In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()


# Data Reporting and Machine Learning: Burn Rate Forecasting

---

## In-Scope Data Sources

* **Costpoint**: budgets, actuals, labor data
* **PDF / Excel files**: pricing & contract data
* **HCSS**: labor data

---

## Functional Requirements

### Data Integration

* Build pipelines from Costpoint to extract budget, actual, and committed cost data.
* Integrate labor data from Costpoint, Workday, HCSS, and Viewpoint for hours worked and labor categories.
* Standardize data into a unified model (project ID, task, labor category, time period).
* Add manual adjustments/forecast inputs from Excel, Word, and/or SharePoint if needed.

### Data Transformations & Forecasting

* Calculate **burn rate** (actual cost / total budget) * 100 over time.
* Forecast future spend based on historical velocity and staffing plans.
* Create margin projections based on the burn rate vs. revenue accruals.
* Flag projects that exceed burn thresholds (e.g., ≥80% budget at 50% project completion).

### Security & Governance

* Apply **Role-Based Access Control (RBAC)**: project-level visibility for PMs; Finance can view all.
* Mask or restrict access to individual wage data unless the user has an HR role.
* Classify datasets (e.g., sensitive labor cost fields).
* Track lineage from source system (i.e., Costpoint) through the data lake to the report.

### Metadata & Documentation

* Tag datasets as “financial-critical”, “forecasting”, “labor-sensitive”.
* Add business glossary terms (e.g., burn-rate, earned value, EAC).
* Assign a data steward to the forecasting dataset.
* Document assumptions for all forecast logic in the catalog.

### Reporting & Insights

* Build dashboards for:
    * Budget vs. Actual by month
    * Forecast burn rate vs. baseline
    * Heatmaps for over/under-spending by project
    * Variance explanations (with auto-tagged reasons)
* Export forecast summaries to Excel/CSV for program reviews.


In [None]:
use database BURN_RATE_FORECAST;
use schema RAW;

### Upload the files to a stage in the above database.  You can use the UI (from the database navigator) or the command line interface SnowCLI.

In [None]:
list @RAWFILES;

### These files can be queried directly, at least the json ones, or csv, parquet, etc.  PDFs we'll address in a moment.

In [None]:
CREATE OR REPLACE FILE FORMAT my_json_format
  TYPE = 'JSON'
  STRIP_OUTER_ARRAY = TRUE; -- Important: If your JSON file starts with an array, this treats each element of the array as a separate row.

In [None]:

    SELECT
    -- Accessing top-level elements:
    PARSE_JSON($1):ActualCost::FLOAT AS ACTUAL_COST,
    PARSE_JSON($1):BudgetedCost::VARCHAR AS BudgetedCost,
    PARSE_JSON($1):ContractId::VARCHAR AS ContractId,
    PARSE_JSON($1):CostCategory::VARCHAR AS CostCategory,
    PARSE_JSON($1):Id::VARCHAR AS Id,
    PARSE_JSON($1):Month::VARCHAR AS Month,
    PARSE_JSON($1):Variance::VARCHAR AS Variance,
  
    -- You can also include metadata columns
    METADATA$FILENAME AS source_filename,
    METADATA$FILE_ROW_NUMBER AS row_in_file
FROM
    @BURN_RATE_FORECAST.RAW.RAWFILES/costpoint_burn_rate.json (FILE_FORMAT => ( 'my_json_format')) ;

### That was unnecessarily difficult.  When we load the data, Snowflake can read the schema on injest and flatten automatically.  

(Go to the UI and walk through loading a file and see the SQL) 

So much simpler.  See below following the use of this process:

In [None]:
SELECT * FROM TABLE(INFER_SCHEMA(location=>'@"BURN_RATE_FORECAST"."RAW"."RAWFILES"', files=>('hcss_burn_rate.json'), file_format=>'my_json_format' , MAX_RECORDS_PER_FILE=>10000 ));

In [None]:
select * from COSTPOINT_BURN_RATE limit 100;

### Naturally we aren't going to be uploading files one by one in a UI, so it is easy to build pipelines to handle this.  I'll use Python as an example - so useful that you can alternate between SQL and Python in the same notebook.  We can save these to the CORE schema in case they were already loaded.

In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
import json # Used for pretty printing the inferred schema (optional)

def load_json_files_with_infer_schema(session: Session, stage_name: str, database_name: str, schema_name: str, dest_schema_name:str):
    """
    Loops through JSON files in a specified internal stage, infers their schema,
    creates tables with flattened columns based on the inferred schema,
    and loads the JSON data into them.

    Args:
        session (Session): The Snowpark session object.
        stage_name (str): The name of the internal stage (e.g., 'RAWFILES').
        database_name (str): The name of the database where tables will be created.
        schema_name (str): The name of the schema where tables will be created.
    """
    print(f"Starting JSON file loading process from stage: {stage_name} with INFER_SCHEMA")

    # Set the context for table creation
    session.use_database(database_name)
    session.use_schema(schema_name)

    # List files in the stage
    stage_files_df = session.sql(f"LS @{stage_name}").collect()

    for file_info in stage_files_df:
        file_name_with_path = file_info['name']
        file_name_raw = file_name_with_path.split('/')[-1]
        if not file_name_raw.lower().endswith('.json'):
            print(f"Skipping non-JSON file: {file_name_raw}")
            continue

        table_name = file_name_raw.replace('.json', '_TST').upper()
        file_path_on_stage = f"@{stage_name}/{file_name_raw}"

        print(f"\nProcessing file: {file_name_raw}")
        print(f"Target table: {table_name}")

        try:
            # --- Check table existence using INFORMATION_SCHEMA ---
            check_table_sql = f"""
            SELECT COUNT(*)
            FROM {database_name}.INFORMATION_SCHEMA.TABLES
            WHERE TABLE_SCHEMA = '{schema_name.upper()}'
              AND TABLE_NAME = '{table_name.upper()}'
            ;
            """
            table_exists_count = session.sql(check_table_sql).collect()[0][0]

            if table_exists_count == 0:
                print(f"Table {table_name} does not exist. Inferring schema and creating table.")

                # 1. Infer Schema
                # Use a temporary file format for schema inference
                temp_file_format_name = f"{table_name}_TEMP_FF"
                create_ff_sql = f"""
                CREATE OR REPLACE FILE FORMAT {temp_file_format_name}
                    TYPE = 'JSON'
                    STRIP_OUTER_ARRAY = TRUE; -- Important: set to FALSE if top-level is an array of objects
                """
                session.sql(create_ff_sql).collect()
                print(f"Created temporary file format: {temp_file_format_name}")

                infer_schema_sql = f"""
                SELECT *
                FROM TABLE(
                    INFER_SCHEMA(
                        LOCATION => '{file_path_on_stage}',
                        FILE_FORMAT => '{temp_file_format_name}'
                    )
                )
                """
                inferred_schema_df = session.sql(infer_schema_sql).collect()

                # Build the CREATE TABLE DDL from the inferred schema
                columns_ddl = []
                for row in inferred_schema_df:
                    column_name = row['COLUMN_NAME']
                    # Use standard SQL types from inferred schema
                    column_type = row['TYPE']
                    # Adjust for potential unsupported types or add precision if needed
                    # For example, if it infers DECIMAL(38,0) and you want INTEGER
                    # Or if it infers a complex type that needs further handling
                    columns_ddl.append(f"{column_name} {column_type}")

                if not columns_ddl:
                    print(f"Warning: No columns inferred for {file_name_raw}. Skipping table creation.")
                    # Optionally, you might create a table with a single VARIANT column here
                    continue

                create_table_sql = f"""
                CREATE OR REPLACE TABLE {database_name}.{dest_schema_name}.{table_name} (
                    {', '.join(columns_ddl)}
                );
                """
                session.sql(create_table_sql).collect()
                print(f"Created table {table_name} with inferred schema:")
                for col_def in columns_ddl:
                    print(f"  - {col_def}")

                # Clean up temporary file format
                session.sql(f"DROP FILE FORMAT {temp_file_format_name}").collect()
                print(f"Dropped temporary file format: {temp_file_format_name}")

            else:
                print(f"Table {table_name} already exists. Appending data (assuming schema matches).")

            # 2. Load Data using a SELECT from stage and table function for auto-flattening
            # This uses the MATCH_RECOGNIZE pattern to map inferred columns to input file columns
            # The 'TABLE(result_scan())' is a powerful way to reference the output of INFER_SCHEMA
            # and then use it in the COPY INTO for projection.
            # However, for direct COPY INTO with INFER_SCHEMA, a simpler approach is often needed
            # based on the structure. If the INFER_SCHEMA works well, a direct COPY INTO
            # FROM (SELECT $1:column_name::TYPE, ... FROM @stage) is more common.
            #
            # A more straightforward way to load into a flattened table after schema inference:
            # COPY INTO <table_name> FROM (SELECT $1:<col1>::<type1>, $1:<col2>::<type2> ... FROM @stage)
            # Snowflake's COPY INTO command with INFER_SCHEMA automatically handles mapping.

            # The simplest way to load after INFER_SCHEMA has done its work:
            copy_into_sql = f"""
            COPY INTO {database_name}.{schema_name}.{table_name}
            FROM @{stage_name}/{file_name_raw}
            FILE_FORMAT = (TYPE = 'JSON')
            MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
            ON_ERROR = 'CONTINUE';
            """
            session.sql(copy_into_sql).collect()
            print(f"Successfully loaded {file_name_raw} into {table_name} with flattened data.")

        except Exception as e:
            print(f"Error processing {file_name_raw}: {e}")
            # Optionally, you might want to log this error to a table for review
            # or move the problematic file to an error stage.

# --- Example Usage in a Snowflake Notebook ---
current_session = session

# --- Configuration ---
# IMPORTANT: Replace these with your actual stage, database, and schema names
my_stage = 'RAWFILES' # e.g., 'RAW_JSON_STAGE'
my_database = 'BURN_RATE_FORECAST' # e.g., 'DEMO_DB'
my_schema = 'RAW'     # e.g., 'PUBLIC'
dest_schema = 'CORE'

# --- Run the function ---
load_json_files_with_infer_schema(current_session, my_stage, my_database, my_schema, dest_schema)

### So what about the PDF files?  Several options, but let's jump out to DocumentAI and see how easy it is for a business user to create a model that can extract the salient info from a document like the contract.pdf

In [None]:
--Here is how we call the model that 'predicts' the values we defined in DocumentAI

create  table contract_data if not exists as 
SELECT BURN_RATE_FORECAST.RAW.CONTRACTS_EXTRACTION!PREDICT(
  GET_PRESIGNED_URL(@RAWFILES, 'contract_document.pdf'), 1) JSON_DATA;


--create table PDF_OCR_OUTPUT as
SELECT
    t.JSON_DATA:"__documentMetadata".ocrScore::FLOAT AS OCR_SCORE,
    f_client.value:value::STRING AS CLIENT_VALUE,
    f_client.value:score::FLOAT AS CLIENT_SCORE,
    f_contract_ceiling.value:value::STRING AS CONTRACT_CEILING_VALUE,
    f_contract_ceiling.value:score::FLOAT AS CONTRACT_CEILING_SCORE,
    f_rate.value:value::STRING AS RATE_VALUE,
    f_rate.value:score::FLOAT AS RATE_SCORE,
    f_reporting_date.value:value::STRING AS REPORTING_DATE_VALUE,
    f_reporting_date.value:score::FLOAT AS REPORTING_DATE_SCORE,
    f_term_end.value:value::STRING AS TERM_END_VALUE,
    f_term_end.value:score::FLOAT AS TERM_END_SCORE,
    f_term_start.value:value::STRING AS TERM_START_VALUE,
    f_term_start.value:score::FLOAT AS TERM_START_SCORE
FROM
    contract_data t,
    LATERAL FLATTEN(input => t.JSON_DATA:client) f_client,
    LATERAL FLATTEN(input => t.JSON_DATA:contract_ceiling) f_contract_ceiling,
    LATERAL FLATTEN(input => t.JSON_DATA:rate) f_rate,
    LATERAL FLATTEN(input => t.JSON_DATA:reporting_date) f_reporting_date,
    LATERAL FLATTEN(input => t.JSON_DATA:term_end) f_term_end,
    LATERAL FLATTEN(input => t.JSON_DATA:term_start) f_term_start;