## DS-3002: Sample Capstone Project
This notebook demonstrates many of the software libraries and programming techniques required to fulfill the requirements of the final end-of-session capstone project for course **DS-3002: Data Systems** at the University of Virginia School of Data Science. The spirit of the project is to provide a capstone challenge that requires students to demonstrate a practical and functional understanding of each of the data systems and architectural principles covered throughout the session.

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Relational Databases Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Relational Databases Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

What's more, this project requires students to make effective decisions regarding whether to implement a Cloud-hosted, on-premises hosted, or hybrid architecture.

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

[0;31m---------------------------------------------------------------------------[0m
[0;31mModuleNotFoundError[0m                       Traceback (most recent call last)
[0;32m<command-479912801394481>[0m in [0;36m<cell line: 3>[0;34m()[0m
[1;32m      1[0m [0;32mimport[0m [0mos[0m[0;34m[0m[0;34m[0m[0m
[1;32m      2[0m [0;32mimport[0m [0mjson[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 3[0;31m [0;32mimport[0m [0mpymongo[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      4[0m [0;32mimport[0m [0mpyspark[0m[0;34m.[0m[0mpandas[0m [0;32mas[0m [0mpd[0m  [0;31m# This uses Koalas that is included in PySpark version 3.2 or newer.[0m[0;34m[0m[0;34m[0m[0m
[1;32m      5[0m [0;32mfrom[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mfunctions[0m [0;32mimport[0m [0mcol[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/python_shell/dbruntime/PythonPackageImportsInstrumentation/__init__.py[0m in [0;36mimport_patch[0;34m(name, globals, local

#### 2.0. Instantiate Global Variables

In [0]:
# Azure SQL Server Connection Information #####################
# Had trouble connecting to my own SQL server so decided to use the one provided
jdbc_hostname = "ds3002-sql.database.windows.net"
jdbc_port = 1433
src_database = "AdventureWorksLT"

connection_properties = {
  "user" : "root",
  "password" : "Suchottv20!",
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# MongoDB Atlas Connection Information ########################
# using the atlas connection from Lab 4 (which is Tupitza's)
atlas_cluster_name = "sandbox"
atlas_database_name = "sample_airbnb"
atlas_user_name = "m001-student"
atlas_password = "m001-mongodb-basics"

# Data Files (JSON) Information ###############################
dst_database = "Chinook"

base_dir = "dbfs:/FileStore/ds3002-capstone"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

output_bronze = f"{database_dir}/fact_sales_orders/bronze"
output_silver = f"{database_dir}/fact_sales_orders/silver"
output_gold   = f"{database_dir}/fact_sales_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_sales_orders", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the Azure SQL database server.
# ######################################################################################################################
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure SQL Database'''
    jdbcUrl = f"jdbc:sqlserver://{host_name}:{port};database={db_name}"
    
    '''Invoke the spark.read.jdbc() function to query the database, and fill a Pandas DataFrame.'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)
    
    return dframe


# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.zibbf.mongodb.net/{db_name}?retryWrites=true&w=majority"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.zibbf.mongodb.net/{db_name}?retryWrites=true&w=majority"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure SQL Database
##### 1.1. Create a New Databricks Metadata Database, and then Create a New Table that Sources its Data from a View in an Azure SQL database.

In [0]:
%sql
DROP DATABASE IF EXISTS Chinook CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS Chinkook
COMMENT "Capstone Project Database"
LOCATION "dbfs:/FileStore/ds3002-capstone/Chinook"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-3002 Capstone Project");

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_product
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:sqlserver://ds3002-sql.database.windows.net:1433;database=AdventureWorksLT",
  dbtable "SalesLT.vDimProducts",
  user "root",
  password "Suchottv20!"
)

In [0]:
%sql
USE DATABASE Chinook;

CREATE TABLE IF NOT EXISTS Chinook.dim_customer
COMMENT "Customer Dimension Table"
LOCATION "dbfs:/FileStore/ds3002-capstone/adventure_works/dim_customer"
AS SELECT * FROM view_product

In [0]:
%sql
SELECT * FROM Chinook.dim_customer LIMIT 5

In [0]:
%sql
DESCRIBE EXTENDED Chinook.dim_customer;

##### 1.2. Create a New Table that Sources its Data from a Table in an Azure SQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:sqlserver://ds3002-sql.database.windows.net:1433;database=AdventureWorksLT",
  dbtable "dbo.DimDate",
  user "root",
  password "Suchottv20!"
)

In [0]:
%sql
USE DATABASE Chinook;

CREATE TABLE IF NOT EXISTS Chinook.dim_employee
COMMENT "Employee Dimension Table"
LOCATION "dbfs:/FileStore/ds3002-capstone/adventure_works/dim_employee"
AS SELECT * FROM view_date

In [0]:
%sql
SELECT * FROM Chinook.dim_employee LIMIT 5

In [0]:
%sql
DESCRIBE EXTENDED Chinook.dim_employee;

#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/ds3002-capstone/source_data/batch'
json_files = {"customers" : 'AdventureWorksLT_DimCustomer.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

##### 2.3. Fetch Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_customer = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "adventure_works").option("collection", "customers").load()
display(df_customer)

In [0]:
%scala
df_customer.printSchema()

##### 2.4. Use the Spark DataFrame to Create a New Table in the Databricks (Adventure Works) Metadata Database

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("adventure_works.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED chinook.dim_track

##### 2.5. Query the New Table in the Databricks Metadata Database

In [0]:
%sql
SELECT * FROM Chinook.dim_track LIMIT 5

#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
address_csv = f"{batch_dir}/AdventureWorksLT_DimAddress.csv"

df_address = spark.read.format('csv').options(header='true', inferSchema='true').load(address_csv)
display(df_address)

In [0]:
df_address.printSchema()

In [0]:
df_address.write.format("delta").mode("overwrite").saveAsTable("adventure_works.dim_address")

In [0]:
%sql
DESCRIBE EXTENDED Chinook.dim_album;

In [0]:
%sql
SELECT * FROM Chinook.dim_album LIMIT 5;

##### Verify Dimension Tables

In [0]:
%sql
USE Chinook;
SHOW TABLES

### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "SalesOrderID INT")
 .option("cloudFiles.schemaHints", "RevisionNumber TINYINT")
 .option("cloudFiles.schemaHints", "OrderDate TIMESTAMP")
 .option("cloudFiles.schemaHints", "DueDate TIMESTAMP") 
 .option("cloudFiles.schemaHints", "ShipDate TIMESTAMP")
 .option("cloudFiles.schemaHints", "Status TINYINT")
 .option("cloudFiles.schemaHints", "OnlineOrderFlag BINARY")
 .option("cloudFiles.schemaHints", "SalesOrderNumber STRING")
 .option("cloudFiles.schemaHints", "PurchaseOrderNumber STRING") 
 .option("cloudFiles.schemaHints", "AccountNumber STRING")
 .option("cloudFiles.schemaHints", "CustomerID INT")
 .option("cloudFiles.schemaHints", "ShipToAddressID INT")
 .option("cloudFiles.schemaHints", "BillToAddressID INT")
 .option("cloudFiles.schemaHints", "ShipMethod STRING")
 .option("cloudFiles.schemaHints", "SubTotal FLOAT")
 .option("cloudFiles.schemaHints", "TaxAmt FLOAT")
 .option("cloudFiles.schemaHints", "Freight FLOAT")
 .option("cloudFiles.schemaHints", "TotalDue FLOAT")
 .option("cloudFiles.schemaHints", "SalesOrderDetailID INT")
 .option("cloudFiles.schemaHints", "OrderQty SMALLINT")
 .option("cloudFiles.schemaHints", "ProductID INT")
 .option("cloudFiles.schemaHints", "UnitPrice FLOAT")
 .option("cloudFiles.schemaHints", "UnitPriceDiscount FLOAT")
 .option("cloudFiles.schemaHints", "LineTotal DECIMAL")
 .option("cloudFiles.schemaHints", "rowguid STRING")
 .option("cloudFiles.schemaHints", "ModifiedDate TIMESTAMP")
 .option("cloudFiles.schemaLocation", output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT t.SalesOrderID
    , t.RevisionNumber
    , od.MonthName AS OrderMonth
    , od.WeekDayName AS OrderDayName
    , od.Day AS OrderDay
    , od.Year AS OrderYear
    , dd.MonthName AS DueMonth
    , dd.WeekDayName AS DueDayName
    , dd.Day AS DueDate
    , dd.Year AS DueYear
    , sd.MonthName AS ShipMonth
    , sd.WeekDayName AS ShipDayName
    , sd.Day AS ShipDay
    , sd.Year AS ShipYear
    , t.Status
    , t.OnlineOrderFlag
    , t.SalesOrderNumber
    , t.PurchaseOrderNumber
    , t.AccountNumber
    , c.CustomerID
    , c.FirstName
    , c.LastName
    , t.ShipToAddressID
    , sa.AddressLine1 AS ShipToAddressLine1
    , sa.AddressLine2 AS ShipToAddressLine2
    , sa.City AS ShipToCity
    , sa.StateProvince AS ShipToStateProvince
    , sa.PostalCode AS ShipToPostalCode
    , t.BillToAddressID
    , ba.AddressLine1 AS BillToAddressLine1
    , ba.AddressLine2 AS BillToAddressLine2
    , ba.City AS BillToCity
    , ba.StateProvince AS BillToStateProvince
    , ba.PostalCode AS BillToPostalCode
    , t.ShipMethod
    , t.SubTotal
    , t.TaxAmt
    , t.Freight
    , t.TotalDue
    , t.SalesOrderDetailID
    , t.OrderQty
    , p.ProductID
    , p.ProductNumber
    , t.UnitPrice
    , t.UnitPriceDiscount
    , t.LineTotal
    , t.rowguid
    , t.ModifiedDate
    , t.receipt_time
    , t.source_file
  FROM orders_silver_tempview t
  INNER JOIN adventure_works.dim_customer c
  ON t.CustomerID = c.CustomerID
  INNER JOIN adventure_works.dim_address sa
  ON t.ShipToAddressID = CAST(sa.AddressID AS BIGINT)
  INNER JOIN adventure_works.dim_address ba
  ON t.BillToAddressID = CAST(ba.AddressID AS BIGINT)
  INNER JOIN adventure_works.dim_product p
  ON t.ProductID = p.ProductID
  INNER JOIN adventure_works.dim_date od
  ON CAST(t.OrderDate AS DATE) = od.Date
  INNER JOIN adventure_works.dim_date dd
  ON CAST(t.DueDate AS DATE) = dd.Date
  INNER JOIN adventure_works.dim_date sd
  ON CAST(t.ShipDate AS DATE) = sd.Date)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

In [0]:
%sql
SELECT * FROM fact_orders_silver

In [0]:
%sql
DESCRIBE EXTENDED adventure_works.fact_orders_silver

##### 6.4. Gold Table: Perform Aggregations

In [0]:
%sql
SELECT CustomerID
  , LastName
  , FirstName
  , OrderMonth
  , COUNT(ProductID) AS ProductCount
FROM adventure_works.fact_orders_silver
GROUP BY CustomerID, LastName, FirstName, OrderMonth
ORDER BY ProductCount DESC

In [0]:
%sql
SELECT pc.CustomerID
  , os.LastName AS CustomerName
  , os.ProductNumber
  , pc.ProductCount
FROM adventure_works.fact_orders_silver AS os
INNER JOIN (
  SELECT CustomerID
  , COUNT(ProductID) AS ProductCount
  FROM adventure_works.fact_orders_silver
  GROUP BY CustomerID
) AS pc
ON pc.CustomerID = os.CustomerID
ORDER BY ProductCount DESC