In [49]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [50]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.39
Running PyMongo Version: 4.15.3


In [51]:
mysql_args = {
    "uid" : "root",
    "pwd" : "pickaxeluv!",
    "hostname" : "localhost",
    "dbname" : "adventureworks_dw"
}

mongodb_args = {
    "user_name" : "kieranylewis_db_user",
    "password" : "Ll1019228",
    "cluster_name" : "Cluster0",
    "cluster_subnet" : "zsltihk",
    "cluster_location" : "atlas", # "local"
    "db_name" : "northwind_purchasing"
}

In [52]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()   

In [53]:
data_dir = os.path.join(os.getcwd(), "midterm data")

json_files = {
    "products_meta": "products.txt",
    "customers_meta": "customers.txt"
}

client = get_mongo_client(**mongodb_args)
set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)


In [54]:
client = get_mongo_client(**mongodb_args)

df_products = get_mongo_dataframe(client, mongodb_args["db_name"], "products_meta", {})
client = get_mongo_client(**mongodb_args)
df_customers = get_mongo_dataframe(client, mongodb_args["db_name"], "customers_meta", {})

print("products sample:\n", df_products.head(), "\n")
print("customers sample:\n", df_customers.head(), "\n")

products sample:
    product_id   color size             tags
0         707   black    m  [commuter, new]
1         708     red    l           [sale]
2         709   blue    XL               [] 

customers sample:
    customer_id account_number customer_type
0        11000     AW00011000         Store
1        11001     AW00011001        Person
2        11002    AW00011002                



In [55]:
#products
client = get_mongo_client(**mongodb_args)
query = {}

collection = "products_meta"
df_products = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_products.head(2)

Unnamed: 0,product_id,color,size,tags
0,707,black,m,"[commuter, new]"
1,708,red,l,[sale]


In [56]:
#customers

client = get_mongo_client(**mongodb_args)
query = {}

collection = "customers_meta"
df_customers = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customers.head(2)

Unnamed: 0,customer_id,account_number,customer_type
0,11000,AW00011000,Store
1,11001,AW00011001,Person


In [57]:
df_products["size"] = df_products["size"].str.upper()
df_products.head(2)

Unnamed: 0,product_id,color,size,tags
0,707,black,M,"[commuter, new]"
1,708,red,L,[sale]


In [66]:
df_products.columns  = df_products.columns.str.strip().str.lower()
df_customers.columns = df_customers.columns.str.strip().str.lower()

df_products["product_id"]  = pd.to_numeric(df_products["product_id"], errors="coerce").astype("Int64")
df_customers["customer_id"] = pd.to_numeric(df_customers["customer_id"], errors="coerce").astype("Int64")

df_customers["customer_type"] = df_customers["customer_type"].astype(str).str.title()
if "tags" in df_products.columns:
    df_products["tags_json"] = df_products["tags"].apply(
        lambda v: json.dumps(v if isinstance(v, list) else [])
    )
else:
    df_products["tags_json"] = "[]"
prod_stage = df_products[["product_id","color","size","tags_json"]].copy()
cust_stage = df_customers[["customer_id","account_number","customer_type"]].copy()

set_dataframe(prod_stage, "stg_dim_product_enrich", "product_id", "insert", **mysql_args)
set_dataframe(cust_stage, "stg_dim_customer_enrich", "customer_id", "insert", **mysql_args)

In [67]:
from sqlalchemy import create_engine, text

engine = create_engine(
    f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}/{mysql_args['dbname']}",
    pool_recycle=3600,
)

with engine.begin() as con:
    exists = con.execute(text("""
        SELECT COUNT(*) 
        FROM information_schema.columns
        WHERE table_schema = :schema
          AND table_name   = 'dim_product'
          AND column_name  = 'tags_json'
    """), {"schema": mysql_args["dbname"]}).scalar()

    if exists == 0:
        try:
            con.execute(text("ALTER TABLE dim_product ADD COLUMN tags_json JSON NULL"))
            coltype = "JSON"
        except Exception:
            con.execute(text("ALTER TABLE dim_product ADD COLUMN tags_json LONGTEXT NULL"))
            coltype = "TEXT"
    else:
        coltype = con.execute(text("""
            SELECT DATA_TYPE
            FROM information_schema.columns
            WHERE table_schema = :schema
              AND table_name   = 'dim_product'
              AND column_name  = 'tags_json'
        """), {"schema": mysql_args["dbname"]}).scalar().upper()

    con.execute(text("""
        UPDATE dim_product dp
        JOIN stg_dim_product_enrich s USING (product_id)
        SET dp.color = NULLIF(s.color,''),
            dp.size  = NULLIF(s.size,'')
    """))

    if coltype == "JSON":
        con.execute(text("""
            UPDATE dim_product dp
            JOIN stg_dim_product_enrich s USING (product_id)
            SET dp.tags_json = CASE
                WHEN dp.tags_json IS NULL THEN CAST(s.tags_json AS JSON)
                ELSE JSON_MERGE_PRESERVE(dp.tags_json, CAST(s.tags_json AS JSON))
            END
        """))
    else:
        con.execute(text("""
            UPDATE dim_product dp
            JOIN stg_dim_product_enrich s USING (product_id)
            SET dp.tags_json = s.tags_json
        """))

    con.execute(text("""
        UPDATE dim_customer dc
        JOIN stg_dim_customer_enrich s USING (customer_id)
        SET dc.account_number = NULLIF(s.account_number,''),
            dc.customer_type  = CASE 
                WHEN s.customer_type IS NULL OR s.customer_type = '' THEN 'Unknown'
                ELSE s.customer_type END
    """))
