In [1]:
import os
import numpy
import requests
import datetime
import json
import pandas as pd
from sqlalchemy import create_engine 
import pymongo

In [2]:
#Connect to MySQL
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila2"

In [3]:
#Define functions for getting and setting a data frame
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe

def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [4]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

In [5]:
#Get data frame for the MySQL table sakila.customer
sql_customer = "SELECT * FROM sakila.customer;"
df_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customer)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-14 23:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-14 23:57:20


In [6]:
#Drop unnecessary columns and rename id columns to key columns
drop_cols = ['create_date','last_update']
df_customer.drop(drop_cols, axis=1, inplace=True)
df_customer.rename(columns={"customer_id":"customer_key", "store_id":"store_key", "address_id":"address_key"}, inplace=True)

df_customer.head(2)

Unnamed: 0,customer_key,store_key,first_name,last_name,email,address_key,active
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1


In [7]:
#Insert dim_customer dataframe into sakila2
dataframe = df_customer
table_name = 'dim_customer'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [8]:
#Get data from the local file sakila_rental
df_rental = pd.read_csv("Downloads/sakila_rental.csv")
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 16:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 16:30:53


In [9]:
#Drop unnecessary columns and rename id columns to key columns
drop_cols = ['last_update', 'inventory_id', 'customer_id', 'staff_id']
df_rental.drop(drop_cols, axis=1, inplace=True)
df_rental.rename(columns={"rental_id":"rental_key"}, inplace=True)

df_rental.head(2)

Unnamed: 0,rental_key,rental_date,return_date
0,1,2005-05-24 22:53:30,2005-05-26 22:04:30
1,2,2005-05-24 22:54:33,2005-05-28 19:40:33


In [10]:
#Insert dim_rental data frame into sakila2
dataframe = df_rental
table_name = 'dim_rental'
primary_key = 'rental_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [11]:
#Establish credentials for accessing MongoDB
mysql_uid = "root"
mysql_pwd = "Passw0rd123"
mysql_hostname = "localhost"

atlas_cluster_name = "ds2002.kz1qzp7"
atlas_user_name = "zmm8xd"
atlas_password = "Ronaldo03"

conn_str = {"local" : f"mongodb://localhost:27017/",
            "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "sakila"
dst_dbname = "sakila2"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://zmm8xd:Ronaldo03@ds2002.kz1qzp7.mongodb.net


In [12]:
#Define getters and setter for data frame creation using MongoDB
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [13]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"payment" : 'sakila_payment.json'
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()     

In [14]:
# Get data frame for sakila_payment as a json file
query = {} # Select all elements (columns), and all documents (rows).
collection = "payment"

df_payment = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)  # Specify 'atlas', or 'local'
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 17:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 17:12:30


In [15]:
#Drop unnecessary columns and rename id columns to key columns
drop_cols = ['last_update']
df_payment.drop(drop_cols, axis=1, inplace=True)
df_payment.rename(columns={"payment_id":"payment_key", "customer_id":"customer_key", "staff_id":"staff_key",
                          "rental_id":"rental_key"}, inplace=True)
df_payment.head(2)

Unnamed: 0,payment_key,customer_key,staff_key,rental_key,amount,payment_date
0,1,1,1,76,2.99,2005-05-25 11:30:37
1,2,1,1,573,0.99,2005-05-28 10:35:23


In [16]:
#Insert dim_payment data frame into sakila2
dataframe = df_payment
table_name = 'dim_payment'
primary_key = 'payment_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [17]:
#Gets data from dim_customer
sql_customer = "SELECT * FROM sakila2.dim_customer;"
dim_customer = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_customer)
dim_customer.head(2)

Unnamed: 0,customer_key,store_key,first_name,last_name,email,address_key,active
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1


In [18]:
#Gets data from dim_payment
sql_payment = "SELECT * FROM sakila2.dim_payment;"
dim_payment = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_customer)
dim_payment.head(2)

Unnamed: 0,customer_key,store_key,first_name,last_name,email,address_key,active
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1


In [19]:
#Gets data from dim_rental
sql_rental = "SELECT * FROM sakila2.dim_rental;"
dim_rental = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_customer)
dim_rental.head(2)

Unnamed: 0,customer_key,store_key,first_name,last_name,email,address_key,active
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1


In [20]:
#Merges the customer and payment data frames on the customer key to create a new table, which will then be merged with
#rental to create the fact table
df_merge = pd.merge(df_customer, df_payment, on='customer_key', how='inner')
df_merge.head(2)

Unnamed: 0,customer_key,store_key,first_name,last_name,email,address_key,active,payment_key,staff_key,rental_key,amount,payment_date
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,1,1,76,2.99,2005-05-25 11:30:37
1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2,1,573,0.99,2005-05-28 10:35:23


In [21]:
#Merges previously made table with rental to make the fact_orders table
df_fact_orders = pd.merge(df_merge, df_rental, on='rental_key', how='inner')
df_fact_orders.head(2)

Unnamed: 0,customer_key,store_key,first_name,last_name,email,address_key,active,payment_key,staff_key,rental_key,amount,payment_date,rental_date,return_date
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,1,1,76,2.99,2005-05-25 11:30:37,2005-05-25 11:30:37,2005-06-03 12:00:37
1,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2,1,573,0.99,2005-05-28 10:35:23,2005-05-28 10:35:23,2005-06-03 06:32:23


In [22]:
#Drop unnecessary columns and inserts a fact_orders_key to functions as the primary key
drop_cols = ['first_name', 'last_name', 'email', 'active', 'amount']
df_fact_orders.drop(drop_cols, axis=1, inplace=True)

df_fact_orders.insert(0, "fact_orders_key", range(1, df_fact_orders.shape[0]+1))
df_fact_orders.head(2)

Unnamed: 0,fact_orders_key,customer_key,store_key,address_key,payment_key,staff_key,rental_key,payment_date,rental_date,return_date
0,1,1,1,5,1,1,76,2005-05-25 11:30:37,2005-05-25 11:30:37,2005-06-03 12:00:37
1,2,1,1,5,2,1,573,2005-05-28 10:35:23,2005-05-28 10:35:23,2005-06-03 06:32:23


In [23]:
#Selects info from sakila2.dim_date
sql_dim_date = "SELECT date_key, full_date FROM sakila2.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64')
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [24]:
#Convert rental_date into rental_date_key for the fact table
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_orders.rental_date = df_fact_orders.rental_date.astype('datetime64').dt.date.astype('datetime64')
df_fact_orders = pd.merge(df_fact_orders, df_dim_rental_date, on='rental_date', how='left')
df_fact_orders.drop(['rental_date'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,fact_orders_key,customer_key,store_key,address_key,payment_key,staff_key,rental_key,payment_date,return_date,rental_date_key
0,1,1,1,5,1,1,76,2005-05-25 11:30:37,2005-06-03 12:00:37,20050525
1,2,1,1,5,2,1,573,2005-05-28 10:35:23,2005-06-03 06:32:23,20050528


In [25]:
#Convert payment_date into payment_date_key for the fact table
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_fact_orders.payment_date = df_fact_orders.payment_date.astype('datetime64').dt.date.astype('datetime64')
df_fact_orders = pd.merge(df_fact_orders, df_dim_payment_date, on='payment_date', how='left')
df_fact_orders.drop(['payment_date'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,fact_orders_key,customer_key,store_key,address_key,payment_key,staff_key,rental_key,return_date,rental_date_key,payment_date_key
0,1,1,1,5,1,1,76,2005-06-03 12:00:37,20050525,20050525
1,2,1,1,5,2,1,573,2005-06-03 06:32:23,20050528,20050528


In [26]:
#Convert return_date in return_date_key for the fact table
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_orders.return_date = df_fact_orders.return_date.astype('datetime64').dt.date.astype('datetime64')
df_fact_orders = pd.merge(df_fact_orders, df_dim_return_date, on='return_date', how='left')
df_fact_orders.drop(['return_date'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,fact_orders_key,customer_key,store_key,address_key,payment_key,staff_key,rental_key,rental_date_key,payment_date_key,return_date_key
0,1,1,1,5,1,1,76,20050525,20050525,20050603
1,2,1,1,5,2,1,573,20050528,20050528,20050603


In [27]:
#Insert complete dim_fact_orders table into sakila2
dataframe = df_fact_orders
table_name = 'dim_fact_orders'
primary_key = 'fact_orders_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [28]:
#SQl test Query
#Returns the number of days a rental lasted (from dim_rental), the amount paid for that rental (from dim_payment),
#and the rental_date_key for that transaction (from dim_fact_orders)
test = """
SELECT
    DATEDIFF(dim_rental.return_date, dim_rental.rental_date) + 1 AS rental_duration_days,
    MAX(dim_payment.amount) as amount_paid,
    MAX(dim_fact_orders.rental_date_key) as rental_date_key
FROM
    dim_rental
INNER JOIN dim_payment ON dim_rental.rental_key = dim_payment.rental_key
LEFT JOIN dim_fact_orders ON dim_rental.rental_key = dim_fact_orders.rental_key
GROUP BY
    rental_duration_days
ORDER BY
    rental_duration_days;
""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, dst_dbname, test)
df_test.head(5)

Unnamed: 0,rental_duration_days,amount_paid,rental_date_key
0,2,4.99,20050530
1,3,2.99,20050530
2,4,2.99,20050530
3,5,4.99,20050530
4,6,3.99,20050529
