### Setting up this document

In [1]:
#importing libraries
import os
import json
import numpy
import datetime

import pandas as pd

import pymongo
from sqlalchemy import create_engine

In [2]:
#Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which I'll be Working 

host_name = "localhost"
ports = {"mongo" : 27017, "mysql" : 3306}

user_id = "root"
pwd = "Suchottv20!"

#atlas_cluster_name = "ds2002"
#atlas_default_dbname = "sample_airbnb"
#atlas_user_name = "tytus30"
#atlas_password = "Suchottv20"

atlas_cluster_name = "sandbox"
atlas_default_dbname = "sample_airbnb"
atlas_user_name = "m001-student"
atlas_password = "m001-mongodb-basics"

conn_str = {"local" : f"mongodb://{host_name}:{ports['mongo']}/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.zibbf.mongodb.net/{atlas_default_dbname}?retryWrites=true&w=majority"
}


#conn_str = {"local" : f"mongodb://{host_name}:{ports['mongo']}/",
#    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.hrgo4u5.mongodb.net/{atlas_default_dbname}?retryWrites=true&w=majority"
#}

#mongodb+srv://tytus30:Suchottv20@ds2002.hrgo4u5.mongodb.net/test

src_dbname = "chinook"
dst_dbname = "chinook_dw"

In [3]:
# Define Functions for Getting Data From and Setting Data Into Databases

def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def get_mongo_dataframe_local(user_id, pwd, host_name, port, db_name, collection, query):
    '''Create a connection to MongoDB, with or without authentication credentials'''
    if user_id and pwd:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db_name)
        client = pymongo.MongoClient(mongo_uri)
    else:
        conn_str = f"mongodb://{host_name}:{port}/"
        client = pymongo.MongoClient(conn_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [7]:
# Populate MongoDB with Source Data

# RUN THIS CODE ONLY ONCE

client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"customer" : 'chinook_customer_table.json',
             "employee" : 'chinook_employee_table.json',
             "invoice" : 'chinook_invoice_table.json',
             "invoiceline" : 'chinook_invoiceline_table.json'}

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")


        
client.close()        

### Creating and populating the dimension tables
#### Extract Data from the Source MongoDB Collections Into DataFrames

In [11]:
query = {}
collection = "customer"

df_customer = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_customer.head(2)

Unnamed: 0,CustomerId,FirstName,LastName,Company,Address,City,State,Country,PostalCode,Phone,Fax,Email,SupportRepId
0,1,LuÃ­s,GonÃ§alves,Embraer - Empresa Brasileira de AeronÃ¡utica S.A.,"Av. Brigadeiro Faria Lima, 2170",SÃ£o JosÃ© dos Campos,SP,Brazil,12227-000,+55 (12) 3923-5555,+55 (12) 3923-5566,luisg@embraer.com.br,3
1,2,Leonie,KÃ¶hler,,Theodor-Heuss-StraÃŸe 34,Stuttgart,,Germany,70174,+49 0711 2842222,,leonekohler@surfeu.de,5


In [12]:
query = {}
collection = "employee"

df_employee = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_employee.head(2)

Unnamed: 0,EmployeeId,LastName,FirstName,Title,ReportsTo,BirthDate,HireDate,Address,City,State,Country,PostalCode,Phone,Fax,Email
0,1,Adams,Andrew,General Manager,,1962-02-18 00:00:00,2002-08-14 00:00:00,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com
1,2,Edwards,Nancy,Sales Manager,1.0,1958-12-08 00:00:00,2002-05-01 00:00:00,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com


#### Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [14]:
dataframe = df_customer
table_name = 'dim_customer'
primary_key = 'CustomerId'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [15]:
dataframe = df_employee
table_name = 'dim_employee'
primary_key = 'EmployeeId'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Validate that the New Dimension Tables were Created.

In [17]:
sql_customer = "SELECT * FROM Chinook_dw.dim_customer;"
df_dim_customer = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_customer)
df_dim_customer.head(2)

Unnamed: 0,CustomerId,FirstName,LastName,Company,Address,City,State,Country,PostalCode,Phone,Fax,Email,SupportRepId
0,1,LuÃ­s,GonÃ§alves,Embraer - Empresa Brasileira de AeronÃ¡utica S.A.,"Av. Brigadeiro Faria Lima, 2170",SÃ£o JosÃ© dos Campos,SP,Brazil,12227-000,+55 (12) 3923-5555,+55 (12) 3923-5566,luisg@embraer.com.br,3
1,2,Leonie,KÃ¶hler,,Theodor-Heuss-StraÃŸe 34,Stuttgart,,Germany,70174,+49 0711 2842222,,leonekohler@surfeu.de,5


In [18]:
sql_employee = "SELECT * FROM Chinook_dw.dim_employee;"
df_dim_employee = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_employee)
df_dim_employee.head(2)

Unnamed: 0,EmployeeId,LastName,FirstName,Title,ReportsTo,BirthDate,HireDate,Address,City,State,Country,PostalCode,Phone,Fax,Email
0,1,Adams,Andrew,General Manager,,1962-02-18 00:00:00,2002-08-14 00:00:00,11120 Jasper Ave NW,Edmonton,AB,Canada,T5K 2N1,+1 (780) 428-9482,+1 (780) 428-3457,andrew@chinookcorp.com
1,2,Edwards,Nancy,Sales Manager,1.0,1958-12-08 00:00:00,2002-05-01 00:00:00,825 8 Ave SW,Calgary,AB,Canada,T2P 2T3,+1 (403) 262-3443,+1 (403) 262-3322,nancy@chinookcorp.com


### Create and Populate the New Fact Tables
#### Extract Data from the Source MongoDB Collections Into DataFrames

In [22]:
query = {} # Select all elements (columns), and all documents (rows).

collection = "invoice"

df_invoice = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_invoice.head(2)

Unnamed: 0,InvoiceId,CustomerId,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total
0,1,2,2009-01-01 00:00:00,Theodor-Heuss-StraÃŸe 34,Stuttgart,,Germany,70174,1.98
1,2,4,2009-01-02 00:00:00,UllevÃ¥lsveien 14,Oslo,,Norway,171,3.96


In [23]:
query = {} # Select all elements (columns), and all documents (rows).

collection = "invoiceline"

df_invoiceline = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_invoiceline.head(2)

Unnamed: 0,InvoiceLineId,InvoiceId,TrackId,UnitPrice,Quantity
0,1,1,2,0.99,1
1,2,1,4,0.99,1


#### Load Newly Transformed MongoDB Data into the Northwind_DW2 Data Warehouse

In [24]:
dataframe = df_invoice
table_name = 'fact_invoice'
primary_key = 'InvoiceId'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [25]:
dataframe = df_invoiceline
table_name = 'fact_invoiceline'
primary_key = 'InvoiceLineId'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Validate that the New Fact Tables were Created

In [29]:
sql_invoice = "SELECT * FROM Chinook_dw.fact_invoice;"
df_fact_invoice = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_invoice)
df_fact_invoice.head(2)

Unnamed: 0,InvoiceId,CustomerId,InvoiceDate,BillingAddress,BillingCity,BillingState,BillingCountry,BillingPostalCode,Total
0,1,2,2009-01-01 00:00:00,Theodor-Heuss-StraÃŸe 34,Stuttgart,,Germany,70174,1.98
1,2,4,2009-01-02 00:00:00,UllevÃ¥lsveien 14,Oslo,,Norway,171,3.96


In [30]:
sql_invoiceline = "SELECT * FROM Chinook_dw.fact_invoiceline;"
df_fact_invoiceline = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_invoiceline)
df_fact_invoiceline.head(2)

Unnamed: 0,InvoiceLineId,InvoiceId,TrackId,UnitPrice,Quantity
0,1,1,2,0.99,1
1,2,1,4,0.99,1
