# DS 2002 Midterm Project by Tatiana Soto-Montero
The project used the MySQL Sample Database (Link: https://www.mysqltutorial.org/getting-started-with-mysql/mysql-sample-database/).

#### Imported necessary libraries

In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [2]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.30
Running PyMongo Version: 4.10.1


#### Declare and assign connection variables for the MYSQL Server, the MongoDB Server, and Databases that will be used

In [4]:
uid = "root"
pwd = "Treat137lol!*"
hostname = "localhost"
src_dbname = "company"
dbname = "company_dw2"

mongodb_args = {
    "user_name" : "tatianaasoto17",
    "password" : "Aut4mn31",
    "cluster_name" : "Lab04",
    "cluster_subnet" : "iopdr",
    "cluster_location" : "atlas", # "local"
    "db_name" : "project"
}

#### Define Functions for Getting Data and Setting Data into databases

In [6]:

def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()
    
def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### Create the New Data Warehouse Database and switch the connection context to use it

In [8]:
conn_str = f"mysql+pymysql://{uid}:{pwd}@{hostname}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{dbname}`;"))
connection.execute(text(f"CREATE DATABASE `{dbname}`;"))
connection.execute(text(f"USE {dbname};"))

connection.close()

#### Populate MongoDB with source data

In [10]:
client = get_mongo_client(**mongodb_args)
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"products" : 'products_company.json'
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)         

## 1. Create and Populate the Dimention Tables

#### 1.1 Extract Data from Source MySQL Database Tables
Get data from customers table from database company.

In [13]:
sql_customers = "SELECT * FROM company.customers;"
df_customers = get_dataframe(uid, pwd, hostname, src_dbname, sql_customers)
df_customers.head(5)

Unnamed: 0,customerNumber,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
0,103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France,1370.0,21000.0
1,112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA,1166.0,71800.0
2,114,"Australian Collectors, Co.",Ferguson,Peter,03 9520 4555,636 St Kilda Road,Level 3,Melbourne,Victoria,3004,Australia,1611.0,117300.0
3,119,La Rochelle Gifts,Labrune,Janine,40.67.8555,"67, rue des Cinquante Otages",,Nantes,,44000,France,1370.0,118200.0
4,121,Baane Mini Imports,Bergulfsen,Jonas,07-98 9555,Erling Skakkes gate 78,,Stavern,,4110,Norway,1504.0,81700.0


#### 1.2 Extract Data from Source MongoDB Collections in Dataframes
Connects to MongoDB, then queries the products collection and loads the data into a Pandas DataFrame.

In [15]:
mongo_client = get_mongo_client(**mongodb_args)
query = {}
collection = "products"
df_products = get_mongo_dataframe(mongo_client, mongodb_args["db_name"], collection, query)
df_products.head(2)

Unnamed: 0,productLine,textDescription,productCode,productName,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
0,Classic Cars,Attention car enthusiasts: Make your wildest c...,S10_1949,1952 Alpine Renault 1300,1:10,Classic Metal Creations,Turnable front wheels; steering function; deta...,7305,98.58,214.3
1,Classic Cars,Attention car enthusiasts: Make your wildest c...,S10_4757,1972 Alfa Romeo GTA,1:10,Motor City Art Classics,Features include: Turnable front wheels; steer...,3252,85.68,136.0


#### 1.3 Extract Data from CSV file (local source)
Reads form a csv file located in data folder, which loads the content into a Pandas DataFrame.

In [17]:
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'company_employees.csv')
df_employees = pd.read_csv(data_file, header=0, index_col=0)
df_employees.head(2)

Unnamed: 0_level_0,lastName,firstName,email,jobTitle,officeCity,officeCountry
employeeNumber,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1002,Murphy,Diane,dmurphy@classicmodelcars.com,President,San Francisco,USA
1056,Patterson,Mary,mpatterso@classicmodelcars.com,VP Sales,San Francisco,USA


#### 1.4 Perform Transformation Necessary
Drop unnecessary columns for each of the dataframes.

##### Transformation for df_customers. Dropped irrelevant contents and renamed customerNumber to custer_key.

In [24]:
drop_cols = ["contactFirstName", "contactLastName", "state", "salesRepEmployeeNumber"]
df_customers.drop(drop_cols, axis=1, inplace=True)
df_customers.rename(columns={"customerNumber":"customer_key"}, inplace=True)
df_customers.head(2)

Unnamed: 0,customer_key,customerName,phone,addressLine1,addressLine2,city,postalCode,country,creditLimit
0,103,Atelier graphique,40.32.2555,"54, rue Royale",,Nantes,44000,France,21000.0
1,112,Signal Gift Stores,7025551838,8489 Strong St.,,Las Vegas,83030,USA,71800.0


##### Transformation for df_products. Dropped irrelevant contents; unfortunately wasn't able to rename productCode to product_key because there seems to be a wrong translation in MongoDB. It seemed the column was translated as a TEXT/BLOB, which prevent the renaming. However, I created a new column for product_key and decided to keep product_code since it had relevant information.

In [27]:
drop_cols = ['textDescription', 'productScale', 'productVendor']
df_products.drop(drop_cols, axis=1, inplace=True)
df_products['product_key'] = range(1, len(df_products) + 1)
cols = ['product_key'] + [col for col in df_products.columns if col != 'product_key']
df_products = df_products[cols]
df_products.head(2)

Unnamed: 0,product_key,productLine,productCode,productName,productDescription,quantityInStock,buyPrice,MSRP
0,1,Classic Cars,S10_1949,1952 Alpine Renault 1300,Turnable front wheels; steering function; deta...,7305,98.58,214.3
1,2,Classic Cars,S10_4757,1972 Alfa Romeo GTA,Features include: Turnable front wheels; steer...,3252,85.68,136.0


##### Transformation for df_employees. Dropped irrelevant contents and had to reset index because I had an issue renaming employeeNumber which was an issue I was having with the csv file. However, I was able to reset and rename employeeNumber to employee_key.

In [29]:
drop_cols = ['email', 'officeCountry']
df_employees.drop(drop_cols, axis=1, inplace=True)
df_employees.reset_index(inplace=True)
df_employees.rename(columns={"employeeNumber": "employee_key"}, inplace=True)
df_employees.head(2)

Unnamed: 0,employee_key,lastName,firstName,jobTitle,officeCity
0,1002,Murphy,Diane,President,San Francisco
1,1056,Patterson,Mary,VP Sales,San Francisco


#### 1.5 Load the transformed dataframes into the new data warehouse using new created tables

In [31]:
db_operation = "insert"

tables = [('dim_customer', df_customers, 'customer_key'),
          ('dim_product', df_products, 'product_key'),
          ('dim_employee', df_employees, 'employee_key')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(uid, pwd, hostname, dbname, dataframe, table_name, primary_key, db_operation)

#### 1.6 Validate that the dim tables were created (dim_customer, dim_product, dim_employee)

In [33]:
sql_customers = "SELECT * FROM company_dw2.dim_customer;"
df_dim_customer = get_dataframe(uid, pwd, hostname, dbname, sql_customers)
df_dim_customer.head(2)

Unnamed: 0,customer_key,customerName,phone,addressLine1,addressLine2,city,postalCode,country,creditLimit
0,103,Atelier graphique,40.32.2555,"54, rue Royale",,Nantes,44000,France,21000.0
1,112,Signal Gift Stores,7025551838,8489 Strong St.,,Las Vegas,83030,USA,71800.0


In [34]:
sql_products = "SELECT * FROM company_dw2.dim_product;"
df_dim_product = get_dataframe(uid, pwd, hostname, dbname, sql_products)
df_dim_product.head(2)

Unnamed: 0,product_key,productLine,productCode,productName,productDescription,quantityInStock,buyPrice,MSRP
0,1,Classic Cars,S10_1949,1952 Alpine Renault 1300,Turnable front wheels; steering function; deta...,7305,98.58,214.3
1,2,Classic Cars,S10_4757,1972 Alfa Romeo GTA,Features include: Turnable front wheels; steer...,3252,85.68,136.0


In [35]:
sql_employees = "SELECT * FROM company_dw2.dim_employee;"
df_dim_employee = get_dataframe(uid, pwd, hostname, dbname, sql_employees)
df_dim_employee.head(2)

Unnamed: 0,employee_key,lastName,firstName,jobTitle,officeCity
0,1002,Murphy,Diane,President,San Francisco
1,1056,Patterson,Mary,VP Sales,San Francisco


## 2. Create and Populate the Fact Table

#### 2.1 Get data from each of the tables: orders (orderDate, requiredDate, shippedDate) and orderdetails

In [38]:
sql_orders = "SELECT * FROM company.orders;"
df_orders = get_dataframe(uid, pwd, hostname, dbname, sql_orders)
df_orders['orderDate'] = df_orders['orderDate'].astype('datetime64[ns]').dt.date
df_orders['requiredDate'] = df_orders['requiredDate'].astype('datetime64[ns]').dt.date
df_orders['shippedDate'] = df_orders['shippedDate'].astype('datetime64[ns]').dt.date
df_orders.head(2)

Unnamed: 0,orderNumber,orderDate,requiredDate,shippedDate,status,comments,customerNumber
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363
1,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,Check on availability.,128


In [39]:
sql_ods = "SELECT * FROM company.orderdetails;"
df_ods = get_dataframe(uid, pwd, hostname, dbname, sql_ods)
df_ods.head(2)

Unnamed: 0,orderNumber,productCode,quantityOrdered,priceEach,orderLineNumber
0,10100,S18_1749,30,136.0,3
1,10100,S18_2248,50,55.09,2


#### 2.2 Merge orders and orderdetails

In [43]:
df_fact_order = pd.merge(df_orders, df_ods, on='orderNumber', how='left')
df_fact_order.head(2)

Unnamed: 0,orderNumber,orderDate,requiredDate,shippedDate,status,comments,customerNumber,productCode,quantityOrdered,priceEach,orderLineNumber
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363,S18_1749,30,136.0,3
1,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363,S18_2248,50,55.09,2


#### 2.3 Get data from the date_dim table
Please now run company_dim_date in MySQL before running the cell below.

In [57]:
sql_date = "SELECT date_key, full_date FROM company_dw2.dim_date;"
df_dim_date = get_dataframe(uid, pwd, hostname, dbname, sql_date)
df_dim_date['full_date'] = pd.to_datetime(df_dim_date['full_date']).dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


#### 2.4 Lookup the DateKeys from the date_dim table and merge into df_fact_orders (orderDate, requiredDate, and shippedDate)

In [60]:
df_dim_order_date = df_dim_date.rename(columns={"date_key": "order_date_key", "full_date": "orderDate"})
df_fact_orders = pd.merge(df_fact_order, df_dim_order_date, on="orderDate", how="left")
df_fact_orders.drop(["orderDate"], axis=1, inplace=True)
df_fact_orders.head()

Unnamed: 0,orderNumber,requiredDate,shippedDate,status,comments,customerNumber,productCode,quantityOrdered,priceEach,orderLineNumber,order_date_key
0,10100,2003-01-13,2003-01-10,Shipped,,363,S18_1749,30,136.0,3,20030106
1,10100,2003-01-13,2003-01-10,Shipped,,363,S18_2248,50,55.09,2,20030106
2,10100,2003-01-13,2003-01-10,Shipped,,363,S18_4409,22,75.46,4,20030106
3,10100,2003-01-13,2003-01-10,Shipped,,363,S24_3969,49,35.29,1,20030106
4,10101,2003-01-18,2003-01-11,Shipped,Check on availability.,128,S18_2325,25,108.06,4,20030109


In [62]:
df_dim_required_date = df_dim_date.rename(columns={"date_key": "required_date_key", "full_date": "requiredDate"})
df_fact_orders = pd.merge(df_fact_orders, df_dim_required_date, on="requiredDate", how="left")
df_fact_orders.drop(["requiredDate"], axis=1, inplace=True)
df_fact_orders.head()

Unnamed: 0,orderNumber,shippedDate,status,comments,customerNumber,productCode,quantityOrdered,priceEach,orderLineNumber,order_date_key,required_date_key
0,10100,2003-01-10,Shipped,,363,S18_1749,30,136.0,3,20030106,20030113
1,10100,2003-01-10,Shipped,,363,S18_2248,50,55.09,2,20030106,20030113
2,10100,2003-01-10,Shipped,,363,S18_4409,22,75.46,4,20030106,20030113
3,10100,2003-01-10,Shipped,,363,S24_3969,49,35.29,1,20030106,20030113
4,10101,2003-01-11,Shipped,Check on availability.,128,S18_2325,25,108.06,4,20030109,20030118


In [64]:
df_dim_shipped_date = df_dim_date.rename(columns={"date_key": "shipped_date_key", "full_date": "shippedDate"})
df_fact_orders = pd.merge(df_fact_orders, df_dim_shipped_date, on="shippedDate", how="left")
df_fact_orders.drop(["shippedDate"], axis=1, inplace=True)
df_fact_orders.head()

Unnamed: 0,orderNumber,status,comments,customerNumber,productCode,quantityOrdered,priceEach,orderLineNumber,order_date_key,required_date_key,shipped_date_key
0,10100,Shipped,,363,S18_1749,30,136.0,3,20030106,20030113,20030110.0
1,10100,Shipped,,363,S18_2248,50,55.09,2,20030106,20030113,20030110.0
2,10100,Shipped,,363,S18_4409,22,75.46,4,20030106,20030113,20030110.0
3,10100,Shipped,,363,S24_3969,49,35.29,1,20030106,20030113,20030110.0
4,10101,Shipped,Check on availability.,128,S18_2325,25,108.06,4,20030109,20030118,20030111.0


#### 2.5 Perform addition transformations

Drop columns with no relevance for the fact table. Then, rename the foreign key column from id to key and insert a new column that will be the primary key. Same problem with product_key here too, so I kept them separate.

In [67]:
drop_columns = ['comments', 'status']
df_fact_orders.drop(drop_columns, axis=1, inplace=True)

# Rename columns, but leave productCode intact
df_fact_orders.rename(columns={
    'customerNumber': 'customer_key',
    'orderNumber': 'order_key',
}, inplace=True)
df_fact_orders['product_key'] = range(1, len(df_fact_orders) + 1)
df_fact_orders.insert(0, "fact_order_key", range(1, df_fact_order.shape[0]+1))
df_fact_orders.head()

Unnamed: 0,fact_order_key,order_key,customer_key,productCode,quantityOrdered,priceEach,orderLineNumber,order_date_key,required_date_key,shipped_date_key,product_key
0,1,10100,363,S18_1749,30,136.0,3,20030106,20030113,20030110.0,1
1,2,10100,363,S18_2248,50,55.09,2,20030106,20030113,20030110.0,2
2,3,10100,363,S18_4409,22,75.46,4,20030106,20030113,20030110.0,3
3,4,10100,363,S24_3969,49,35.29,1,20030106,20030113,20030110.0,4
4,5,10101,128,S18_2325,25,108.06,4,20030109,20030118,20030111.0,5


#### 2.6 Write the Dataframe back to the database

In [70]:
table_name = "fact_order"
primary_key = "fact_order_key"
db_operation = "insert"

set_dataframe(uid, pwd, hostname, dbname, df_fact_orders, table_name, primary_key, db_operation)

## 3. Validate that the new fact table was created

#### 3.1 Validate from dim_products and fact_order
Find the total number of units ordered (total_quantity), find the total sales revenue generated (total_sales), find the average price of the products in line (avg_price), and find the min and max price of products in the line (min_price and max_price).

In [74]:
sql_test = """
SELECT 
    dp.productLine,
    SUM(fo.quantityOrdered) AS total_quantity,
    SUM(fo.quantityOrdered * fo.priceEach) AS total_sales,
    AVG(fo.priceEach) AS avg_price,
    MIN(fo.priceEach) AS min_price,
    MAX(fo.priceEach) AS max_price
FROM 
    fact_order AS fo
LEFT JOIN 
    dim_product AS dp ON fo.product_key = dp.product_key
GROUP BY 
    dp.productLine;
""".format(dbname)

# Run the query and load the result into a DataFrame
df_test = get_dataframe(uid, pwd, hostname, dbname, sql_test)


In [76]:
df_test.head()

Unnamed: 0,productLine,total_quantity,total_sales,avg_price,min_price,max_price
0,Classic Cars,1325.0,114986.85,92.407105,30.41,214.3
1,Motorcycles,491.0,48766.82,99.752308,53.31,205.72
2,Planes,422.0,34261.19,81.859167,44.77,134.04
3,Ships,339.0,24788.93,75.69,35.78,113.9
4,Trains,96.0,11222.76,119.856667,81.35,172.36


#### 3.2 Validate from dim_customers and fact_order
Find the number of orders each customer has place (total_orders) and find the total number of revenue generated (total_sales).

In [79]:
sql_test = """SELECT 
    c.customerName,
    COUNT(fo.order_key) AS total_orders,
    SUM(fo.quantityOrdered * fo.priceEach) AS total_sales
FROM 
    fact_order AS fo
JOIN 
    dim_customer AS c ON fo.customer_key = c.customer_key
GROUP BY 
    c.customerName
ORDER BY 
    total_sales DESC;
""".format(dbname)
df_test = get_dataframe(uid, pwd, hostname, dbname, sql_test)

In [81]:
df_test.head()

Unnamed: 0,customerName,total_orders,total_sales
0,Euro+ Shopping Channel,259,820689.54
1,Mini Gifts Distributors Ltd.,180,591827.34
2,"Australian Collectors, Co.",55,180585.07
3,Muscle Machine Inc,48,177913.95
4,La Rochelle Gifts,53,158573.12
