## DS2002 - Project 1: ETL Pipeline
This project will collect data from three different sources and use said data to populate the data mart. This will complete this in three parts: part one will extract and transform data from a MySQL database, part two will extract and transform data from a local file (csv), and part three will extract and tranform data from MongoDB. A date dimension table will also be created in MySQL. Towards the end of this document, a SQL query will select data from three of the tables in the data mart to demonstrate functionality.
### Extracting SQL Data
In this section of the project, data will be extracted and transformed from the MySQL database, "sakila" and will be used to populate the new created database, "sakila_dw," using Pandas function and Datafrmaes along with python.

#### Importing the Necessary Libraries

In [1]:
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine

#### Declaring & Assigning Connection Variables for the MySQL Server & Databases

In [2]:
host_name = "ds2002-mysql-stokes.mysql.database.azure.com"
host_ip = "127.0.0.1"
port = "3306"
user_id = "sstokes001"
pwd = "Rw459484"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

#### Defining Functions for Getting Data From and Setting Data Into Databases
These will be used to import (set_dataframe) and extract (get_dataframe) data from MySQL using the assigned variables from above, taking in multiple different arguments. 

In [3]:
def get_dataframe(user_id, pwd, host_name, src_dbname, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{src_dbname}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, dst_dbname, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{dst_dbname}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=sqlEngine, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=sqlEngine, index=False, if_exists='append')
    
    connection.close()

#### Creating the New Data Warehouse (sakila_dw)and Establishing the Connection
If the data warehouse already exists, it will be dropped and a new one is created.

In [4]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x1f340b077c0>

#### Extracting and Transforming Data from the Source Database Tables and Using it to Create and Populate a Fact Table
In the next cells, data will be extracted from the sakila database tables (rental, payment, inventory) using the get_dataframe() function and  any columns that may be unnecessary will be dropped (i.e last_update). The same process is used for each table that is extracted.

In [5]:
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)
df_rental.head()

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53
2,3,2005-05-24 23:03:39,1711,408,2005-06-01 22:12:39,1,2006-02-15 21:30:53
3,4,2005-05-24 23:04:41,2452,333,2005-06-03 01:43:41,2,2006-02-15 21:30:53
4,5,2005-05-24 23:05:21,2079,222,2005-06-02 04:33:21,1,2006-02-15 21:30:53


In [6]:
drop_cols = ['last_update', 'return_date']
df_rental.drop(drop_cols, axis=1, inplace=True)

df_rental.head()

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,staff_id
0,1,2005-05-24 22:53:30,367,130,1
1,2,2005-05-24 22:54:33,1525,459,1
2,3,2005-05-24 23:03:39,1711,408,1
3,4,2005-05-24 23:04:41,2452,333,2
4,5,2005-05-24 23:05:21,2079,222,1


In [7]:
sql_inventory = "SELECT * FROM sakila.inventory;"
df_inventory = get_dataframe(user_id, pwd, host_name, src_dbname, sql_inventory)
df_inventory.head()

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17
2,3,1,1,2006-02-15 05:09:17
3,4,1,1,2006-02-15 05:09:17
4,5,1,2,2006-02-15 05:09:17


In [8]:
drop_cols = ['last_update']
df_inventory.drop(drop_cols, axis=1, inplace=True)

df_inventory.head()

Unnamed: 0,inventory_id,film_id,store_id
0,1,1,1
1,2,1,1
2,3,1,1
3,4,1,1
4,5,1,2


In [9]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payment)
df_payment.head()

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30
2,3,1,1,1185,5.99,2005-06-15 00:54:12,2006-02-15 22:12:30
3,4,1,2,1422,0.99,2005-06-15 18:02:53,2006-02-15 22:12:30
4,5,1,2,1476,9.99,2005-06-15 21:08:46,2006-02-15 22:12:30


In [10]:
drop_cols = ['last_update', 'customer_id', 'staff_id']
df_payment.drop(drop_cols, axis=1, inplace=True)

df_payment.head()

Unnamed: 0,payment_id,rental_id,amount,payment_date
0,1,76,2.99,2005-05-25 11:30:37
1,2,573,0.99,2005-05-28 10:35:23
2,3,1185,5.99,2005-06-15 00:54:12
3,4,1422,0.99,2005-06-15 18:02:53
4,5,1476,9.99,2005-06-15 21:08:46


#### Forming the Fact Table
Now that the dataframes needed to make the fact table have been exported, they will be joined together using the merge() function of the Pandas Dataframe. The dataframes I will use for the fact table are: df_rental, df_payment, and df_inventory. df_rental and df_inventory will be merged first, then merged with df_payment.

In [11]:
df_rental_inventory = pd.merge (df_rental, df_inventory, on='inventory_id', how='inner')
df_rental_inventory.head()

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,staff_id,film_id,store_id
0,1,2005-05-24 22:53:30,367,130,1,80,1
1,1577,2005-06-16 04:03:28,367,327,2,80,1
2,3584,2005-07-06 04:16:43,367,207,1,80,1
3,10507,2005-08-01 11:22:20,367,45,2,80,1
4,13641,2005-08-20 07:34:42,367,281,1,80,1


In [12]:
df_fact_rental_payment = pd.merge(df_rental_inventory, df_payment, on='rental_id', how = 'inner')
df_fact_rental_payment.head() 

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,staff_id,film_id,store_id,payment_id,amount,payment_date
0,1,2005-05-24 22:53:30,367,130,1,80,1,3504,2.99,2005-05-24 22:53:30
1,1577,2005-06-16 04:03:28,367,327,2,80,1,8828,3.99,2005-06-16 04:03:28
2,3584,2005-07-06 04:16:43,367,207,1,80,1,5603,2.99,2005-07-06 04:16:43
3,10507,2005-08-01 11:22:20,367,45,2,80,1,1244,2.99,2005-08-01 11:22:20
4,13641,2005-08-20 07:34:42,367,281,1,80,1,7623,2.99,2005-08-20 07:34:42


#### Further Transformations
The columns will now be reordered and the primary key is created: rental_key.

In [13]:
ordered_columns = ['rental_id','inventory_id','customer_id','payment_id','film_id','store_id',
                   'payment_date','rental_date','amount']


df_fact_rental_payment = df_fact_rental_payment[ordered_columns]

df_fact_rental_payment.insert(0, "rental_key", range(1, df_fact_rental_payment.shape[0]+1))
df_fact_rental_payment.head()


Unnamed: 0,rental_key,rental_id,inventory_id,customer_id,payment_id,film_id,store_id,payment_date,rental_date,amount
0,1,1,367,130,3504,80,1,2005-05-24 22:53:30,2005-05-24 22:53:30,2.99
1,2,1577,367,327,8828,80,1,2005-06-16 04:03:28,2005-06-16 04:03:28,3.99
2,3,3584,367,207,5603,80,1,2005-07-06 04:16:43,2005-07-06 04:16:43,2.99
3,4,10507,367,45,1244,80,1,2005-08-01 11:22:20,2005-08-01 11:22:20,2.99
4,5,13641,367,281,7623,80,1,2005-08-20 07:34:42,2005-08-20 07:34:42,2.99


#### Getting Data from the Date Dimension Table
Using MySQL, a date dimension table was created in "sakila_dw." Using the get_dataframe() function, and the .astype() Pandas Dataframe function, the primary key and business key will be obtained.

In [14]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64')
df_dim_date.head()

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02
2,20000103,2000-01-03
3,20000104,2000-01-04
4,20000105,2000-01-05


#### Obtaining DateKeys
In each date column in the fact table, the corresponding primary key column will be found.

In [15]:
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_fact_rental_payment = pd.merge(df_fact_rental_payment, df_dim_payment_date, on='payment_date', how='inner')
df_fact_rental_payment.drop(['payment_date'], axis=1, inplace=True)
df_fact_rental_payment.head()

Unnamed: 0,rental_key,rental_id,inventory_id,customer_id,payment_id,film_id,store_id,rental_date,amount,payment_date_key
0,13815,6957,2231,518,13947,483,1,2005-07-27,2.99,20050727


In [16]:
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_rental_payment = pd.merge(df_fact_rental_payment, df_dim_rental_date, on='rental_date', how='inner')
df_fact_rental_payment.drop(['rental_date'], axis=1, inplace=True)
df_fact_rental_payment.head()

Unnamed: 0,rental_key,rental_id,inventory_id,customer_id,payment_id,film_id,store_id,amount,payment_date_key,rental_date_key
0,13815,6957,2231,518,13947,483,1,2.99,20050727,20050727


#### Writing the Dataframe back to the Database
A table name and primary key will be assigned and used to populate the new dataframes into the tables of "sakila_dw" and will then be validated using the set_dataframe() function.

In [17]:
table_name = "fact_rental_payments"
primary_key = "rental_key"
db_operation = "insert"


set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_rental_payment, table_name, primary_key, db_operation)

#### Validating the New Fact Table

In [18]:
sql_fact_rental_payments = "SELECT * FROM sakila_dw.fact_rental_payments;"
df_fact_rental_payments = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_fact_rental_payments)
df_fact_rental_payments.head()

Unnamed: 0,rental_key,rental_id,inventory_id,customer_id,payment_id,film_id,store_id,amount,payment_date_key,rental_date_key
0,13815,6957,2231,518,13947,483,1,2.99,20050727,20050727


### Extracting CSV Data
In this section of the project, data will be extracted and transformed from a csv file (sakila_customers.csv) and will be used it to populate 'sakila_dw,' using the pandas function.

#### Importing the Necessary Libraries

In [19]:
import pymysql
import mysql.connector
import matplotlib.pyplot as plt

#### Loading the CSV File Data

In [20]:
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'sakila_customers.csv')

df_customers = pd.read_csv(data_file, header=0, index_col=0)
df_customers.head()

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1,2006-02-14 22:04:36,2006-02-15 04:57:20
3,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1,2006-02-14 22:04:36,2006-02-15 04:57:20
4,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1,2006-02-14 22:04:36,2006-02-15 04:57:20


#### Transforming the Data
Any possibly unncessary columns may be dropped and the dataframe will be given a primary key.

In [21]:
df_customers.drop(['create_date','last_update'], axis=1, inplace=True)
df_customers.rename(columns={"customer_id":"customer_key"}, inplace=True)
df_customers.head()

Unnamed: 0,customer_key,store_id,first_name,last_name,email,address_id,active
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1
2,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1
3,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1
4,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1


#### Loading the Data
A table a name, the dataframe needed to populate it, and a primary_key column are assigned. The set_dataframe() function will be used to insert it into "sakila_dw."

In [22]:
db_operation = "insert"
table = [('dim_customers', df_customers, 'customer_key')]

In [23]:
for table_name, dataframe, primary_key in table:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Validating the New Fact Table

In [24]:
sql_dim_customers = "SELECT * FROM sakila_dw.dim_customers;"
df_dim_customers = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_dim_customers)
df_dim_customers.head()

Unnamed: 0,customer_key,store_id,first_name,last_name,email,address_id,active
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1
2,3,1,LINDA,WILLIAMS,LINDA.WILLIAMS@sakilacustomer.org,7,1
3,4,2,BARBARA,JONES,BARBARA.JONES@sakilacustomer.org,8,1
4,5,1,ELIZABETH,BROWN,ELIZABETH.BROWN@sakilacustomer.org,9,1


### Extracting Data from MongoDB
In this section, data will be imported and extracted from MongoDB, and will be used to populate "sakila_dw" after it is transformed using a Pandas function.
#### Importing the Necessary Libraries

In [25]:
import json
import datetime
import pymongo

#### Declaring & Assign Connection Variables for the MongoDB Server, the MySQL Server & "Sakila" Databases 
Connecting to the local instance of MongoDB.

In [26]:
mysql_uid = "sstokes001"
mysql_pwd = "Rw459484"
mysql_host = "ds2002-mysql-stokes.mysql.database.azure.com"

conn_str = {"local" : f"mongodb://localhost:27017/"}

src_dbname = "sakila_rentals"
dst_dbname = "sakila_dw"

print(f"Local Connection String: {conn_str['local']}")

Local Connection String: mongodb://localhost:27017/


#### Defining Functions for Importing and Exporting Data From and Into Databases

In [27]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{mysql_host}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, db_name, host_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Importing Data into MongoDB 
Using json files, the data is imported into the indicated database" "sakila_rentals," using files such as sakila_films.json and sakil_film_store.json through a for loop.

In [28]:
client = pymongo.MongoClient(conn_str["local"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"films" : 'sakila_film_inv.json',
              "address" : 'sakila_adress.json',
              "stores" : 'sakil_film_store.json',
             }

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()        


#### Extracting Data from MongoDB Collections and Creating and Populating the New Dimension Tables - Performing Transformations as necessary
Data will be extracted from MongoDB and will be used to populate the new dimension tables that will be added to "sakila_dw." Any potentially unnecessary columns may be dropped, and the new dimesnion tables are also given a primary key. Each of the new tables will then be validated. The same process will be used for each table obtained.

In [29]:
query = {}
collection = "films"

df_films = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)
df_films.head()

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42
2,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a ...,2006,1,0,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes",2006-02-15 05:03:42
3,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumb...,2006,1,0,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes",2006-02-15 05:03:42
4,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And ...,2006,1,0,6,2.99,130,22.99,G,Deleted Scenes,2006-02-15 05:03:42


In [30]:
drop_cols = ['last_update']
df_films.drop(drop_cols, axis=1, inplace=True)
df_films.rename(columns={"film_id":"film_key"}, inplace=True)
df_films.head()

Unnamed: 0,film_key,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"
2,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a ...,2006,1,0,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes"
3,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumb...,2006,1,0,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes"
4,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And ...,2006,1,0,6,2.99,130,22.99,G,Deleted Scenes


In [31]:
query = {}
collection = "stores"

df_stores = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)
df_stores.head()

Unnamed: 0,store_id,manager_staff_id,address_id,last_update
0,1,1,1,2006-02-15 04:57:12
1,2,2,2,2006-02-15 04:57:12


In [32]:
drop_cols = ['last_update']
df_stores.drop(drop_cols, axis=1, inplace=True)
df_stores.rename(columns={"store_id":"store_key"}, inplace=True)
df_stores.head()

Unnamed: 0,store_key,manager_staff_id,address_id
0,1,1,1
1,2,2,2


In [33]:
query = {}
collection = "address"

df_address = get_mongo_dataframe(conn_str['local'], src_dbname, collection, query)
df_address.head()

Unnamed: 0,address_id,address,address2,district,city_id,postal_code,phone,location,last_update
0,1,47 MySakila Drive,,Alberta,300,,,"{'type': 'Point', 'coordinates': [-112.8185647...",2014-09-25 22:30:27
1,2,28 MySQL Boulevard,,QLD,576,,,"{'type': 'Point', 'coordinates': [153.1408538,...",2014-09-25 22:30:09
2,3,23 Workhaven Lane,,Alberta,300,,14033335568.0,"{'type': 'Point', 'coordinates': [-112.8185673...",2014-09-25 22:30:27
3,4,1411 Lillydale Drive,,QLD,576,,6172235589.0,"{'type': 'Point', 'coordinates': [153.1913094,...",2014-09-25 22:30:09
4,5,1913 Hanoi Way,,Nagasaki,463,35200.0,28303384290.0,"{'type': 'Point', 'coordinates': [129.7227851,...",2014-09-25 22:31:53


In [34]:
drop_cols = ['last_update', 'location', 'phone']
df_address.drop(drop_cols, axis=1, inplace=True)
df_address.rename(columns={"address_id":"address_key"}, inplace=True)
df_address.head()

Unnamed: 0,address_key,address,address2,district,city_id,postal_code
0,1,47 MySakila Drive,,Alberta,300,
1,2,28 MySQL Boulevard,,QLD,576,
2,3,23 Workhaven Lane,,Alberta,300,
3,4,1411 Lillydale Drive,,QLD,576,
4,5,1913 Hanoi Way,,Nagasaki,463,35200.0


#### Loading the New DataFrames into the Data Warehouse by 
Using the set_dataframe() function, the new dimension tables are loaded in "sakila_dw." The process is similar to adding the dimension tables using MySQL data: assign a table name and primary key, use the newly transformed dataframes to insert the data into the table,s and then validate them using the get_sql_dataframe() function. The same process is used for each of the dataframes.

In [35]:
dataframe = df_films
table_name = 'dim_films'
primary_key = 'film_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, mysql_host, dataframe, table_name, primary_key, db_operation)

In [36]:
sql_films = "SELECT * FROM sakila_dw.dim_films;"
df_dim_films = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_host, dst_dbname, sql_films)
df_dim_films.head()

Unnamed: 0,film_key,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes"
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes"
2,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a ...,2006,1,0,7,2.99,50,18.99,NC-17,"Trailers,Deleted Scenes"
3,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumb...,2006,1,0,5,2.99,117,26.99,G,"Commentaries,Behind the Scenes"
4,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And ...,2006,1,0,6,2.99,130,22.99,G,Deleted Scenes


In [37]:
dataframe = df_stores
table_name = 'dim_stores'
primary_key = 'store_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, mysql_host, dataframe, table_name, primary_key, db_operation)

In [38]:
sql_stores = "SELECT * FROM sakila_dw.dim_stores;"
df_dim_stores = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_host, dst_dbname, sql_stores)
df_dim_stores.head()

Unnamed: 0,store_key,manager_staff_id,address_id
0,1,1,1
1,2,2,2


In [39]:
dataframe = df_address
table_name = 'dim_address'
primary_key = 'address_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, dst_dbname, mysql_host, dataframe, table_name, primary_key, db_operation)

In [40]:
sql_address = "SELECT * FROM sakila_dw.dim_address;"
df_dim_address = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_host, dst_dbname, sql_address)
df_dim_address.head()

Unnamed: 0,address_key,address,address2,district,city_id,postal_code
0,1,47 MySakila Drive,,Alberta,300,
1,2,28 MySQL Boulevard,,QLD,576,
2,3,23 Workhaven Lane,,Alberta,300,
3,4,1411 Lillydale Drive,,QLD,576,
4,5,1913 Hanoi Way,,Nagasaki,463,35200.0


#### Authoring the SQL Query
To demonstrate functionality data from dim_customers, fact_rental_payments, and dim_films are selected. The code highlights the customer name, the title of the film rented, and total amount of the payment.

In [41]:
sql_test = """
    SELECT 
    dim_customers.last_name, 
    dim_films.title, 
    SUM(fact_rental_payments.amount) as total_rental_payments
FROM fact_rental_payments 
INNER JOIN dim_customers ON fact_rental_payments.customer_id = dim_customers.customer_key
INNER JOIN dim_films ON fact_rental_payments.film_id = dim_films.film_key
GROUP BY dim_customers.last_name, dim_films.title;
""".format(dst_dbname)

df_test = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_test)

In [42]:
df_test.head()

Unnamed: 0,last_name,title,total_rental_payments
0,HARDER,JERICHO MULAN,2.99
