In [0]:
import os
import json
import pymongo
import pyspark.pandas as pd  
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, BinaryType
from pyspark.sql.types import ByteType, ShortType, IntegerType, LongType, FloatType, DecimalType

In [0]:
# Azure MySQL Server Connection Information ###################
jdbc_hostname = "ds-2002-mysql.mysql.database.azure.com"
jdbc_port = 3306
src_database = "ds_2002_hinton_final_project"

connection_properties = {
  "user" : "mary",
  "password" : "Password1!",
  "driver" : "org.mariadb.jdbc.Driver"
}

# MongoDB Atlas Connection Information ########################
atlas_cluster_name = "Cluster0.ew2virq"
atlas_database_name = "ds_2002_hinton_final_project"
atlas_user_name = "cme4hj"
atlas_password = "mCaxCh$uR*4W3iX"

dst_database = "ds_2002_hinton_final_project"
base_dir = "dbfs:/FileStore/final_project"
database_dir = f"{base_dir}/{dst_database}"


In [0]:
def get_sql_dataframe(host_name, port, db_name, conn_props, sql_query):
    '''Create a JDBC URL to the Azure SQL Database'''
    jdbcurl = f"jdbc:sqlserver://{host_name}:{port};database={db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

  dframe = spark.read.jdbc(url=jdbcUrl, table=sql_query, properties=conn_props)

  return dframe

In [0]:
def get_mongo_dataframe(user_id, pwd, cluster_name, db_name, collection, conditions, projection, sort):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    
    client = pymongo.MongoClient(mongo_uri)

    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    if conditions and projection and sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection).sort(sort)))
    elif conditions and projection and not sort:
        dframe = pd.DataFrame(list(db[collection].find(conditions, projection)))
    else:
        dframe = pd.DataFrame(list(db[collection].find()))

    client.close()
    
    return dframe

In [0]:
def set_mongo_collection(user_id, pwd, cluster_name, db_name, src_file_path, json_files):
    '''Create a client connection to MongoDB'''
    mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"
    client = pymongo.MongoClient(mongo_uri)
    db = client[db_name]
    
    '''Read in a JSON file, and Use It to Create a New Collection'''
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(src_file_path, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)

    client.close()
    
    return result

In [0]:
file_out_csv = f"{batch_dir}/file_out.csv"

df_retail = spark.read.format('csv').options(header='true', inferSchema='true').load(file_out_csv)
display(df_retail)

In [0]:
df_retail.printSchema()

In [0]:
%scala
import com.mongodb.spark._

mongo_uri = f"mongodb+srv://{user_id}:{pwd}@{cluster_name}.mongodb.net/{db_name}"

val df_retail = spark.read.format("com.mongodb.spark.sql.DefaultSource")
.option("database", "ds_2002_hinton_final_project").option("collection", "retail").option("uri", mongo_uri).load()
.select("customer_key","date_key", "invoice_key", "customer_key", "item_key")

display(df_retail)

In [0]:
%scala
df_retail.write.format("delta").mode("overwrite").saveAsTable("ds_2002_hinton_final_project.dim_customer")

In [0]:
%sql
DESCRIBE EXTENDED ds_2002_hinton_final_project.dim_customer.dim_customer

In [0]:
(spark.readStream
 .format("cloudFiles")
 .option("cloudFiles.format", "json")
 .option("cloudFiles.schemaHints", "fact_order_key BIGINT")
 .option("cloudFiles.schemaHints", "date_key BIGINT")
 .option("cloudFiles.schemaHints", "invoice_key BIGINT")
 .option("cloudFiles.schemaHints", "customer_key BIGINT") 
 .option("cloudFiles.schemaHints", "item_key BIGINT")
 .option("cloudFiles.schemaLocation", reatail_trans_output)
 .option("cloudFiles.inferColumnTypes", "true")
 .option("multiLine", "true")
 .load(retail_trans_stream_dir)
 .createOrReplaceTempView("retail_transactions_raw_tempview"))