In [1]:
# Setup from Lab 4
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

# Setup from Lab 6
import findspark
findspark.init()
print(findspark.find())
import pyspark
print(pyspark.__version__)
import pymysql
from delta import *

import sys
import time
import shutil
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window as W

C:\Users\jocel\spark\spark-4.0.1-bin-hadoop3\
4.0.1


In [2]:
# --------------------------------------------------------------------------------
# Specify MySQL Server Connection Information
# --------------------------------------------------------------------------------
# mysql_args = {
#     "host_name" : "localhost",
#     "port" : "3306",
#     "db_name" : "northwind_dw2",
#     "conn_props" : {
#         "user" : "root",
#         "password" : "Zhu0z!m!m@37*",
#         "driver" : "com.mysql.cj.jdbc.Driver"
#     }
# }
mysql_args = {
    "uid" : "root",
    "pwd" : "Zhu0z!m!m%4037*",
    "hostname" : "localhost",
    "port" : "3306",
    "dbname" : "adventureworks",
    "driver" : "com.mysql.cj.jdbc.Driver"
}
# --------------------------------------------------------------------------------
# Specify MongoDB Cluster Connection Information
# --------------------------------------------------------------------------------
mongodb_args = {
    "cluster_location" : "atlas",
    "user_name" : "jocelynjiang1",
    "password" : "Zhu0z!m!m%4037*",
    "cluster_name" : "cluster0",
    "cluster_subnet" : "cqza4jd",
    "db_name" : "adventureworks",
    "collection" : "",
    "null_column_threshold" : 0.5
}


In [3]:
def get_file_info(path: str):
    file_sizes = []
    modification_times = []

    '''Fetch each item in the directory, and filter out any directories.'''
    items = os.listdir(path)
    files = sorted([item for item in items if os.path.isfile(os.path.join(path, item))])

    '''Populate lists with the Size and Last Modification DateTime for each file in the directory.'''
    for file in files:
        file_sizes.append(os.path.getsize(os.path.join(path, file)))
        modification_times.append(pd.to_datetime(os.path.getmtime(os.path.join(path, file)), unit='s'))

    data = list(zip(files, file_sizes, modification_times))
    column_names = ['name','size','modification_time']
    
    return pd.DataFrame(data=data, columns=column_names)


def wait_until_stream_is_ready(query, min_batches=1):
    while len(query.recentProgress) < min_batches:
        time.sleep(5)
        
    print(f"The stream has processed {len(query.recentProgress)} batchs")


def remove_directory_tree(path: str):
    '''If it exists, remove the entire contents of a directory structure at a given 'path' parameter's location.'''
    try:
        if os.path.exists(path):
            shutil.rmtree(path)
            return f"Directory '{path}' has been removed successfully."
        else:
            return f"Directory '{path}' does not exist."
            
    except Exception as e:
        return f"An error occurred: {e}"
        

def drop_null_columns(df, threshold):
    '''Drop Columns having a percentage of NULL values that exceeds the given 'threshold' parameter value.'''
    columns_with_nulls = [col for col in df.columns if df.filter(df[col].isNull()).count() / df.count() > threshold] 
    df_dropped = df.drop(*columns_with_nulls) 
    
    return df_dropped
    
    
def get_mysql_dataframe(spark_session, sql_query : str, **args):
    '''Create a JDBC URL to the MySQL Database'''
    jdbc_url = f"jdbc:mysql://{args['hostname']}:{args['port']}/{args['dbname']}"
    
    '''Invoke the spark.read.format("jdbc") function to query the database, and fill a DataFrame.'''
    dframe = spark_session.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("driver", args['driver']) \
    .option("user", args['uid']) \
    .option("password", args['pwd']) \
    .option("query", sql_query) \
    .load()
    
    return dframe
    

def get_mongo_uri(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the 'cluster_location' parameter.")
        
    if args['cluster_location'] == "atlas":
        uri = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        uri += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net/"
    else:
        uri = "mongodb://localhost:27017/"

    return uri


def get_spark_conf_args(spark_jars : list, **args):
    jars = ""
    for jar in spark_jars:
        jars += f"{jar}, "
    
    sparkConf_args = {
        "app_name" : "PySpark Northwind Data Lakehouse (Medallion Architecture)",
        "worker_threads" : f"local[{int(os.cpu_count()/2)}]",
        "shuffle_partitions" : int(os.cpu_count()),
        "mongo_uri" : get_mongo_uri(**args),
        "spark_jars" : jars[0:-2],
        "database_dir" : sql_warehouse_dir
    }
    
    return sparkConf_args
    

def get_spark_conf(**args):
    sparkConf = SparkConf().setAppName(args['app_name'])\
    .setMaster(args['worker_threads']) \
    .set('spark.driver.memory', '4g') \
    .set('spark.executor.memory', '2g') \
    .set('spark.jars', args['spark_jars']) \
    .set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.13:10.4.1') \
    .set('spark.mongodb.input.uri', args['mongo_uri']) \
    .set('spark.mongodb.output.uri', args['mongo_uri']) \
    .set('spark.sql.adaptive.enabled', 'false') \
    .set('spark.sql.debug.maxToStringFields', 35) \
    .set('spark.sql.shuffle.partitions', args['shuffle_partitions']) \
    .set('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .set('spark.sql.streaming.schemaInference', 'true') \
    .set('spark.sql.warehouse.dir', args['database_dir']) \
    .set('spark.streaming.stopGracefullyOnShutdown', 'true')
    
    return sparkConf

    
def set_mongo_collections(mongo_client, db_name : str, data_directory : str, json_files : list):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()
    

def get_mongodb_dataframe(spark_session, **args):
    '''Query MongoDB, and create a DataFrame'''
    # dframe = spark_session.read.format("com.mongodb.spark.sql.DefaultSource") \
    mongo_uri = get_mongo_uri(**mongodb_args)
    dframe = spark_session.read.format("mongodb") \
        .option("connection.uri", mongo_uri) \
        .option("database", args['db_name']) \
        .option("collection", args['collection']) \
            .load()

    '''Drop the '_id' index column to clean up the response.'''
    dframe = dframe.drop('_id')
    
    '''Call the drop_null_columns() function passing in the dataframe.'''
    dframe = drop_null_columns(dframe, args['null_column_threshold'])
    
    return dframe


def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe

In [4]:
# Date dimension: ran provided script in SSMS, exported result as csv, going to read into pd df

# SOURCE 1 -- local file system
df_dim_date = pd.read_csv('./data/dim_date.csv')
dataframe = df_dim_date
table_name = 'dim_date'
primary_key = 'DateKey'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

df_dim_date.head(2)

Unnamed: 0,DateKey,Date,Day,DaySuffix,Weekday,WeekDayName,WeekDayName_Short,WeekDayName_FirstLetter,DOWInMonth,DayOfYear,...,LastDateofQuater,FirstDateofMonth,LastDateofMonth,FirstDateofWeek,LastDateofWeek,CurrentYear,CurrentQuater,CurrentMonth,CurrentWeek,CurrentDay
0,20000101,2000-01-01,1,st,7,Saturday,SAT,S,1,1,...,2025-12-31,2000-01-01,2000-01-31,1999-12-26,2000-01-01,-25,-103,-309,-1347,-9427
1,20000102,2000-01-02,2,nd,1,Sunday,SUN,S,2,2,...,2025-12-31,2000-01-01,2000-01-31,2000-01-02,2000-01-08,-25,-103,-309,-1346,-9426


In [5]:
client = get_mongo_client(**mongodb_args)
data_dir = os.path.join(os.getcwd(), 'data')
# results of dim_employee_vw from AdventureWorks_Queries saved to json; save json to mongo
# can query other data on salesperson and salesorderdetails with mysql
json_files = {"employee_details" : 'employee_details_view.json',
             }

# SOURCE 2 -- mongoDB (this is the setting step; usage step is later)
set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)

In [6]:
# Insight 1 that I want to observe: average sales last year for a sales representative given their age

# ***Assumption for this project: SalespersonID corresponds with EmployeeID. 
#   I assume this because all SalespersonIDs, when treated as EmployeeIDs, correspond with 
#   an Employee Title that is a sales-related role

# Step 1, going to insert age of salesperson (at time of sale) column into salesorderheader (fact table)
# by subtracting employee BirthDate and sales OrderDate

# SOURCE 3 -- RDBMS (MySQL)
sql_dim_employee = "SELECT EmployeeID, Title, BirthDate FROM employee;"
df_dim_employee = get_sql_dataframe(sql_dim_employee, **mysql_args)

sql_fact_salesorder = "SELECT * FROM salesorderheader;"
df_fact_salesorder = get_sql_dataframe(sql_fact_salesorder, **mysql_args)
df_fact_salesorder.insert(0, "fact_salesorder_key", range(1, df_fact_salesorder.shape[0]+1))
df_fact_salesorder.drop(columns=['RevisionNumber','Status','OnlineOrderFlag','SalesOrderNumber','PurchaseOrderNumber','AccountNumber','rowguid'], inplace=True)

df_fact_salesorder = pd.merge(df_fact_salesorder,df_dim_employee, left_on="SalesPersonID", right_on="EmployeeID",how="inner")
df_fact_salesorder.insert(df_fact_salesorder.shape[1], "SalesPersonAge", 
                          ((df_fact_salesorder["OrderDate"]-df_fact_salesorder["BirthDate"]).dt.days/365).round()) # looked up how to treat timestamp difference as days, round age to whole num

df_fact_salesorder.drop(columns=['EmployeeID','BirthDate'],inplace=True)


# Step 2, add SalesLastYear data for each order's salesperson (includes repeats if multiple sales by one person)
sql_dim_salesperson = "SELECT SalesPersonID, SalesLastYear FROM salesperson;"
df_dim_salesperson = get_sql_dataframe(sql_dim_salesperson, **mysql_args)

df_fact_salesorder = pd.merge(df_fact_salesorder, df_dim_salesperson, on="SalesPersonID", how="left")

# finally, modify dates to use datekey from date dim
# first, update dim_date to keep only date key and date cols:
sql_dim_date = "SELECT DateKey, Date FROM dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)

df_fact_salesorder.OrderDate = df_fact_salesorder.OrderDate.astype('datetime64[ns]').dt.date
df_fact_salesorder.DueDate = df_fact_salesorder.DueDate.astype('datetime64[ns]').dt.date
df_fact_salesorder.ShipDate = df_fact_salesorder.ShipDate.astype('datetime64[ns]').dt.date
df_dim_date.Date = df_dim_date.Date.astype('datetime64[ns]').dt.date

#
df_fact_salesorder = pd.merge(df_fact_salesorder, df_dim_date, left_on="OrderDate",right_on="Date",how="inner")
df_fact_salesorder = df_fact_salesorder.rename(columns={'DateKey':'OrderDateKey'})
df_fact_salesorder.drop(columns=['OrderDate','Date'],inplace=True)

df_fact_salesorder = pd.merge(df_fact_salesorder, df_dim_date, left_on="DueDate",right_on="Date",how="inner")
df_fact_salesorder = df_fact_salesorder.rename(columns={'DateKey':'DueDateKey'})
df_fact_salesorder.drop(columns=['DueDate','Date'],inplace=True)

df_fact_salesorder = pd.merge(df_fact_salesorder, df_dim_date, left_on="ShipDate",right_on="Date",how="inner")
df_fact_salesorder = df_fact_salesorder.rename(columns={'DateKey':'ShipDateKey'})
df_fact_salesorder.drop(columns=['ShipDate','Date'],inplace=True)

# put the fact table into MySQL
dataframe = df_fact_salesorder
table_name = 'fact_salesorder'
primary_key = 'fact_salesorder_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

df_fact_salesorder.head(2)

Unnamed: 0,fact_salesorder_key,SalesOrderID,CustomerID,ContactID,SalesPersonID,TerritoryID,BillToAddressID,ShipToAddressID,ShipMethodID,CreditCardID,...,Freight,TotalDue,Comment,ModifiedDate,Title,SalesPersonAge,SalesLastYear,OrderDateKey,DueDateKey,ShipDateKey
0,1,43659,676,378,279.0,5,985,985,5,16281.0,...,616.0984,27231.5495,,2001-07-08,Sales Representative,37.0,1849641.0,20010701,20010713,20010708
1,2,43660,117,216,279.0,5,921,921,5,5618.0,...,38.8276,1716.1794,,2001-07-08,Sales Representative,37.0,1849641.0,20010701,20010713,20010708


In [7]:
# Insight #1, continued:

# Now query to analyze sales last year given sales person age
# ***(Possible that same salesperson could be represented in two diff age groups
#   if salesperson had a birthday, but keeping salesperson repeats 
#   still lets us see how sales success correlated with age)

sql_age_sales_results = """
    SELECT 
        SalesPersonAge, 
        AVG(SalesLastYear) AS avg_sales_last_year
    FROM fact_salesorder
    GROUP BY SalesPersonAge
    ORDER BY SalesPersonAge
"""

df_age_sales = get_sql_dataframe(sql_age_sales_results, **mysql_args)
print(df_age_sales)

""" Results analysis:
    Values of zero in avg_sales_last_year column likely mean there were salespeople with that age,
    but they did not create any sales in the date range of this database.

    Generally, the average sales last year increased as age of the salesperson increased.
"""

    SalesPersonAge  avg_sales_last_year
0             31.0         1.439156e+06
1             32.0         1.439156e+06
2             33.0         1.439156e+06
3             34.0         1.439156e+06
4             35.0         0.000000e+00
5             36.0         1.472979e+05
6             37.0         1.919542e+06
7             38.0         1.785199e+06
8             39.0         1.630044e+06
9             40.0         1.699203e+06
10            42.0         2.177950e+06
11            43.0         1.967816e+06
12            44.0         1.806209e+06
13            45.0         1.769684e+06
14            46.0         1.800747e+06
15            48.0         2.038235e+06
16            49.0         2.014531e+06
17            50.0         2.014251e+06
18            51.0         1.997186e+06
19            52.0         1.997186e+06
20            55.0         0.000000e+00
21            56.0         0.000000e+00
22            57.0         0.000000e+00
23            60.0         0.000000e+00


' Results analysis:\n    Values of zero in avg_sales_last_year column likely mean there were salespeople with that age,\n    but they did not create any sales in the date range of this database.\n\n    Generally, the average sales last year increased as age of the salesperson increased.\n'

In [8]:
# Insight 2: How popularity of products changes with their price
sql_fact_productcosthist = "SELECT * FROM productcosthistory;"
df_fact_productcosthist = get_sql_dataframe(sql_fact_productcosthist, **mysql_args)
df_fact_productcosthist.insert(0, "fact_productcosthist_key", range(1, df_fact_productcosthist.shape[0]+1))
df_fact_productcosthist.drop(columns=['ModifiedDate'],inplace=True)

# convert dates to universal keys
df_fact_productcosthist.StartDate = df_fact_productcosthist.StartDate.astype('datetime64[ns]').dt.date
df_fact_productcosthist.EndDate = df_fact_productcosthist.EndDate.astype('datetime64[ns]').dt.date

df_fact_productcosthist = pd.merge(df_fact_productcosthist, df_dim_date, left_on="StartDate", right_on="Date", how="inner")
df_fact_productcosthist = df_fact_productcosthist.rename(columns={"DateKey":"StartDateKey"})
df_fact_productcosthist.drop(columns=['StartDate','Date'],inplace=True)

df_fact_productcosthist = pd.merge(df_fact_productcosthist, df_dim_date, left_on="EndDate", right_on="Date", how="inner")
df_fact_productcosthist = df_fact_productcosthist.rename(columns={"DateKey":"EndDateKey"})
df_fact_productcosthist.drop(columns=['EndDate','Date'],inplace=True)


# add product name to fact table for reference
sql_dim_product = "SELECT ProductID, Name FROM product;"
df_dim_product = get_sql_dataframe(sql_dim_product, **mysql_args)
df_fact_productcosthist = pd.merge(df_fact_productcosthist, df_dim_product, on='ProductID', how='inner')
df_fact_productcosthist.head(2)

# put costhist fact table in MySQL for analytics
dataframe = df_fact_productcosthist
table_name = 'fact_productcosthist'
primary_key = 'fact_productcosthist_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

# add in transaction date info from transaction history
sql_dim_transachist = "SELECT ProductID, TransactionDate, Quantity FROM transactionhistory;"
df_dim_transachist = get_sql_dataframe(sql_dim_transachist, **mysql_args)
df_dim_transachist.insert(0, "dim_transacthist_key", range(1, df_dim_transachist.shape[0]+1))
df_dim_transachist.TransactionDate = df_dim_transachist.TransactionDate.astype('datetime64[ns]').dt.date

df_dim_transachist = pd.merge(df_dim_transachist, df_dim_date, left_on="TransactionDate", right_on="Date", how="inner")
df_dim_transachist = df_dim_transachist.rename(columns={"DateKey":"TransactionDateKey"})
df_dim_transachist.drop(columns=['TransactionDate','Date'],inplace=True)

# put transachist dim table in MySQL for analytics
dataframe = df_dim_transachist
table_name = 'dim_transachist'
primary_key = 'dim_transacthist_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [9]:
# Insight 2, continued:
sql_price_changes = """
    SELECT
        t.*,
        p.StartDateKey,
        p.EndDateKey
    FROM fact_productcosthist p
    LEFT JOIN dim_transachist t
        ON p.ProductID = t.ProductID
    WHERE
        ((TransactionDateKey >= StartDateKey) AND (TransactionDateKey <= EndDateKey));
"""
df_price_changes = get_sql_dataframe(sql_price_changes, **mysql_args)
print(df_price_changes.head(5))
print(df_price_changes.shape)

""" 
    Results analysis:
    Dataframe df_price_changes was meant to show the transactions that occurred
    where the transaction took place during one of the time periods where the product price changed.
    The dataframe appeared empty, and after checking its shape, there are in fact
    no records where a product in the transaction history was purchased at a former price from productcosthistory.

    I then noticed that all of the TransactionDateKeys were values later than the EndDateKeys, meaning
    none of the transactions in transactionhistory took place in the periods where the product price was changing.

    As a result, I would not be able to sum the quantities purchased of each product as the price changed as planned
    (but wanted to demonstrate my work above). I instead shifted to investigate quantities purchased of each product on each transaction date.
"""

sql_product_qtys = """
    SELECT
        t.TransactionDateKey,
        p.ProductID,
        p.Name AS ProductName,
        SUM(t.Quantity) AS TotalProductQty
    FROM fact_productcosthist p
    JOIN dim_transachist t
        ON p.ProductID = t.ProductID
    GROUP BY
        p.ProductID,
        p.Name,
        t.TransactionDateKey;
"""
df_product_qtys = get_sql_dataframe(sql_product_qtys, **mysql_args)
df_product_qtys.head(5)

""" 
    Results analysis:
    Looking at more records (than 5) of df_product_qtys would give insight into
    which products are growing in popularity (i.e. higher quantities purchased) over time.
"""

Empty DataFrame
Columns: [dim_transacthist_key, ProductID, Quantity, TransactionDateKey, StartDateKey, EndDateKey]
Index: []
(0, 6)


' \n    Results analysis:\n    Looking at more records (than 5) of df_product_qtys would give insight into\n    which products are growing in popularity (i.e. higher quantities purchased) over time.\n'

## Additions for Final Project

In [None]:
""" Final/capstone: analyze how managers' employees perform as orders come in
- Stream order data using fact_salesorder, persisted to MySQL in midterm's half of the notebook
    - Achieve the 'real-time' data by querying for all fact_salesorder results, saving to 3 jsons (called 'orders_01.json', for example) in a streaming/
- Join with table of managers' names
- Join with the table of employee details view
- Join with salesperson table
- Now have info on: for each incoming sales order, which manager that salesperson works under
    - Final, organized output: show managers by name, the total sales last year of their salespeople who are generating new sales orders,
    and calculation of how much return on investment (in the form of salesperson bonus) that employee is generating for them.
"""

# --------------------------------------------------------------------------------
# Specify Directory Structure for Source Data
# --------------------------------------------------------------------------------
data_dir = os.path.join(os.getcwd(), 'data')
# data_dir = os.path.join(base_dir, 'retail-org')
batch_dir = os.path.join(data_dir, 'batch')
stream_dir = os.path.join(data_dir, 'streaming')

orders_stream_dir = os.path.join(stream_dir, 'orders')

# --------------------------------------------------------------------------------
# Create Directory Structure for Data Lakehouse Files
# --------------------------------------------------------------------------------
dest_database = "retail_dlh"
sql_warehouse_dir = os.path.abspath('spark-warehouse')
dest_database_dir = f"{dest_database}.db"
database_dir = os.path.join(sql_warehouse_dir, dest_database_dir)

orders_output_bronze = os.path.join(database_dir, 'fact_orders', 'bronze')
orders_output_silver = os.path.join(database_dir, 'fact_orders', 'silver')
orders_output_gold = os.path.join(database_dir, 'fact_orders', 'gold')

In [11]:
worker_threads = f"local[{int(os.cpu_count()/2)}]"
shuffle_partitions = int(os.cpu_count())

builder = pyspark.sql.SparkSession.builder \
    .appName('PySpark Final Project in Juptyer')\
    .master(worker_threads)\
    .config('spark.driver.memory', '4g') \
    .config('spark.executor.memory', '2g')\
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog') \
    .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension') \
    .config('spark.sql.adaptive.enabled', 'false') \
    .config('spark.sql.debug.maxToStringFields', 50) \
    .config('spark.sql.shuffle.partitions', shuffle_partitions) \
    .config('spark.sql.streaming.forceDeleteTempCheckpointLocation', 'true') \
    .config('spark.sql.streaming.schemaInference', 'true') \
    .config('spark.sql.warehouse.dir', database_dir) \
    .config('spark.streaming.stopGracefullyOnShutdown', 'true')

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark

In [12]:
# Creating the database
spark.sql(f"DROP DATABASE IF EXISTS {dest_database} CASCADE;")

sql_create_db = f"""
    CREATE DATABASE IF NOT EXISTS {dest_database}
    COMMENT 'DS-2002 Final Project Database'
    WITH DBPROPERTIES (contains_pii = true, purpose = 'DS-2002 Final');
"""
spark.sql(sql_create_db)

DataFrame[]

In [13]:
# Getting dimensions
# 1: Getting batch/static data (**local folder**)
# Table of managers achieved by running the following in MySQL Workbench, 
# exporting and saving result as json in /data/batch/
"""
    USE adventureworks;

    SELECT EmployeeID, Title FROM employee WHERE Title LIKE "%Manager%";
"""
get_file_info(batch_dir)

manager_csv = os.path.join(batch_dir,'managers.csv')
df_dim_managers = spark.read.format('csv').options(header='true',inferSchema='true').load(manager_csv)

# 2: Cleaning up dataframe
# Lowercase all columns
df_dim_managers = df_dim_managers.withColumnsRenamed({'EmployeeID':'employee_id','Title':'title'})

# add a PK
df_dim_managers.createOrReplaceTempView("managers")
sql_managers = """
                    SELECT 
                        *, ROW_NUMBER() OVER (ORDER BY employee_id) AS manager_key 
                    FROM 
                        managers;
                """
df_dim_managers = spark.sql(sql_managers)

# reorder columns
reordered_cols = df_dim_managers.columns
reordered_cols.remove('manager_key')
reordered_cols.insert(0,'manager_key')
df_dim_managers = df_dim_managers[reordered_cols]

df_dim_managers.toPandas().head(3)

Unnamed: 0,manager_key,employee_id,title
0,1,3,Engineering Manager
1,2,6,Marketing Manager
2,3,21,Production Control Manager


In [14]:
# 3: Save manager data to the DLH
df_dim_managers.write.saveAsTable(f'{dest_database}.dim_managers',mode='overwrite')
spark.sql(f"DESCRIBE EXTENDED {dest_database}.dim_managers;").show()
spark.sql(f"SELECT * FROM {dest_database}.dim_managers LIMIT 2;").toPandas()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|         manager_key|                 int|   NULL|
|         employee_id|                 int|   NULL|
|               title|              string|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|             Catalog|       spark_catalog|       |
|            Database|          retail_dlh|       |
|               Table|        dim_managers|       |
|        Created Time|Wed Dec 03 04:41:...|       |
|         Last Access|             UNKNOWN|       |
|          Created By|         Spark 4.0.1|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|            Location|file:/c:/Users/jo...|       |
+--------------------+--------------------+-------+



Unnamed: 0,manager_key,employee_id,title
0,1,3,Engineering Manager
1,2,6,Marketing Manager


In [15]:
# Second dimension
# Get employee details data from **MongoDB**

query = {}
mongodb_args['collection'] = 'employee_details'
client = get_mongo_client(**mongodb_args)
df_dim_employee_detail = get_mongo_dataframe(client, mongodb_args['db_name'],'employee_details',query)
# df_dim_employee_detail = get_mongodb_dataframe(spark, **mongodb_args)

df_dim_employee_detail.head(3)

Unnamed: 0,EmployeeID,NationalIDNumber,LoginID,ManagerID,FirstName,MiddleName,LastName,Title,EmailAddress,EmailPromotion,Phone,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag
0,1,14417807,adventure-works\guy1,16.0,Guy,R,Gilbert,Production Technician - WC60,guy1@adventure-works.com,0,320-555-0195,1972-05-15 00:00:00,M,M,1996-07-31 00:00:00,0,21,30,1
1,2,253022876,adventure-works\kevin0,6.0,Kevin,F,Brown,Marketing Assistant,kevin0@adventure-works.com,2,150-555-0189,1977-06-03 00:00:00,S,M,1997-02-26 00:00:00,0,42,41,1
2,3,509647174,adventure-works\roberto0,12.0,Roberto,,Tamburello,Engineering Manager,roberto0@adventure-works.com,0,212-555-0187,1964-12-13 00:00:00,M,M,1997-12-12 00:00:00,1,2,21,1


In [16]:
# Third dimension
# Salesperson dimension table, retrieved from local file system (including more data than used in midterm)
salesperson_csv_path = os.path.join(batch_dir, 'salesperson.csv')
df_dim_salesperson = spark.read.format('csv').options(inferSchema='true',header='true').load(salesperson_csv_path)

df_dim_salesperson.toPandas().head(3)


Unnamed: 0,SalesPersonID,TerritoryID,SalesQuota,Bonus,CommissionPct,SalesYTD,SalesLastYear,rowguid,ModifiedDate
0,268,,,0,0.0,677558.5,0.0,...,2001-01-28
1,275,2.0,300000.0,4100,0.012,4557045.0,1750406.0,...,2001-06-24
2,276,4.0,250000.0,2000,0.015,5200475.0,1439156.0,...,2001-06-24


### Bronze Layer

In [17]:
# orders fact table
get_file_info(stream_dir)

Unnamed: 0,name,size,modification_time


In [18]:
df_orders_bronze = (
    spark.readStream \
    .option("schemaLocation",orders_output_bronze) \
    .option("maxFilesPerTrigger",1) \
    .option("multiLine","true") \
    .json(orders_stream_dir)
)

df_orders_bronze.isStreaming

True

In [19]:
# Writing real-time order data to parquet file
orders_checkpoint_bronze = os.path.join(orders_output_bronze, '_checkpoint')

orders_bronze_query = (
    df_orders_bronze \
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("orders_bronze")
    .trigger(availableNow = True) \
    .option("checkpointLocation", orders_checkpoint_bronze) \
    .option("compression", "snappy") \
    .start(orders_output_bronze)
)

print(f"Query ID: {orders_bronze_query.id}")
print(f"Query Name: {orders_bronze_query.name}")
print(f"Query Status: {orders_bronze_query.status}")

Query ID: c4c6dee1-711b-421e-ae32-8eab078146f8
Query Name: orders_bronze
Query Status: {'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [20]:
orders_bronze_query.awaitTermination()

### Silver Layer

In [21]:
# Joining streaming and batch data
df_orders_silver = spark.readStream.format("parquet").load(orders_output_bronze) \
    .join(df_dim_salesperson,'SalesPersonID') \
    .select(
        col('fact_salesorder_key'),
        col('SalesOrderID'),
        col('SalesPersonID'),
        df_dim_salesperson.Bonus,
        df_dim_salesperson.SalesLastYear,
        col('OrderDateKey'),
        col('ShipDateKey')
    )

In [22]:
df_orders_silver.isStreaming

True

In [23]:
df_orders_silver.printSchema()

root
 |-- fact_salesorder_key: long (nullable = true)
 |-- SalesOrderID: long (nullable = true)
 |-- SalesPersonID: long (nullable = true)
 |-- Bonus: integer (nullable = true)
 |-- SalesLastYear: double (nullable = true)
 |-- OrderDateKey: long (nullable = true)
 |-- ShipDateKey: long (nullable = true)



In [24]:
# Writing joined sales order and salesperson data into DLH
orders_checkpoint_silver = os.path.join(orders_output_silver, '_checkpoint')

orders_silver_query = (
    df_orders_silver.writeStream \
    .format("parquet") \
    .outputMode("append") \
    .queryName("orders_silver")
    .trigger(availableNow = True) \
    .option("checkpointLocation", orders_checkpoint_silver) \
    .option("compression", "snappy") \
    .start(orders_output_silver)
)

In [25]:
orders_silver_query.awaitTermination()

### Gold Layer

In [26]:
# Writing to csv, then read from it to make dim_employee into spark df (for joining with stream data)
dim_employee_csv_path = os.path.join(batch_dir,'employee_detail.csv')
df_dim_employee_detail.to_csv(dim_employee_csv_path)
df_dim_employee_detail = spark.read.format('csv').options(inferSchema="true",header="true").load(dim_employee_csv_path)

df_ret_on_employee_gold = spark.readStream.format('parquet').load(orders_output_silver) \
    .join(df_dim_employee_detail, df_dim_employee_detail.EmployeeID==col('SalesPersonID')) \
    .join(df_dim_managers, df_dim_employee_detail.ManagerID==df_dim_managers.employee_id, 'left_outer') \
    .groupBy(df_dim_employee_detail.ManagerID, df_dim_employee_detail.FirstName, df_dim_employee_detail.LastName) \
    .agg(sum(col('SalesLastYear')).alias('return_on_their_employees')) \
    .select(
        df_dim_employee_detail.ManagerID,
        col('FirstName').alias('manager_first_nm'),
        col('LastName').alias('manager_last_nm'),
        col('return_on_their_employees')
    )

In [27]:
df_ret_on_employee_gold.printSchema()

root
 |-- ManagerID: double (nullable = true)
 |-- manager_first_nm: string (nullable = true)
 |-- manager_last_nm: string (nullable = true)
 |-- return_on_their_employees: double (nullable = true)



In [28]:
orders_gold_query = (
    df_ret_on_employee_gold.writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("fact_ret_on_employee")
    .start()
)

In [29]:
wait_until_stream_is_ready(orders_gold_query, 1)

The stream has processed 1 batchs


In [30]:
df_fact_ret_on_employee = spark.sql("SELECT * FROM fact_ret_on_employee")
df_fact_ret_on_employee.printSchema()

root
 |-- ManagerID: double (nullable = true)
 |-- manager_first_nm: string (nullable = true)
 |-- manager_last_nm: string (nullable = true)
 |-- return_on_their_employees: double (nullable = true)



In [32]:
df_fact_ret_on_employee_gold_final = df_fact_ret_on_employee \
.select(col("ManagerID").alias('manager_id'), \
        col("manager_first_nm"), \
        col("manager_last_nm"), \
        col("return_on_their_employees")) \
.orderBy(desc("return_on_their_employees"))

In [33]:
df_fact_ret_on_employee_gold_final.write.saveAsTable(f"{dest_database}.fact_ret_on_employee", mode="overwrite")
spark.sql(f"SELECT * FROM {dest_database}.fact_ret_on_employee").toPandas()

Unnamed: 0,manager_id,manager_first_nm,manager_last_nm,return_on_their_employees
0,273.0,Stephen,Jiang,0.0
1,273.0,Syed,Abbas,0.0
2,268.0,Tete,Mensa-Annan,0.0
3,273.0,Amy,Alberts,0.0
4,284.0,Ranjit,Varkey Chudukatil,289981300.0
5,268.0,Pamela,Ansman-Wolfe,163800000.0
6,268.0,Jillian,Carson,770913900.0
7,268.0,Tsvi,Reiter,660321800.0
8,268.0,Michael,Blythe,654652000.0
9,268.0,Garrett,Vargas,302991800.0


In [34]:
spark.stop()