In [1]:
import findspark
findspark.init()
print(findspark.find())

import os
import sys
import json
import time
import pymongo
import certifi
import shutil
import pandas as pd

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W

/opt/anaconda3/lib/python3.12/site-packages/pyspark


#### Declare and Assign Variables 

In [2]:
# --------------------------------------------------------------------------------
# Specify MySQL Server Connection Information
# --------------------------------------------------------------------------------
mysql_args = {
    "host_name" : "localhost",
    "port" : "3306",
    "db_name" : "adventureworks",
    "conn_props" : {
        "user" : "root",
        "password" : "mypassword",
        "driver" : "com.mysql.cj.jdbc.Driver"
    }
}
# --------------------------------------------------------------------------------
# Specify MongoDB Cluster Connection Information
# --------------------------------------------------------------------------------
mongodb_args = {
    "cluster_location" : "atlas",
    "user_name" : "miamccarrick",
    "password" : "mypassword",
    "cluster_name" : "mycluster",
    "cluster_subnet" : "zl4ms",
    "db_name" : "northwind",
    "collection" : "",
    "null_column_threshold" : 0.5
}


base_dir = os.path.join(os.getcwd(), 'lab_data')
data_dir = os.path.join(base_dir, 'adventureworks')
stream_dir = os.path.join(data_dir, 'streaming')

sales_stream_dir = os.path.join(stream_dir, 'sales')
# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "northwind_data_lakehouse"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

sales_output_bronze = os.path.join(database_dir, 'fact_sales', 'bronze')
sales_output_silver = os.path.join(database_dir, 'fact_sales', 'silver')
sales_output_gold = os.path.join(database_dir, 'fact_sales', 'gold')

### Define Global Functions

In [3]:
def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    '''Fetch each item in the directory, and filter out any directories.'''
    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    '''Populate lists with the Size and Last Modification DateTime for each file in the directory.'''
    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)


def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
        
    print(f"The stream has processed {len(query.recentProgress)} batchs")


def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"
        

def drop_null_columns(df, threshold):
    '''Drop Columns having a percentage of NULL values that exceeds the given 'threshold' parameter value.'''
    columns_with_nulls = [col for col in df.columns if df.filter(df[col].isNull()).count() / df.count() > threshold] 
    df_dropped = df.drop(*columns_with_nulls) 
    
    return df_dropped
    
    
def get_mysql_dataframe(spark_session, sql_query : str, **args):
    '''Create a JDBC URL to the MySQL Database'''
    jdbc_url = f"jdbc:mysql://{args['host_name']}:{args['port']}/{args['db_name']}"
    
    '''Invoke the spark.read.format("jdbc") function to query the database, and fill a DataFrame.'''
    dframe = spark_session.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("driver", args['conn_props']['driver']) \
    .option("user", args['conn_props']['user']) \
    .option("password", args['conn_props']['password']) \
    .option("query", sql_query) \
    .load()
    
    return dframe


def get_mongo_uri(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the 'cluster_location' parameter.")
        
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri


def get_spark_conf_args(spark_jars : list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "
    
    sparkConf_args = {
        "app_name" : "PySpark Northwind Data Lakehouse (Medallion Architecture)",
        "worker_threads" : f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions" : int(os.cpu_count()),
        "mongo_uri" : get_mongo_uri(**args),
        "spark_jars" : jars[0:-2],
        "database_dir" : sql_warehouse_dir
    }
    
    return sparkConf_args
    

def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name'])\
    .setMaster(args['worker_threads']) \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.memory', '2g') \
    .set('spark.jars', args['spark_jars']) \
    .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .set('spark.mongodb.input.uri', args['mongo_uri']) \
    .set('spark.mongodb.output.uri', args['mongo_uri']) \
    .set('spark.sql.adaptive.enabled', 'false') \
    .set('spark.sql.debug.maxToStringFields', 35) \
    .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
    .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .set('spark.sql.streaming.schemaInference', 'true') \
    .set('spark.sql.warehouse.dir', args['database_dir']) \
    .set('spark.streaming.stopGracefullyOnShutdown', 'true')
    
    return sparkConf




def get_mongo_client(**args):
    '''Get MongoDB Client Connection'''
    mongo_uri = get_mongo_uri(**args)
    if args['cluster_location'] == "atlas":
        client = pymongo.MongoClient(mongo_uri, tlsCAFile=certifi.where())

    elif args['cluster_location'] == "local":
        client = pymongo.MongoClient(mongo_uri)
        
    else:
        raise Exception("A MongoDB Client could not be created.")

    return client
    
    
# TODO: Rewrite this to leverage PySpark?
def set_mongo_collections(mongo_client, db_name : str, data_directory : str, json_files : list):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()
    

def get_mongodb_dataframe(spark_session, **args):
    '''Query MongoDB, and create a DataFrame'''
    dframe = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("database", args['db_name']) \
        .option("collection", args['collection']).load()

    '''Drop the '_id' index column to clean up the response.'''
    dframe = dframe.drop('_id')
    
    '''Call the drop_null_columns() function passing in the dataframe.'''
    dframe = drop_null_columns(dframe, args['null_column_threshold'])
    
    return dframe

### Initialize Data Lakehouse Directory Structure
Remove the Data Lakehouse Database Directory Structure to Ensure Idempotency

In [4]:
remove_directory_tree(database_dir)

"Directory '/Users/miamccarrick/DS2002/Project 2/spark-warehouse/northwind_data_lakehouse.db' has been removed successfully."

### Create a New Spark Session

In [5]:
worker_threads = f"local[{int(os.cpu_count()/2)}]"

jars = []
mysql_spark_jar = os.path.join(os.getcwd(), "mysql-connector-j-9.3.0", "mysql-connector-j-9.3.0.jar")
#mssql_spark_jar = os.path.join(os.getcwd(), "sqljdbc_12.8", "enu", "jars", "mssql-jdbc-12.8.1.jre11.jar")

jars.append(mysql_spark_jar)
#jars.append(mssql_spark_jar)

sparkConf_args = get_spark_conf_args(jars, **mongodb_args)

sparkConf = get_spark_conf(**sparkConf_args)
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("OFF")
spark

25/04/25 11:22:47 WARN Utils: Your hostname, Mias-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 172.25.64.204 instead (on interface en0)
25/04/25 11:22:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/miamccarrick/.ivy2/cache
The jars for the packages stored in: /Users/miamccarrick/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-90ae5798-958f-4738-9000-d6b6a51d852f;1.0
	confs: [default]


:: loading settings :: url = jar:file:/opt/anaconda3/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 143ms :: artifacts dl 6ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   4   |   0   |   0   |   0   ||   4   |   0   |
	---------------------------------------------------------------------
:: retrieving ::

### Create a New Metadata Database

In [6]:
spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE;")

sql_create_db = f"""
    CREATE DATABASE IF NOT EXISTS {dest_database}
    COMMENT 'DS-2002 Lab 06 Database'
    WITH DBPROPERTIES (contains_pii = true, purpose = 'DS-2002 Lab 6.0');
"""
spark.sql(sql_create_db)

DataFrame[]

#### Getting Data from MongoDB

In [7]:
sql_products = """
SELECT * FROM dim_products
"""
product_data = get_mysql_dataframe(spark, sql_products, **mysql_args)
pandas_df = product_data.toPandas()
pandas_df.to_json("products.json", orient="records", lines=False)

                                                                                

In [8]:
#Populate MongoDB with product data
client = get_mongo_client(**mongodb_args)

data_dir = os.getcwd()

json_files = {"products" : 'products.json'}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)  

In [9]:
#Extract Products from MongoDB
mongodb_args["collection"] = "products"

df_dim_products = get_mongodb_dataframe(spark, **mongodb_args)

                                                                                

#### Make Necessary Transformations to the New Dataframe¶

In [10]:
# ----------------------------------------------------------------------------------
# Rename the 'id' column to 'customer_id' ------------------------------------------
# ----------------------------------------------------------------------------------
df_dim_products = df_dim_products.withColumnRenamed("ProductID", "product_id")
# ----------------------------------------------------------------------------------
# Reorder Columns and display the first two rows in a Pandas dataframe
# ----------------------------------------------------------------------------------
ordered_columns = ['product_key', 'product_id', 'Name', 'ProductNumber','ListPrice', 'Color', 'ProductLine', 
                   'ProductModelID', 'ProductSubcategoryID', 'ReorderPoint', 'SafetyStockLevel', 'SellStartDate', 'StandardCost']

df_dim_products = df_dim_products[ordered_columns]
df_dim_products.toPandas().head(2)

Unnamed: 0,product_key,product_id,Name,ProductNumber,ListPrice,Color,ProductLine,ProductModelID,ProductSubcategoryID,ReorderPoint,SafetyStockLevel,SellStartDate,StandardCost
0,1,1,Adjustable Race,AR-5381,0.0,,,,,750,1000,896659200000,0.0
1,2,2,Bearing Ball,BA-8327,0.0,,,,,750,1000,896659200000,0.0


#### Save df_dim_products to Data Lakehouse

In [11]:
df_dim_products.write.saveAsTable(f"{dest_database}.dim_products", mode="overwrite")

                                                                                

#### Getting Data from MySQL

In [12]:
#Get Customers data
sql_customers = "SELECT * FROM adventureworks.dim_customers"
df_customers = get_mysql_dataframe(spark, sql_customers, **mysql_args)

#### Save as the dim_customers table in the Data Lakehouse

In [13]:
df_dim_customers = df_customers.withColumnRenamed("CustomerID", "customer_id")

ordered_columns = ['customer_key', 'customer_id', 'AccountNumber',
                  'CustomerType', 'TerritoryID']  

df_dim_customers = df_dim_customers.select(ordered_columns)
df_dim_customers.toPandas().head(2)

Unnamed: 0,customer_key,customer_id,AccountNumber,CustomerType,TerritoryID
0,1,1,AW00000001,S,1
1,2,2,AW00000002,S,1


In [14]:
df_dim_customers.write.saveAsTable(f"{dest_database}.dim_customers", mode="overwrite")

In [15]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_customers;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_customers LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|        customer_key|              bigint|   NULL|
|         customer_id|              bigint|   NULL|
|       AccountNumber|              string|   NULL|
|        CustomerType|              string|   NULL|
|         TerritoryID|              bigint|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|northwind_data_la...|       |
|               Table|       dim_customers|       |
|        Created Time|Fri Apr 25 11:23:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.5.5|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|            Location|file:/Users/miamc...|       |
+-----------

Unnamed: 0,customer_key,customer_id,AccountNumber,CustomerType,TerritoryID
0,1,1,AW00000001,S,1
1,2,2,AW00000002,S,1


In [16]:
#Get Date Data
sql_dim_date = "SELECT * FROM adventureworks.dim_date"
df_dim_date = get_mysql_dataframe(spark, sql_dim_date, **mysql_args)
df_dim_date.toPandas().head(2)

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


#### Save as the dim_date table in the Data Lakehouse

In [17]:
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")

In [18]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_date;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_date LIMIT 2").toPandas()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|            date_key|      int|   NULL|
|           full_date|     date|   NULL|
|           date_name| char(11)|   NULL|
|        date_name_us| char(11)|   NULL|
|        date_name_eu| char(11)|   NULL|
|         day_of_week|  tinyint|   NULL|
|    day_name_of_week| char(10)|   NULL|
|        day_of_month|  tinyint|   NULL|
|         day_of_year|      int|   NULL|
|     weekday_weekend| char(10)|   NULL|
|        week_of_year|  tinyint|   NULL|
|          month_name| char(10)|   NULL|
|       month_of_year|  tinyint|   NULL|
|is_last_day_of_month|  char(1)|   NULL|
|    calendar_quarter|  tinyint|   NULL|
|       calendar_year|      int|   NULL|
| calendar_year_month| char(10)|   NULL|
|   calendar_year_qtr| char(10)|   NULL|
|fiscal_month_of_year|  tinyint|   NULL|
|      fiscal_quarter|  tinyint|   NULL|
+--------------------+---------+-------+
only showing top

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


#### Getting Data from local CSV

In [19]:
#Query products to export as CSV
sql_salesperson = """
SELECT SalesPersonID, TerritoryID, SalesQuota, Bonus,
CommissionPct, SalesYTD, SalesLastYear
FROM adventureworks.salesperson
"""
salesperson_data = get_mysql_dataframe(spark, sql_salesperson, **mysql_args)
salesperson_data = salesperson_data.toPandas()
salesperson_data.to_csv('salesperson.csv', index=False)

#### Use PySpark to Read data from a CSV file

In [20]:
df_dim_salesperson = spark.read.option("header", True).csv("salesperson.csv")
df_dim_salesperson.show(2)

+-------------+-----------+----------+------+-------------+------------+-------------+
|SalesPersonID|TerritoryID|SalesQuota| Bonus|CommissionPct|    SalesYTD|SalesLastYear|
+-------------+-----------+----------+------+-------------+------------+-------------+
|          268|       NULL|      NULL|   0.0|          0.0| 677558.4653|          0.0|
|          275|        2.0|  300000.0|4100.0|        0.012|4557045.0459| 1750406.4785|
+-------------+-----------+----------+------+-------------+------------+-------------+
only showing top 2 rows



In [21]:
# ----------------------------------------------------------------------------------
# Rename the 'id' column to 'employee_id' ------------------------------------------
# ----------------------------------------------------------------------------------
df_dim_salesperson = df_dim_salesperson.withColumnRenamed("SalesPersonID", "salesperson_id")

# ----------------------------------------------------------------------------------
# Add Primary Key column using SQL Windowing function: ROW_NUMBER() 
# ----------------------------------------------------------------------------------
df_dim_salesperson.createOrReplaceTempView("salesperson")
sql_salesperson = f"""
    SELECT *, ROW_NUMBER() OVER (ORDER BY salesperson_id) AS salesperson_key
    FROM salesperson;
"""
df_dim_salesperson = spark.sql(sql_salesperson)

ordered_columns = ['salesperson_key', 'salesperson_id', 'SalesQuota', 'SalesYTD'
                   , 'TerritoryID', 'SalesLastYear', 'Bonus', 'CommissionPct']

df_dim_salesperson = df_dim_salesperson[ordered_columns]

df_dim_salesperson.toPandas().head(2)

Unnamed: 0,salesperson_key,salesperson_id,SalesQuota,SalesYTD,TerritoryID,SalesLastYear,Bonus,CommissionPct
0,1,268,,677558.4653,,0.0,0.0,0.0
1,2,275,300000.0,4557045.0459,2.0,1750406.4785,4100.0,0.012


#### Save Salesperson Dimension in Data Lakehouse

In [22]:
df_dim_salesperson.write.saveAsTable(f"{dest_database}.dim_salesperson", mode="overwrite")

In [23]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_salesperson;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_salesperson LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|     salesperson_key|                 int|   NULL|
|      salesperson_id|              string|   NULL|
|          SalesQuota|              string|   NULL|
|            SalesYTD|              string|   NULL|
|         TerritoryID|              string|   NULL|
|       SalesLastYear|              string|   NULL|
|               Bonus|              string|   NULL|
|       CommissionPct|              string|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|northwind_data_la...|       |
|               Table|     dim_salesperson|       |
|        Created Time|Fri Apr 25 11:23:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.5.5|       |
|           

Unnamed: 0,salesperson_key,salesperson_id,SalesQuota,SalesYTD,TerritoryID,SalesLastYear,Bonus,CommissionPct
0,1,268,,677558.4653,,0.0,0.0,0.0
1,2,275,300000.0,4557045.0459,2.0,1750406.4785,4100.0,0.012


### Verify Dimension Tables

In [24]:
spark.sql(f"USE {dest_database};")
spark.sql("SHOW TABLES").toPandas()

Unnamed: 0,namespace,tableName,isTemporary
0,northwind_data_lakehouse,dim_customers,False
1,northwind_data_lakehouse,dim_date,False
2,northwind_data_lakehouse,dim_products,False
3,northwind_data_lakehouse,dim_salesperson,False
4,,salesperson,True


### Use PySpark Structured Streaming to Process (Hot Path) <span style="color:darkred">Orders</span> Fact Data  
#### Verify the location of the source data files on the file system

In [25]:
get_file_info(sales_stream_dir)

Unnamed: 0,name,size,modification_time
0,sales_1.json,114732,2025-04-24 16:14:31.247002840
1,sales_2.json,101409,2025-04-24 16:14:29.302740335
2,sales_3.json,112668,2025-04-24 16:14:00.144544125


#### Create the Bronze Layer: Stage <span style="color:darkred">Orders Fact table</span> Data
##### Read "Raw" JSON file data into a Stream

In [26]:
df_sales_bronze = (
    spark.readStream \
    .option("schemaLocation", sales_output_bronze) \
    .option("maxFilesPerTrigger", 1) \
    .option("multiLine", "true") \
    .json(sales_stream_dir)
)

df_sales_bronze.isStreaming

True

In [27]:
sales_checkpoint_bronze = os.path.join(sales_output_bronze, '_checkpoint')

sales_bronze_query = (
    df_sales_bronze
    .withColumn("receipt_time", current_timestamp())
    .withColumn("source_file", input_file_name())
    
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("sales_bronze")
    .trigger(availableNow = True) \
    .option("checkpointLocation", sales_checkpoint_bronze) \
    .option("compression", "snappy") \
    .start(sales_output_bronze)
)

In [28]:
print(f"Query ID: {sales_bronze_query.id}")
print(f"Query Name: {sales_bronze_query.name}")
print(f"Query Status: {sales_bronze_query.status}")

Query ID: c337c195-9439-4318-a1fd-de6f07aa78b4
Query Name: sales_bronze
Query Status: {'message': 'Writing offsets to log', 'isDataAvailable': False, 'isTriggerActive': True}


In [29]:
sales_bronze_query.awaitTermination()

#### Create the Silver Layer: Integrate "Cold-path" Data & Make Transformations

In [30]:
df_dim_order_date = df_dim_date.select(col("date_key").alias("order_date_key"), col("full_date").alias("order_full_date"))

In [31]:
df_sales_silver = spark.readStream.format("parquet").load(sales_output_bronze) \
    .join(broadcast(df_dim_customers), "customer_key", "inner") \
    .join(broadcast(df_dim_salesperson), "salesperson_key", "inner") \
    .join(broadcast(df_dim_products), "product_key", "inner") \
    .join(broadcast(df_dim_order_date), df_dim_order_date.order_full_date.cast(DateType()) == col("order_full_date").cast(DateType()), "inner") \
    .select(
        col("SalesOrderID").cast(LongType()),
        df_dim_customers.customer_key.cast(LongType()),
        df_dim_salesperson.salesperson_key.cast(LongType()),
        df_dim_products.product_key.cast(LongType()),
        df_dim_order_date.order_date_key.cast(LongType()),
        col("UnitPrice"),
        col("OrderQty"),
        col("UnitPriceDiscount")
    )

In [32]:
df_sales_silver.isStreaming

True

In [33]:
df_sales_silver.printSchema()

root
 |-- SalesOrderID: long (nullable = true)
 |-- customer_key: long (nullable = true)
 |-- salesperson_key: long (nullable = false)
 |-- product_key: long (nullable = true)
 |-- order_date_key: long (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- OrderQty: long (nullable = true)
 |-- UnitPriceDiscount: double (nullable = true)



##### Write the Transformed Streaming data to the Data Lakehouse

In [34]:
sales_checkpoint_silver = os.path.join(sales_output_silver, '_checkpoint')

sales_silver_query = (
    df_sales_silver.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("sales_silver")
    .trigger(availableNow = True) \
    .option("checkpointLocation", sales_checkpoint_silver) \
    .option("compression", "snappy") \
    .start(sales_output_silver)
)

In [35]:
print(f"Query ID: {sales_silver_query.id}")
print(f"Query Name: {sales_silver_query.name}")
print(f"Query Status: {sales_silver_query.status}")

Query ID: 3f7f4425-abfd-47f9-8f40-4cac4e61aaec
Query Name: sales_silver
Query Status: {'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}


In [36]:
sales_silver_query.awaitTermination()

                                                                                

#### Create Gold Layer: Perform Aggregations
#### Sales per rep by month

In [37]:
df_sales_by_rep_gold = spark.readStream.format("parquet").load(sales_output_silver) \
    .join(df_dim_date, col("order_date_key") == col("date_key")) \
    .groupBy("salesperson_key", "month_of_year", "month_name") \
    .agg(sum("OrderQty").alias("units_sold"),
        sum((col("UnitPrice") * col("OrderQty") * (1 - col("UnitPriceDiscount")))).alias("revenue")
    ) \
    .orderBy("salesperson_key", "month_of_year")

In [38]:
df_sales_by_rep_gold.printSchema()

root
 |-- salesperson_key: long (nullable = true)
 |-- month_of_year: byte (nullable = true)
 |-- month_name: string (nullable = true)
 |-- units_sold: long (nullable = true)
 |-- revenue: double (nullable = true)



In [39]:
sales_gold_query = (
    df_sales_by_rep_gold.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("sales_by_rep")
    .start()
)

In [40]:
wait_until_stream_is_ready(sales_gold_query, 1)

                                                                                

The stream has processed 1 batchs


In [41]:
df_fact_sales_per_rep = spark.sql("SELECT * FROM sales_by_rep")
df_fact_sales_per_rep.printSchema()

root
 |-- salesperson_key: long (nullable = true)
 |-- month_of_year: byte (nullable = true)
 |-- month_name: string (nullable = true)
 |-- units_sold: long (nullable = true)
 |-- revenue: double (nullable = true)



In [42]:
df_fact_sales_per_rep_gold_final = df_fact_sales_per_rep \
.select(col("salesperson_key").alias("Salesperson Key"), \
        col("revenue").alias("Salesperson Revenue"), \
        col("units_sold").alias("Total Units Sold")) \
.orderBy(asc("month_of_year"))

In [43]:
df_fact_sales_per_rep_gold_final.write.saveAsTable(f"{dest_database}.sales_by_rep", mode="overwrite")
spark.sql(f"SELECT * FROM {dest_database}.sales_by_rep").toPandas()

Unnamed: 0,Salesperson Key,Salesperson Revenue,Total Units Sold
0,1,7.005743e+06,6138
1,2,5.150264e+07,76725
2,3,5.113608e+07,68541
3,4,7.384120e+07,96844
4,5,3.910760e+07,45694
...,...,...,...
115,6,9.874770e+07,119009
116,7,3.510461e+07,35464
117,8,2.920725e+07,76725
118,9,9.094743e+07,152427
