Import Libraries

In [1]:
import os
import numpy
import json
import pandas as pd
import mysql.connector
import pymongo
from sqlalchemy import create_engine

Assign Connection Variables for SQL

In [2]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

dst_dbname = "coffee_bean_sales"
src_dbname = "coffee_bean_src"

Assign Connection Variables for Mongo DB

In [3]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"
mysql_hostname = "localhost"

atlas_cluster_name = "cluster0.nzdrkqt"
atlas_user_name = "kda7me"
atlas_password = "Moki1234"

conn_str_mongo = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname_mongo = "coffee_bean_src"
dst_dbname = "coffee_bean_sales"

print(f"Local Connection String: {conn_str_mongo['local']}")
print(f"Atlas Connection String: {conn_str_mongo['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://kda7me:Moki1234@cluster0.nzdrkqt.mongodb.net


In [4]:
##define functions for setting database

def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str_sql = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str_sql, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str_sql = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str_sql, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [5]:
conn_str_sql = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str_sql, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x1bb9eb87990>

Populate Mongo DB with source data

In [6]:
client = pymongo.MongoClient(conn_str_mongo["atlas"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"orders" : 'coffee_bean_orders.json',
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")
        
client.close()   

Get data From Mongo DB for orders dataframe

In [7]:
query = {} # Select all elements (columns), and all documents (rows).
collection = "orders"

df_orders = get_mongo_dataframe(conn_str_mongo['atlas'], src_dbname, collection, query)  # Specify 'atlas', or 'local'
df_orders.head(2)

Unnamed: 0,Order ID,Order Date,Customer ID,Product ID,Quantity,Customer Name,Email,Country,Coffee Type,Roast Type,Size,Unit Price,Sales
0,QEV-37451-860,9/5/19,17670-51384-MA,R-M-1,2,,,,,,,,
1,QEV-37451-860,9/5/19,17670-51384-MA,E-M-0.5,5,,,,,,,,


In [8]:
##extract data from csv files

df_customers = pd.read_csv("coffee_bean_customers.csv")
df_products = pd.read_csv("coffee_bean_products.csv")

df_customers.head(2)

Unnamed: 0,Customer ID,Customer Name,Email,Phone Number,Address Line 1,City,Country,Postcode,Loyalty Card
0,17670-51384-MA,Aloisia Allner,aallner0@lulu.com,+1 (862) 817-0124,57999 Pepper Wood Alley,Paterson,United States,7505,Yes
1,73342-18763-UW,Piotr Bote,pbote1@yelp.com,+353 (913) 396-4653,2112 Ridgeway Hill,Crumlin,Ireland,D6W,No


In [9]:
df_products.head(2)

Unnamed: 0,Product ID,Coffee Type,Roast Type,Size,Unit Price,Price per 100g,Profit
0,A-L-0.2,Ara,L,0.2,3.885,1.9425,0.34965
1,A-L-0.5,Ara,L,0.5,7.77,1.554,0.6993


Extract Dim Date Table from SQL

In [10]:
sql_dim_date = "SELECT date_key, full_date FROM coffee_bean_src.dim_date"
df_dim_date = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64')
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


Transform

In [11]:
##customers

df_customers = df_customers.rename(columns={"Customer ID":"customer_key","Customer Name": "customer_name", 
                                            "Phone Number": "phone_number", "Address Line 1":"street_address"})

df_customers.head(2)

Unnamed: 0,customer_key,customer_name,Email,phone_number,street_address,City,Country,Postcode,Loyalty Card
0,17670-51384-MA,Aloisia Allner,aallner0@lulu.com,+1 (862) 817-0124,57999 Pepper Wood Alley,Paterson,United States,7505,Yes
1,73342-18763-UW,Piotr Bote,pbote1@yelp.com,+353 (913) 396-4653,2112 Ridgeway Hill,Crumlin,Ireland,D6W,No


In [12]:
##orders

df_orders = df_orders.rename(columns={"Order ID": "order_key", "Customer ID": "customer_key", "Product ID": "product_key",
                                     "Order Date": "order_date", "Unit Price": "unit_price"})
cols_to_drop = ['Email', 'Customer Name']
df_orders.drop(cols_to_drop, axis = 1, inplace = True)

df_orders["order_date"] = df_orders["order_date"].astype('datetime64')

reordered_cols = ["order_key", "customer_key", "product_key", "order_date", "Quantity", "Country", "Coffee Type", "Roast Type",
                 "Size", "unit_price", "Sales"]
df_orders = df_orders[reordered_cols]
df_orders.insert(0, "fact_order_key", range(1,df_orders.shape[0]+1))

df_orders.head(2)

Unnamed: 0,fact_order_key,order_key,customer_key,product_key,order_date,Quantity,Country,Coffee Type,Roast Type,Size,unit_price,Sales
0,1,QEV-37451-860,17670-51384-MA,R-M-1,2019-09-05,2,,,,,,
1,2,QEV-37451-860,17670-51384-MA,E-M-0.5,2019-09-05,5,,,,,,


In [13]:
##products

df_products = df_products.rename(columns={"Product ID":"product_key", "Unit Price": "unit_price"})

df_products.head(2)

Unnamed: 0,product_key,Coffee Type,Roast Type,Size,unit_price,Price per 100g,Profit
0,A-L-0.2,Ara,L,0.2,3.885,1.9425,0.34965
1,A-L-0.5,Ara,L,0.5,7.77,1.554,0.6993


In [14]:
##Write Data Frames Back to Database

db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key(255)'),
          ('dim_products', df_products, 'product_key(255)'),
         ('fact_orders', df_orders, 'fact_order_key')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

Verify that data was inserted

In [15]:
sql_test = """

SELECT customer_name AS `Customer Name`,
    SUM(quantity) AS `Total Quantity`,
    SUM(products.unit_price) AS 'Total Unit Price'
    
    
FROM `{0}`.`fact_orders` AS orders
INNER JOIN `{0}`.dim_customers AS customers
ON orders.customer_key = customers.customer_key
INNER JOIN `{0}`.dim_products AS products
ON products.product_key = orders.product_key
GROUP BY customers.`customer_name`
ORDER BY `Total Quantity` DESC;


""".format(dst_dbname)

df_test = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_test)
df_test.head()

Unnamed: 0,Customer Name,Total Quantity,Total Unit Price
0,Terri Farra,22.0,50.775
1,Nealson Cuttler,18.0,91.74
2,Jimmy Dymoke,18.0,42.78
3,Adrian Swaine,17.0,19.065
4,Marja Urion,17.0,43.405
