In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  # This uses Koalas that is included in PySpark version 3.2 or newer.
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

In [0]:
# Azure SQL Server Connection Information #####################
jdbc_hostname = "mysql-czhong.mysql.database.azure.com"
jdbc_port = 3306
src_database = "final_proj"

connection_properties = {
  "user" : "dsadmin",
  "password" : "CatoZ0426",
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "DS2002"
atlas_database_name = "hr_db"
atlas_user_name = "root"
atlas_password = "gthgthgth"

# Data Files (JSON) Information ###############################
dst_database = "hr_database"

base_dir = "dbfs:/FileStore/ds2002finalProject"
database_dir = f"{base_dir}/{dst_database}"

data_dir = f"{base_dir}/source_data"
batch_dir = f"{data_dir}/batch"
stream_dir = f"{data_dir}/stream"

output_bronze = f"{database_dir}/factTable/bronze"
output_silver = f"{database_dir}/factTable/silver"
output_gold   = f"{database_dir}/factTable/gold"

# Delete the Streaming Files ################################## 
dbutils.fs.rm(f"{database_dir}/factTable", True)

# Delete the Database Files ###################################
dbutils.fs.rm(database_dir, True)

In [0]:
# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the Azure SQL database server.
# ######################################################################################################################
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure SQL Database'''
    jdbcUrl = jdbcUrl = f"jdbc:mysql://{host_name}:{port}/{db_name}"
    
    '''Invoke the spark.read.jdbc() function to query the database, and fill a Pandas DataFrame.'''
    dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)
    
    return dframe


# ######################################################################################################################
# Use this Function to Fetch a DataFrame from the MongoDB Atlas database server Using PyMongo.
# ######################################################################################################################
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.20qvgzt.mongodb.net/{db_name}?retryWrites=true&w=majority"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

# ######################################################################################################################
# Use this Function to Create New Collections by Uploading JSON file(s) to the MongoDB Atlas server.
# ######################################################################################################################
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.20qvgzt.mongodb.net/{db_name}?retryWrites=true&w=majority"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

In [0]:
%sql
DROP DATABASE IF EXISTS hr_database CASCADE;

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS hr_database
COMMENT "Final Project Database"
LOCATION "dbfs:/FileStore/ds2002finalProject/hr_database"
WITH DBPROPERTIES (contains_pii = true, purpose = "DS-3002 Final Project");

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_employees
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:sqlserver://mysql-czhong.mysql.database.azure.com:3306;database=final_proj",
  dbtable "employees",
  user "dsadmin",
  password "CatoZ0426"
)

In [0]:
%sql
USE DATABASE hr_database;

CREATE TABLE IF NOT EXISTS hr_database.employees
COMMENT "Products Dimension Table"
LOCATION "dbfs:/FileStore/ds2002finalProject/hr_database/employees"
AS SELECT * FROM view_employees

In [0]:
%sql
CREATE OR REPLACE TEMPORARY VIEW view_date
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:sqlserver://mysql-czhong.mysql.database.azure.com:3306;database=final_proj",
  dbtable "dim_date",
  user "dsadmin",
  password "CatoZ0426"
)

In [0]:
source_dir = '/dbfs/FileStore/ds2002finalProject/source_data/batch'
json_files = {"employees" : 'employees.json'}

set_mongo_collection(atlas_user_name, atlas_password, atlas_cluster_name, atlas_database_name, source_dir, json_files) 

In [0]:
%scala
df_customer.write.format("delta").mode("overwrite").saveAsTable("adventure_works.dim_customer")

In [0]:
dependents_csv = f"{batch_dir}/dependents.csv"

df_dependents = spark.read.format('csv').options(header='true', inferSchema='true').load(dependents_csv)
display(df_dependents)

In [0]:
df_dependents.write.format("delta").mode("overwrite").saveAsTable("hr_db.dependents")

In [0]:
%sql
USE hr_db;
SHOW TABLES

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schema", "location_id INT")
 .option("cloudFiles.schema", "country_id INT")
 .option("cloudFiles.schema", "department_id INT")
 .option("cloudFiles.schema", "employee_id INT") 
 .option("cloudFiles.schema", "job_id INT")
 .option("cloudFiles.schema", "manager_id INT")
 .option("cloudFiles.schema", "dependent_id INT")
 .option("cloudFiles.schema", "street_address STRING")
 .option("cloudFiles.schema", "postal_code STRING") 
 .option("cloudFiles.schema", "city STRING")
 .option("cloudFiles.schema", "state_province STRING")
 .option("cloudFiles.schema", "department_name STRING")
 .option("cloudFiles.schema", "first_name STRING")
 .option("cloudFiles.schema", "last_name STRING")
 .option("cloudFiles.schema", "email STRING")
 .option("cloudFiles.schema", "phone_number FLOAT")
 .option("cloudFiles.schema", "hire_date TIMESTAMP")
 .option("cloudFiles.schema", "salary INT")
 .option("cloudFiles.schemaLocation", output_bronze)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(stream_dir)
 .createOrReplaceTempView("orders_raw_tempview"))

In [0]:
/* Add Metadata for Traceability */
CREATE OR REPLACE TEMPORARY VIEW orders_bronze_tempview AS (
  SELECT *, current_timestamp() receipt_time, input_file_name() source_file
  FROM orders_raw_tempview
)

In [0]:
SELECT * FROM orders_bronze_tempview

In [0]:
(spark.table("orders_bronze_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_bronze}/_checkpoint")
      .outputMode("append")
      .table("factTable_bronze"))

In [0]:
(spark.readStream
  .table("factTable_bronze")
  .createOrReplaceTempView("orders_silver_tempview"))

In [0]:
CREATE OR REPLACE TEMPORARY VIEW factTable_silver_tempview AS (
  SELECT 
    t.fact_key
    ,t.location_id
    , t.country_id
    , t.department_id
    , t.employee_id
    , t.job_id
    , t.manager_id
    , t.dependent_id
    , t.street_address
    , t.postal_code
    , t.city
    , t.state_province
    , t.department_name
    , t.first_name
    , t.last_name
    , t.email
    , t.phone_number
    , t.hire_date
    , t.salary
    , j.job_title
    , j.max_salary
    , j.min_salary
    , d.first_name as dependent_first_name
    , d.last_name as dependent_last_name
    , d.relationship
  FROM orders_silver_tempview t
  INNER JOIN final_proj.jobs j
  ON t.job_id = j.job_id
  INNER JOIN final_proj.dependents d
  ON t.dependent_id = d.dependent_id

In [0]:
(spark.table("factTable_silver_tempview")
      .writeStream
      .format("delta")
      .option("checkpointLocation", f"{output_silver}/_checkpoint")
      .outputMode("append")
      .table("factTable_silver"))

In [0]:
%sql
select first_name, last_name, country_name as country, max(max_salary) as max_salary
FROM final_proj.factTable_silver
group by hire_date
ORDER BY phone_number DESC