## Lab 06: Data Lakehouse with Structured Streaming
This lab will help you learn to use many of the software libraries and programming techniques required to fulfill the requirements of the final end-of-session capstone project for course **DS-2002: Data Systems**. The spirit of the project is to provide a capstone challenge that requires students to demonstrate a practical and functional understanding of each of the data systems and architectural principles covered throughout the session.

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [1]:
import findspark
findspark.init()
print(findspark.find())

import os
import sys
import json
import time
import pymongo
import certifi
import shutil
import pandas as pd

from pyspark import SparkConf
from pyspark.sql import SparkSession

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W

C:\spark-3.5.4-bin-hadoop3


#### 2.0. Instantiate Global Variables

In [2]:
# --------------------------------------------------------------------------------
# Specify MySQL Server Connection Information
# --------------------------------------------------------------------------------
mysql_args = {
    "host_name" : "localhost",
    "port" : "3306",
    "db_name" : "estella_dw",  
    "conn_props" : {
        "user" : "root",
        "password" : "DwightSchrute21!",  
        "driver" : "com.mysql.cj.jdbc.Driver"
    }
}


# --------------------------------------------------------------------------------
# Specify MongoDB Cluster Connection Information
# --------------------------------------------------------------------------------

mongodb_args = {
    "cluster_location" : "atlas",
    "user_name" : "stellarhill",
    "password" : "DwightSchrute21",
    "cluster_name" : "StellCluster",
    "cluster_subnet" : "p4yt3yg",
    "db_name" : "estella_dw",  
    "collection" : "",
    "null_column_threshold" : 0.5
}


# --------------------------------------------------------------------------------
# Specify Directory Structure for Source Data
# --------------------------------------------------------------------------------
base_dir = os.path.join(os.getcwd(), 'lab_data')
data_dir = os.path.join(base_dir, 'estella_dw')
batch_dir = os.path.join(data_dir, 'batch')
stream_dir = os.path.join(data_dir, 'streaming')

fact_sales_stream_dir = os.path.join(stream_dir, 'fact_sales')

# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "estella_dw"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

fact_sales_output_bronze = os.path.join(database_dir, 'fact_sales', 'bronze')
fact_sales_output_silver = os.path.join(database_dir, 'fact_sales', 'silver')
fact_sales_output_gold = os.path.join(database_dir, 'fact_sales', 'gold')

In [3]:
print("Batch dir:", batch_dir)
print("Fact sales stream dir:", fact_sales_stream_dir)
print("Warehouse dir:", database_dir)


Batch dir: C:\Users\stell\OneDrive\Desktop\Documents\DS-2002-main\04-PySpark\lab_data\estella_dw\batch
Fact sales stream dir: C:\Users\stell\OneDrive\Desktop\Documents\DS-2002-main\04-PySpark\lab_data\estella_dw\streaming\fact_sales
Warehouse dir: C:\Users\stell\OneDrive\Desktop\Documents\DS-2002-main\04-PySpark\spark-warehouse\estella_dw.db


### 3.0. Define Global Functions

In [4]:
def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    '''Fetch each item in the directory, and filter out any directories.'''
    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    '''Populate lists with the Size and Last Modification DateTime for each file in the directory.'''
    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)


def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
        
    print(f"The stream has processed {len(query.recentProgress)} batchs")


def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"
        

def drop_null_columns(df, threshold):
    """
    Drop columns that have a percentage of NULL values exceeding the threshold.
    Skip if DataFrame has only _corrupt_record column.
    """
    if df.columns == ["_corrupt_record"]:
        print("Warning: DataFrame only has _corrupt_record column. Skipping drop_null_columns.")
        return df
    
    total_rows = df.count()
    if total_rows == 0:
        return df

    columns_with_nulls = [
        col for col in df.columns if df.filter(df[col].isNull()).count() / total_rows > threshold
    ]
    df_dropped = df.drop(*columns_with_nulls)
    return df_dropped


    
def get_mysql_dataframe(spark_session, sql_query : str, **args):
    '''Create a JDBC URL to the MySQL Database'''
    jdbc_url = f"jdbc:mysql://{args['host_name']}:{args['port']}/{args['db_name']}"
    
    '''Invoke the spark.read.format("jdbc") function to query the database, and fill a DataFrame.'''
    dframe = spark_session.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("driver", args['conn_props']['driver']) \
    .option("user", args['conn_props']['user']) \
    .option("password", args['conn_props']['password']) \
    .option("query", sql_query) \
    .load()
    
    return dframe
    

def get_mongo_uri(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the 'cluster_location' parameter.")
        
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri


def get_spark_conf_args(spark_jars : list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "
    
    sparkConf_args = {
        "app_name" : "PySpark Estella Data Lakehouse (Medallion Architecture)",
        "worker_threads" : f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions" : int(os.cpu_count()),
        "mongo_uri" : get_mongo_uri(**args),
        "spark_jars" : jars[0:-2],
        "database_dir" : sql_warehouse_dir
    }
    
    return sparkConf_args
    

def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name'])\
    .setMaster(args['worker_threads']) \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.memory', '2g') \
    .set('spark.jars', args['spark_jars']) \
    .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') \
    .set('spark.mongodb.input.uri', args['mongo_uri']) \
    .set('spark.mongodb.output.uri', args['mongo_uri']) \
    .set('spark.sql.adaptive.enabled', 'false') \
    .set('spark.sql.debug.maxToStringFields', 35) \
    .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
    .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .set('spark.sql.streaming.schemaInference', 'true') \
    .set('spark.sql.warehouse.dir', args['database_dir']) \
    .set('spark.streaming.stopGracefullyOnShutdown', 'true')
    
    return sparkConf


def get_mongo_client(**args):
    '''Get MongoDB Client Connection'''
    mongo_uri = get_mongo_uri(**args)
    if args['cluster_location'] == "atlas":
        client = pymongo.MongoClient(mongo_uri, tlsCAFile=certifi.where())

    elif args['cluster_location'] == "local":
        client = pymongo.MongoClient(mongo_uri)
        
    else:
        raise Exception("A MongoDB Client could not be created.")

    return client
    
    
# TODO: Rewrite this to leverage PySpark?
def set_mongo_collections(mongo_client, db_name : str, data_directory : str, json_files : list):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()


    

def get_mongodb_dataframe(spark_session, **args):
    '''Query MongoDB, and create a DataFrame'''
    dframe = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
        .option("database", args['db_name']) \
        .option("collection", args['collection']).load()

    '''Drop the '_id' index column to clean up the response.'''
    dframe = dframe.drop('_id')
    
    '''Call the drop_null_columns() function passing in the dataframe.'''
    dframe = drop_null_columns(dframe, args['null_column_threshold'])
    
    return dframe

### 4.0. Initialize Data Lakehouse Directory Structure
Remove the Data Lakehouse Database Directory Structure to Ensure Idempotency

In [5]:
database_dir = os.path.join(sql_warehouse_dir, "estella_dw.db")
remove_directory_tree(database_dir)

"Directory 'C:\\Users\\stell\\OneDrive\\Desktop\\Documents\\DS-2002-main\\04-PySpark\\spark-warehouse\\estella_dw.db' has been removed successfully."

### 5.0. Create a New Spark Session

In [6]:
worker_threads = f"local[{int(os.cpu_count()/2)}]"

jars = []
mysql_spark_jar = os.path.join(os.getcwd(), "mysql-connector-j-9.1.0", "mysql-connector-j-9.1.0.jar")
mssql_spark_jar = os.path.join(os.getcwd(), "sqljdbc_12.8", "enu", "jars", "mssql-jdbc-12.8.1.jre11.jar")

jars.append(mysql_spark_jar)
#jars.append(mssql_spark_jar)

sparkConf_args = get_spark_conf_args(jars, **mongodb_args)

sparkConf = get_spark_conf(**sparkConf_args)
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("OFF")
spark


### 6.0. Create a New Metadata Database.

In [7]:
spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE;")

sql_create_db = f"""
    CREATE DATABASE IF NOT EXISTS {dest_database}
    COMMENT 'DS-2002 AdventureWorks Data Lakehouse'
    WITH DBPROPERTIES (contains_pii = true, purpose = 'DS-2002 Final Project');
"""
spark.sql(sql_create_db)

DataFrame[]

## Section II: Populate Dimensions by Ingesting "Cold-path" Reference Data 
### 1.0. Fetch Data from the File System
#### 1.1. Verify the location of the source data files on the file system

In [8]:
get_file_info(batch_dir)



Unnamed: 0,name,size,modification_time
0,dim_customer.csv,916342,2025-12-17 19:18:00.996943474
1,dim_date.csv,64182,2025-12-17 19:21:36.731117249
2,dim_product.csv,12368,2025-12-17 19:22:34.330742836
3,dim_shipping_region.csv,113,2025-12-17 19:23:08.750877619


#### 1.2. Populate the <span style="color:darkred">Customer Dimension</span>
##### 1.2.1. Use PySpark to Read data from a CSV file

In [9]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
import os

batch_dir = r"C:\Users\stell\OneDrive\Desktop\Documents\DS-2002-main\04-PySpark\lab_data\estella_dw\batch"

files_and_schemas = {
    "dim_customer.csv": StructType([
        StructField("customer_id", IntegerType(), True),
        StructField("full_name", StringType(), True),
        StructField("city", StringType(), True),
        StructField("state", StringType(), True),
        StructField("country", StringType(), True)
    ]),
    "dim_date.csv": StructType([
        StructField("date_key", IntegerType(), True),
        StructField("full_date", DateType(), True),
        StructField("day", IntegerType(), True),
        StructField("month", IntegerType(), True),
        StructField("year", IntegerType(), True),
        StructField("quarter", IntegerType(), True)
    ]),
    "dim_product.csv": StructType([
        StructField("product_id", IntegerType(), True),
        StructField("product_name", StringType(), True),
        StructField("category", StringType(), True),
        StructField("subcategory", StringType(), True),
        StructField("list_price", DoubleType(), True)
    ]),
    "dim_shipping_region.csv": StructType([
        StructField("region_id", IntegerType(), True),
        StructField("region_name", StringType(), True)
    ])
}

spark = SparkSession.builder.getOrCreate()

df_dimensions = {} 

for file_name, schema in files_and_schemas.items():
    file_path = os.path.join(batch_dir, file_name)
    print(f"Reading {file_path}")
    
    df = spark.read.csv(file_path, header=True, schema=schema)
    
   
    primary_col = schema.fields[0].name  
    window_spec = Window.orderBy(primary_col)
    df = df.withColumn(f"{primary_col}_key", row_number().over(window_spec))
    

    cols = [f"{primary_col}_key"] + [f.name for f in schema.fields]
    df = df.select(*cols)
    
    df.show(5)
    
    df_dimensions[file_name] = df


Reading C:\Users\stell\OneDrive\Desktop\Documents\DS-2002-main\04-PySpark\lab_data\estella_dw\batch\dim_customer.csv
+---------------+-----------+---------+-------+-------+-------+
|customer_id_key|customer_id|full_name|   city|  state|country|
+---------------+-----------+---------+-------+-------+-------+
|              1|          2|  Unknown|Unknown|Unknown|Unknown|
|              2|          3|  Unknown|Unknown|Unknown|Unknown|
|              3|          4|  Unknown|Unknown|Unknown|Unknown|
|              4|          5|  Unknown|Unknown|Unknown|Unknown|
|              5|          6|  Unknown|Unknown|Unknown|Unknown|
+---------------+-----------+---------+-------+-------+-------+
only showing top 5 rows

Reading C:\Users\stell\OneDrive\Desktop\Documents\DS-2002-main\04-PySpark\lab_data\estella_dw\batch\dim_date.csv
+------------+--------+----------+----+-----+----+-------+
|date_key_key|date_key| full_date| day|month|year|quarter|
+------------+--------+----------+----+-----+----+-

In [10]:
df_dim_customer = df_dimensions["dim_customer.csv"]
df_dim_customer.show(5)


+---------------+-----------+---------+-------+-------+-------+
|customer_id_key|customer_id|full_name|   city|  state|country|
+---------------+-----------+---------+-------+-------+-------+
|              1|          2|  Unknown|Unknown|Unknown|Unknown|
|              2|          3|  Unknown|Unknown|Unknown|Unknown|
|              3|          4|  Unknown|Unknown|Unknown|Unknown|
|              4|          5|  Unknown|Unknown|Unknown|Unknown|
|              5|          6|  Unknown|Unknown|Unknown|Unknown|
+---------------+-----------+---------+-------+-------+-------+
only showing top 5 rows



##### 1.2.2. Make Necessary Transformations to the New DataFrame

In [11]:
df_dim_customer = df_dim_customer.withColumnRenamed("customer_id", "customer_id_orig")  

df_dim_customer.createOrReplaceTempView("customers")
sql_customers = """
    SELECT *, ROW_NUMBER() OVER (ORDER BY customer_id_orig) AS customer_key
    FROM customers
"""
df_dim_customer = spark.sql(sql_customers)

ordered_columns = ['customer_key', 'customer_id_orig', 'full_name', 'city', 'state', 'country']
df_dim_customer = df_dim_customer[ordered_columns]

df_dim_customer.limit(5).toPandas()



Unnamed: 0,customer_key,customer_id_orig,full_name,city,state,country
0,1,2,Unknown,Unknown,Unknown,Unknown
1,2,3,Unknown,Unknown,Unknown,Unknown
2,3,4,Unknown,Unknown,Unknown,Unknown
3,4,5,Unknown,Unknown,Unknown,Unknown
4,5,6,Unknown,Unknown,Unknown,Unknown


##### 1.2.3. Save as the <span style="color:darkred">dim_customer</span> table in the Data Lakehouse

In [12]:
df_dim_customer.write.saveAsTable(f"{dest_database}.dim_customer", mode="overwrite")

##### 1.2.4. Unit Test: Describe and Preview Table

In [13]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_customer").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_customer LIMIT 2").toPandas()


+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|        customer_key|                 int|   NULL|
|    customer_id_orig|                 int|   NULL|
|           full_name|              string|   NULL|
|                city|              string|   NULL|
|               state|              string|   NULL|
|             country|              string|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|          estella_dw|       |
|               Table|        dim_customer|       |
|        Created Time|Thu Dec 18 00:21:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.5.4|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|           

Unnamed: 0,customer_key,customer_id_orig,full_name,city,state,country
0,1,2,Unknown,Unknown,Unknown,Unknown
1,2,3,Unknown,Unknown,Unknown,Unknown


#### 1.3. Populate the <span style="color:darkred">Date Dimension</span>
##### 1.3.1. Use PySpark to Read Data from a CSV File

In [14]:
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window
import os

df_dim_date_raw = spark.read.csv(
    os.path.join(batch_dir, "dim_date.csv"),
    header=False,
    inferSchema=True
)

df_dim_date_raw.show(5)

df_dim_date = (
    df_dim_date_raw
    .select(
        col("_c0").cast("int").alias("date_key"),
        col("_c1").cast("date").alias("full_date"),
        col("_c2").cast("int").alias("year"),
        col("_c3").cast("int").alias("day_of_year"),
        col("_c4").cast("int").alias("day"),
        col("_c5").cast("int").alias("week"),
        col("_c6").cast("int").alias("month")
    )
)
from pyspark.sql.functions import ceil, col

df_dim_date = df_dim_date.withColumn(
    "quarter",
    ceil(col("month") / 3)
)

window_spec = Window.orderBy("date_key")

df_dim_date = (
    df_dim_date
    .withColumn("date_key_surrogate", row_number().over(window_spec))
    .select(
        "date_key_surrogate",
        "date_key",
        "full_date",
        "day",
        "month",
        "year",
        "quarter"
    )
)

df_dim_date.show(5)


+--------+-------------------+----+---+---+---+---+
|     _c0|                _c1| _c2|_c3|_c4|_c5|_c6|
+--------+-------------------+----+---+---+---+---+
|20010701|2001-07-01 00:00:00|2001|  7|  1|  7|  1|
|20010702|2001-07-02 00:00:00|2001|  7|  2|  1|  0|
|20010703|2001-07-03 00:00:00|2001|  7|  3|  2|  0|
|20010704|2001-07-04 00:00:00|2001|  7|  4|  3|  0|
|20010705|2001-07-05 00:00:00|2001|  7|  5|  4|  0|
+--------+-------------------+----+---+---+---+---+
only showing top 5 rows

+------------------+--------+----------+---+-----+----+-------+
|date_key_surrogate|date_key| full_date|day|month|year|quarter|
+------------------+--------+----------+---+-----+----+-------+
|                 1|20010701|2001-07-01|  1|    1|2001|      1|
|                 2|20010702|2001-07-02|  2|    0|2001|      0|
|                 3|20010703|2001-07-03|  3|    0|2001|      0|
|                 4|20010704|2001-07-04|  4|    0|2001|      0|
|                 5|20010705|2001-07-05|  5|    0|2001|    

##### 1.3.2 Make Necessary Transformations to the New DataFrame

In [15]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("date_key")

df_dim_date = (
    df_dim_date
    .withColumn("date_key_surrogate", row_number().over(window_spec))
    .select(
        "date_key_surrogate",
        "date_key",
        "full_date",
        "day",
        "month",
        "year",
        "quarter"
    )
)

df_dim_date.show(5)



+------------------+--------+----------+---+-----+----+-------+
|date_key_surrogate|date_key| full_date|day|month|year|quarter|
+------------------+--------+----------+---+-----+----+-------+
|                 1|20010701|2001-07-01|  1|    1|2001|      1|
|                 2|20010702|2001-07-02|  2|    0|2001|      0|
|                 3|20010703|2001-07-03|  3|    0|2001|      0|
|                 4|20010704|2001-07-04|  4|    0|2001|      0|
|                 5|20010705|2001-07-05|  5|    0|2001|      0|
+------------------+--------+----------+---+-----+----+-------+
only showing top 5 rows



##### 1.3.3. Save as the <span style="color:darkred">dim_date</span> table in the Data Lakehouse

In [16]:
df_dim_date.write.saveAsTable(f"{dest_database}.dim_date", mode="overwrite")

##### 1.3.4. Unit Test: Describe and Preview Table

In [17]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_date").show()

spark.sql(f"SELECT * FROM {dest_database}.dim_date LIMIT 2").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|  date_key_surrogate|                 int|   NULL|
|            date_key|                 int|   NULL|
|           full_date|                date|   NULL|
|                 day|                 int|   NULL|
|               month|                 int|   NULL|
|                year|                 int|   NULL|
|             quarter|              bigint|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|          estella_dw|       |
|               Table|            dim_date|       |
|        Created Time|Thu Dec 18 00:22:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.5.4|       |
|                Type|             MANAGED|       |
|           

Unnamed: 0,date_key_surrogate,date_key,full_date,day,month,year,quarter
0,1,20010701,2001-07-01,1,1,2001,1
1,2,20010702,2001-07-02,2,0,2001,0


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
#### 2.1. Create a New MongoDB Database, and Load Each JSON File into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [18]:
import pandas as pd
import os

csv_files = {
    "dim_product": "dim_product.csv",
    "dim_shipping_region": "dim_shipping_region.csv"
}


client = get_mongo_client(**mongodb_args)
db = client[mongodb_args["db_name"]]


for collection_name, csv_file in csv_files.items():
    csv_path = os.path.join(batch_dir, csv_file)
    df = pd.read_csv(csv_path)
    records = df.to_dict(orient="records")
    
    
    db.drop_collection(collection_name)
    db[collection_name].insert_many(records)
    print(f"Loaded {len(records)} records into MongoDB collection '{collection_name}'")


client.close()



Loaded 265 records into MongoDB collection 'dim_product'
Loaded 4 records into MongoDB collection 'dim_shipping_region'


#### 2.2. Populate the <span style="color:darkred">Product</span>
##### 2.2.1. Fetch Data from the New MongoDB <span style="color:darkred">Product</span> Collection



In [19]:

mongodb_args["collection"] = "dim_product"


df_dim_product = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
    .option("database", mongodb_args['db_name']) \
    .option("collection", mongodb_args['collection']).load()

if "_id" in df_dim_product.columns:
    df_dim_product = df_dim_product.drop("_id")

df_dim_product.limit(5).toPandas()


Unnamed: 0,34.99,707,Red,"Sport-100 Helmet, Red"
0,,708,Black,"Sport-100 Helmet, Black"
1,,709,White,"Mountain Bike Socks, M"
2,,710,White,"Mountain Bike Socks, L"
3,,711,Blue,"Sport-100 Helmet, Blue"
4,,712,Multi,AWC Logo Cap


In [20]:
correct_columns = [
    "product_id", "product_number", "color", "name"
]
df_dim_product = df_dim_product.toDF(*correct_columns)

df_dim_product.show(5)


+----------+--------------+-----+--------------------+
|product_id|product_number|color|                name|
+----------+--------------+-----+--------------------+
|      NULL|           708|Black|Sport-100 Helmet,...|
|      NULL|           709|White|Mountain Bike Soc...|
|      NULL|           710|White|Mountain Bike Soc...|
|      NULL|           711| Blue|Sport-100 Helmet,...|
|      NULL|           712|Multi|        AWC Logo Cap|
+----------+--------------+-----+--------------------+
only showing top 5 rows



In [21]:
df_dim_product.write.saveAsTable(f"{dest_database}.dim_product", mode="overwrite")


##### 2.2.2. Make Necessary Transformations to the New Dataframe

In [22]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy("product_id")
df_dim_product = df_dim_product.withColumn("product_key", row_number().over(window_spec))

df_dim_product_clean = df_dim_product.select(
    df_dim_product["product_key"],
    df_dim_product["product_id"],
    df_dim_product["product_number"],
    df_dim_product["name"],
    df_dim_product["color"]
  
)

df_dim_product_clean.limit(5).toPandas()




Unnamed: 0,product_key,product_id,product_number,name,color
0,1,,708,"Sport-100 Helmet, Black",Black
1,2,,709,"Mountain Bike Socks, M",White
2,3,,710,"Mountain Bike Socks, L",White
3,4,,711,"Sport-100 Helmet, Blue",Blue
4,5,,712,AWC Logo Cap,Multi


##### 2.2.3. Save as the <span style="color:darkred">dim_product</span> table in the Data lakehouse

In [23]:

df_dim_product_clean.write.saveAsTable(f"{dest_database}.dim_product", mode="overwrite")

##### 2.2.4. Unit Test: Describe and Preview Table

In [24]:

spark.sql(f"SHOW TABLES IN {dest_database}").show()

spark.sql(f"SELECT * FROM {dest_database}.dim_product LIMIT 5").toPandas()


+----------+------------+-----------+
| namespace|   tableName|isTemporary|
+----------+------------+-----------+
|estella_dw|dim_customer|      false|
|estella_dw|    dim_date|      false|
|estella_dw| dim_product|      false|
|          |   customers|       true|
+----------+------------+-----------+



Unnamed: 0,product_key,product_id,product_number,name,color
0,1,,708,"Sport-100 Helmet, Black",Black
1,2,,709,"Mountain Bike Socks, M",White
2,3,,710,"Mountain Bike Socks, L",White
3,4,,711,"Sport-100 Helmet, Blue",Blue
4,5,,712,AWC Logo Cap,Multi


#### 2.4. Populate the <span style="color:darkred">Shipping Regions</span>
##### 2.3.1. Fetch Data from the New MongoDB <span style="color:darkred">Shipping Regions</span> Collection

In [25]:

mongodb_args["collection"] = "dim_shipping_region"

df_dim_shipping_region = get_mongodb_dataframe(spark, **mongodb_args)


##### 2.3.2. Make Necessary Transformations to the New Dataframe

In [26]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

df_dim_shipping_region = df_dim_shipping_region.toDF(
    "shippingregion_id",  
    "name",               
    "country"            
)


window_spec = Window.orderBy("shippingregion_id")
df_dim_shipping_region = df_dim_shipping_region.withColumn("shippingregion_key", row_number().over(window_spec))


df_dim_shipping_region_clean = df_dim_shipping_region.select(
    "shippingregion_key",
    "shippingregion_id",
    "name",
    "country"
)


df_dim_shipping_region_clean.limit(5).toPandas()



Unnamed: 0,shippingregion_key,shippingregion_id,name,country
0,1,2,West Coast,USA
1,2,3,Midwest,USA
2,3,4,South,USA
3,4,5,International,Global


##### 2.3.3. Save as the <span style="color:darkred">dim_shipping_regions</span> table in the Data lakehouse

In [27]:

df_dim_shipping_region_clean.write.saveAsTable(
    f"{dest_database}.dim_shipping_region",
    mode="overwrite"
)


##### 2.3.4. Unit Test: Describe and Preview Table

In [28]:

spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_shipping_region").show(truncate=False)

spark.sql(f"SELECT * FROM {dest_database}.dim_shipping_region LIMIT 5").toPandas()


+----------------------------+-------------------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                                |comment|
+----------------------------+-------------------------------------------------------------------------------------------------------------------------+-------+
|shippingregion_key          |int                                                                                                                      |NULL   |
|shippingregion_id           |int                                                                                                                      |NULL   |
|name                        |string                                                                                                                   |NULL   |
|country                     |stri

Unnamed: 0,shippingregion_key,shippingregion_id,name,country
0,1,2,West Coast,USA
1,2,3,Midwest,USA
2,3,4,South,USA
3,4,5,International,Global


### 3.0. Fetch Reference Data from a MySQL Database
#### 3.1. Populate the <span style="color:darkred">Currency Dimension</span>
##### 3.1.1 Fetch data from the <span style="color:darkred">dim_currency</span> table in MySQL

In [29]:

sql_dim_currency = f"SELECT * FROM {mysql_args['db_name']}.dim_currency"

df_dim_currency = get_mysql_dataframe(spark, sql_dim_currency, **mysql_args)

df_dim_currency.limit(5).toPandas()


Unnamed: 0,CurrencyCode,Name,ModifiedDate
0,AED,Emirati Dirham,1998-06-01
1,AFA,Afghani,1998-06-01
2,ALL,Lek,1998-06-01
3,AMD,Armenian Dram,1998-06-01
4,ANG,Netherlands Antillian Guilder,1998-06-01


##### 3.1.2. Save as the <span style="color:darkred">dim_date</span> table in the Data Lakehouse

In [30]:
df_dim_currency.write.saveAsTable(f"{dest_database}.dim_currency", mode="overwrite")


##### 3.1.3. Unit Test: Describe and Preview Table

In [31]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_currency").show()

spark.sql(f"SELECT * FROM {dest_database}.dim_currency LIMIT 5").toPandas()


+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|        CurrencyCode|          varchar(3)|   NULL|
|                Name|         varchar(50)|   NULL|
|        ModifiedDate|           timestamp|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|          estella_dw|       |
|               Table|        dim_currency|       |
|        Created Time|Thu Dec 18 00:22:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.5.4|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|            Location|file:/C:/Users/st...|       |
+--------------------+--------------------+-------+



Unnamed: 0,CurrencyCode,Name,ModifiedDate
0,AED,Emirati Dirham,1998-06-01
1,AFA,Afghani,1998-06-01
2,ALL,Lek,1998-06-01
3,AMD,Armenian Dram,1998-06-01
4,ANG,Netherlands Antillian Guilder,1998-06-01


#### 3.2. Populate the <span style="color:darkred">Product Dimension</span>
##### 3.2.1. Fetch data from the <span style="color:darkred">Products</span> table in MySQL

In [32]:
sql_dim_culture = f"SELECT * FROM {mysql_args['db_name']}.dim_culture"

df_dim_culture = get_mysql_dataframe(spark, sql_dim_culture, **mysql_args)

df_dim_culture.limit(5).toPandas()


Unnamed: 0,CultureID,Name,ModifiedDate
0,,Invariant Language (Invariant Country),1998-06-01
1,ar,Arabic,1998-06-01
2,en,English,1998-06-01
3,es,Spanish,1998-06-01
4,fr,French,1998-06-01


##### 3.2.3. Save as the <span style="color:darkred">dim_products</span> table in the Data Lakehouse

In [33]:
df_dim_currency.write.saveAsTable(f"{dest_database}.dim_culture", mode="overwrite")

##### 3.2.4. Unit Test: Describe and Preview Table

In [34]:
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_culture").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_culture LIMIT 5").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|        CurrencyCode|          varchar(3)|   NULL|
|                Name|         varchar(50)|   NULL|
|        ModifiedDate|           timestamp|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|          estella_dw|       |
|               Table|         dim_culture|       |
|        Created Time|Thu Dec 18 00:22:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 3.5.4|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|            Location|file:/C:/Users/st...|       |
+--------------------+--------------------+-------+



Unnamed: 0,CurrencyCode,Name,ModifiedDate
0,AED,Emirati Dirham,1998-06-01
1,AFA,Afghani,1998-06-01
2,ALL,Lek,1998-06-01
3,AMD,Armenian Dram,1998-06-01
4,ANG,Netherlands Antillian Guilder,1998-06-01


### 4.0. Verify Dimension Tables

In [35]:
spark.sql(f"USE {dest_database};")
spark.sql("SHOW TABLES").toPandas()

Unnamed: 0,namespace,tableName,isTemporary
0,estella_dw,dim_culture,False
1,estella_dw,dim_currency,False
2,estella_dw,dim_customer,False
3,estella_dw,dim_date,False
4,estella_dw,dim_product,False
5,estella_dw,dim_shipping_region,False
6,,customers,True


## Section III: Integrate Reference Data with Real-Time Data
### 6.0. Use PySpark Structured Streaming to Process (Hot Path) <span style="color:darkred">Fact Sales</span> Fact Data  
#### 6.1. Verify the location of the source data files on the file system

In [36]:
get_file_info(fact_sales_stream_dir)


Unnamed: 0,name,size,modification_time
0,fact_sales_43659_54147.json,218346,2025-12-17 18:32:01.172175407
1,fact_sales_54148_64636.json,216391,2025-12-17 18:32:36.945561647
2,fact_sales_64637_75123.json,216599,2025-12-17 18:33:36.349854708


#### 6.2. Create the Bronze Layer: Stage <span style="color:darkred">Fact Sales table</span> Data
##### 6.2.1. Read "Raw" JSON file data into a Stream

In [37]:
fact_sales_stream_dir = r"C:\Users\stell\OneDrive\Desktop\Documents\DS-2002-main\04-PySpark\lab_data\estella_dw\streaming\fact_sales"
fact_sales_output_bronze = r"C:\Users\stell\OneDrive\Desktop\Documents\DS-2002-main\04-PySpark\lab_data\estella_dw\streaming\fact_sales\Bronze"

df_fact_sales_bronze = (
    spark.readStream
    .option("schemaLocation", fact_sales_output_bronze)
    .option("maxFilesPerTrigger", 1)     
    .option("multiLine", "true")          
    .json(fact_sales_stream_dir)         
)

df_fact_sales_bronze.isStreaming


True

##### 6.2.2. Write the Streaming Data to a Parquet file

In [38]:
from pyspark.sql.functions import current_timestamp, input_file_name
import os

fact_sales_checkpoint_bronze = os.path.join(fact_sales_output_bronze, '_checkpoint')

fact_sales_bronze_query = (
    df_fact_sales_bronze
    .withColumn("receipt_time", current_timestamp())  
    .withColumn("source_file", input_file_name())    
    .writeStream
    .format("parquet")
    .outputMode("append")
    .queryName("fact_sales_bronze")
    .trigger(availableNow=True)                      
    .option("checkpointLocation", fact_sales_checkpoint_bronze)
    .option("compression", "snappy")
    .start(fact_sales_output_bronze)
)

fact_sales_bronze_query.isActive



True

##### 6.2.3. Unit Test: Implement Query Monitoring

In [39]:


print(f"Query ID: {fact_sales_bronze_query.id}")
print(f"Query Name: {fact_sales_bronze_query.name}")
print(f"Query Status: {fact_sales_bronze_query.status}")

fact_sales_bronze_query.awaitTermination()


Query ID: b67dbe89-9562-4776-866e-da325f8e3878
Query Name: fact_sales_bronze
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [40]:
spark.read.parquet(fact_sales_output_bronze).printSchema()


root
 |-- customer_id: long (nullable = true)
 |-- date_key: long (nullable = true)
 |-- extended_amount: double (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_line_id: long (nullable = true)
 |-- product_id: long (nullable = true)
 |-- quantity: long (nullable = true)
 |-- region_id: long (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- receipt_time: timestamp (nullable = false)
 |-- source_file: string (nullable = false)



In [41]:

dim_tables = [
    "dim_customer",
    "dim_product",
    "dim_shipping_region",
    "dim_date", 
    "dim_culture", 
    "dim_currency", 
    
]

dest_database = "estella_dw"

for table in dim_tables:
    print(f"\n=== Table: {table} ===")
    df = spark.table(f"{dest_database}.{table}")
    print("Schema:")
    df.printSchema()
    print("Sample rows:")
    df.show(5, truncate=False)



=== Table: dim_customer ===
Schema:
root
 |-- customer_key: integer (nullable = true)
 |-- customer_id_orig: integer (nullable = true)
 |-- full_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)

Sample rows:
+------------+----------------+---------+-------+-------+-------+
|customer_key|customer_id_orig|full_name|city   |state  |country|
+------------+----------------+---------+-------+-------+-------+
|1           |2               |Unknown  |Unknown|Unknown|Unknown|
|2           |3               |Unknown  |Unknown|Unknown|Unknown|
|3           |4               |Unknown  |Unknown|Unknown|Unknown|
|4           |5               |Unknown  |Unknown|Unknown|Unknown|
|5           |6               |Unknown  |Unknown|Unknown|Unknown|
+------------+----------------+---------+-------+-------+-------+
only showing top 5 rows


=== Table: dim_product ===
Schema:
root
 |-- product_key: integer (nullable =

#### 6.3. Create the Silver Layer: Integrate "Cold-path" Data & Make Transformations
##### 6.3.1. Prepare Role-Playing Dimension Primary and Business Keys

In [42]:

from pyspark.sql.functions import col

df_dim_product = df_dim_product.withColumnRenamed("id", "product_key")
df_dim_product = df_dim_product.withColumnRenamed("product_number", "product_business_key")

df_dim_customer = df_dim_customer.withColumnRenamed("id", "customer_key")
df_dim_customer = df_dim_customer.withColumnRenamed("customer_id", "customer_id_orig")

df_dim_date = df_dim_date.withColumnRenamed("id", "date_key")
df_dim_date = df_dim_date.withColumnRenamed("full_date", "date_full_date")

df_dim_shipping_region = df_dim_shipping_region.withColumnRenamed("id", "shippingregion_key")

print("Dimension tables prepared with primary and business keys.")



Dimension tables prepared with primary and business keys.


##### 6.3.2. Define Silver Query to Join Streaming with Batch Data

In [43]:
fact_sales_output_bronze = r"C:\Users\stell\OneDrive\Desktop\Documents\DS-2002-main\04-PySpark\lab_data\estella_dw\streaming\fact_sales\Bronze"


df_bronze_stream = spark.readStream.format("parquet").load(fact_sales_output_bronze)


df_silver_stream = (
    df_bronze_stream
    .join(
        df_dim_product,
        df_bronze_stream.product_id.cast("integer") == df_dim_product.product_business_key,
        "left"
    )
    .join(
        df_dim_date,
        df_bronze_stream.date_key == df_dim_date.date_key,
        "left"
    )
    .join(
        df_dim_customer,
        df_bronze_stream.customer_id == df_dim_customer.customer_id_orig,
        "left"
    )
    .join(
        df_dim_shipping_region,
        df_bronze_stream.region_id == df_dim_shipping_region.shippingregion_key,
        "left"
    )
    .select(
        df_bronze_stream.order_id,
        df_bronze_stream.order_line_id,
        df_bronze_stream.customer_id,
        df_bronze_stream.product_id.alias("bronze_product_id"),
        df_dim_product.product_key,
        df_dim_product.color.alias("product_category"),
        df_dim_product.name.alias("product_name"),
        df_bronze_stream.quantity,
        df_bronze_stream.unit_price,
        df_bronze_stream.extended_amount,
        df_bronze_stream.date_key.alias("order_date_key"),
        df_dim_date.date_full_date.alias("full_date"),
        df_dim_date.month,
        df_dim_date.year,
        df_dim_date.quarter,
        df_bronze_stream.region_id,
        df_dim_shipping_region.name.alias("region_name"),
        df_dim_customer.full_name.alias("customer_name"),
        df_bronze_stream.receipt_time,
        df_bronze_stream.source_file
    )
)

print(f"Is streaming? {df_silver_stream.isStreaming}")  


Is streaming? True


#### 6.3.3. Write the Transformed Streaming data to the Data Lakehouse

In [44]:
fact_sales_output_silver = r"C:\Users\stell\OneDrive\Desktop\Documents\DS-2002-main\04-PySpark\lab_data\estella_dw\streaming\fact_sales\Silver"
checkpoint_location = r"C:\Users\stell\OneDrive\Desktop\Documents\DS-2002-main\04-PySpark\lab_data\estella_dw\streaming\fact_sales\Silver_checkpoint"


df_silver_stream_query = (
    df_silver_stream.writeStream
    .format("parquet")
    .option("path", fact_sales_output_silver)
    .option("checkpointLocation", checkpoint_location)
    .outputMode("append")  
    .start()
)

print("Streaming Silver query started. Use df_silver_stream_query.awaitTermination() to keep it running.")


Streaming Silver query started. Use df_silver_stream_query.awaitTermination() to keep it running.


In [45]:
df_silver_stream_query.stop()

In [46]:
from pyspark.sql.functions import col, sum as _sum, when, isnull

df_silver_stream_query.awaitTermination()

df_verify = spark.read.parquet(fact_sales_output_silver)

print("\nVerified Silver schema:")
df_verify.printSchema()

print("\nSample rows from Silver:")
df_verify.show(10, truncate=False)

df_verify.select([
    _sum(when(isnull(c), 1).otherwise(0)).alias(c)
    for c in ["product_key", "product_category", "month", "year"]
]).show()

df_verify.groupBy("product_category").count().orderBy(col("count").desc()).show(20, truncate=False)

print("\nSilver layer monitoring complete.")



Verified Silver schema:
root
 |-- order_id: long (nullable = true)
 |-- order_line_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- bronze_product_id: long (nullable = true)
 |-- product_key: integer (nullable = true)
 |-- product_category: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- quantity: long (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- extended_amount: double (nullable = true)
 |-- order_date_key: long (nullable = true)
 |-- full_date: date (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- quarter: long (nullable = true)
 |-- region_id: long (nullable = true)
 |-- region_name: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- receipt_time: timestamp (nullable = true)
 |-- source_file: string (nullable = true)


Sample rows from Silver:
+--------+-------------+-----------+-----------------+-----------+----------------+----------------

#### 6.4. Create Gold Layer: Perform Aggregations
##### 6.4.1. Define a Query to Create a Business Report
Create a new Gold table using the PySpark API. The table should include the number of Products sold per Category each Month. The results should include The Month, Product Category and Number of Products sold, sorted by the month number when the orders were placed: e.g., January, February, March.

In [49]:
from pyspark.sql.functions import col, sum as _sum, when
import os

fact_sales_output_silver = r"C:\Users\stell\OneDrive\Desktop\Documents\DS-2002-main\04-PySpark\lab_data\estella_dw\streaming\fact_sales\Silver"
fact_sales_output_gold = r"C:\Users\stell\OneDrive\Desktop\Documents\DS-2002-main\04-PySpark\lab_data\estella_dw\streaming\fact_sales\Gold"

for stream in spark.streams.active:
    if stream.name in ["fact_sales_gold", "fact_sales_gold_memory"]:
        stream.stop()

df_silver_stream = spark.readStream.option("maxFilesPerTrigger", 1).parquet(fact_sales_output_silver)

df_gold_stream = (
    df_silver_stream
    .filter(col("product_category").isNotNull() & col("month").isNotNull() & col("year").isNotNull() & col("quantity").isNotNull())
    .withColumn("category_clean", when(col("product_category").isNull(), "Unknown").otherwise(col("product_category")))
    .groupBy("month", "year", "category_clean")
    .agg(_sum("quantity").alias("product_count"))
    .select(col("month").alias("Month"), col("year").alias("Year"), col("category_clean").alias("Product_Category"), col("product_count").alias("Product_Count"))
)


#### 6.4.2: Write Streaming data to MEMORY (Complete mode)

In [50]:
fact_sales_gold_memory_query = (
    df_gold_stream.writeStream
    .format("memory")
    .outputMode("complete")
    .queryName("fact_sales_gold_memory")
    .trigger(availableNow=True)
    .start()
)

fact_sales_gold_memory_query.awaitTermination()


#### 6.4.3: Query the Gold Data from Memory

In [51]:
df_gold_memory = spark.sql("SELECT * FROM fact_sales_gold_memory ORDER BY Year, Month, Product_Count DESC")
df_gold_memory.show(50, truncate=False)


+-----+----+----------------+-------------+
|Month|Year|Product_Category|Product_Count|
+-----+----+----------------+-------------+
|0    |2001|Red             |463          |
|0    |2001|Black           |370          |
|0    |2001|Silver          |209          |
|0    |2001|Multi           |176          |
|0    |2001|White           |69           |
|0    |2001|Blue            |34           |
|1    |2001|Red             |360          |
|1    |2001|Black           |236          |
|1    |2001|Multi           |124          |
|1    |2001|Silver          |50           |
|1    |2001|White           |43           |
|1    |2001|Blue            |33           |
|0    |2003|\N              |398          |
|0    |2003|Black           |91           |
|0    |2003|Yellow          |53           |
|0    |2003|Multi           |47           |
|0    |2003|Blue            |38           |
|0    |2003|Silver          |28           |
|0    |2003|White           |8            |
|0    |2003|Red             |1  

#### 6.4.4: Create the Final Selection / Write to Parquet

In [52]:
df_gold_memory.write.mode("overwrite").parquet(fact_sales_output_gold)

df_gold_verify = spark.read.parquet(fact_sales_output_gold)
df_gold_verify.orderBy("Year", "Month", col("Product_Count").desc()).show(100, truncate=False)

+-----+----+----------------+-------------+
|Month|Year|Product_Category|Product_Count|
+-----+----+----------------+-------------+
|0    |2001|Red             |463          |
|0    |2001|Black           |370          |
|0    |2001|Silver          |209          |
|0    |2001|Multi           |176          |
|0    |2001|White           |69           |
|0    |2001|Blue            |34           |
|1    |2001|Red             |360          |
|1    |2001|Black           |236          |
|1    |2001|Multi           |124          |
|1    |2001|Silver          |50           |
|1    |2001|White           |43           |
|1    |2001|Blue            |33           |
|0    |2003|\N              |398          |
|0    |2003|Black           |91           |
|0    |2003|Yellow          |53           |
|0    |2003|Multi           |47           |
|0    |2003|Blue            |38           |
|0    |2003|Silver          |28           |
|0    |2003|White           |8            |
|0    |2003|Red             |1  

#### 6.4.5: Load into Table and Display

In [53]:
dest_database = "estella_dw"

try:
    df_gold_verify.write.mode("overwrite").saveAsTable(f"{dest_database}.fact_products_sold_by_month")
    result = spark.sql(f"SELECT * FROM {dest_database}.fact_products_sold_by_month ORDER BY Year, Month, Product_Count DESC")
    result.show(50, truncate=False)
except:
    pass


+-----+----+----------------+-------------+
|Month|Year|Product_Category|Product_Count|
+-----+----+----------------+-------------+
|0    |2001|Red             |463          |
|0    |2001|Black           |370          |
|0    |2001|Silver          |209          |
|0    |2001|Multi           |176          |
|0    |2001|White           |69           |
|0    |2001|Blue            |34           |
|1    |2001|Red             |360          |
|1    |2001|Black           |236          |
|1    |2001|Multi           |124          |
|1    |2001|Silver          |50           |
|1    |2001|White           |43           |
|1    |2001|Blue            |33           |
|0    |2003|\N              |398          |
|0    |2003|Black           |91           |
|0    |2003|Yellow          |53           |
|0    |2003|Multi           |47           |
|0    |2003|Blue            |38           |
|0    |2003|Silver          |28           |
|0    |2003|White           |8            |
|0    |2003|Red             |1  

### 7 Stop the Spark Session

In [None]:
spark.stop()