# DS 2002 Project 1 - Nishita Cheekatamarla

For this project, I decided to use the Sakila database in MySQL in order to create a new Data Warehouse. I looked at different tables in this database such as customer, actor, rental, and payment and used the data to create my new Data Warehouse.

### Initial Set Up

Importing libraries:

In [1]:
import os 
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

Assigning connection variables for MySQL:

In [2]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

Assigning connection variables for MongoDB:

In [8]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"
mysql_hostname = "localhost"

atlas_cluster_name = "ds2002.fbfl2si"
atlas_user_name = "nishitac15"
atlas_password = "NishAnsh12"

conn_str2 = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "sakila"
dst_dbname = "sakila_dw"

print(f"Local Connection String: {conn_str2['local']}")
print(f"Atlas Connection String: {conn_str2['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://nishitac15:NishAnsh12@ds2002.fbfl2si.mongodb.net


Defining functions to get data and set data:

In [9]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [11]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Creating new datawarehouse:
Creating new database in MySQL and reading in json file to MongoDB

In [12]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x149845c8110>

In [15]:
client = pymongo.MongoClient(conn_str2["atlas"])
db = client[src_dbname]

data_dir = os.getcwd()

json_files = {"actor" : 'actor.json'}

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()        

## Creating and Populating Data Tables
Extracting Data from SQL Source Database, performing transformations. I used the customer table from the Sakila database in MySQL to get this data.

In [16]:
sql_customers = "SELECT * FROM sakila.customer;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.rename(columns={"customer_id":"customer_key"}, inplace=True)
df_customers.head(2)

Unnamed: 0,customer_key,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-14 23:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-14 23:57:20


Extracting Data from JSON file using MongoDB, performing transformations. I used an exported JSON file containing information from the actor table in the Sakila Database of MySQL. I then used the MongoDB server to read it into the pandas dataframe.

In [18]:
query = {} 
collection = "actor"

df_actor = get_mongo_dataframe(conn_str2['atlas'], src_dbname, collection, query) 
df_actor.rename(columns={"actor_id":"actor_key"}, inplace=True)
df_actor.head(2)

Unnamed: 0,actor_key,first_name,last_name,last_update
0,1,PENELOPE,GUINESS,2006-02-14 23:34:33
1,2,NICK,WAHLBERG,2006-02-14 23:34:33


Loading transformed data into Data Warehouse. I put both the dataframes into tables to write back into the Data Warehouse.

In [19]:
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_actor', df_actor, 'actor_key')]

In [20]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

## Creating and Populating Fact Table

Get data from relevant tables and make transformations. I used the rental and payment tables from the Sakila Database in MySQL. I read in the data to a pandas dataframe to work with in this notebook.

In [21]:
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)
drop_cols = ['last_update', 'customer_id']
df_rental.drop(drop_cols, axis=1, inplace=True)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,return_date,staff_id
0,1,2005-05-24 22:53:30,367,2005-05-26 22:04:30,1
1,2,2005-05-24 22:54:33,1525,2005-05-28 19:40:33,1


In [22]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payment)
drop_cols = ['staff_id', 'last_update']
df_payment.drop(drop_cols, axis=1, inplace=True)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,rental_id,amount,payment_date
0,1,1,76,2.99,2005-05-25 11:30:37
1,2,1,573,0.99,2005-05-28 10:35:23


Merge tables into Fact Table

In [23]:
df_fact_table = pd.merge(df_rental, df_payment, on='rental_id', how='right')
df_fact_table.insert(0, "fact_table_key", range(1, df_fact_table.shape[0]+1))
df_fact_table.head(2)

Unnamed: 0,fact_table_key,rental_id,rental_date,inventory_id,return_date,staff_id,payment_id,customer_id,amount,payment_date
0,1,76,2005-05-25 11:30:37,3021,2005-06-03 12:00:37,2,1,1,2.99,2005-05-25 11:30:37
1,2,573,2005-05-28 10:35:23,4020,2005-06-03 06:32:23,1,2,1,0.99,2005-05-28 10:35:23


## Date Dimension
#### Run code locally in MySQL to create initial Date Dimension

Get Data from Date Dimension in MySQL

In [24]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


Lookup date keys for corresponding columns in Fact Table. I altered the 3 different datetime objects in my fact table to include just date information as a date key. These three tables were rental date, return date, and payment date.

In [25]:
df_dim_rental_date = df_dim_date.rename(columns={"date_key": "rental_date_key", "full_date": "rental_date"})
df_fact_table.rental_date = df_fact_table.rental_date.astype('datetime64').dt.date
df_fact_table = pd.merge(df_fact_table, df_dim_rental_date, on='rental_date', how='left')
df_fact_table.drop(['rental_date'], axis=1, inplace=True)
df_fact_table.head(2)

  df_fact_table.rental_date = df_fact_table.rental_date.astype('datetime64').dt.date


Unnamed: 0,fact_table_key,rental_id,inventory_id,return_date,staff_id,payment_id,customer_id,amount,payment_date,rental_date_key
0,1,76,3021,2005-06-03 12:00:37,2,1,1,2.99,2005-05-25 11:30:37,20050525
1,2,573,4020,2005-06-03 06:32:23,1,2,1,0.99,2005-05-28 10:35:23,20050528


In [26]:
df_dim_return_date = df_dim_date.rename(columns={"date_key": "return_date_key", "full_date": "return_date"})
df_fact_table.return_date = df_fact_table.return_date.astype('datetime64').dt.date
df_fact_table = pd.merge(df_fact_table, df_dim_return_date, on='return_date', how='left')
df_fact_table.drop(['return_date'], axis=1, inplace=True)
df_fact_table.head(2)

  df_fact_table.return_date = df_fact_table.return_date.astype('datetime64').dt.date


Unnamed: 0,fact_table_key,rental_id,inventory_id,staff_id,payment_id,customer_id,amount,payment_date,rental_date_key,return_date_key
0,1,76,3021,2,1,1,2.99,2005-05-25 11:30:37,20050525,20050603.0
1,2,573,4020,1,2,1,0.99,2005-05-28 10:35:23,20050528,20050603.0


In [27]:
df_dim_payment_date = df_dim_date.rename(columns={"date_key": "payment_date_key", "full_date": "payment_date"})
df_fact_table.payment_date = df_fact_table.payment_date.astype('datetime64').dt.date
df_fact_table = pd.merge(df_fact_table, df_dim_payment_date, on='payment_date', how='left')
df_fact_table.drop(['payment_date'], axis=1, inplace=True)
df_fact_table.head(2)

  df_fact_table.payment_date = df_fact_table.payment_date.astype('datetime64').dt.date


Unnamed: 0,fact_table_key,rental_id,inventory_id,staff_id,payment_id,customer_id,amount,rental_date_key,return_date_key,payment_date_key
0,1,76,3021,2,1,1,2.99,20050525,20050603.0,20050525
1,2,573,4020,1,2,1,0.99,20050528,20050603.0,20050528


Write the dataframe back to the Database. The fact table is now in my Sakila Data Warehouse.

In [28]:
table_name = "fact_table"
primary_key = "fact_table_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_table, table_name, primary_key, db_operation)

## Demonstrating Validity and Functionality
I selected data from all 3 tables (customers, actors, and fact table). I grouped the actor's dimension by first name to see the most popular first names among actors in this list of movies. I also filtered the fact table to show only rows where the amount was greater that $5.00 to see the list of some of the more expensive movies in this list.

In [29]:
sql_test_customers = "SELECT * FROM sakila_dw.dim_customers"
df_test_customers = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_test_customers)
df_test_customers.head(2)

Unnamed: 0,customer_key,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-14 23:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-14 23:57:20


In [30]:
sql_test_actor = "SELECT * FROM sakila_dw.dim_actor"
df_test_actor = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_test_actor)
df_test_actor.head(2)

Unnamed: 0,actor_key,first_name,last_name,last_update
0,1,PENELOPE,GUINESS,2006-02-14 23:34:33
1,2,NICK,WAHLBERG,2006-02-14 23:34:33


In [31]:
sql_test_fact = "SELECT * FROM sakila_dw.fact_table"
df_test_fact = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_test_fact)
df_test_fact.head(2)

Unnamed: 0,fact_table_key,rental_id,inventory_id,staff_id,payment_id,customer_id,amount,rental_date_key,return_date_key,payment_date_key
0,1,76,3021,2,1,1,2.99,20050525,20050603.0,20050525
1,2,573,4020,1,2,1,0.99,20050528,20050603.0,20050528


In [39]:
sql_test_actor2 = "SELECT first_name, COUNT(*) AS row_count FROM sakila_dw.dim_actor GROUP BY first_name"
df_grouped_actors = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_test_actor2)
df_grouped_actors.head()

Unnamed: 0,first_name,row_count
0,PENELOPE,4
1,NICK,3
2,ED,3
3,JENNIFER,1
4,JOHNNY,2


In [34]:
sql_test_fact2 = "SELECT * FROM sakila_dw.fact_table WHERE amount > 5"
df_test_fact2 = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_test_fact2)
df_test_fact2.head()

Unnamed: 0,fact_table_key,rental_id,inventory_id,staff_id,payment_id,customer_id,amount,rental_date_key,return_date_key,payment_date_key
0,3,1185,2785,2,3,1,5.99,20050615,20050623.0,20050615
1,5,1476,1407,1,5,1,9.99,20050615,20050625.0,20050615
2,10,4526,1443,2,10,1,5.99,20050708,20050714.0,20050708
3,11,4611,3486,2,11,1,5.99,20050708,20050712.0,20050708
4,14,6163,1330,2,14,1,7.99,20050711,20050719.0,20050711
