## Here I will Use Structured Streaming to Create the fact_cannabis Table and Load it into MongoDB and SQL

In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
#import sqlalchemy
#import pymysql
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

#for writing the data to sql
#from sqlalchemy import text, create_engine


In [0]:
#variable definitions for later

#I pre-loaded a table into base_dir to kickoff the readstream

stream_dir = "dbfs:/FileStore/tables"
data_location = f"{stream_dir}/stream/data"
schemaLocation = f"{stream_dir}/stream/schemaLocation"

bronzeCP = f"{stream_dir}/bronzeCheckPoint"
silverCP = f"{stream_dir}/silverCheckPoint"
goldCP = f"{stream_dir}/goldCheckPoint"


#Clear Checkpoint directories
dbutils.fs.rm("dbfs:/FileStore/tables/bronzeCheckPoint", True)
dbutils.fs.rm("dbfs:/FileStore/tables/silverCheckPoint", True)
dbutils.fs.rm("dbfs:/FileStore/tables/goldCheckPoint", True)

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{stream_dir}/stream/schemaLocation", True) 

# Delete the Database Files ###################################
dbutils.fs.rm("dbfs:/FileStore/tables/ds2002-Midterm2", True)






Out[5]: False

## Now I will load all of the data from the SQL Database Created in the Jupyter Notebook of this Repo

In [0]:
%sql
DROP DATABASE IF EXISTS `ds2002_Midterm2` CASCADE;

CREATE DATABASE IF NOT EXISTS `ds2002_Midterm2`
COMMENT "DS-2002 Lab 06 Database"

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW `dim_date_view`
  USING org.apache.spark.sql.jdbc
  OPTIONS (
    url "jdbc:mysql://wfl9zymysql.mysql.database.azure.com:3306/ds2002-midterm2",
    dbtable "dim_date",
    user "wloving77",
    password "PASSWORD"
  );

USE DATABASE `ds2002_Midterm2`;

CREATE OR REPLACE TABLE `dim_date`
  USING DELTA
  OPTIONS (
  'delta.columnMapping.mode'='name',
  'delta.minReaderVersion'='2',
  'delta.minWriterVersion'='5'
  )
  AS SELECT * FROM `dim_date_view`;

  SELECT * FROM dim_date
  LIMIT 5;

date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,week_of_year,month_name,month_of_year,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
20050420,2005-04-20,2005/04/20,04/20/2005,20/04/2005,4,Wednesday,20,110,Weekday,16,April,4,N,2,2005,2005-04,2005Q2,10,4,2005,2005-10,2005Q4
20050421,2005-04-21,2005/04/21,04/21/2005,21/04/2005,5,Thursday,21,111,Weekday,16,April,4,N,2,2005,2005-04,2005Q2,10,4,2005,2005-10,2005Q4
20050422,2005-04-22,2005/04/22,04/22/2005,22/04/2005,6,Friday,22,112,Weekday,16,April,4,N,2,2005,2005-04,2005Q2,10,4,2005,2005-10,2005Q4
20050423,2005-04-23,2005/04/23,04/23/2005,23/04/2005,7,Saturday,23,113,Weekend,16,April,4,N,2,2005,2005-04,2005Q2,10,4,2005,2005-10,2005Q4
20050424,2005-04-24,2005/04/24,04/24/2005,24/04/2005,1,Sunday,24,114,Weekend,16,April,4,N,2,2005,2005-04,2005Q2,10,4,2005,2005-10,2005Q4


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW `income_view`
  USING org.apache.spark.sql.jdbc
  OPTIONS (
    url "jdbc:mysql://wfl9zymysql.mysql.database.azure.com:3306/ds2002-midterm2",
    dbtable "income",
    user "wloving77",
    password "PASSWORD"
  );

USE DATABASE `ds2002_Midterm2`;

CREATE OR REPLACE TABLE `income`
  USING DELTA
  OPTIONS (
  'delta.columnMapping.mode'='name',
  'delta.minReaderVersion'='2',
  'delta.minWriterVersion'='5'
  )
  AS SELECT * FROM `income_view`;

  SELECT * FROM income
  LIMIT 5;

REF_DATE,GEO,DGUID,Estimate,Industry,UOM,UOM_ID,SCALAR_FACTOR,SCALAR_ID,VECTOR,COORDINATE,VALUE,DECIMALS
1961,Canada,2016A000011124,Gross domestic product (GDP),Medical cannabis industry,Dollars,81,millions,6,v1001809219,1.1.1,0,0
1961,Canada,2016A000011124,Gross domestic product (GDP),Non-medical cannabis industry,Dollars,81,millions,6,v1001809224,1.1.2,94,0
1961,Canada,2016A000011124,Compensation of employees,Medical cannabis industry,Dollars,81,millions,6,v1001809220,1.2.1,0,0
1961,Canada,2016A000011124,Compensation of employees,Non-medical cannabis industry,Dollars,81,millions,6,v1001809225,1.2.2,0,0
1961,Canada,2016A000011124,Gross operating surplus,Medical cannabis industry,Dollars,81,millions,6,v1001809221,1.3.1,0,0


In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW `licensed_industry_stats_view`
  USING org.apache.spark.sql.jdbc
  OPTIONS (
    url "jdbc:mysql://wfl9zymysql.mysql.database.azure.com:3306/ds2002-midterm2",
    dbtable "licensed_industry_stats",
    user "wloving77",
    password "PASSWORD"
  );

USE DATABASE `ds2002_Midterm2`;

CREATE OR REPLACE TABLE `licensed_industry_stats`
  USING DELTA
  OPTIONS (
  'delta.columnMapping.mode'='name',
  'delta.minReaderVersion'='2',
  'delta.minWriterVersion'='5'
  )
  AS SELECT * FROM `licensed_industry_stats_view`;

SELECT * FROM licensed_industry_stats
LIMIT 5;

REF_DATE,GEO,DGUID,Indicator,UOM,UOM_ID,SCALAR_FACTOR,SCALAR_ID,VECTOR,COORDINATE,VALUE,DECIMALS
2016,Canada,2016A000011124,Total revenue,Dollars,81,units,0,v1001849328,1.1,245732000.0,0
2016,Canada,2016A000011124,Cannabis revenue,Dollars,81,units,0,v1001849329,1.2,239363000.0,0
2016,Canada,2016A000011124,Other cannabis-related revenue,Dollars,81,units,0,v1001849330,1.3,3088000.0,0
2016,Canada,2016A000011124,Revenues not related to cannabis,Dollars,81,units,0,v1001849331,1.4,3281000.0,0
2016,Canada,2016A000011124,Total sales,Dollars,81,units,0,v1001849332,1.5,77309.0,0


In [0]:
%sql

USE DATABASE`ds2002_Midterm2`;

SHOW TABLES;

database,tableName,isTemporary
ds2002_midterm2,dim_date,False
ds2002_midterm2,income,False
ds2002_midterm2,licensed_industry_stats,False
,dim_date_view,True
,income_view,True
,licensed_industry_stats_view,True


## Now We Can Begin Building the Structured Streaming Using Bronze, Silver, and Gold Tables:
- Bronze First:

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "csv")
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .option("cloudFiles.schemaLocation", schemaLocation)
 .load(data_location)
 .createOrReplaceTempView("consumer_and_producer_view"))

In [0]:
%sql
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW consumer_and_producer_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM consumer_and_producer_view
);

In [0]:
(spark.table("consumer_and_producer_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{bronzeCP}/_checkpoint")
      .outputMode("append")
      .table("fact_consumer_and_producer"))

Out[15]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f85c57feb20>

## Now Silver:

In [0]:
(spark.readStream
  .table("fact_consumer_and_producer")
  .createOrReplaceTempView("consumer_and_producer_silver_view"))

- Now to Build The Actual Fact Table:

In [0]:
%sql

CREATE OR REPLACE TEMPORARY VIEW consumer_and_producer_silver_tempview AS (
  SELECT cas.GEO, 
    cas.Cannabis_price,
    cas.Units_of_measure,
    cas.UOM as CaS_UOM,
    cas.UOM_ID as CaS_UOM_ID, 
    cas.SCALAR_FACTOR as CaS_Scalar_Factor,
    income.Estimate,
    income.Industry as income_Industry,
    income.UOM as income_UOM,
    income.UOM_ID as income_UOM_ID,
    income.SCALAR_FACTOR as income_Scalar_Factor,
    lis.Indicator as LicensedIndustryStats_Indicator,
    lis.UOM as LicensedIndustryStats_UOM,
    lis.UOM_ID as LicensedIndustryStats_UOM_ID,
    lis.SCALAR_FACTOR as LicensedIndustryStats_Scalar_Factor
  FROM consumer_and_producer_silver_view as cas
  INNER JOIN income
  ON cas.GEO = income.GEO
  INNER JOIN licensed_industry_stats as lis
  ON cas.GEO = lis.GEO
)


In [0]:
(spark.table("consumer_and_producer_silver_tempview").writeStream
    .format("delta")
    .option("checkpointLocation", f"{silverCP}/_checkpoint")
    .outputMode("append")
    .table("fact_cannabis_silver"))

Out[18]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f85c56d0610>

In [0]:
%sql
SELECT * FROM `fact_cannabis_silver`
LIMIT 5;

GEO,Cannabis_price,Units_of_measure,CaS_UOM,CaS_UOM_ID,CaS_Scalar_Factor,Estimate,income_Industry,income_UOM,income_UOM_ID,income_Scalar_Factor,LicensedIndustryStats_Indicator,LicensedIndustryStats_UOM,LicensedIndustryStats_UOM_ID,LicensedIndustryStats_Scalar_Factor
Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Producer price per gram,Dollars,81,units
Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Registered clients,Number,223,units
Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Number of cannabis for medical use licences issued,Number,223,units
Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Shipments to registered clients,Number,223,units
Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Average amount of dried cannabis for medical use per client shipment,Grams per day,380,units


## Now to Build a Gold Table

In [0]:
%sql
-- example one
SELECT * FROM fact_cannabis_silver
WHERE fact_cannabis_silver.GEO = "Canada"
LIMIT 5;

GEO,Cannabis_price,Units_of_measure,CaS_UOM,CaS_UOM_ID,CaS_Scalar_Factor,Estimate,income_Industry,income_UOM,income_UOM_ID,income_Scalar_Factor,LicensedIndustryStats_Indicator,LicensedIndustryStats_UOM,LicensedIndustryStats_UOM_ID,LicensedIndustryStats_Scalar_Factor
Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Producer price per gram,Dollars,81,units
Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Registered clients,Number,223,units
Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Number of cannabis for medical use licences issued,Number,223,units
Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Shipments to registered clients,Number,223,units
Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Average amount of dried cannabis for medical use per client shipment,Grams per day,380,units


In [0]:
%sql
-- example two

SELECT * FROM fact_cannabis_silver
WHERE income_Scalar_Factor = "millions"
AND income_industry="Non-medical cannabis industry"
LIMIT 5;

GEO,Cannabis_price,Units_of_measure,CaS_UOM,CaS_UOM_ID,CaS_Scalar_Factor,Estimate,income_Industry,income_UOM,income_UOM_ID,income_Scalar_Factor,LicensedIndustryStats_Indicator,LicensedIndustryStats_UOM,LicensedIndustryStats_UOM_ID,LicensedIndustryStats_Scalar_Factor
Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Producer price per gram,Dollars,81,units
Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Registered clients,Number,223,units
Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Number of cannabis for medical use licences issued,Number,223,units
Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Shipments to registered clients,Number,223,units
Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Average amount of dried cannabis for medical use per client shipment,Grams per day,380,units


## Getting the fact table back into SQL and Mongo:

In [0]:
df = spark.read.table("fact_cannabis_silver")

In [0]:
df_pandas = df.toPandas()

In [0]:
df_pandas.head()

Unnamed: 0,GEO,Cannabis_price,Units_of_measure,CaS_UOM,CaS_UOM_ID,CaS_Scalar_Factor,Estimate,income_Industry,income_UOM,income_UOM_ID,income_Scalar_Factor,LicensedIndustryStats_Indicator,LicensedIndustryStats_UOM,LicensedIndustryStats_UOM_ID,LicensedIndustryStats_Scalar_Factor
0,Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Producer price per gram,Dollars,81,units
1,Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Registered clients,Number,223,units
2,Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Number of cannabis for medical use licences is...,Number,223,units
3,Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Shipments to registered clients,Number,223,units
4,Canada,Consumer price,Price per gram,Dollars,81,units,Taxes less subsidies,Non-medical cannabis industry,Dollars,81,millions,Average amount of dried cannabis for medical ...,Grams per day,380,units


## I now have the Table as a DataFrame and can use the methods from the Jupyter Noteook, I will first create connection string information

In [0]:
#SQL:
host_name = "wfl9zymysql.mysql.database.azure.com" # the host name
user_id = "wloving77" # the user id of the account
pwd = os.environ["PWD"] # an environment variable invisible as I delete the cell after creating it. 
db_name_sql = "ds2002-Midterm2"

conn_str_sql = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"



In [0]:
#MongoDB:
atlas_cluster_name = "dscluster"
atlas_user_name = "wloving77"
atlas_password = os.environ["PWD"] # omitted the cell defining this variable for password security. 
atlas_special_key = "iynrpaa"

db_name_mongo = "ds2002-Midterm2"


conn_str_mongo = f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.{atlas_special_key}.mongodb.net"

In [0]:
#Function for creating a table on a SQL server
def set_dataframe(conn_str, df, table_name, db_name, db_operation):
    
    '''Create a connection to SQL'''
    sqlEngine = create_engine(conn_str, pool_recycle=360)
    connection = sqlEngine.connect()
        
    if db_operation == "insert":
        df.to_sql(table_name, schema = db_name, con = sqlEngine, index=False, if_exists='replace')
            
    elif db_operation == "update":
        df.to_sql(table_name, schema=db_name,con=connection, index=False, if_exists='append')
    
    connection.close()

In [0]:
#My function that sets a mongodb collection to be the provided dataframe
def set_mongo_dataframe(connection_str, df, db_name, collection):
        client = pymongo.MongoClient(connection_str)
        db = client[db_name]
        db[collection].insert_many(df.to_dict('records'))
        client.close()

### Now to send both tables to mongo and sql:

In [0]:
set_dataframe(conn_str_sql, df_pandas, "fact_cannabis", db_name_sql, "insert")
set_mongo_dataframe(conn_str_mongo, df_pandas, db_name_mongo, "fact_cannabis")