In [1]:
import os
import numpy
import pandas as pd
import datetime
import json

import pymongo
from sqlalchemy import create_engine

In [2]:
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

In [3]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [4]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x1590da58310>

In [5]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"

#atlas_cluster_name = "sandbox.zibbf"
#atlas_user_name = "m001-student"
#atlas_password = "m001-mongodb-basics"

conn_str = {"local" : f"mongodb://localhost:27017/",
}

src_dbname = "sakila"
dst_dbname = "sakila_dw"

print(f"Local Connection String: {conn_str['local']}")
#print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/


In [6]:
def get_sql_dataframe(user_id, pwd, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@localhost/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [7]:
client = pymongo.MongoClient(conn_str["local"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"film" : 'sakila.film.json',
              #"address" : 'sakila.address.csv'
              #"customer" : 'sakila.customer.json',
              #"staff" : 'sakila.staff.json' 
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close() 

In [8]:
## Create and populate dimension tables

query = {}
collection = "film"

df_film = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [9]:
sql_customer = "SELECT * FROM sakila.customer;"
df_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customer)
df_customer.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [10]:
## Perfom any necessary transformations

sql_customers = """
SELECT * FROM sakila.customer 
    LEFT JOIN sakila.address
    ON address.address_id = customer.address_id
    LEFT JOIN sakila.city
    ON city.city_id = address.city_id 
"""

df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.head(2)


drop_cols = ['email','store_id', 'location', 'active', 'create_date', 'last_update', 'address_id', 'address2', 'city_id', 'country_id']
df_customers.drop(drop_cols, axis=1, inplace=True)
df_customers.rename(columns={"customer_id":"customer_key"}, inplace=True)

df_customers.head(2)

Unnamed: 0,customer_key,first_name,last_name,address,district,postal_code,phone,city
0,1,MARY,SMITH,1913 Hanoi Way,Nagasaki,35200,28303384290,Sasebo
1,2,PATRICIA,JOHNSON,1121 Loja Avenue,California,17886,838635286649,San Bernardino


In [11]:
drop_cols = ['description','language_id', 'original_language_id', 'length', 'rating', 'special_features']
df_film.drop(drop_cols, axis=1, inplace=True)
df_film.rename(columns={"film_id":"film_key"}, inplace=True)

df_film.head(2)

Unnamed: 0,film_key,title,release_year,rental_duration,rental_rate,replacement_cost,last_update
0,1,ACADEMY DINOSAUR,2006,6,0.99,20.99,2006-02-15 05:03:42
1,2,ACE GOLDFINGER,2006,3,4.99,12.99,2006-02-15 05:03:42


In [12]:
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'sakila.staff.csv')

df_staff = pd.read_csv(data_file, header=0)
df_staff.head()

Unnamed: 0,staff_id,first_name,last_name,address_id,picture,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,...,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-15 03:57:16
1,2,Jon,Stephens,4,,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-15 03:57:16


In [13]:
drop_cols = ['picture','email','active', 'username', 'password', 'address_id', 'store_id']
df_staff.drop(drop_cols, axis=1, inplace=True)
df_staff.rename(columns={"staff_id":"staff_key"}, inplace=True)

df_staff.head(2)

Unnamed: 0,staff_key,first_name,last_name,last_update
0,1,Mike,Hillyer,2006-02-15 03:57:16
1,2,Jon,Stephens,2006-02-15 03:57:16


In [14]:
## Load Tranfomred Datatables into New Datawarehouse
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_staff', df_staff, 'staff_key'),
          ('dim_film', df_film, 'film_key')]

In [15]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [16]:
## Create Fact Table

sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)
df_rental.rename(columns={"rental_id":"rental_key"}, inplace=True)
df_rental.head(2)

Unnamed: 0,rental_key,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [17]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payment)
df_payment.rename(columns={"rental_id": "rental_key"}, inplace=True)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_key,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


In [18]:
sql_inventory = "SELECT * FROM sakila.inventory;"
df_inventory = get_dataframe(user_id, pwd, host_name, src_dbname, sql_inventory)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


In [19]:
df_rental = pd.merge(df_rental, df_inventory, on='inventory_id', how='right')
df_rental.head(2)

Unnamed: 0,rental_key,rental_date,inventory_id,customer_id,return_date,staff_id,last_update_x,film_id,store_id,last_update_y
0,4863.0,2005-07-08 19:03:15,1,431.0,2005-07-11 21:29:15,2.0,2006-02-15 21:30:53,1,1,2006-02-15 05:09:17
1,11433.0,2005-08-02 20:13:10,1,518.0,2005-08-11 21:35:10,1.0,2006-02-15 21:30:53,1,1,2006-02-15 05:09:17


In [20]:
df_fact_rental = pd.merge(df_rental, df_payment, on='rental_key', how='inner')
df_fact_rental.head(2)

Unnamed: 0,rental_key,rental_date,inventory_id,customer_id_x,return_date,staff_id_x,last_update_x,film_id,store_id,last_update_y,payment_id,customer_id_y,staff_id_y,amount,payment_date,last_update
0,4863.0,2005-07-08 19:03:15,1,431.0,2005-07-11 21:29:15,2.0,2006-02-15 21:30:53,1,1,2006-02-15 05:09:17,11630,431,1,0.99,2005-07-08 19:03:15,2006-02-15 22:18:35
1,11433.0,2005-08-02 20:13:10,1,518.0,2005-08-11 21:35:10,1.0,2006-02-15 21:30:53,1,1,2006-02-15 05:09:17,13956,518,2,3.99,2005-08-02 20:13:10,2006-02-15 22:20:51


In [21]:
# Get the data from the date dimension table
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64')
df_dim_date.head(2)


Unnamed: 0,date_key,full_date
0,20030101,2003-01-01
1,20030102,2003-01-02


In [22]:
df_fact_rental['rental_date'] = df_fact_rental['rental_date'].dt.date
df_fact_rental.rental_date = df_fact_rental.rental_date.astype('datetime64')

df_fact_rental['return_date'] = df_fact_rental['return_date'].dt.date
df_fact_rental.return_date = df_fact_rental.return_date.astype('datetime64')

In [23]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "rental_date" Column.
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_rental = pd.merge(df_fact_rental, df_dim_rental_date, on="rental_date", how='left')
df_fact_rental.drop(['rental_date'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_key,inventory_id,customer_id_x,return_date,staff_id_x,last_update_x,film_id,store_id,last_update_y,payment_id,customer_id_y,staff_id_y,amount,payment_date,last_update,rental_date_key
0,4863.0,1,431.0,2005-07-11,2.0,2006-02-15 21:30:53,1,1,2006-02-15 05:09:17,11630,431,1,0.99,2005-07-08 19:03:15,2006-02-15 22:18:35,20050708
1,11433.0,1,518.0,2005-08-11,1.0,2006-02-15 21:30:53,1,1,2006-02-15 05:09:17,13956,518,2,3.99,2005-08-02 20:13:10,2006-02-15 22:20:51,20050802


In [24]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "return_date" Column.
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_rental = pd.merge(df_fact_rental, df_dim_return_date, on='return_date', how='left')
df_fact_rental.drop(['return_date'], axis=1, inplace=True)
df_fact_rental.head(2)

Unnamed: 0,rental_key,inventory_id,customer_id_x,staff_id_x,last_update_x,film_id,store_id,last_update_y,payment_id,customer_id_y,staff_id_y,amount,payment_date,last_update,rental_date_key,return_date_key
0,4863.0,1,431.0,2.0,2006-02-15 21:30:53,1,1,2006-02-15 05:09:17,11630,431,1,0.99,2005-07-08 19:03:15,2006-02-15 22:18:35,20050708,20050711.0
1,11433.0,1,518.0,1.0,2006-02-15 21:30:53,1,1,2006-02-15 05:09:17,13956,518,2,3.99,2005-08-02 20:13:10,2006-02-15 22:20:51,20050802,20050811.0


In [25]:
drop_columns = ['customer_id_x', 'staff_id_x', 'last_update_x', 'last_update_y', 'inventory_id', 'payment_id']
df_fact_rental.drop(drop_columns, axis=1, inplace=True)
df_fact_rental.rename(columns={"customer_id_y": "customer_key"}, inplace=True)
df_fact_rental.rename(columns={"staff_id_y": "staff_key"}, inplace=True)
df_fact_rental.rename(columns={"film_id": "film_key"}, inplace=True)

df_fact_rental.head(2)

Unnamed: 0,rental_key,film_key,store_id,customer_key,staff_key,amount,payment_date,last_update,rental_date_key,return_date_key
0,4863.0,1,1,431,1,0.99,2005-07-08 19:03:15,2006-02-15 22:18:35,20050708,20050711.0
1,11433.0,1,1,518,2,3.99,2005-08-02 20:13:10,2006-02-15 22:20:51,20050802,20050811.0


In [26]:
table_name = "fact_rental"
primary_key = "rental_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_rental, table_name, primary_key, db_operation)

In [41]:
sql_test = """
    SELECT customers.`last_name`, 
        SUM(rentals.`amount`) AS `total_amount`
    FROM `{0}`.`fact_rental` AS rentals
    INNER JOIN `{0}`.`dim_customers` AS customers 
    ON rentals.customer_key = customers.customer_key
    GROUP BY customers.`last_name`
    ORDER BY total_amount DESC;
""".format(dst_dbname)


df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)

In [42]:
df_test.head()

Unnamed: 0,last_name,total_amount
0,SEAL,221.55
1,HUNT,216.54
2,SHAW,195.58
3,KENNEDY,194.61
4,SNYDER,194.61
