# Tamera Fang (ven7sg) - DS2002 Final Capstone Project 
### This project is mainly adapted from our lab 6 assignment fit for the chinook database  

#### For reference:
##### - SQL Files: chinook_dim_date.sql, chinook_merge_tables.sql, create_chinook.sql
##### - MongoDB: chinook_customer.json
##### - Local: chinook_track_genre.csv
##### - Stream: invoices_1.json, invoices_2.json, invoices_3.json

# 1.0. Import Required Libraries

In [None]:
import os
import json
import pymongo
import pyspark.pandas as pd # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

# 2.0. Instantiate Global Variables

In [None]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ven7sg-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "chinook"

connection_properties = {
  "user" : "ven7sg",
  "password" : "Passw0rd123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "Cluster0.dkc1g"
atlas_database_name = "chinook"
atlas_user_name = "ven7sg"
atlas_password = "Passw0rd123"

# Data Files (JSON) Information ###############################
dst_database = "chinook_dlh"

base_dir = "dbfs:/FileStore/capstone_project"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/data"
batch_dir = f"{data_dir}/batch"
orders_stream_dir = f"{data_dir}/stream"

orders_output_bronze = f"{database_dir}/fact_orders/bronze"
orders_output_silver = f"{database_dir}/fact_orders/silver"
orders_output_gold   = f"{database_dir}/fact_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

True

# 3.0. Define Global Functions

In [None]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe


# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure MySQL Database
##### 1.1. Create a New Databricks Metadata Database.

In [None]:
%sql
DROP DATABASE IF EXISTS chinook_dlh CASCADE;

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS chinook_dlh
COMMENT "Capstone Project Database"
LOCATION "dbfs:/FileStore/capstone_project/chinook_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "Capstone Project");

##### 1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ven7sg-mysql.mysql.database.azure.com:3306/chinook", 
  dbtable "dim_date",
  user "ven7sg",
  password "Passw0rd123"
);

In [None]:
%sql
USE DATABASE chinook_dlh;

CREATE OR REPLACE TABLE chinook_dlh.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/capstone_project/chinook_dlh/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [None]:
%sql
SELECT * FROM chinook_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20200101,2020-01-01,2020/01/01,01/01/2020,01/01/2020,4,Wednesday,1,1,Weekday,1,January,1,N,1,2020,2020-01,2020Q1,7,3,2020,2020-07,2020Q3
20200102,2020-01-02,2020/01/02,01/02/2020,02/01/2020,5,Thursday,2,2,Weekday,1,January,1,N,1,2020,2020-01,2020Q1,7,3,2020,2020-07,2020Q3
20200103,2020-01-03,2020/01/03,01/03/2020,03/01/2020,6,Friday,3,3,Weekday,1,January,1,N,1,2020,2020-01,2020Q1,7,3,2020,2020-07,2020Q3
20200104,2020-01-04,2020/01/04,01/04/2020,04/01/2020,7,Saturday,4,4,Weekend,1,January,1,N,1,2020,2020-01,2020Q1,7,3,2020,2020-07,2020Q3
20200105,2020-01-05,2020/01/05,01/05/2020,05/01/2020,1,Sunday,5,5,Weekend,1,January,1,N,1,2020,2020-01,2020Q1,7,3,2020,2020-07,2020Q3


# Employee Data

In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_employee
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ven7sg-mysql.mysql.database.azure.com:3306/chinook",
  dbtable "employee",
  user "ven7sg",
  password "Passw0rd123"
)

In [None]:
%sql
USE DATABASE chinook_dlh;

CREATE OR REPLACE TABLE chinook_dlh.dim_employee
COMMENT "Employee dimension table"
LOCATION "dbfs:/FileStore/capstone_project/chinook_dlh/dim_employee"
AS SELECT * FROM view_employee

num_affected_rows,num_inserted_rows


In [None]:
%sql
SELECT * FROM chinook_dlh.dim_employee LIMIT 5

EmployeeId,LastName,FirstName,Title,ReportsTo,BirthDate,HireDate,Address,City,State,Country,PostalCode,Phone,Fax,Email
1,Adams,Andrew,General Manager,,1962-02-18T00:00:00Z,2002-08-14T00:00:00Z,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com
2,Edwards,Nancy,Sales Manager,1.0,1958-12-08T00:00:00Z,2002-05-01T00:00:00Z,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com
3,Peacock,Jane,Sales Support Agent,2.0,1973-08-29T00:00:00Z,2002-04-01T00:00:00Z,1111 6 Ave SW,Calgary,AB,Canada,T2P 5M5,+1 (403) 262-3443,+1 (403) 262-6712,jane@chinookcorp.com
4,Park,Margaret,Sales Support Agent,2.0,1947-09-19T00:00:00Z,2003-05-03T00:00:00Z,683 10 Street SW,Calgary,AB,Canada,T2P 5G3,+1 (403) 263-4423,+1 (403) 263-4289,margaret@chinookcorp.com
5,Johnson,Steve,Sales Support Agent,2.0,1965-03-03T00:00:00Z,2003-10-17T00:00:00Z,7727B 41 Ave,Calgary,AB,Canada,T3B 1Y7,1 (780) 836-9987,1 (780) 836-9543,steve@chinookcorp.com


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database

In [None]:
source_dir = '/dbfs/FileStore/capstone_project/data'
json_files = {"customers" : 'chinook_customer.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

<pymongo.results.InsertManyResult at 0x7f52b44927c0>

In [None]:
%scala
import com.mongodb.spark._

val userName = "ven7sg"
val pwd = "Passw0rd123"
val clusterName = "Cluster0.dkc1g"
val atlas_uri = "mongodb+srv://$userName:$pwd@$clusterName.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"

In [None]:
%scala

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", atlas_uri)
.option("database", "chinook")
.option("collection", "customers").load()
.select("CustomerId","FirstName","LastName","Country","Phone","SupportRepId")


In [None]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("chinook_dlh.dim_customer")

In [None]:
%sql
SELECT * FROM chinook_dlh.dim_customer LIMIT 5

CustomerId,FirstName,LastName,Country,Phone,SupportRepId
1,Luís,Gonçalves,Brazil,+55 (12) 3923-5555,3
2,Leonie,Köhler,Germany,+49 0711 2842222,5
3,François,Tremblay,Canada,+1 (514) 721-4711,3
4,Bjørn,Hansen,Norway,+47 22 44 22 22,4
5,František,Wichterlová,Czech Republic,+420 2 4172 5555,4


#### Fetching track data from local file

In [None]:
track_csv = f"{data_dir}/chinook_track_genre.csv"

df_track = spark.read.format('csv').options(header='true', inferSchema='true').load(track_csv)

In [None]:
df_track.write.format("delta").mode("overwrite").saveAsTable("chinook_dlh.dim_track")

In [None]:
%sql
SELECT * FROM chinook_dlh.dim_track LIMIT 5;

TrackId,track_name,AlbumId,MediaTypeId,GenreId,Composer,Milliseconds,Bytes,UnitPrice,genre_name
1,For Those About To Rock (We Salute You),1,1,1,"Angus Young, Malcolm Young, Brian Johnson",343719,11170334,0.99,Rock
2,Balls to the Wall,2,2,1,"U. Dirkschneider, W. Hoffmann, H. Frank, P. Baltes, S. Kaufmann, G. Hoffmann",342562,5510424,0.99,Rock
3,Fast As a Shark,3,2,1,"F. Baltes, S. Kaufman, U. Dirkscneider & W. Hoffman",230619,3990994,0.99,Rock
4,Restless and Wild,3,2,1,"F. Baltes, R.A. Smith-Diesel, S. Kaufman, U. Dirkscneider & W. Hoffman",252051,4331779,0.99,Rock
5,Princess of the Dawn,3,2,1,Deaffy & R.A. Smith-Diesel,375418,6290521,0.99,Rock


#### Create fact table with dimension tables and invoice data

In [None]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaLocation", orders_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(orders_stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [None]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [None]:
%sql
SELECT * FROM orders_bronze_tempview 

BillingAddress,BillingCity,BillingCountry,BillingPostalCode,BillingState,CustomerId,InvoiceDate,InvoiceId,InvoiceLineId,Quantity,Total,TrackId,UnitPrice,_rescued_data,receipt_time,source_file
1498 rue Bélanger,Montréal,Canada,H2G 1A7,QC,3,20220421,110,601,1,13.86,157,0.99,,2024-05-09T16:30:51.436Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
1498 rue Bélanger,Montréal,Canada,H2G 1A7,QC,3,20220421,110,602,1,13.86,166,0.99,,2024-05-09T16:30:51.436Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
1498 rue Bélanger,Montréal,Canada,H2G 1A7,QC,3,20220421,110,603,1,13.86,175,0.99,,2024-05-09T16:30:51.436Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
1498 rue Bélanger,Montréal,Canada,H2G 1A7,QC,3,20220421,110,604,1,13.86,184,0.99,,2024-05-09T16:30:51.436Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
1498 rue Bélanger,Montréal,Canada,H2G 1A7,QC,3,20220421,110,605,1,13.86,193,0.99,,2024-05-09T16:30:51.436Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
1 Microsoft Way,Redmond,USA,98052-8300,WA,17,20220429,111,606,1,0.99,207,0.99,,2024-05-09T16:30:51.436Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
627 Broadway,New York,USA,10012-2612,NY,18,20220512,112,607,1,1.98,208,0.99,,2024-05-09T16:30:51.436Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
627 Broadway,New York,USA,10012-2612,NY,18,20220512,112,608,1,1.98,209,0.99,,2024-05-09T16:30:51.436Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
541 Del Medio Avenue,Mountain View,USA,94040-111,CA,20,20220512,113,609,1,1.98,211,0.99,,2024-05-09T16:30:51.436Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
541 Del Medio Avenue,Mountain View,USA,94040-111,CA,20,20220512,113,610,1,1.98,213,0.99,,2024-05-09T16:30:51.436Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json


In [None]:
# Creating bronze fact_orders table
(spark.table("orders_bronze_tempview")
.writeStream
.format("delta")
.option("checkpointLocation", f"{orders_output_bronze}/_checkpoint")
.outputMode("append")
.table("fact_orders_bronze"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f527282a9e0>

In [None]:
# Merging data in the silver fact_orders table
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [None]:
%sql
SELECT * FROM orders_silver_tempview

BillingAddress,BillingCity,BillingCountry,BillingPostalCode,BillingState,CustomerId,InvoiceDate,InvoiceId,InvoiceLineId,Quantity,Total,TrackId,UnitPrice,_rescued_data,receipt_time,source_file
1498 rue Bélanger,Montréal,Canada,H2G 1A7,QC,3,20220421,110,601,1,13.86,157,0.99,,2024-05-09T16:31:47.282Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
1498 rue Bélanger,Montréal,Canada,H2G 1A7,QC,3,20220421,110,602,1,13.86,166,0.99,,2024-05-09T16:31:47.282Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
1498 rue Bélanger,Montréal,Canada,H2G 1A7,QC,3,20220421,110,603,1,13.86,175,0.99,,2024-05-09T16:31:47.282Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
1498 rue Bélanger,Montréal,Canada,H2G 1A7,QC,3,20220421,110,604,1,13.86,184,0.99,,2024-05-09T16:31:47.282Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
1498 rue Bélanger,Montréal,Canada,H2G 1A7,QC,3,20220421,110,605,1,13.86,193,0.99,,2024-05-09T16:31:47.282Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
1 Microsoft Way,Redmond,USA,98052-8300,WA,17,20220429,111,606,1,0.99,207,0.99,,2024-05-09T16:31:47.282Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
627 Broadway,New York,USA,10012-2612,NY,18,20220512,112,607,1,1.98,208,0.99,,2024-05-09T16:31:47.282Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
627 Broadway,New York,USA,10012-2612,NY,18,20220512,112,608,1,1.98,209,0.99,,2024-05-09T16:31:47.282Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
541 Del Medio Avenue,Mountain View,USA,94040-111,CA,20,20220512,113,609,1,1.98,211,0.99,,2024-05-09T16:31:47.282Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json
541 Del Medio Avenue,Mountain View,USA,94040-111,CA,20,20220512,113,610,1,1.98,213,0.99,,2024-05-09T16:31:47.282Z,dbfs:/FileStore/capstone_project/data/stream/invoices_3.json


In [None]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT o.InvoiceId,
      o.CustomerId,
      c.FirstName AS customer_first_name,
      c.LastName AS customer_last_name,
      o.BillingCountry AS country,
      e.EmployeeId,
      e.FirstName AS employee_first_name,
      e.LastName AS employee_last_name,
      o.Quantity,
      o.Total AS total_price,
      o.TrackId,
      t.track_name,
      t.genre_name,
      o.InvoiceDate
  FROM orders_silver_tempview AS o
  INNER JOIN chinook_dlh.dim_customer AS c
  ON c.CustomerId = o.CustomerId
  INNER JOIN chinook_dlh.dim_employee AS e
  ON e.EmployeeId = c.SupportRepId
  INNER JOIN chinook_dlh.dim_track AS t
  ON t.TrackId = o.TrackId
)

In [None]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{orders_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

<pyspark.sql.streaming.query.StreamingQuery at 0x7f5271f8a350>

In [None]:
%sql
SELECT * FROM fact_orders_silver

InvoiceId,CustomerId,customer_first_name,customer_last_name,country,EmployeeId,employee_first_name,employee_last_name,Quantity,total_price,TrackId,track_name,genre_name,InvoiceDate
1,2,Leonie,Köhler,Germany,5,Steve,Johnson,1,1.98,2,Balls to the Wall,Rock,20210101
1,2,Leonie,Köhler,Germany,5,Steve,Johnson,1,1.98,4,Restless and Wild,Rock,20210101
2,4,Bjørn,Hansen,Norway,4,Margaret,Park,1,3.96,6,Put The Finger On You,Rock,20210102
2,4,Bjørn,Hansen,Norway,4,Margaret,Park,1,3.96,8,Inject The Venom,Rock,20210102
2,4,Bjørn,Hansen,Norway,4,Margaret,Park,1,3.96,10,Evil Walks,Rock,20210102
2,4,Bjørn,Hansen,Norway,4,Margaret,Park,1,3.96,12,Breaking The Rules,Rock,20210102
3,8,Daan,Peeters,Belgium,4,Margaret,Park,1,5.94,16,Dog Eat Dog,Rock,20210103
3,8,Daan,Peeters,Belgium,4,Margaret,Park,1,5.94,20,Overdose,Rock,20210103
3,8,Daan,Peeters,Belgium,4,Margaret,Park,1,5.94,24,Love In An Elevator,Rock,20210103
3,8,Daan,Peeters,Belgium,4,Margaret,Park,1,5.94,28,Janie's Got A Gun,Rock,20210103


##### Fact table to summarize order quantities and total prices grouped by country and employee

In [None]:
%sql
CREATE OR REPLACE TABLE chinook_dlh.fact_orders_by_country_employee_gold AS (
  SELECT employee_first_name
    , employee_last_name
    , country
    , SUM(Quantity) AS order_quantity
    , SUM(total_price) AS total_price
    , COUNT(genre_name) AS number_of_genres
  FROM chinook_dlh.fact_orders_silver
  GROUP BY employee_first_name, employee_last_name, country
  ORDER BY total_price DESC);

SELECT * FROM chinook_dlh.fact_orders_by_country_employee_gold;

employee_first_name,employee_last_name,country,order_quantity,total_price,number_of_genres
Jane,Peacock,Canada,20,203.94000000000003,20
Steve,Johnson,Germany,18,200.97000000000003,18
Jane,Peacock,USA,21,198.98999999999995,21
Margaret,Park,Poland,12,146.51999999999998,12
Jane,Peacock,India,16,142.56,16
Steve,Johnson,Brazil,11,142.56,11
Margaret,Park,France,17,134.64,17
Margaret,Park,USA,11,132.66,11
Jane,Peacock,United Kingdom,11,100.98,11
Steve,Johnson,United Kingdom,7,84.14999999999999,7
