# Import the Necessary Libraries

In [119]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

In [120]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 1.4.39
Running PyMongo Version: 4.6.3


#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working 


In [121]:
mysql_args = {
    "uid" : "root",
    "pwd" : "Passw0rd123",
    "hostname" : "localhost",
    "dbname" : "adventureworks"
}

# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "",
    "password" : "password",
    "cluster_name" : "cluster_name",
    "cluster_subnet" : "xxxxx",
    "cluster_location" : "local",
    "db_name" : "adventureworks"
}

#### Declare & Assign Connection Variables for the MySQL Server & Databases with which You'll be Working 

In [122]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "adventureworks"
dst_dbname = "adventureworks_dw"

#### Create the New Data Warehouse database, and to Use it, Switch the Connection Context.

In [123]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(f"CREATE DATABASE IF NOT EXISTS`{dst_dbname}`;")
connection.execute(f"USE {dst_dbname};")

connection.close()

#### Define Functions for Getting Data From and Setting Data Into Databases

In [124]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe
    


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### Populate MongoDB with Source Data


In [125]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"customers" : 'adventureworks_customer.json',
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)         

### 1.0. Create and Populate the New Dimension Tables
#### 1.1. Extract Data from the Source MongoDB Collections Into DataFrames

In [126]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "customers"

df_customers = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customers.head(2)

Unnamed: 0,CustomerID,TerritoryID,AccountNumber,CustomerType,ModifiedDate
0,1,1,AW00000001,S,2004-10-13 11:15:07
1,2,1,AW00000002,S,2004-10-13 11:15:07


In [127]:
adventureworks_sales = "SELECT * FROM adventureworks.salesorderdetail;"
df_sales = get_sql_dataframe(adventureworks_sales, **mysql_args)
df_sales.head(2)

Unnamed: 0,SalesOrderID,SalesOrderDetailID,CarrierTrackingNumber,OrderQty,ProductID,SpecialOfferID,UnitPrice,UnitPriceDiscount,LineTotal,rowguid,ModifiedDate
0,43659,1,4911-403C-98,1,776,1,2024.994,0.0,2024.994,"b'm\xc9\x07\xb2\xe6\xd9+@\x84p,\xc1v\xc4""\x83'",2001-07-01
1,43659,2,4911-403C-98,3,777,1,2024.994,0.0,6074.982,"b'\r`\xbbzw\x1e\xbeA\x9f\xe5\xb9\x14,\xfc\x08\...",2001-07-01


In [128]:
query = "SELECT * FROM adventureworks_dw.dim_date;"
df_dim_date = get_sql_dataframe(query, **mysql_args)
df_dim_date.head(2)

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


#### Perform Any Necessary Transformations

In [129]:
df_customers.rename(columns={"CustomerID":"CustomerKey"}, inplace=True)
drop_cols = ['ModifiedDate']
df_customers.drop(drop_cols, axis=1, inplace=True)
df_customers.head(2)

Unnamed: 0,CustomerKey,TerritoryID,AccountNumber,CustomerType
0,1,1,AW00000001,S
1,2,1,AW00000002,S


In [130]:
df_sales.rename(columns={"SalesOrderDetailID":"SalesOrderKey"}, inplace=True)
drop_cols = ['rowguid','ModifiedDate']
df_sales.drop(drop_cols, axis=1, inplace=True)
df_sales.head(2)

Unnamed: 0,SalesOrderID,SalesOrderKey,CarrierTrackingNumber,OrderQty,ProductID,SpecialOfferID,UnitPrice,UnitPriceDiscount,LineTotal
0,43659,1,4911-403C-98,1,776,1,2024.994,0.0,2024.994
1,43659,2,4911-403C-98,3,777,1,2024.994,0.0,6074.982


#### 1.3. Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

Here we will call our **set_dataframe( )** function to create each dimension table. This function expects a number of parameters including the usual connection information (e.g., user_id, password, MySQL server name and database), the *table_name* we need to assign to the table, the *pandas DataFrame* we crafted to define & populate the table, the *name* of the column we wish to designate as the *primary_key* column, and finally, the database operation (insert or update). 

In [131]:
db_operation = "insert"

tables = [('dim_customers', df_customers, 'CustomerKey'),
          ('dim_sales', df_sales, 'SalesOrderKey'),
          ('dim_date', df_dim_date, 'date_key')]

In [132]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)