Install Libraries : 

!pip install snowflake-snowpark-python

! pip install ipynb

! pip install import-ipynb

In [343]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, count, sum, lit, round, rank, max, parse_json
from snowflake.snowpark.window import Window
from snowflake.snowpark import DataFrame
from typing import Dict

## Connect to Snowflake

In [352]:
#Parameters:

t_raw_emp = "CASE_STUDY.BRONZE.RAW_EMPLOYEE"
t_raw_order = "CASE_STUDY.BRONZE.RAW_ORDER"
t_raw_emp_ord_map = "CASE_STUDY.BRONZE.RAW_EMP_ORD_BRIDGE"

t_int_emp = "CASE_STUDY.SILVER.INT_EMPLOYEE"
t_int_order = "CASE_STUDY.SILVER.INT_ORDER"
t_int_emp_ord_map = "CASE_STUDY.SILVER.INT_EMP_ORD_BRIDGE"

t_prm_emp = "CASE_STUDY.GOLD.EMP_ORDERS"

rename_emp_dict = {
    "EMP_ID": "employee_id",
    "EMP_NAME": "employee_name"
}

rename_emp_ord_map_dict= {
    "EMP_ID": "employee_id",
    "ORDER_ID": "order_id"}

In [None]:
session = snowflake_connect()
print("SNOWFLAKE CONNECTION SUCCESSULL")

## Data Cleaning Libraries

In [345]:
def rename_columns_from_dict(df: DataFrame, rename_map: Dict[str, str]) -> DataFrame:
    """
    Renames columns in a PySpark DataFrame based on a dictionary map.

    Args:
        df (DataFrame): The input PySpark DataFrame.
        rename_map (Dict[str, str]): A dictionary where keys are the 
                                      current column names (old) and values 
                                      are the desired column names (new).

    Returns:
        DataFrame: A new DataFrame with the specified columns renamed.
    """
    renamed_cols = [
        df[col_name].alias(rename_map[col_name])
        if col_name in rename_map
        else col_name
        for col_name in df.columns
    ]
    return df.select(renamed_cols)

def sanitize_snowpark_column_names(df: DataFrame) -> DataFrame:
    """
    Sanitizes column names in a Snowpark DataFrame by explicitly casting 
    them to their string representation, which removes extraneous metadata 
    (like hidden 'dict' structure or complex objects).

    Args:
        df (DataFrame): The input Snowpark DataFrame with potentially complex column names.

    Returns:
        DataFrame: A new DataFrame with clean, simple string column names.
    """
    
    sanitized_cols = [
        col(current_name).alias(str(current_name).strip('"').upper() )
        for current_name in df.columns
    ]
    return df.select(sanitized_cols)

In [350]:
def int_emp(df: DataFrame) -> int:
    df_int_emp = session.sql(f"""SELECT * FROM {t_raw_emp}""")
    processed_emp = sanitize_snowpark_column_names(rename_columns_from_dict(df_int_emp.select(list(rename_emp_dict.keys())), rename_emp_dict))
    processed_emp.write.mode("append").save_as_table(t_int_emp)
    return 0
    

## Intermediate Layer Data Processing

### 1. Employee

In [346]:
df_int_emp = session.sql(f"""SELECT * FROM {t_raw_emp}""")
processed_emp = sanitize_snowpark_column_names(rename_columns_from_dict(df_int_emp.select(list(rename_emp_dict.keys())), rename_emp_dict))
processed_emp.write.mode("append").save_as_table(t_int_emp)


### 2. Orders

In [341]:
df_int_ord = session.sql(f"""SELECT RECORD_CONTENT FROM {t_raw_order}""")
    
processed_ord = (
        df_int_ord
        .with_column("RECORD_CONTENT", parse_json(col("RECORD_CONTENT")))
        .select(
            col("RECORD_CONTENT")["itemid"].as_("ITEM_ID"),
            col("RECORD_CONTENT")["orderid"].as_("ORDER_ID"),
            col("RECORD_CONTENT")["ordertime"].as_("ORDER_TIME"),
            col("RECORD_CONTENT")["orderunits"].as_("ORDER_UNITS"),
            col("RECORD_CONTENT")["address"]["city"].as_("CITY"),
            col("RECORD_CONTENT")["address"]["state"].as_("STATE"),
            col("RECORD_CONTENT")["address"]["zipcode"].as_("ZIPCODE")
        )
    )
    
sanitize_snowpark_column_names(processed_ord).write.mode("append").save_as_table(t_int_order)
    

### 3. Order Map

In [342]:
df_int_emp_ord_map = session.sql(f"""SELECT * FROM {t_raw_emp_ord_map}""")
processed_emp_ord_map = sanitize_snowpark_column_names(rename_columns_from_dict(df_int_emp_ord_map.select(list(rename_emp_ord_map_dict.keys())), rename_emp_ord_map_dict))
processed_emp_ord_map.write.mode("append").save_as_table(t_int_emp_ord_map)

## Primary Layer Data Processing

In [273]:
df_emp = session.sql(f"""SELECT * FROM {t_int_emp}""")
df_ord = session.sql(f"""SELECT * FROM {t_int_order}""")
df_emp_ord_map = session.sql(f"""SELECT * FROM {t_int_emp_ord_map}""")

In [274]:
df_master = df_ord.join(df_emp_ord_map,"ORDER_ID","inner")\
                    .join(df_emp,"EMPLOYEE_ID","inner")\
                    .with_column("ORDER_UNITS", round("ORDER_UNITS",2))

In [275]:
total_units_sold = df_master.select(round(sum(col("ORDER_UNITS")),2).alias("TOTAL_UNITS_SOLD")).collect()[0]["TOTAL_UNITS_SOLD"]

In [276]:
df_emp_metrics = df_master.group_by(
    col("EMPLOYEE_ID"), 
    col("EMPLOYEE_NAME")
).agg(
    count(col("ORDER_ID")).alias("EMPLOYEE_ORDER_COUNT"),
    round(sum(col("ORDER_UNITS")),2).alias("EMPLOYEE_UNITS_SOLD")
).with_column(
    "CONTRIBUTION_PERCENTAGE",
    round(
        (col("EMPLOYEE_UNITS_SOLD") / lit(total_units_sold)) * 100, 
        2
    )
)


In [277]:
df_emp_metrics.write.mode("append").save_as_table(t_prm_emp)

In [278]:
# session.close()
# print("Session closed")

Session closed
