In [1]:
# Michael McGeachy - mjm2xmm
import os
import pandas as pd
from sqlalchemy import create_engine
import json
import numpy
import datetime
import pymongo

#Data from Payment, Staff, and Rental Tables will be combined in order to
#analyze workplace performance of each member of sales team both over lifetime and during June, 2005

In [2]:
#SQL Data Source
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

In [3]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [4]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

#Run dim_date script in MYSQL after this step

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x283853a0f10>

In [5]:
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 16:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 16:30:53


In [6]:
#Modifying source table in dataframe

drop_cols = ['inventory_id', 'last_update', 'customer_id', 'staff_id']
df_rental.drop(drop_cols, axis=1, inplace=True)
#df_rental.rename(columns={"rental_id":"rental_key"}, inplace=True)

df_rental.head(2)

Unnamed: 0,rental_id,rental_date,return_date
0,1,2005-05-24 22:53:30,2005-05-26 22:04:30
1,2,2005-05-24 22:54:33,2005-05-28 19:40:33


In [7]:
#Sending MYSQL Data to new DB 
set_dataframe(user_id, pwd, host_name, dst_dbname, df_rental, 'dim_rental', 'rental_id', "insert")

In [8]:
#local file source
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'payment_data.csv')

df = pd.read_csv(data_file, header=0, index_col=False)
df.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 17:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 17:12:30


In [9]:
#Modifying Source Data in Dataframe
drop_cols = ['customer_id', 'last_update',]
df.drop(drop_cols, axis=1, inplace=True)
df.rename(columns={"payment_id":"payment_key"}, inplace=True)
df.head(2)

Unnamed: 0,payment_key,staff_id,rental_id,amount,payment_date
0,1,1,76,2.99,2005-05-25 11:30:37
1,2,1,573,0.99,2005-05-28 10:35:23


In [10]:
#Sending Local File Data to DB
set_dataframe(user_id, pwd, host_name, dst_dbname, df, 'dim_payment', 'payment_key', "insert")

In [11]:
#pymongo data source
mysql_uid = "jtupitza"
mysql_pwd = "Passw0rd123"
mysql_hostname = "localhost"

atlas_cluster_name = "cluster0.wrb1goe"
atlas_user_name = "mjm2xmm"
atlas_password = "mjm2xmm"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "sakila"
dst_dbname = "sakila_dw"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://mjm2xmm:mjm2xmm@cluster0.wrb1goe.mongodb.net


In [12]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [13]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"staff" : 'staff_data.json'
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)

        
client.close()      

In [14]:
query = {} 
collection = "staff"

df_staff = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,address_id,email,store_id,active,username,password,last_update
0,1,Mike,Hillyer,3,Mike.Hillyer@sakilastaff.com,1,1,Mike,8cb2237d0679ca88db6464eac60da96345513964,2006-02-14 22:57:16
1,2,Jon,Stephens,4,Jon.Stephens@sakilastaff.com,2,1,Jon,,2006-02-14 22:57:16


In [15]:
#Modifying Source Data in Dateframe
drop_cols = ['address_id', 'active', 'last_update',]
df_staff.drop(drop_cols, axis=1, inplace=True)
df_staff.head(2)

Unnamed: 0,staff_id,first_name,last_name,email,store_id,username,password
0,1,Mike,Hillyer,Mike.Hillyer@sakilastaff.com,1,Mike,8cb2237d0679ca88db6464eac60da96345513964
1,2,Jon,Stephens,Jon.Stephens@sakilastaff.com,2,Jon,


In [16]:
#Sending PyMongo Data to DB
set_dataframe(user_id, pwd, host_name, dst_dbname, df_staff, 'dim_staff', 'staff_id', "insert")

In [17]:
#Combining Rental and Payment Tables
df_transaction = pd.merge(df_rental, df, on='rental_id' ,how='inner')
df_transaction.head(10)

Unnamed: 0,rental_id,rental_date,return_date,payment_key,staff_id,amount,payment_date
0,1,2005-05-24 22:53:30,2005-05-26 22:04:30,3504,1,2.99,2005-05-24 22:53:30
1,2,2005-05-24 22:54:33,2005-05-28 19:40:33,12377,2,2.99,2005-05-24 22:54:33
2,3,2005-05-24 23:03:39,2005-06-01 22:12:39,11032,2,3.99,2005-05-24 23:03:39
3,4,2005-05-24 23:04:41,2005-06-03 01:43:41,8987,1,4.99,2005-05-24 23:04:41
4,5,2005-05-24 23:05:21,2005-06-02 04:33:21,6003,1,6.99,2005-05-24 23:05:21
5,6,2005-05-24 23:08:07,2005-05-27 01:32:07,14728,1,0.99,2005-05-24 23:08:07
6,7,2005-05-24 23:11:53,2005-05-29 20:34:53,7274,2,1.99,2005-05-24 23:11:53
7,8,2005-05-24 23:31:46,2005-05-27 23:33:46,6440,2,4.99,2005-05-24 23:31:46
8,9,2005-05-25 00:00:40,2005-05-28 00:22:40,3386,1,4.99,2005-05-25 00:00:40
9,10,2005-05-25 00:02:21,2005-05-31 22:44:21,10785,2,5.99,2005-05-25 00:02:21


In [18]:
#Creating Fact Order Table with all Info
df_fact_orders = pd.merge(df_staff, df_transaction, on='staff_id', how='inner')
df_fact_orders.drop(['email', 'username', 'password', 'store_id', 'payment_key', 'payment_date', 'return_date',], axis=1, inplace=True)
df_fact_orders.head(10)

Unnamed: 0,staff_id,first_name,last_name,rental_id,rental_date,amount
0,1,Mike,Hillyer,1,2005-05-24 22:53:30,2.99
1,1,Mike,Hillyer,4,2005-05-24 23:04:41,4.99
2,1,Mike,Hillyer,5,2005-05-24 23:05:21,6.99
3,1,Mike,Hillyer,6,2005-05-24 23:08:07,0.99
4,1,Mike,Hillyer,9,2005-05-25 00:00:40,4.99
5,1,Mike,Hillyer,12,2005-05-25 00:19:27,4.99
6,1,Mike,Hillyer,13,2005-05-25 00:22:55,6.99
7,1,Mike,Hillyer,15,2005-05-25 00:39:22,9.99
8,1,Mike,Hillyer,16,2005-05-25 00:43:11,4.99
9,1,Mike,Hillyer,17,2005-05-25 01:06:36,2.99


In [19]:
#Date Dimension Component
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64')
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [20]:
#Adding Date Dimension into Fact Table
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_orders.rental_date = df_fact_orders.rental_date.astype('datetime64').dt.date.astype('datetime64')
df_fact_orders = pd.merge(df_fact_orders, df_dim_rental_date, on='rental_date', how='left')
df_fact_orders.drop(['rental_date'], axis=1, inplace=True)
df_fact_orders.head(2)

  df_fact_orders.rental_date = df_fact_orders.rental_date.astype('datetime64').dt.date.astype('datetime64')


Unnamed: 0,staff_id,first_name,last_name,rental_id,amount,rental_date_key
0,1,Mike,Hillyer,1,2.99,20050524
1,1,Mike,Hillyer,4,4.99,20050524


In [21]:
set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_orders, 'dim_fact_orders', 'rental_id', "insert")

In [22]:
#SQL Test Statement 1
#Find Lifetime Sales and # of Sales to compare performance of Salesmen
sql_test = """
    SELECT   
          t.last_name AS 'Staff Last Name',
          SUM(orders.amount) AS 'Total Sales',
          COUNT(st.rental_id) AS 'Quantity of Orders'
    FROM dim_fact_orders AS orders 
    JOIN dim_rental st ON orders.rental_id=st.rental_id
    JOIN dim_staff t ON orders.staff_id=t.staff_id
    GROUP BY orders.staff_id
    

""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_test)
df_test.head()

Unnamed: 0,Staff Last Name,Total Sales,Quantity of Orders
0,Hillyer,33482.5,8054
1,Stephens,33924.06,7990


In [23]:
#SQL Test Statement 2
#Find Sales and # of Sales to compare performance of Salesmen in June, 2005
sql_test = """
    SELECT   
          t.last_name AS 'Staff Last Name',
          SUM(orders.amount) AS 'Total Sales',
          COUNT(st.rental_id) AS 'Quantity of Orders'
    FROM dim_fact_orders AS orders 
    JOIN dim_rental st ON orders.rental_id=st.rental_id
    JOIN dim_staff t ON orders.staff_id=t.staff_id
    WHERE (orders.rental_date_key >= 20050601) and (orders.rental_date_key <= 20050630)
    GROUP BY orders.staff_id
    

""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_test)
df_test.head()

Unnamed: 0,Staff Last Name,Total Sales,Quantity of Orders
0,Stephens,4855.52,1148
1,Hillyer,4774.37,1163
