## DS-3002: Sample Capstone Project
This notebook demonstrates many of the software libraries and programming techniques required to fulfill the requirements of the final end-of-session capstone project for course **DS-3002: Data Systems** at the University of Virginia School of Data Science. The spirit of the project is to provide a capstone challenge that requires students to demonstrate a practical and functional understanding of each of the data systems and architectural principles covered throughout the session.

**These include:**
- Relational Database Management Systems (e.g., MySQL, Microsoft SQL Server, Oracle, IBM DB2)
  - Online Transaction Processing Systems (OLTP): *Relational Databases Optimized for High-Volume Write Operations; Normalized to 3rd Normal Form.*
  - Online Analytical Processing Systems (OLAP): *Relational Databases Optimized for Read/Aggregation Operations; Dimensional Model (i.e, Star Schema)*
- NoSQL *(Not Only SQL)* Systems (e.g., MongoDB, CosmosDB, Cassandra, HBase, Redis)
- File System *(Data Lake)* Source Systems (e.g., AWS S3, Microsoft Azure Data Lake Storage)
  - Various Datafile Formats (e.g., JSON, CSV, Parquet, Text, Binary)
- Massively Parallel Processing *(MPP)* Data Integration Systems (e.g., Apache Spark, Databricks)
- Data Integration Patterns (e.g., Extract-Transform-Load, Extract-Load-Transform, Extract-Load-Transform-Load, Lambda & Kappa Architectures)

What's more, this project requires students to make effective decisions regarding whether to implement a Cloud-hosted, on-premises hosted, or hybrid architecture.

### Section I: Prerequisites

#### 1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#### 2.0. Instantiate Global Variables

In [0]:
# Azure SQL Server Connection Information #####################
jdbc_hostname = "laurenmds2002.mysql.database.azure.com"
jdbc_port = 3306
src_database = "police_killings"

connection_properties = {
  "user" : "lmarkwart",
  "password" : "Langley123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "Project 0"
atlas_database_name = "policekillings"
atlas_user_name = "laurenmarkwart"
atlas_password = "Password123"

# Data Files (JSON) Information ###############################
dst_database = "police_killings"

base_dir = "dbfs:/FileStore/ds3002-capstone"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

output_bronze = f"{database_dir}/fact_sales_orders/bronze"
output_silver = f"{database_dir}/fact_sales_orders/silver"
output_gold   = f"{database_dir}/fact_sales_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_sales_orders", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

#### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the Azure SQL database server.
# ######################################################################################################################
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure SQL Database'''
    jdbcUrl = f"jdbc:mysql://{host_name}:{port}/{db_name}"
    
    '''Invoke the spark.read.jdbc() function to query the database, and fill a Pandas DataFrame.'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)
    
    return dframe


# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.jeczt7v.mongodb.net/{db_name}?retryWrites=true&w=majority"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.jeczt7v.mongodb.net/{db_name}?retryWrites=true&w=majority"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an Azure SQL Database
##### 1.1. Create a New Databricks Metadata Database, and then Create a New Table that Sources its Data from a View in an Azure SQL database.

In [0]:
%sql
DROP DATABASE IF EXISTS police_killings CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS police_killings
COMMENT "Capstone Project Database"
LOCATION "dbfs:/FileStore/ds3002-capstone/police_killings"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-3002 Capstone Project");

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_victim
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://laurenmds2002.mysql.database.azure.com:3306/police_killings",
  dbtable "police_killings.victim",
  user "lmarkwart",
  password "Langley123"
)

In [0]:
%sql
USE DATABASE police_killings;

CREATE TABLE IF NOT EXISTS police_killings.victim
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds3002-capstone/police_killings/date"
AS SELECT * FROM view_victim

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM police_killings.victim LIMIT 5

victim_ID,name,age,gender,race
30,A'donte Washington,16,Male,Black
31,Aaron Rutledge,27,Male,White
32,Aaron Siler,26,Male,White
33,Aaron Valdez,25,Male,Hispanic/Latino
34,Adam Jovicic,29,Male,White


In [0]:
%sql
DESCRIBE EXTENDED police_killings.victim;

col_name,data_type,comment
victim_ID,int,
name,string,
age,string,
gender,string,
race,string,
,,
# Partitioning,,
Not partitioned,,
,,
# Detailed Table Information,,


##### 1.2. Create a New Table that Sources its Data from a Table in an Azure SQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_location
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://laurenmds2002.mysql.database.azure.com:3306/police_killings",
  dbtable "police_killings.location",
  user "lmarkwart",
  password "Langley123"
)

In [0]:
%sql
USE DATABASE police_killings;

CREATE TABLE IF NOT EXISTS police_killings.location
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds3002-capstone/police_killings/location"
AS SELECT * FROM view_location

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM police_killings.location LIMIT 5

location_ID,street_address,city,state,latitude,longitude
30,Clearview Ln,Millbrook,AL,32.529577,-86.362829
31,300 block Iris Park Dr,Pineville,LA,31.3217392,-92.4348602
32,22nd Ave and 56th St,Kenosha,WI,42.5835597,-87.8357101
33,3000 Seminole Ave,South Gate,CA,33.9392976,-118.2194634
34,364 Hiwood Ave,Munroe Falls,OH,41.1485748,-81.4298782


In [0]:
%sql
DESCRIBE EXTENDED police_killings.location;

col_name,data_type,comment
location_ID,int,
street_address,string,
city,string,
state,string,
latitude,string,
longitude,string,
,,
# Partitioning,,
Not partitioned,,
,,


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))

##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
source_dir = '/dbfs/FileStore/ds3002-capstone/source_data/batch'
json_files = {"population_facts" : 'population_facts.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

##### 2.3. Fetch Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val df_population_facts = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "policekillings").option("collection", "population_facts").load()
display(df_population_facts)

In [0]:
%scala
df_population_facts.printSchema()

##### 2.4. Use the Spark DataFrame to Create a New Table in the Databricks (Adventure Works) Metadata Database

In [0]:
%scala
df_population_facts.write.format("delta").mode("overwrite").saveAsTable("police_killings.population_facts")

In [0]:
%sql
DESCRIBE EXTENDED police_killings.population_facts

##### 2.5. Query the New Table in the Databricks Metadata Database

In [0]:
%sql
SELECT * FROM police_killings.population_facts LIMIT 5

#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
cause_csv = f"{batch_dir}/cause.csv"

df_incident_facts = spark.read.format('csv').options(header='true', inferSchema='true').load(cause_csv)
display(df_incident_facts)

law_enforcement_agency,cause,armed
Millbrook Police Department,Gunshot,No
Rapides Parish Sheriff's Office,Gunshot,No
Kenosha Police Department,Gunshot,No
South Gate Police Department,Gunshot,Firearm
Kent Police Department,Gunshot,No
Phoenix Police Department,Gunshot,No
Bakersfield Police Department,Gunshot,Firearm
Los Angeles Police Department,Gunshot,Non-lethal firearm
Wise County Sheriff's Department and Texas DPS,Gunshot,Firearm
Kentwood Police Department and Wyoming DPS,Gunshot,Other


In [0]:
df_incident_facts.printSchema()

In [0]:
df_incident_facts.write.format("delta").mode("overwrite").saveAsTable("police_killings.incident_facts")

In [0]:
%sql
DESCRIBE EXTENDED police_killings.incident_facts;

col_name,data_type,comment
law_enforcement_agency,string,
cause,string,
armed,string,
,,
# Partitioning,,
Not partitioned,,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,police_killings,


In [0]:
%sql
SELECT * FROM police_killings.incident_facts LIMIT 5;

law_enforcement_agency,cause,armed
Millbrook Police Department,Gunshot,No
Rapides Parish Sheriff's Office,Gunshot,No
Kenosha Police Department,Gunshot,No
South Gate Police Department,Gunshot,Firearm
Kent Police Department,Gunshot,No


##### Verify Dimension Tables

In [0]:
%sql
USE police_killings;
SHOW TABLES

database,tableName,isTemporary
police_killings,incident_facts,False
police_killings,location,False
police_killings,victim,False
,view_date,True
,view_location,True
,view_victim,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "Incident_ID INT")
 .option("cloudFiles.schemaHints", "law_enforcement_agency STRING")
 .option("cloudFiles.schemaHints", "cause STRING") 
 .option("cloudFiles.schemaHints", "armed STRING")
 .option("cloudFiles.schemaHints", "location_ID INT")
 .option("cloudFiles.schemaHints", "street_address STRING")
 .option("cloudFiles.schemaHints", "city STRING")
 .option("cloudFiles.schemaHints", "state STRING")
 .option("cloudFiles.schemaHints", "victim_ID STRING")
 .option("cloudFiles.schemaHints", "name STRING")
 .option("cloudFiles.schemaHints", "age STRING")
 .option("cloudFiles.schemaHints", "gender STRING")
 .option("cloudFiles.schemaHints", "race STRING")
 .option("cloudFiles.schemaLocation", output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

age,armed,cause,city,college_degree_rate,gender,latitude,law_enforcement_agency,longitude,median_household_income,median_personal_income,name,population_size,poverty_rate,race,share_black,share_hispanic,share_white,state,street_address,unemployment_rate,_rescued_data,receipt_time,source_file
,,,,0.168509509,,,,,51367.0,28375,,3779.0,14.1,,30.5,5.6,60.5,,,0.097686375,,2022-12-14T05:20:59.828+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.111402359,,,,,27972.0,14678,,2769.0,28.8,,36.2,0.5,53.8,,,0.065723794,,2022-12-14T05:20:59.828+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.147312269,,,,,45365.0,25286,,4079.0,14.6,,7.7,16.8,73.8,,,0.166293142,,2022-12-14T05:20:59.828+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.050132928,,,,,48295.0,17194,,4343.0,11.7,,0.6,98.8,1.2,,,0.124827269,,2022-12-14T05:20:59.828+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.403954214,,,,,68785.0,33954,,6809.0,1.9,,1.4,1.7,92.5,,,0.063549832,,2022-12-14T05:20:59.828+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.102955195,,,,,20833.0,15523,,4682.0,58,,7.7,79,7,,,0.073651452,,2022-12-14T05:20:59.828+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.20380117,,,,,58068.0,25949,,5027.0,17.2,,0.3,44.2,50.8,,,0.131461131,,2022-12-14T05:20:59.828+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.090437601,,,,,66543.0,25043,,5238.0,12.2,,0.2,84.1,8.6,,,0.094346979,,2022-12-14T05:20:59.828+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.047601339,,,,,30391.0,16778,,4832.0,37.7,,17.7,66.3,14.6,,,0.140832976,,2022-12-14T05:20:59.828+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.102691511,,,,,44553.0,22005,,3795.0,18.4,,7.7,26.5,63.6,,,0.174167417,,2022-12-14T05:20:59.828+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json


In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

age,armed,cause,city,college_degree_rate,gender,latitude,law_enforcement_agency,longitude,median_household_income,median_personal_income,name,population_size,poverty_rate,race,share_black,share_hispanic,share_white,state,street_address,unemployment_rate,_rescued_data,receipt_time,source_file
,,,,0.168509509,,,,,51367.0,28375,,3779.0,14.1,,30.5,5.6,60.5,,,0.097686375,,2022-12-14T05:23:26.001+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.111402359,,,,,27972.0,14678,,2769.0,28.8,,36.2,0.5,53.8,,,0.065723794,,2022-12-14T05:23:26.001+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.147312269,,,,,45365.0,25286,,4079.0,14.6,,7.7,16.8,73.8,,,0.166293142,,2022-12-14T05:23:26.001+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.050132928,,,,,48295.0,17194,,4343.0,11.7,,0.6,98.8,1.2,,,0.124827269,,2022-12-14T05:23:26.001+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.403954214,,,,,68785.0,33954,,6809.0,1.9,,1.4,1.7,92.5,,,0.063549832,,2022-12-14T05:23:26.001+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.102955195,,,,,20833.0,15523,,4682.0,58,,7.7,79,7,,,0.073651452,,2022-12-14T05:23:26.001+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.20380117,,,,,58068.0,25949,,5027.0,17.2,,0.3,44.2,50.8,,,0.131461131,,2022-12-14T05:23:26.001+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.090437601,,,,,66543.0,25043,,5238.0,12.2,,0.2,84.1,8.6,,,0.094346979,,2022-12-14T05:23:26.001+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.047601339,,,,,30391.0,16778,,4832.0,37.7,,17.7,66.3,14.6,,,0.140832976,,2022-12-14T05:23:26.001+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json
,,,,0.102691511,,,,,44553.0,22005,,3795.0,18.4,,7.7,26.5,63.6,,,0.174167417,,2022-12-14T05:23:26.001+0000,dbfs:/FileStore/ds3002-capstone/source_data/stream/population.json


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
age,string,
armed,string,
cause,string,
city,string,
college_degree_rate,string,
gender,string,
latitude,double,
law_enforcement_agency,string,
longitude,double,
median_household_income,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT victim.Victim_ID
    , victim.name AS VictimName
    , victim.age AS VictimAge
    , victim.race AS VictimRace
    , population_facts.population_size
    , population_facts.share_white
    , population_facts.share_black
    , population_facts.share_hispanice
    , population_facts.poverty_rate
    , population_facts.unemployment_rate
    , population_facts.college_degree_rate
    , population_facts.median_personal_income
    , incident_facts.law_enforcement_agency
    , incident_facts.armed
    , incident_facts.cause
    , location.location_ID
    , location.street_address
    , location.city
    , location.state
    , t.Incident_ID
    , t.location_ID
    , t.victim_ID
    , t.population_facts_ID
  FROM orders_silver_tempview t
  INNER JOIN police_killings.victim c
  ON t.Incident_ID = c.victim_ID
  INNER JOIN police_killings.location sa
  ON t.population_facts_ID = CAST(sa.location_ID AS BIGINT)

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

In [0]:
%sql
SELECT * FROM fact_orders_silver

In [0]:
%sql
DESCRIBE EXTENDED police_killings.fact_orders_silver

##### 6.4. Gold Table: Perform Aggregations

In [0]:
%sql
SELECT victim_ID
  , VictimName
  , VictimAge
  , VictimRace
  , COUNT(victim_ID) AS victimID
FROM police_killings.fact_orders_silver
GROUP BY Victim_ID, VictimAge, VictimRace, VictimName
ORDER BY victimID DESC

In [0]:
%sql
SELECT pc.incident_facts
  , os.cause AS CauseOfDeath
  , os.armed
  , pc.poverty_rate
FROM police_killings.fact_orders_silver AS os
INNER JOIN (
  SELECT Incident_ID
  , COUNT(Victim_ID) AS VictimID
  FROM police_killings.fact_orders_silver
  GROUP BY population_facts_ID
) AS pc
ON pc.population_facts_ID = os.Incident_facts_ID
ORDER BY Population_Facts_ID DESC