# Rachel Ney-Grimm Midterm Project - Sakila Datamart

First, I imported libraries and set up the connection to the MySQL Server and MongoDB, where I will retrieve data from. Then I added the functions for getting and setting database data, and I created the Sakila_2 database, the destination of the ETL pipeline.

In [None]:
#libraries
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine
import json
import datetime
import pymongo

#connection setup
host_name = "localhost"
host_ip = "127.0.0.1"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila_2"

#database function definitions
def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe

def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    return dframe

def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()
    
#creating new database    
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

## Customer Dimension Table
I created a 'customer' dimension table from data originating at the MySQL sakila database. I transformed this data by renaming what will be table's key, and dropping the columns that aren't needed in the new schema.

In [None]:
#extracting the data from the source with a sql select statement into a dataframe
sql_customers = "SELECT * FROM sakila.customers;"
df_customers = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.head(2)

In [None]:
#rename id column to fit data warehouse standard, drop undesired columns
df_customers.rename(columns={"id":"customer_key"}, inplace=True)
df_customers.drop(['store_id'], axis=1, inplace=True)
df_customers.head(2)

In [None]:
#for adding address to the customer dimension tables
#bring address, city, and country tables in as dataframes
#drop columns that are empty ('address2') or contain binary large objects that won't be conducive to analysis (location')
sql_address = "SELECT * FROM sakila.address;"
df_address = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_address)
df_address.drop(['location', 'address2', 'last_update'], axis=1, inplace=True)

sql_city = "SELECT city_id , city , country_id FROM sakila.city;"
df_city = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_city)

sql_country = "SELECT country_id, country FROM sakila.country;"
df_country = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_country)

In [None]:
#merge country into city
df_city= pd.merge(df_city, df_country, on='country_id', how='left')#or inner
#merge city into address
df_address= pd.merge(df_address, df_city, on='city_id', how='left')
#merge address into the customer table - end goal
df_customer= pd.merge(df_customer, df_address, on='address_id', how='left')
    #the address information is now attached in the dimension table

In [None]:
#load into data warehouse
#dataframe -> tables
db_operation = "insert"
tables = [('dim_customers', df_customers, 'customer_key')]
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

### Film Dimension Table
Data on the film's language is retrieved from the file system. The film data itself is retrieved from MongoDB. Simple transformations (dropping unnecessary or duplicate columns, renaming the key) are made. The film dataframe is then merged with the language dataframe on the shared language_id column.

In [None]:
#get the sakila language data from the file system
#will later merge to become part of the film dimension table
data_file = os.path.join(os.getcwd(), 'sakila_language_data.csv')
df_language = pd.read_csv(data_file, header=0, index_col=0
df_language.drop(['last_update'], axis=1, inplace=True)
df_language.head()

In [None]:
#get sakila film data from mongodb
query = {}
collection = "film"
df_film = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)#select everthing from film use to make a df
df_film.head(2)

In [None]:
#perform initial transormations on film data
#drop data we are not interested in for analyzing the business processes
df_film.drop(['description','original_language_id'], axis=1, inplace=True)
df_film.rename(columns={"id":"film_key"}, inplace=True) #dont want id to be called just id
df_film.head(2) #id column now film_key

In [None]:
#merge language and film on language id (left) using pandas
df_film = pd.merge(df_film, df_language, on='language_id', how='left')#or inner
df_film.rename(columns={"name":"film_language"}) #rename the column containing the language of the film to be more intuitive
df_film.drop(['language_id'], axis=1, inplace=True)
df_film.head(2)

In [None]:
#load the film dataframe into newly created film table in datawarehouse, 
dataframe = df_film
table_name = 'dim_film'
primary_key = 'film_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, dataframe, table_name, primary_key, db_operation)

#validate that it was created and loaded successfully
sql_film = "SELECT * FROM sakila_2.dim_film;"
df_dim_film = get_sql_dataframe(mysql_uid, mysql_pwd, dst_dbname, sql_film)
df_dim_film.head(2)

## Fact Table
The fact table is based primarily off of the rental process that is modeled by the sakila database. It also includes data pertaining to the payments for the rentals to provide more quantitative information on the transaction. To create this fact table, sakila's rental table and payment table were joined using the pandas merge method.

In [None]:
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)


In [None]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_payment)

In [None]:
df_rental = pd.merge(df_rental, df_payment, on='rental_id', how='left')