##DS 3002: Data Science Systems
####Capstone Project
###### Matthew Beck

### Section Prerequisites
#### 1.0 Import Required Libraries

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType
from sqlalchemy import create_engine
import numpy
import datetime
import pymysql

#### 2.0 Instantiate Global Variables

In [0]:
# Azure SQL Server Connection Information #####################
jdbc_hostname = "ds3002project.mysql.database.azure.com"
jdbc_port = 3306
src_database = "classicmodels"

connection_properties = {
  "user" : "matt93002",
  "password" : "Hookey.93002",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "Cluster0"
atlas_database_name = "classicmodels"
atlas_user_name = "matt93002"
atlas_password = "Hookey.93002"

# Data Files (JSON) Information ###############################
dst_database = "classicmodels_dw"

base_dir = "dbfs:/FileStore/ds3002-capstone"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{base_dir}/stream"

output_bronze = f"{database_dir}/fact_sales_orders/bronze"
output_silver = f"{database_dir}/fact_sales_orders/silver"
output_gold   = f"{database_dir}/fact_sales_orders/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/fact_sales_orders", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

Out[174]: True

#### 3.0 Define Global Functions

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the Azure SQL database server.
# ######################################################################################################################
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure SQL Database'''
    jdbcUrl = f"jdbc:mysql://{host_name}:{port}/{db_name}"
    
    '''Invoke the spark.read.jdbc() function to query the database, and fill a Pandas DataFrame.'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)
    
    return dframe


# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.m9vzszr.mongodb.net/{db_name}?retryWrites=true&w=majority"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.m9vzszr.mongodb.net/{db_name}?retryWrites=true&w=majority"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

### Section II: Populate Dimensions by Ingesting Reference (Cold-path) Data 
#### 1.0. Fetch Reference Data From an MySQL Database
##### 1.1. Create a New Databricks Metadata Database, and then Create a New Table that Sources its Data from a View in a MySQL database.

In [0]:
%sql
DROP DATABASE IF EXISTS classicmodels CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS classicmodels
COMMENT "Capstone Project Database"
LOCATION "dbfs:/FileStore/ds3002-capstone/classicmodels"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-3002 Capstone Project");

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_custs
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds3002project.mysql.database.azure.com:3306/classicmodels", 
  dbtable "customers",
  user "matt93002",
  password "Hookey.93002"
)

In [0]:
%sql
USE DATABASE classicmodels;

CREATE TABLE IF NOT EXISTS classicmodels.dim_customers
COMMENT "Customers Dimension Table"
LOCATION "dbfs:/FileStore/ds3002-capstone/classicmodels/dim_customers"
AS SELECT * FROM view_custs

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM classicmodels.dim_customers LIMIT 5

customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France,1370,21000.0
112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA,1166,71800.0
114,"Australian Collectors, Co.",Ferguson,Peter,03 9520 4555,636 St Kilda Road,Level 3,Melbourne,Victoria,3004,Australia,1611,117300.0
119,La Rochelle Gifts,Labrune,Janine,40.67.8555,"67, rue des Cinquante Otages",,Nantes,,44000,France,1370,118200.0
121,Baane Mini Imports,Bergulfsen,Jonas,07-98 9555,Erling Skakkes gate 78,,Stavern,,4110,Norway,1504,81700.0


In [0]:
%sql
DESCRIBE EXTENDED classicmodels.dim_customers;

col_name,data_type,comment
customerNumber,int,
customerName,string,
contactLastName,string,
contactFirstName,string,
phone,string,
addressLine1,string,
addressLine2,string,
city,string,
state,string,
postalCode,string,


##### 1.2. Create a New Table that Sources its Data from a Table in an MySQL database.

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:mysql://ds3002project.mysql.database.azure.com:3306/classicmodels",
  dbtable "dim_date",
  user "matt93002",
  password "Hookey.93002"
)

In [0]:
%sql
USE DATABASE classicmodels;

CREATE TABLE IF NOT EXISTS classicmodels.dim_date
COMMENT "Date Dimension Table"
LOCATION "dbfs:/FileStore/ds3002-capstone/classicmodels/dim_date"
AS SELECT * FROM view_date

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM classicmodels.dim_date LIMIT 5

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,52,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000103,2000-01-03,2000/01/03,01/03/2000,03/01/2000,2,Monday,3,3,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000104,2000-01-04,2000/01/04,01/04/2000,04/01/2000,3,Tuesday,4,4,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
20000105,2000-01-05,2000/01/05,01/05/2000,05/01/2000,4,Wednesday,5,5,Weekday,1,January,1,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


In [0]:
%sql
DESCRIBE EXTENDED classicmodels.dim_date;

col_name,data_type,comment
date_key,int,
full_date,string,
date_name,string,
date_name_us,string,
date_name_eu,string,
day_of_week,int,
day_name_of_week,string,
day_of_month,int,
day_of_year,int,
weekday_weekend,string,


#### 2.0. Fetch Reference Data from a MongoDB Atlas Database
##### 2.1. View the Data Files on the Databricks File System

##### 2.2. Create a New MongoDB Database, and Load JSON Data Into a New MongoDB Collection
**NOTE:** The following cell **can** be run more than once because the **set_mongo_collection()** function **is** idempotent.

In [0]:
%scala
import com.mongodb.spark._

val df_employees = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "classicmodels").option("collection", "employees").load()
display(df_employees)

_id,email,employeeNumber,extension,firstName,jobTitle,lastName,officeCode,reportsTo
List(63699ad30227705ff11d5cf2),jfirrelli@classicmodelcars.com,1188,x2173,Julie,Sales Rep,Firrelli,2,1143.0
List(63699ad30227705ff11d5cf5),gvanauf@classicmodelcars.com,1323,x4102,George,Sales Rep,Vanauf,3,1143.0
List(63699ad30227705ff11d5cf3),spatterson@classicmodelcars.com,1216,x4334,Steve,Sales Rep,Patterson,2,1143.0
List(63699ad30227705ff11d5cf6),lbondur@classicmodelcars.com,1337,x6493,Loui,Sales Rep,Bondur,4,1102.0
List(63699ad30227705ff11d5cfa),bjones@classicmodelcars.com,1504,x102,Barry,Sales Rep,Jones,7,1102.0
List(63699ad30227705ff11d5cfd),tking@classicmodelcars.com,1619,x103,Tom,Sales Rep,King,6,1088.0
List(63699ad30227705ff11d5cff),ykato@classicmodelcars.com,1625,x102,Yoshimi,Sales Rep,Kato,5,1621.0
List(63699ad30227705ff11d5d00),mgerard@classicmodelcars.com,1702,x2312,Martin,Sales Rep,Gerard,4,1102.0
List(63699ad30227705ff11d5cef),abow@classicmodelcars.com,1143,x5428,Anthony,Sales Manager (NA),Bow,1,1056.0
List(63699ad30227705ff11d5cea),dmurphy@classicmodelcars.com,1002,x5800,Diane,President,Murphy,1,


In [0]:
%scala
df_employees.printSchema()

##### 2.4. Use the Spark DataFrame to Create a New Table in the Databricks (Adventure Works) Metadata Database

In [0]:
%scala
df_employees.write.format("delta").mode("overwrite").saveAsTable("classicmodels.dim_employees")

In [0]:
%sql
DESCRIBE EXTENDED classicmodels.dim_employees

col_name,data_type,comment
_id,struct,
email,string,
employeeNumber,int,
extension,string,
firstName,string,
jobTitle,string,
lastName,string,
officeCode,string,
reportsTo,int,
,,


##### 2.5. Query the New Table in the Databricks Metadata Database

In [0]:
%sql
SELECT * FROM classicmodels.dim_employees LIMIT 5

_id,email,employeeNumber,extension,firstName,jobTitle,lastName,officeCode,reportsTo
List(63699ad30227705ff11d5cf2),jfirrelli@classicmodelcars.com,1188,x2173,Julie,Sales Rep,Firrelli,2,1143
List(63699ad30227705ff11d5cf5),gvanauf@classicmodelcars.com,1323,x4102,George,Sales Rep,Vanauf,3,1143
List(63699ad30227705ff11d5cf3),spatterson@classicmodelcars.com,1216,x4334,Steve,Sales Rep,Patterson,2,1143
List(63699ad30227705ff11d5cf6),lbondur@classicmodelcars.com,1337,x6493,Loui,Sales Rep,Bondur,4,1102
List(63699ad30227705ff11d5cfa),bjones@classicmodelcars.com,1504,x102,Barry,Sales Rep,Jones,7,1102


#### 3.0. Fetch Data from a File System
##### 3.1. Use PySpark to Read From a CSV File

In [0]:
orders_csv = f"{base_dir}/orders.csv"

df_orders = spark.read.format('csv').options(header='true', inferSchema='true').load(orders_csv)
display(df_orders)

orderNumber,orderDate,requiredDate,shippedDate,status,comments,customerNumber
10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363
10101,2003-01-09,2003-01-18,2003-01-11,Shipped,Check on availability.,128
10102,2003-01-10,2003-01-18,2003-01-14,Shipped,,181
10103,2003-01-29,2003-02-07,2003-02-02,Shipped,,121
10104,2003-01-31,2003-02-09,2003-02-01,Shipped,,141
10105,2003-02-11,2003-02-21,2003-02-12,Shipped,,145
10106,2003-02-17,2003-02-24,2003-02-21,Shipped,,278
10107,2003-02-24,2003-03-03,2003-02-26,Shipped,Difficult to negotiate with customer. We need more marketing materials,131
10108,2003-03-03,2003-03-12,2003-03-08,Shipped,,385
10109,2003-03-10,2003-03-19,2003-03-11,Shipped,Customer requested that FedEx Ground is used for this shipping,486


In [0]:
df_orders.printSchema()

root
 |-- orderNumber: integer (nullable = true)
 |-- orderDate: date (nullable = true)
 |-- requiredDate: date (nullable = true)
 |-- shippedDate: date (nullable = true)
 |-- status: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- customerNumber: integer (nullable = true)



In [0]:
df_orders.write.format("delta").mode("overwrite").saveAsTable("classicmodels.dim_orders")

In [0]:
%sql
DESCRIBE EXTENDED classicmodels.dim_orders;

col_name,data_type,comment
orderNumber,int,
orderDate,date,
requiredDate,date,
shippedDate,date,
status,string,
comments,string,
customerNumber,int,
,,
# Detailed Table Information,,
Catalog,spark_catalog,


In [0]:
%sql
SELECT * FROM classicmodels.dim_orders LIMIT 5;

orderNumber,orderDate,requiredDate,shippedDate,status,comments,customerNumber
10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363
10101,2003-01-09,2003-01-18,2003-01-11,Shipped,Check on availability.,128
10102,2003-01-10,2003-01-18,2003-01-14,Shipped,,181
10103,2003-01-29,2003-02-07,2003-02-02,Shipped,,121
10104,2003-01-31,2003-02-09,2003-02-01,Shipped,,141


##### Verify Dimension Tables

In [0]:
%sql
USE classicmodels;
SHOW TABLES

database,tableName,isTemporary
classicmodels,dim_customers,False
classicmodels,dim_date,False
classicmodels,dim_employees,False
classicmodels,dim_orders,False
,display_query_1,True
,display_query_10,True
,display_query_11,True
,display_query_12,True
,display_query_13,True
,display_query_2,True


### Section III: Integrate Reference Data with Real-Time Data
#### 6.0. Use AutoLoader to Process Streaming (Hot Path) Data 
##### 6.1. Bronze Table: Process 'Raw' JSON Data

In [0]:

(spark.readStream.format("cloudFiles")\
 .option("cloudFiles.format", "json")\
.option("cloudFiles.schemaHints", "ordersNumber INT")\
.option("cloudFiles.schemaHints", "orderDate DATE")\
.option("cloudFiles.schemaHints", "requiredDate DATE")\
.option("cloudFiles.schemaHints", "shippedDate DATE")\
.option("cloudFiles.schemaHints", "status STRING")\
.option("cloudFiles.schemaHints", "comments STRING")\
.option("cloudFiles.schemaHints", "customerNumber INT")\
.option("cloudFiles.schemaLocation", output_bronze)\
.option("cloudFiles.inferColumnTypes", "true")\
.load(stream_dir)\
.createOrReplaceTempView("orders_raw_tempview"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
%sql
SELECT * FROM orders_bronze_tempview

customerNumber,orderDate,orders_key,requiredDate,shippedDate,status,_rescued_data,receipt_time,source_file
151,2003-06-03,10127,2003-06-09,2003-06-06,Shipped,,2022-12-15T04:45:34.685+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
141,2003-06-06,10128,2003-06-12,2003-06-11,Shipped,,2022-12-15T04:45:34.685+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
324,2003-06-12,10129,2003-06-18,2003-06-14,Shipped,,2022-12-15T04:45:34.685+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
198,2003-06-16,10130,2003-06-24,2003-06-21,Shipped,,2022-12-15T04:45:34.685+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
447,2003-06-16,10131,2003-06-25,2003-06-21,Shipped,,2022-12-15T04:45:34.685+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
323,2003-06-25,10132,2003-07-01,2003-06-28,Shipped,,2022-12-15T04:45:34.685+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
141,2003-06-27,10133,2003-07-04,2003-07-03,Shipped,,2022-12-15T04:45:34.685+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
250,2003-07-01,10134,2003-07-10,2003-07-05,Shipped,,2022-12-15T04:45:34.685+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
124,2003-07-02,10135,2003-07-12,2003-07-03,Shipped,,2022-12-15T04:45:34.685+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
242,2003-07-04,10136,2003-07-14,2003-07-06,Shipped,,2022-12-15T04:45:34.685+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json


In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_bronze}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_bronze"))

Out[197]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fbfb2480a30>

##### 6.2. Silver Table: Include Reference Data

In [0]:
(spark.readStream
  .table("fact_orders_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
%sql
SELECT * FROM orders_silver_tempview

customerNumber,orderDate,orders_key,requiredDate,shippedDate,status,_rescued_data,receipt_time,source_file
151,2003-06-03,10127,2003-06-09,2003-06-06,Shipped,,2022-12-15T04:45:36.677+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
141,2003-06-06,10128,2003-06-12,2003-06-11,Shipped,,2022-12-15T04:45:36.677+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
324,2003-06-12,10129,2003-06-18,2003-06-14,Shipped,,2022-12-15T04:45:36.677+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
198,2003-06-16,10130,2003-06-24,2003-06-21,Shipped,,2022-12-15T04:45:36.677+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
447,2003-06-16,10131,2003-06-25,2003-06-21,Shipped,,2022-12-15T04:45:36.677+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
323,2003-06-25,10132,2003-07-01,2003-06-28,Shipped,,2022-12-15T04:45:36.677+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
141,2003-06-27,10133,2003-07-04,2003-07-03,Shipped,,2022-12-15T04:45:36.677+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
250,2003-07-01,10134,2003-07-10,2003-07-05,Shipped,,2022-12-15T04:45:36.677+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
124,2003-07-02,10135,2003-07-12,2003-07-03,Shipped,,2022-12-15T04:45:36.677+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json
242,2003-07-04,10136,2003-07-14,2003-07-06,Shipped,,2022-12-15T04:45:36.677+0000,dbfs:/FileStore/ds3002-capstone/stream/ordersstream.json


In [0]:
%sql
DESCRIBE EXTENDED orders_silver_tempview

col_name,data_type,comment
customerNumber,int,
orderDate,string,
orders_key,bigint,
requiredDate,string,
shippedDate,string,
status,string,
_rescued_data,string,
receipt_time,timestamp,
source_file,string,


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW fact_orders_silver_tempview AS (
  SELECT t.orders_key,
  od.month_name AS OrderMonth,
  od.day_name_of_week AS OrderDayName,
  od.day_of_month AS OrderDay,
  od.calendar_year AS OrderYear,
  rd.month_name AS RequiredMonth,
  rd.day_name_of_week AS RequiredDayName,
  rd.day_of_month AS RequiredDay,
  rd.calendar_year AS RequiredYear,
  sd.month_name AS ShippedMonth,
  sd.day_name_of_week AS ShippedDayName,
  sd.day_of_month AS ShippedDay,
  sd.calendar_year AS ShippedYear,
  t.status,
  c.customerName,
  c.contactLastName,
  c.contactFirstname,
  c.phone,
  c.addressLine1,
  c.city,
  c.state,
  c.postalCode,
  c.country,
  c.creditLimit,
  emp.lastName AS EmployeeLastName,
  emp.firstName AS EmployeeFirstName,
  emp.extension AS EmployeeExtension,
  emp.email AS EmployeeEmail,
  emp.officeCode AS EmployeeOffice,
  emp.reportsTo AS EmployeeBoss,
  emp.jobTitle As EmployeeJobTitle
  FROM orders_silver_tempview t
  INNER JOIN classicmodels.dim_customers c
  ON t.customerNumber = c.customerNumber
  INNER JOIN classicmodels.dim_employees emp
  ON c.salesRepEmployeeNumber = emp.employeeNumber
  INNER JOIN classicmodels.dim_date od
  ON CAST(t.orderDate AS DATE) = od.full_date
  INNER JOIN classicModels.dim_date rd
  ON CAST (t.requiredDate AS DATE) = rd.full_date
  INNER JOIN classicmodels.dim_date sd
  ON CAST (t.shippedDate AS DATE) = sd.full_date)
 

In [0]:
(spark.table("fact_orders_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_silver}/_checkpoint")
      .outputMode("append")
      .table("fact_orders_silver"))

Out[202]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fbf83639d90>

In [0]:
%sql
SELECT * FROM fact_orders_silver

orders_key,OrderMonth,OrderDayName,OrderDay,OrderYear,RequiredMonth,RequiredDayName,RequiredDay,RequiredYear,ShippedMonth,ShippedDayName,ShippedDay,ShippedYear,status,customerName,contactLastName,contactFirstname,phone,addressLine1,city,state,postalCode,country,creditLimit,EmployeeLastName,EmployeeFirstName,EmployeeExtension,EmployeeEmail,EmployeeOffice,EmployeeBoss,EmployeeJobTitle
10142,August,Friday,8,2003,August,Saturday,16,2003,August,Wednesday,13,2003,Shipped,Mini Gifts Distributors Ltd.,Nelson,Susan,4155551450,5677 Strong St.,San Rafael,CA,97562,USA,210500.0,Jennings,Leslie,x3291,ljennings@classicmodelcars.com,1,1143,Sales Rep
10135,July,Wednesday,2,2003,July,Saturday,12,2003,July,Thursday,3,2003,Shipped,Mini Gifts Distributors Ltd.,Nelson,Susan,4155551450,5677 Strong St.,San Rafael,CA,97562,USA,210500.0,Jennings,Leslie,x3291,ljennings@classicmodelcars.com,1,1143,Sales Rep
10133,June,Friday,27,2003,July,Friday,4,2003,July,Thursday,3,2003,Shipped,Euro+ Shopping Channel,Freyre,Diego,(91) 555 94 44,"C/ Moralzarzal, 86",Madrid,,28034,Spain,227600.0,Hernandez,Gerard,x2028,ghernande@classicmodelcars.com,4,1102,Sales Rep
10128,June,Friday,6,2003,June,Thursday,12,2003,June,Wednesday,11,2003,Shipped,Euro+ Shopping Channel,Freyre,Diego,(91) 555 94 44,"C/ Moralzarzal, 86",Madrid,,28034,Spain,227600.0,Hernandez,Gerard,x2028,ghernande@classicmodelcars.com,4,1102,Sales Rep
10150,September,Friday,19,2003,September,Saturday,27,2003,September,Sunday,21,2003,Shipped,"Dragon Souveniers, Ltd.",Natividad,Eric,+65 221 7555,Bronz Sok.,Singapore,,079903,Singapore,103800.0,Nishi,Mami,x101,mnishi@classicmodelcars.com,5,1056,Sales Rep
10127,June,Tuesday,3,2003,June,Monday,9,2003,June,Friday,6,2003,Shipped,Muscle Machine Inc,Young,Jeff,2125557413,4092 Furth Circle,NYC,NY,10022,USA,138500.0,Tseng,Foon Yue,x2248,ftseng@classicmodelcars.com,3,1143,Sales Rep
10140,July,Thursday,24,2003,August,Saturday,2,2003,July,Wednesday,30,2003,Shipped,Technics Stores Inc.,Hashimoto,Juri,6505556809,9408 Furth Circle,Burlingame,CA,94217,USA,84600.0,Jennings,Leslie,x3291,ljennings@classicmodelcars.com,1,1143,Sales Rep
10130,June,Monday,16,2003,June,Tuesday,24,2003,June,Saturday,21,2003,Shipped,Auto-Moto Classics Inc.,Taylor,Leslie,6175558428,16780 Pompton St.,Brickhaven,MA,58339,USA,23000.0,Patterson,Steve,x4334,spatterson@classicmodelcars.com,2,1143,Sales Rep
10145,August,Monday,25,2003,September,Tuesday,2,2003,August,Sunday,31,2003,Shipped,Toys4GrownUps.com,Young,Julie,6265557265,78934 Hillside Dr.,Pasadena,CA,90003,USA,90700.0,Thompson,Leslie,x4065,lthompson@classicmodelcars.com,1,1143,Sales Rep
10136,July,Friday,4,2003,July,Monday,14,2003,July,Sunday,6,2003,Shipped,Alpha Cognac,Roulet,Annette,61.77.6555,1 rue Alsace-Lorraine,Toulouse,,31000,France,61100.0,Hernandez,Gerard,x2028,ghernande@classicmodelcars.com,4,1102,Sales Rep


In [0]:
%sql
DESCRIBE EXTENDED classicmodels.fact_orders_silver

col_name,data_type,comment
orders_key,bigint,
OrderMonth,string,
OrderDayName,string,
OrderDay,int,
OrderYear,int,
RequiredMonth,string,
RequiredDayName,string,
RequiredDay,int,
RequiredYear,int,
ShippedMonth,string,


##### 6.4. Gold Table: Perform Aggregations

In [0]:
%sql
SELECT customerName
  , EmployeeLastName
  , OrderMonth
  , COUNT(orders_key) AS OrdersCount
FROM classicmodels.fact_orders_silver
GROUP BY customerName, EmployeeLastName, OrderMonth
ORDER BY OrdersCount DESC

customerName,EmployeeLastName,OrderMonth,OrdersCount
Euro+ Shopping Channel,Hernandez,June,2
Gift Ideas Corp.,Vanauf,September,1
Toys4GrownUps.com,Thompson,August,1
Gift Ideas Corp.,Vanauf,June,1
Signal Collectibles Ltd.,Jennings,September,1
Mini Gifts Distributors Ltd.,Jennings,August,1
Auto-Moto Classics Inc.,Patterson,June,1
Souveniers And Things Co.,Fixter,July,1
"Stylish Desk Decors, Co.",Bott,June,1
Muscle Machine Inc,Tseng,June,1


In [0]:
%sql
SELECT os.EmployeeOffice,
    COUNT(OrderSubCount) AS OrderCount
FROM classicmodels.fact_orders_silver as os
INNER JOIN (
  SELECT EmployeeLastName
  , COUNT(orders_key) AS OrderSubCount
  FROM classicmodels.fact_orders_silver
  GROUP BY EmployeeLastName
) AS oc
ON oc.EmployeeLastName = os.EmployeeLastName
GROUP BY EmployeeOffice
ORDER BY OrderCount DESC

EmployeeOffice,OrderCount
4,6
6,5
1,5
7,3
3,3
2,3
5,1
