## DS 2002: Final Project
The goal of the second data project, building upon the first project, is to further demonstrate 
(1) your understanding of and 
(2) competence implementing the data science systems covered throughout this course (e.g., Relational & NoSQL databases, ETL process pipelines, data transformations, SQL and Python scripts, API’s and cloud services).

**Requirements:**

-  Date dimension to enable the analysis of the business process over various intervals of time
-  Must include at least 3 additional dimension tables (e.g., buyers, sellers, products)
-  Must populate its dimensions using data originating from the following sources: 
  1. A relational database like MySQL, Oracle or SQL Server
  2. NoSQL database like MongoDB, Redis, Cassandra or HBase
  3. A Cloud file system like Azure Data Lake, AWS S3 hosting file-base data (e.g., JSON, CSV)
  4. An API that returns a message payload (e.g., JSON, CSV, text) **optional**
- At least 1 fact table that captures the business process transaction
- Integrate datum of differing granularity (static and near real-time); i.e., implement Databricks structured streaming to integrate hot-path and cold-path data
- Dt include one or more visualizations that demonstrate the business value of your solution. For example, a “dashboard” developed using Excel, Power BI, Tableau or other data visualization tool capable of demonstrating the use of PivotTables and/or Pivot Charts

###Section I: Prerequisites
####1.0. Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

##### 2.0. Instantiate Global Variables

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "rrc2faz-mysql-azure-ws.mysql.database.azure.com"  #rrc2faz-mysql-azure-ws
jdbc_port = 3306
src_database = "sakila_dw"

connection_properties = {
  "user" : "rrc2faz",
  "password" : "Passw0rd123",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "cluster0.8y01vxo"
atlas_database_name = "sakila"
atlas_user_name = "rrc2faz"
atlas_password = "Passw0rd123"

# Data Files (JSON) Information ###############################
dst_database = "sakila_dlh"

base_dir = "dbfs:/FileStore"   #"dbfs:/FileStore/ds2002-lab06"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

rentals_stream_dir = f"{stream_dir}/orders"
#purchase_orders_stream_dir = f"{stream_dir}/purchase_orders"
#inventory_trans_stream_dir = f"{stream_dir}/inventory_transactions"

rentals_output_bronze = f"{database_dir}/fact_orders/bronze"
rentals_output_silver = f"{database_dir}/fact_orders/silver"
rentals_output_gold   = f"{database_dir}/fact_orders/gold"

#purchase_orders_output_bronze = f"{database_dir}/fact_purchase_orders/bronze"
#purchase_orders_output_silver = f"{database_dir}/fact_purchase_orders/silver"
#purchase_orders_output_gold   = f"{database_dir}/fact_purchase_orders/gold"

#inventory_trans_output_bronze = f"{database_dir}/fact_inventory_transactions/bronze"
#inventory_trans_output_silver = f"{database_dir}/fact_inventory_transactions/silver"
#inventory_trans_output_gold   = f"{database_dir}/fact_inventory_transactions/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_orders", True) 
#dbutils.fs.rm(f"{database_dir}/fact_purchase_orders", True) 
#dbutils.fs.rm(f"{database_dir}/fact_inventory_transactions", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[32]: True

##### 3.0. Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

###Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data

####1.0. Fetch Reference Data From an Azure MySQL Database

#####1.1. Create a New Databricks Metadata Database.

In [0]:
%sql
DROP DATABASE IF EXISTS sakila_dlh CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sakila_dlh
COMMENT "DS-2002 Final Project"
-- LOCATION "dbfs:/FileStore/ds2002-lab06/sakila_dlh" 
LOCATION "dbfs:/FileStore/sakila_dlh"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-2002 Lab 6.0");

#####1.2. Create a New Table that Sources Date Dimension Data from a Table in an Azure MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://rrc2faz-mysql-azure-ws.mysql.database.azure.com:3306/sakila_dw",
  dbtable "dim_date",
  user "rrc2faz",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

CREATE OR REPLACE TABLE sakila_dlh.dim_date
COMMENT "Date Dimension Table"
-- LOCATION "dbfs:/FileStore/sakila_dw/dim_date"
LOCATION "dbfs:/FileStore/sakila_dlh/dim_date"
AS SELECT * FROM dim_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_date;
     

col_name,data_type,comment
date_key,int,
full_date,date,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20050501,2005-05-01,2005/05/01,05/01/2005,01/05/2005,1,Sunday,1,121,Weekend,17,May,5,N,2,2005,2005-05,2005Q2,11,4,2005,2005-11,2005Q4
20050502,2005-05-02,2005/05/02,05/02/2005,02/05/2005,2,Monday,2,122,Weekday,18,May,5,N,2,2005,2005-05,2005Q2,11,4,2005,2005-11,2005Q4
20050503,2005-05-03,2005/05/03,05/03/2005,03/05/2005,3,Tuesday,3,123,Weekday,18,May,5,N,2,2005,2005-05,2005Q2,11,4,2005,2005-11,2005Q4
20050504,2005-05-04,2005/05/04,05/04/2005,04/05/2005,4,Wednesday,4,124,Weekday,18,May,5,N,2,2005,2005-05,2005Q2,11,4,2005,2005-11,2005Q4
20050505,2005-05-05,2005/05/05,05/05/2005,05/05/2005,5,Thursday,5,125,Weekday,18,May,5,N,2,2005,2005-05,2005Q2,11,4,2005,2005-11,2005Q4


##### 1.3. Create a New Table that Sources Product Dimension Data from an Azure MySQL database.

In [0]:
%sql
-- TODO:
-- Create a Temporary View named "view_product" that extracts data from your MySQL Sakila database.

CREATE OR REPLACE TEMPORARY VIEW view_customers
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://rrc2faz-mysql-azure-ws.mysql.database.azure.com:3306/sakila_dw",
  dbtable "customers",
  user "rrc2faz",
  password "Passw0rd123"
)

In [0]:
%sql
USE DATABASE sakila_dlh;

-- TODO:
-- Create a new table named "sakila_dlh.dim_product" using data from the view named "view_product"

CREATE OR REPLACE TABLE sakila_dlh.dim_customers
COMMENT "Customers Dimension Table"
LOCATION "dbfs:/FileStore/sakila_dlh/dim_customers"
AS SELECT * FROM dim_customers

num_affected_rows,num_inserted_rows


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customers;
     

col_name,data_type,comment
customer_key,bigint,
first_name,string,
last_name,string,
create_date,timestamp,
last_update,timestamp,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_customers,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customers LIMIT 5

customer_key,first_name,last_name,create_date,last_update
1,MARY,SMITH,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
2,PATRICIA,JOHNSON,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
3,LINDA,WILLIAMS,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
4,BARBARA,JONES,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000
5,ELIZABETH,BROWN,2006-02-14T22:04:36.000+0000,2006-02-15T04:57:20.000+0000


####2.0. Fetch Reference Data from a MongoDB Atlas Database
#####2.1. View the Data Files on the Databricks File System

In [0]:
display(dbutils.fs.ls(batch_dir))


[0;31m---------------------------------------------------------------------------[0m
[0;31mExecutionError[0m                            Traceback (most recent call last)
[0;32m<command-1447972713418238>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m [0mdisplay[0m[0;34m([0m[0mdbutils[0m[0;34m.[0m[0mfs[0m[0;34m.[0m[0mls[0m[0;34m([0m[0mbatch_dir[0m[0;34m)[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/python_shell/dbruntime/dbutils.py[0m in [0;36mf_with_exception_handling[0;34m(*args, **kwargs)[0m
[1;32m    360[0m                     [0mexc[0m[0;34m.[0m[0m__context__[0m [0;34m=[0m [0;32mNone[0m[0;34m[0m[0;34m[0m[0m
[1;32m    361[0m                     [0mexc[0m[0;34m.[0m[0m__cause__[0m [0;34m=[0m [0;32mNone[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 362[0;31m                     [0;32mraise[0m [0mexc[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m    363[0m [0;34m[0m[0m
[1;32m    364[0m          

##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection

NOTE: The following cell can be run more than once because the set_mongo_collection() function is idempotent.

In [0]:
source_dir = '/dbfs/FileStore/source_data/batch'
json_files = {"customers" : 'sakila_customers.json', "inventory" : 'sakila_inventory.json', "films" : 'sakila_films.json', "rentals" : 'sakila_rentals.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

[0;31m---------------------------------------------------------------------------[0m
[0;31mFileNotFoundError[0m                         Traceback (most recent call last)
[0;32m<command-1447972713418240>[0m in [0;36m<cell line: 4>[0;34m()[0m
[1;32m      2[0m [0mjson_files[0m [0;34m=[0m [0;34m{[0m[0;34m"customers"[0m [0;34m:[0m [0;34m'sakila_customers.json'[0m[0;34m,[0m [0;34m"inventory"[0m [0;34m:[0m [0;34m'sakila_inventory.json'[0m[0;34m,[0m [0;34m"films"[0m [0;34m:[0m [0;34m'sakila_films.json'[0m[0;34m,[0m [0;34m"rentals"[0m [0;34m:[0m [0;34m'sakila_rentals.json'[0m[0;34m}[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m [0;34m[0m[0m
[0;32m----> 4[0;31m [0mset_mongo_collection[0m[0;34m([0m[0matlas_user_name[0m[0;34m,[0m [0matlas_password[0m[0;34m,[0m [0matlas_cluster_name[0m[0;34m,[0m [0matlas_database_name[0m[0;34m,[0m [0msource_dir[0m[0;34m,[0m [0mjson_files[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m


#####2.3.1. Fetch Customer Dimension Data from the New MongoDB Collection

In [0]:
%scala
import com.mongodb.spark._

val atlas_uri = "mongodb+srv://rrc2faz:Passw0rd123@ds2002.8y01vxo.mongodb.net/" 

val df_inventory = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila").option("collection", "df_inventory").option("uri", atlas_uri).load()
.select("inventory_id","film_id","store_id","last_update")

display(df_inventory)

inventory_id,film_id,store_id,last_update
1,1,1,2006-02-15 05:09:17
2,1,1,2006-02-15 05:09:17
3,1,1,2006-02-15 05:09:17
4,1,1,2006-02-15 05:09:17
5,1,2,2006-02-15 05:09:17
6,1,2,2006-02-15 05:09:17
7,1,2,2006-02-15 05:09:17
8,1,2,2006-02-15 05:09:17
9,2,2,2006-02-15 05:09:17
10,2,2,2006-02-15 05:09:17


In [0]:
%scala
df_inventory.printSchema()
     

#####2.3.2. Use the Spark DataFrame to Create a New Customer Dimension Table in the Databricks Metadata Database (sakila_dlh)

In [0]:
%scala
df_inventory.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_inventory")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_inventory

col_name,data_type,comment
inventory_id,int,
film_id,int,
store_id,int,
last_update,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,sakila_dlh,
Table,dim_inventory,
Type,MANAGED,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_inventory LIMIT 5

inventory_id,film_id,store_id,last_update
1,1,1,2006-02-15 05:09:17
2,1,1,2006-02-15 05:09:17
3,1,1,2006-02-15 05:09:17
4,1,1,2006-02-15 05:09:17
5,1,2,2006-02-15 05:09:17


#####2.4.1 Fetch Supplier Dimension Data from the New MongoDB Collection

In [0]:
%scala

import com.mongodb.spark._

/** */
val atlas_uri = "mongodb+srv://rrc2faz:Passw0rd123@ds2002.8y01vxo.mongodb.net/" 

val df_films = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila").option("collection", "df_films").option("uri", atlas_uri).load()
.select("film_id","title","description","release_year","rating","length","rental_duration","rental_rate","last_update")

/** option("uri", atlas_uri).load() */

display(df_films)

film_id,title,description,release_year,rating,length,rental_duration,rental_rate,last_update
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,PG,86,6,0.99,2006-02-15 00:03:42
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,G,48,3,4.99,2006-02-15 00:03:42
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,NC-17,50,7,2.99,2006-02-15 00:03:42
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,G,117,5,2.99,2006-02-15 00:03:42
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,G,130,6,2.99,2006-02-15 00:03:42
6,AGENT TRUMAN,A Intrepid Panorama of a Robot And a Boy who must Escape a Sumo Wrestler in Ancient China,2006,PG,169,3,2.99,2006-02-15 00:03:42
7,AIRPLANE SIERRA,A Touching Saga of a Hunter And a Butler who must Discover a Butler in A Jet Boat,2006,PG-13,62,6,4.99,2006-02-15 00:03:42
8,AIRPORT POLLOCK,A Epic Tale of a Moose And a Girl who must Confront a Monkey in Ancient India,2006,R,54,6,4.99,2006-02-15 00:03:42
9,ALABAMA DEVIL,A Thoughtful Panorama of a Database Administrator And a Mad Scientist who must Outgun a Mad Scientist in A Jet Boat,2006,PG-13,114,3,2.99,2006-02-15 00:03:42
10,ALADDIN CALENDAR,A Action-Packed Tale of a Man And a Lumberjack who must Reach a Feminist in Ancient China,2006,NC-17,63,6,4.99,2006-02-15 00:03:42


In [0]:

%scala
df_films.printSchema()
     

#####2.4.2. Use the Spark DataFrame to Create a New Suppliers Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala

df_films.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_films")


In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_films
     

col_name,data_type,comment
film_id,int,
title,string,
description,string,
release_year,int,
rating,string,
length,int,
rental_duration,int,
rental_rate,double,
last_update,string,
,,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_films LIMIT 5

film_id,title,description,release_year,rating,length,rental_duration,rental_rate,last_update
1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist who must Battle a Teacher in The Canadian Rockies,2006,PG,86,6,0.99,2006-02-15 00:03:42
2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrator And a Explorer who must Find a Car in Ancient China,2006,G,48,3,4.99,2006-02-15 00:03:42
3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a Car who must Sink a Lumberjack in A Baloon Factory,2006,NC-17,50,7,2.99,2006-02-15 00:03:42
4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumberjack who must Chase a Monkey in A Shark Tank,2006,G,117,5,2.99,2006-02-15 00:03:42
5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And a Dentist who must Pursue a Forensic Psychologist in The Gulf of Mexico,2006,G,130,6,2.99,2006-02-15 00:03:42


##### 2.5.1 Fetch Invoice Dimension Data from teh New MongoDB Collection

In [0]:
%scala


import com.mongodb.spark._

val atlas_uri = "mongodb+srv://rrc2faz:Passw0rd123@ds2002.8y01vxo.mongodb.net/" 

val df_payment = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "sakila").option("collection", "df_payment").option("uri", atlas_uri).load()
.select("payment_id","customer_id","staff_id","rental_id","amount","payment_date","last_update")

/** option("uri", atlas_uri).load() */

display(df_payment)

payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30
3,1,1,1185,5.99,2005-06-15 00:54:12,2006-02-15 22:12:30
4,1,2,1422,0.99,2005-06-15 18:02:53,2006-02-15 22:12:30
5,1,2,1476,9.99,2005-06-15 21:08:46,2006-02-15 22:12:30
6,1,1,1725,4.99,2005-06-16 15:18:57,2006-02-15 22:12:30
7,1,1,2308,4.99,2005-06-18 08:41:48,2006-02-15 22:12:30
8,1,2,2363,0.99,2005-06-18 13:33:59,2006-02-15 22:12:30
9,1,1,3284,3.99,2005-06-21 06:24:45,2006-02-15 22:12:30
10,1,2,4526,5.99,2005-07-08 03:17:05,2006-02-15 22:12:30


In [0]:
%scala
df_payment.printSchema()

#####2.5.2. Use the Spark DataFrame to Create a New Invoices Dimension Table in the Databricks Metadata Database (northwind_dlh)

In [0]:
%scala


df_payment.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_payment")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_payment

col_name,data_type,comment
payment_id,int,
customer_id,int,
staff_id,int,
rental_id,int,
amount,double,
payment_date,string,
last_update,string,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM sakila_dlh.dim_payment LIMIT 5
     

payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30
3,1,1,1185,5.99,2005-06-15 00:54:12,2006-02-15 22:12:30
4,1,2,1422,0.99,2005-06-15 18:02:53,2006-02-15 22:12:30
5,1,2,1476,9.99,2005-06-15 21:08:46,2006-02-15 22:12:30


####3.0. Fetch Data from a File System
#####3.1. Use PySpark to Read From a CSV File

In [0]:
rentals_csv = f"{batch_dir}/sakila_rentals.csv"

df_rentals = spark.read.format('csv').options(header='true', inferSchema='true').load(rentals_csv)
display(df_rentals)
     

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-1447972713418263>[0m in [0;36m<cell line: 3>[0;34m()[0m
[1;32m      1[0m [0mrentals_csv[0m [0;34m=[0m [0;34mf"{batch_dir}/sakila_rentals.csv"[0m[0;34m[0m[0;34m[0m[0m
[1;32m      2[0m [0;34m[0m[0m
[0;32m----> 3[0;31m [0mdf_rentals[0m [0;34m=[0m [0mspark[0m[0;34m.[0m[0mread[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m'csv'[0m[0;34m)[0m[0;34m.[0m[0moptions[0m[0;34m([0m[0mheader[0m[0;34m=[0m[0;34m'true'[0m[0;34m,[0m [0minferSchema[0m[0;34m=[0m[0;34m'true'[0m[0;34m)[0m[0;34m.[0m[0mload[0m[0;34m([0m[0mrentals_csv[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      4[0m [0mdisplay[0m[0;34m([0m[0mdf_rentals[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      5[0m [0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/ins

In [0]:
df_rentals.printSchema()

In [0]:
df_employee.write.format("delta").mode("overwrite").saveAsTable("sakila_dlh.dim_rentals")

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_rentals;

In [0]:
%sql
SELECT * FROM sakila_dlh.dim_rentals LIMIT 5;

#####3.2 Use PySpark to Read Shipper Dimension Data from CSV File

In [0]:

# TODO:
#IF NEEDED make this one for customers I guess

customers_csv = f"{batch_dir}/sakila_customers.csv"

df_customers = spark.read.format('csv').options(header = 'true', inferSchema = 'true').load(customers_csv)
display(df_customers)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-1447972713418269>[0m in [0;36m<cell line: 6>[0;34m()[0m
[1;32m      4[0m [0mcustomers_csv[0m [0;34m=[0m [0;34mf"{batch_dir}/sakila_customers.csv"[0m[0;34m[0m[0;34m[0m[0m
[1;32m      5[0m [0;34m[0m[0m
[0;32m----> 6[0;31m [0mdf_customers[0m [0;34m=[0m [0mspark[0m[0;34m.[0m[0mread[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m'csv'[0m[0;34m)[0m[0;34m.[0m[0moptions[0m[0;34m([0m[0mheader[0m [0;34m=[0m [0;34m'true'[0m[0;34m,[0m [0minferSchema[0m [0;34m=[0m [0;34m'true'[0m[0;34m)[0m[0;34m.[0m[0mload[0m[0;34m([0m[0mcustomers_csv[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      7[0m [0mdisplay[0m[0;34m([0m[0mdf_customers[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py

In [0]:
df_customers.printSchema()

In [0]:

# TODO:

df_shipper.write.format('delta').mode('overwrite').saveAsTable('sakila_dlh.dim_customers')

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.dim_customers;

In [0]:
%sql
SELECT * FROM sakila_dlh.dim_customers LIMIT 5;
     

Verify Dimension Tables

In [0]:
%sql
USE sakila_dlh;
SHOW TABLES
     

database,tableName,isTemporary
sakila_dlh,dim_customers,False
sakila_dlh,dim_date,False
sakila_dlh,dim_films,False
sakila_dlh,dim_inventory,False
sakila_dlh,dim_payment,False


###Section III: Integrate Reference Data with Real-Time Data

####6.0. Use AutoLoader to Process Streaming (Hot Path) Orders Fact Data

#####6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:

(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "customer_id SMALLINT")
 .option("cloudFiles.schemaHints", "payment_id SMALLINT")
 .option("cloudFiles.schemaHints", "staff_id TINYINT")
 .option("cloudFiles.schemaHints", "rental_id INT") 
 .option("cloudFiles.schemaHints", "film_id SMALLINT")
 .option("cloudFiles.schemaHints", "rental_rate DECIMAL")
 .option("cloudFiles.schemaHints", "inventory_id MEDIUMINT")
 .option("cloudFiles.schemaHints", "return_date DATETIME")
 .option("cloudFiles.schemaHints", "last_uodate TIMESTAMP")
 .option("cloudFiles.schemaLocation", rentals_output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(rentals_stream_dir)
 .createOrReplaceTempView("rentals_raw_tempview"))

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-1447972713418277>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m (spark.readStream
[0m[1;32m      2[0m  [0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"cloudFiles"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m  [0;34m.[0m[0moption[0m[0;34m([0m[0;34m"cloudFiles.format"[0m[0;34m,[0m [0;34m"json"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m  [0;34m.[0m[0moption[0m[0;34m([0m[0;34m"cloudFiles.schemaHints"[0m[0;34m,[0m [0;34m"customer_id SMALLINT"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      5[0m  [0;34m.[0m[0moption[0m[0;34m([0m[0;34m"cloudFiles.schemaHints"[0m[0;34m,[0m [0;34m"payment_id SMALLINT"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/sql/streaming/readwriter.py[0m i

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW rentals_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM rentals_raw_tempview
)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-1447972713418278>[0m in [0;36m<cell line: 1>[0;34m()[0m
[1;32m      5[0m     [0mdisplay[0m[0;34m([0m[0mdf[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      6[0m     [0;32mreturn[0m [0mdf[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 7[0;31m   [0m_sqldf[0m [0;34m=[0m [0m____databricks_percent_sql[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      8[0m [0;32mfinally[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m      9[0m   [0;32mdel[0m [0m____databricks_percent_sql[0m[0;34m[0m[0;34m[0m[0m

[0;32m<command-1447972713418278>[0m in [0;36m____databricks_percent_sql[0;34m()[0m
[1;32m      2[0m   [0;32mdef[0m [0m____databricks_percent_sql[0m[0;34m([0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m     [0;32mimport

In [0]:
%sql
SELECT * FROM rentals_bronze_tempview

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-1447972713418279>[0m in [0;36m<cell line: 1>[0;34m()[0m
[1;32m      5[0m     [0mdisplay[0m[0;34m([0m[0mdf[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      6[0m     [0;32mreturn[0m [0mdf[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 7[0;31m   [0m_sqldf[0m [0;34m=[0m [0m____databricks_percent_sql[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      8[0m [0;32mfinally[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m      9[0m   [0;32mdel[0m [0m____databricks_percent_sql[0m[0;34m[0m[0;34m[0m[0m

[0;32m<command-1447972713418279>[0m in [0;36m____databricks_percent_sql[0;34m()[0m
[1;32m      2[0m   [0;32mdef[0m [0m____databricks_percent_sql[0m[0;34m([0m[0;34m)[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m     [0;32mimport

In [0]:

(spark.table("rentalss_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_bronze"))

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-1447972713418280>[0m in [0;36m<cell line: 1>[0;34m()[0m
[0;32m----> 1[0;31m (spark.table("orders_bronze_tempview")
[0m[1;32m      2[0m       [0;34m.[0m[0mwriteStream[0m[0;34m[0m[0;34m[0m[0m
[1;32m      3[0m       [0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"delta"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      4[0m       [0;34m.[0m[0moption[0m[0;34m([0m[0;34m"checkpointLocation"[0m[0;34m,[0m [0;34mf"{orders_output_bronze}/_checkpoint"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m      5[0m       [0;34m.[0m[0moutputMode[0m[0;34m([0m[0;34m"append"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [

#####6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_rentals_bronze")
  .createOrReplaceTempView("rentals_silver_tempview"))

In [0]:
%sql
SELECT * FROM rentals_silver_tempview

In [0]:
%sql
DESCRIBE EXTENDED rentals_silver_tempview

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_rentals_silver_tempview AS (
  SELECT r.rental_id,
      r.rental_date,
      r.staff_id,
      r.customer_id,
      r.return_date,
      r.last_update,
      r.film_id,
      r.rental_rate,
      r.inventory_id,
  FROM rentals_silver_tempview AS r
  INNER JOIN sakila_dlh.dim_inventory AS i
  ON r.inventory_id = i.inventory_id
  INNER JOIN sakila_dlh.dim_films AS f
  ON r.film_id = f.film_id
  INNER JOIN sakila_dlh.dim_customers AS c
  ON r.customer_id = r.customer_id
)

In [0]:
(spark.table("fact_rentals_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{rentals_output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_rentals_silver"))
     

In [0]:
%sql
SELECT * FROM fact_rentals_silver
     

In [0]:
%sql
DESCRIBE EXTENDED sakila_dlh.fact_rentals_silver

#####6.3. Gold Table: Perform Aggregations

In [0]:

%sql
SELECT customer_id AS CustomerID
  , COUNT(rental_id) AS ProductCount
FROM sakila_dlh.fact_rentals_silver
GROUP BY CustomerID
ORDER BY ProductCount DESC

In [0]:
%sql
/*
SELECT pc.CustomerID
  , os.customer_last_name AS CustomerName
  , os.product_key AS ProductNumber
  , pc.ProductCount
FROM sakila_dlh.fact_orders_silver AS os
INNER JOIN (
  SELECT customer_key AS CustomerID
  , COUNT(product_key) AS ProductCount
  FROM sakila_dlh.fact_orders_silver
  GROUP BY customer_key
) AS pc
ON pc.CustomerID = os.customer_key
ORDER BY ProductCount DESC

*/

####9.0. Clean up the File System

In [0]:
%fs rm -r /FileStore/ds2002-lab06/