In [1]:
import os
import pymysql
import numpy
import pandas as pd
import mysql.connector
from sqlalchemy import create_engine

In [2]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Suchocki7152!"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

In [3]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [4]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x12f63f4d0>

Extract Data from SQL

In [5]:
sql_customers = "SELECT * FROM sakila.customer;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)

drop_cols = ['email','active','create_date','last_update']
df_customers.drop(drop_cols, axis=1, inplace=True)
df_customers.rename(columns={"customer_id":"customer_key"}, inplace=True)

df_customers.head(2)

Unnamed: 0,customer_key,store_id,first_name,last_name,address_id
0,1,1,MARY,SMITH,5
1,2,1,PATRICIA,JOHNSON,6


In [6]:
sql_staff = "SELECT * FROM sakila.staff;"
df_staff = get_dataframe(user_id, pwd, host_name, src_dbname, sql_staff)

drop_cols = ['picture','email','username','password','last_update']
df_staff.drop(drop_cols, axis=1, inplace=True)
df_staff.rename(columns={"staff_id":"staff_key"}, inplace=True)

df_staff.head(2)

Unnamed: 0,staff_key,first_name,last_name,address_id,store_id,active
0,1,Mike,Hillyer,3,1,1
1,2,Jon,Stephens,4,2,1


In [7]:
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)

drop_cols = ['last_update']
df_rental.drop(drop_cols, axis=1, inplace=True)
df_rental.rename(columns={"customer_id":"customer_key", "staff_id":"staff_key", "rental_id":"rental_key"}, inplace=True)

df_rental.head(2)

Unnamed: 0,rental_key,rental_date,inventory_id,customer_key,return_date,staff_key
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1


In [8]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payment)

drop_cols = ['last_update', 'customer_id']
df_payment.drop(drop_cols, axis=1, inplace=True)
df_payment.rename(columns={"payment_id":"payment_key", "amount":"amount_paid","rental_id":"rental_key", 'staff_id':'staff_key'}, inplace=True)

df_payment.head(2)

Unnamed: 0,payment_key,staff_key,rental_key,amount_paid,payment_date
0,1,1,76,2.99,2005-05-25 11:30:37
1,2,1,573,0.99,2005-05-28 10:35:23


Load Into DB

In [9]:
db_operation = "insert"

tables = [('customers', df_customers, 'customer_key'),
          ('staff', df_staff, 'staff_key'),
          ('rental', df_rental, 'rental_key'),
          ('payment', df_payment, 'payment_key')]

In [10]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

Merge Tables

In [11]:
df_rental= pd.merge(df_staff, df_payment, on= 'staff_key',how= 'inner')
#df_rental.drop({'status_id'}, axis=1, inplace= True)
df_rental.head(2)

Unnamed: 0,staff_key,first_name,last_name,address_id,store_id,active,payment_key,rental_key,amount_paid,payment_date
0,1,Mike,Hillyer,3,1,1,1,76,2.99,2005-05-25 11:30:37
1,1,Mike,Hillyer,3,1,1,2,573,0.99,2005-05-28 10:35:23


See Who Sold the Most

In [12]:
sql_test = """
SELECT 
    SUM(pay.amount_paid) AS dollars_sold,
    staff.staff_key,
    staff.first_name,
    staff.last_name,
    staff.store_id
FROM `{0}`.payment AS pay
INNER JOIN `{0}`.staff AS staff
ON pay.staff_key = staff.staff_key
GROUP BY staff.staff_key
ORDER BY dollars_sold DESC
""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)

In [13]:
df_test.head()

Unnamed: 0,dollars_sold,staff_key,first_name,last_name,store_id
0,33924.06,2,Jon,Stephens,2
1,33482.5,1,Mike,Hillyer,1


Load Data into Mongo DB

In [14]:
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

In [15]:
mysql_uid = "root"
mysql_pwd = "Suchocki7152!"
mysql_hostname = "localhost"
mysql_port= "3306"

atlas_cluster_name = "cluster1.pe4zc7j"
atlas_user_name = "os7th"
atlas_password = "Suchocki"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "sakila"
dst_dbname = "sakila_dw"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://os7th:Suchocki@cluster1.pe4zc7j.mongodb.net


In [16]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [17]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]


# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(),'downloads/sakila_data')

json_files = {"rental":'sakila_rental.json',
              "inventory" : 'sakila_inventory.json',
              "film" : 'sakila_film_text.json'
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        print(f"{file} was successfully loaded.")

        
client.close()        

Collection(Database(MongoClient(host=['ac-xdqemsl-shard-00-02.pe4zc7j.mongodb.net:27017', 'ac-xdqemsl-shard-00-00.pe4zc7j.mongodb.net:27017', 'ac-xdqemsl-shard-00-01.pe4zc7j.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, authsource='admin', replicaset='atlas-ymaogh-shard-0', tls=True), 'sakila'), 'rental') was successfully loaded.
Collection(Database(MongoClient(host=['ac-xdqemsl-shard-00-02.pe4zc7j.mongodb.net:27017', 'ac-xdqemsl-shard-00-00.pe4zc7j.mongodb.net:27017', 'ac-xdqemsl-shard-00-01.pe4zc7j.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, authsource='admin', replicaset='atlas-ymaogh-shard-0', tls=True), 'sakila'), 'inventory') was successfully loaded.
Collection(Database(MongoClient(host=['ac-xdqemsl-shard-00-02.pe4zc7j.mongodb.net:27017', 'ac-xdqemsl-shard-00-00.pe4zc7j.mongodb.net:27017', 'ac-xdqemsl-shard-00-01.pe4zc7j.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, authsource='admin', replicaset='atl

In [18]:
query = {} # Select all elements (columns), and all documents (rows).
collection = "rental"

df_rental = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)  # Specify 'atlas', or 'local'
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [19]:
query = {} # Select all elements (columns), and all documents (rows).
collection = "inventory"

df_inventory = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)  # Specify 'atlas', or 'local'
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


In [20]:
query = {} # Select all elements (columns), and all documents (rows).
collection = "film"

df_film = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)  # Specify 'atlas', or 'local'
df_film.head(2)

Unnamed: 0,film_id,title,description
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...


Necessary Transformations

In [21]:
df_rental.rename(columns={"rental_id":"rental_key","inventory_id":"inventory_key"}, inplace=True)
df_rental.head(2)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [22]:
df_inventory.rename(columns={"inventory_id":"inventory_key","film_id":"film_key"}, inplace=True)
df_inventory.head(2)

Unnamed: 0,inventory_key,film_key,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


In [23]:
df_film.rename(columns={"film_id":"film_key"}, inplace=True)
df_film.head(2)

Unnamed: 0,film_key,title,description
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...


Load by Creating New Tables

In [24]:
dataframe = df_rental
table_name = 'dim_rental'
primary_key = 'rental_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [25]:
dataframe = df_inventory
table_name = 'dim_inventory'
primary_key = 'inventory_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [26]:
dataframe = df_film
table_name = 'dim_film'
primary_key = 'film_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

Validate New Tables

In [27]:
sql_suppliers = "SELECT * FROM sakila_dw.dim_rental;"
df_dim_rental = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_suppliers)
df_dim_rental.head(2)

Unnamed: 0,rental_key,rental_date,inventory_key,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [28]:
sql_suppliers = "SELECT * FROM sakila_dw.dim_inventory;"
df_dim_inventory = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_suppliers)
df_dim_inventory.head(2)

Unnamed: 0,inventory_key,film_key,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


In [29]:
sql_suppliers = "SELECT * FROM sakila_dw.dim_film;"
df_dim_film = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_suppliers)
df_dim_film.head(2)

Unnamed: 0,film_key,title,description
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...


See how many times a movie with a specific title was rented

In [30]:
sql_test2 = """
SELECT f.title, COUNT(*) as times_rented
FROM film AS f
INNER JOIN inventory AS i ON f.film_id = i.film_id
GROUP BY f.title
ORDER BY times_rented DESC
""".format(dst_dbname)

df_times_rented = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test2)
df_times_rented.head(2)

Unnamed: 0,title,times_rented
0,ACADEMY DINOSAUR,8
1,APACHE DIVINE,8


In [31]:
df_film_list = pd.merge(df_inventory, df_film, on='film_key', how='inner')
df_film_list.drop(['store_id','last_update','description'], axis=1, inplace=True)
df_film_list.head(2)

Unnamed: 0,inventory_key,film_key,title
0,1,1,ACADEMY DINOSAUR
1,2,1,ACADEMY DINOSAUR


In [32]:
dataframe = df_film_list
table_name = 'film_list'
primary_key = 'inventory_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

JSON local file

In [33]:
import json
import numpy as np
with open('/Users/liv/downloads/sakila_data/sakila_rental.json') as f:
    rental_dates = json.load(f)

df_rental_dates= pd.DataFrame(rental_dates)
df_rental_dates.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


In [34]:
import json
import numpy as np
with open('/Users/liv/downloads/sakila_data/sakila_customer.json') as c:
    rental_customers = json.load(c)

df_rental_customers= pd.DataFrame(rental_customers)
df_rental_customers.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [35]:
drop_cols = ['inventory_id','staff_id','last_update']
df_rental_dates.drop(drop_cols, axis=1, inplace=True)
df_rental_dates.head(2)

Unnamed: 0,rental_id,rental_date,customer_id,return_date
0,1,2005-05-24 22:53:30,130,2005-05-26 22:04:30
1,2,2005-05-24 22:54:33,459,2005-05-28 19:40:33


In [36]:
drop_cols = ['store_id','email','address_id','active','create_date','last_update']
df_rental_customers.drop(drop_cols, axis=1, inplace=True)
df_rental_customers.head(2)

Unnamed: 0,customer_id,first_name,last_name
0,1,MARY,SMITH
1,2,PATRICIA,JOHNSON


In [37]:
df_rent= pd.merge(df_rental_dates, df_rental_customers, on= 'customer_id',how= 'inner')
df_rent.head(2)

Unnamed: 0,rental_id,rental_date,customer_id,return_date,first_name,last_name
0,1,2005-05-24 22:53:30,130,2005-05-26 22:04:30,CHARLOTTE,HUNTER
1,746,2005-05-29 09:25:10,130,2005-06-02 04:20:10,CHARLOTTE,HUNTER


In [38]:
sql_dim_date = "SELECT date_key, full_date FROM sakila.dim_date;"
df_dim_date = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64')
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


Create Date Table of Dates Rented by Customer (rental_date and return_date in datetime)

In [65]:
df_rent= pd.merge(df_rental_dates, df_rental_customers, on= 'customer_id',how= 'inner')
df_rent.head(2)

Unnamed: 0,rental_id,rental_date,customer_id,return_date,first_name,last_name
0,1,2005-05-24 22:53:30,130,2005-05-26 22:04:30,CHARLOTTE,HUNTER
1,746,2005-05-29 09:25:10,130,2005-06-02 04:20:10,CHARLOTTE,HUNTER
