In [None]:
--!jinja
ENV = '{{ env }}'
print("Running notebook in environment:", ENV)


from snowflake.snowpark import Session
from snowflake.snowpark import Session, functions as F
from snowflake.snowpark.functions import col, min, max
from snowflake.snowpark.window import Window
from copy import copy

session = Session.get_active_session()
session.sql(f"ALTER SESSION SET ENV = '{ENV}'").collect()

RAW_SCHEMA = f"{ENV}_RAW_SCHEMA"             # e.g., DEV_RAW_SCHEMA (where your 6 raw tables reside)

HARMONIZED_SCHEMA = f"{ENV}_HARMONIZED_SCHEMA"   # Target schema for harmonized views/tables
 

In [None]:
create_udf_sql = f"""

CREATE OR REPLACE FUNCTION {HARMONIZED_SCHEMA}.USD_CONVERSION_UDF(EXCHANGE_RATE NUMBER(35,4))

RETURNS NUMBER(35,4)

LANGUAGE SQL

AS

$$

    CASE 

        WHEN EXCHANGE_RATE = 0 THEN 0

        ELSE 1 / EXCHANGE_RATE

    END

$$;

"""
 
# Execute the SQL to create the UDF

session.sql(create_udf_sql).collect()
 
print("âœ… USD_CONVERSION_UDF created successfully!")
 

In [None]:
# Define raw data mapping with table names matching those created earlier

raw_data = {

    "RAW_DAILY": ["DEXINUS", "DEXUSEU", "DEXUSUK"],

    "RAW_MONTHLY": ["EXINUS", "EXUSEU", "EXUSUK"]

}
 
def fill_missing_dates(df):

    # Assume df has columns DATE and VALUE (uppercase, as defined in your raw tables)

    # Sort the dataframe by DATE

    df = df.sort("DATE")

    # Get min and max date from the dataset

    agg_row = df.agg(F.min("DATE").alias("MIN_DATE"), F.max("DATE").alias("MAX_DATE")).collect()[0]

    min_date, max_date = agg_row["MIN_DATE"], agg_row["MAX_DATE"]

    print(min_date, max_date)
 
    # Calculate the number of days between min_date and max_date

    date_diff_query = f"SELECT DATEDIFF(DAY, '{min_date}', '{max_date}') AS DATE_DIFF"

    date_diff = session.sql(date_diff_query).collect()[0]["DATE_DIFF"]
 
    # Generate full date range using Snowflake's GENERATOR function, aliasing the date as FULL_DATE

    full_date_range_df = session.sql(f"""

        SELECT CAST(DATEADD(DAY, SEQ4(), '{min_date}') AS DATE) AS FULL_DATE

        FROM TABLE(GENERATOR(ROWCOUNT => {date_diff + 1}))

    """)

    # Alias the dataframes to disambiguate the DATE columns

    fdr = full_date_range_df.alias("fdr")

    d = df.alias("d")

    # Left join on full date (FULL_DATE) equals the raw DATE

    joined_df = fdr.join(d, fdr["FULL_DATE"] == d["DATE"], how="left")

    # Define window specification based on FULL_DATE from the full date range

    window_spec = Window.order_by(fdr["FULL_DATE"]).rows_between(Window.UNBOUNDED_PRECEDING, Window.CURRENT_ROW)

    # Fill missing VALUEs using last non-null value over the window

    # We use the column from d (raw data) which may be null if there's no matching row

    filled_df = joined_df.with_column(

        "FILLED_VALUE",

        F.coalesce(d["VALUE"], F.last_value(d["VALUE"], True).over(window_spec))

    )

    # Select only the FULL_DATE (renamed to DATE) and FILLED_VALUE (renamed to VALUE), and sort by date

    final_df = filled_df.select(fdr["FULL_DATE"].alias("DATE"), F.col("FILLED_VALUE").alias("VALUE")).sort("DATE")

    return final_df
 
def create_harmonized_view():

    # Switch to harmonized schema

    session.use_schema(HARMONIZED_SCHEMA)
 
    for schema_name, tables in raw_data.items():

        base_df = None  # Initialize base DataFrame
 
        for table in tables:

            # Use table names as stored in RAW_SCHEMA

            table_suffix = table  # Here, table is already like "DEXINUS", etc.

            # Select DATE and VALUE columns from the raw table

            df = session.table(f"{RAW_SCHEMA}.{table}").select(F.col("DATE"), F.col("VALUE"))
 
            # Fill missing dates and alias columns

            if schema_name == "RAW_DAILY":

                df = fill_missing_dates(df).select(F.col("DATE").alias("DDATE"), F.col("VALUE").alias(table_suffix))

                # Use a copy to avoid self-join issues

                df_copy = copy(df)

                base_df = df_copy if base_df is None else base_df.join(df_copy, on="DDATE", how="outer")

            else:

                # For monthly data, simply alias the columns

                df = df.select(F.col("DATE").alias("MDATE"), F.col("VALUE").alias(table_suffix))

                df_copy = copy(df)

                base_df = df_copy if base_df is None else base_df.join(df_copy, on="MDATE", how="outer")
 
        # Apply UDF conversion on specific columns

        if schema_name == "RAW_DAILY":

            base_df = base_df.with_column("DEXUSEU_CONVERTED", F.call_udf(f"{HARMONIZED_SCHEMA}.USD_CONVERSION_UDF", F.col("DEXUSEU")))

            base_df = base_df.with_column("DEXUSUK_CONVERTED", F.call_udf(f"{HARMONIZED_SCHEMA}.USD_CONVERSION_UDF", F.col("DEXUSUK")))

        else:

            base_df = base_df.with_column("EXUSEU_CONVERTED", F.call_udf(f"{HARMONIZED_SCHEMA}.USD_CONVERSION_UDF", F.col("EXUSEU")))

            base_df = base_df.with_column("EXUSUK_CONVERTED", F.call_udf(f"{HARMONIZED_SCHEMA}.USD_CONVERSION_UDF", F.col("EXUSUK")))
 
        # Create a harmonized view. The view name will be something like HARMONIZED_DAILY_V or HARMONIZED_MONTHLY_V.

        view_name = f"HARMONIZED_{schema_name.split('_')[1]}_V".upper()

        base_df.create_or_replace_view(view_name)

        print(f"âœ… {view_name} created successfully!")
 
def create_harmonized_stream():

    session.use_schema(HARMONIZED_SCHEMA)

    for schema_name in raw_data.keys():

        if schema_name == "RAW_DAILY":

            session.sql("""

                CREATE OR REPLACE TABLE HARMONIZED_DAILY_TBL AS 

                SELECT * FROM HARMONIZED_DAILY_V

            """).collect()

            session.sql("""

                CREATE OR REPLACE STREAM HARMONIZED_DAILY_STREAM 

                ON TABLE HARMONIZED_DAILY_TBL 

                SHOW_INITIAL_ROWS = TRUE

            """).collect()

            print("Daily Stream created successfully!")

        else:

            session.sql("""

                CREATE OR REPLACE TABLE HARMONIZED_MONTHLY_TBL AS 

                SELECT * FROM HARMONIZED_MONTHLY_V

            """).collect()

            session.sql("""

                CREATE OR REPLACE STREAM HARMONIZED_MONTHLY_STREAM 

                ON TABLE HARMONIZED_MONTHLY_TBL 

                SHOW_INITIAL_ROWS = TRUE

            """).collect()

            print("Monthly Stream created successfully!")
 
# Execute the functions

create_harmonized_view()

create_harmonized_stream()

# If you have a combined table function defined, call it here:

# create_harmonized_combined_table()
 
print("ðŸ”„ Harmonization process completed.")

 

In [None]:
# Execute the functions
create_harmonized_view()
create_harmonized_stream()
create_harmonized_combined_table()

print("ðŸ”„ Harmonization process completed.")