In [None]:
#Importing the necessary libraries
import os
import json
import numpy
import datetime
import certifi
import mysql.connector
import pymysql
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

In [None]:
#Loading Data from CSV1

data_dir = os.path.join(os.getcwd(), 'UnderwearData')
data_file = os.path.join(data_dir, 'suppliers.csv')

df_suppliers = pd.read_csv(data_file, header=0, index_col=0)
df_suppliers.head()

In [None]:
#Loading Data from CSV2

data_dir2 = os.path.join(os.getcwd(), 'UnderwearData')
data_file2 = os.path.join(data_dir, 'purchase_orders.csv')

df_purhcase_orders = pd.read_csv(data_file2, header=0, index_col=0)
df_purhcase_orders.head()

In [None]:
#Declaring and Assigning Connection Variables for the MongoDB Server, the mySQL server, & Databases 

mysql_args = {
    "uid" : "root",
    "pwd" : "Passw0rd123",
    "hostname" : "localhost",
    "dbname" : "UnderwearData",
    "dst_dbname" : "UnderwearData2"
}


# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "",
    "password" : "password",
    "cluster_name" : "cluster_underwearData",
    "cluster_subnet" : "123456",
    "cluster_location" : "local", # "local"
    "db_name" : "underwear_data"
}

In [None]:
#Defining functions for getting data from and setting data into databases

def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

In [None]:
#Create a new Data Warehouse databse and switching the connection context 

conn_str = f"mysql+pymysql://{"uid"}:{"pwd"}@{"hostname"}""
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
connection.execute(f"CREATE DATABASE `{dst_dbname}`;")
connection.execute(f"USE {dst_dbname};")

connection.close()

In [None]:
#Extracting and Cleaning data from dimension table from SQL database 
sql_products = "SELECT * FROM underweardata.dim_products;"
df_products = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_products)
df_products.drop(['Color'], axis = 1, inplace=True)
df_products.head(2)

In [None]:
#Extracting and Cleaning data from dimension table from SQL database
sql_inventory_transactions = "SELECT * FROM underweardata.dim_inventory_transactions;"
df_inventory_transactions = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_inventory_transactions)
df_inventory_transactions.drop(['MissingID','UnitPurchasePrice','QuantityMissing'], axis = 1, inplace =True)
df_inventory_transactions.head(2)

In [None]:
#Extracting data from date dimension table created in SQL database
sql_dim_date = "SELECT * FROM underweardata.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.head(2)

In [None]:
#Creating fact table 
df_fact_orders = pd.merge(df_purchase_orders, df_inventory_transactions, on = 'PurchaseOrderID', how = 'right')
df_fact_orders.head(2)

In [None]:
df_fact_orders.shape

In [None]:
#Extract the 'primary key' from suppliers dimension table
sql_dim_suppliers = "SELECT SupplierID FROM UnderwearData.suppliers;"
df_dim_suppliers = get_sql_dataframe(sql_dim_suppliers, **mysql_args)
df_dim_suppliers.head(2)

In [None]:
#Combining fact table with suppliers dimension table
df_fact_inventory = pd.merge(df_dim_suppliers, df_fact_orders, on = 'SupplierID', how = 'inner')
df_fact_inventory.head(2)

In [None]:
#Loading transformed dataframes into new datawarehouse
dataframe = df_products
table_name = 'dim_products'
primary_key = 'ProductID'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [None]:
#Loading transformed dataframes into new datawarehouse
dataframe = df_inventory_transactions
table_name = 'dim_inventory_transactions'
primary_key = 'TransactionID'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [None]:
#Inserting fact table back into SQL data warehouse 
dataframe = df_fact_inventory
table_name = 'fact_inventory_transactions'
primary_key = 'SupplierID'
db_operation = "insert"
set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [None]:
#SQL Query
sql_fact_inventory = """

SUM(products.PurchasePrice) AS 'Total Purchase Price' 
FROM UderwearData2.fact_inventory_transactions AS po 
INNER JOIN UnderwearData2.dim_products AS s
ON po.ProductID = s.ProductID
GROUP BY s.ProductID 

"""

In [None]:
#SQL Query
sql_fact_inventory = """

AVERAGE(products.PurchasePrice) AS 'Total Purchase Price' 
FROM UderwearData2.fact_inventory_transactions AS po 
INNER JOIN UnderwearData2.dim_products AS s
ON po.ProductID = s.ProductID
GROUP BY s.ProductID 

"""