# Midterm Project
The goal of this project is to demonstrate an understanding of and competence creating and implementing basic data science systems such as pipelines, scripts, data transformations, APIs, databases and cloud services. In this project, I designed a dimensional data warehouse that represents movie rental transactions. I developed an ETL pipeline that extracts, transforms, and loads data into that data warehouse. My SQL statements show the relationship between customers, stores, payments, rentals, and the films involved.

#### Import the Necessary Libraries

In [1]:
import os
import numpy
import pandas as pd
import json
import datetime
import pymongo
from sqlalchemy import create_engine

#### Declare & Assign Connection Variables for the MySQL Server & Databases with which You'll be Working

In [2]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "sakila"
dst_dbname = "sakila_dw_final"

#### Define Functions for Getting Data From and Setting Data Into Databases

In [3]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Create the New Data Warehouse database, and to Use it, Switch the Connection Context.

Creating a new data warehouse to use as a target for all subsequent operations.

In [4]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x1c472ecc8d0>

## 1.0 SQL

### Create & Populate the Dimension Tables

#### Extract Data from the Source Database Tables 

The main dimensions I am using are customers and the stores they shopped at.

In [5]:
sql_customers = "SELECT * FROM sakila.customer;"
df_customers = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_customers)
df_customers.head(2)

Unnamed: 0,customer_id,store_id,first_name,last_name,email,address_id,active,create_date,last_update
0,1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
1,2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [6]:
sql_stores = "SELECT * FROM sakila.store;"
df_stores = get_dataframe(user_id, pwd, host_name, src_dbname, sql_stores)
df_stores.head(2)

Unnamed: 0,store_id,manager_staff_id,address_id,last_update
0,1,1,1,2006-02-15 04:57:12
1,2,2,2,2006-02-15 04:57:12


#### Perform Any Necessary Transformations

Here, I drop columns that are unnecessary and rename id columns to keys to match data warehouse standards.

In [7]:
drop_cols = ['email','address_id','active','last_update']
df_customers.drop(drop_cols, axis=1, inplace=True)
df_customers.rename(columns={"customer_id":"customer_key"}, inplace=True)

df_customers.head(2)

Unnamed: 0,customer_key,store_id,first_name,last_name,create_date
0,1,1,MARY,SMITH,2006-02-14 22:04:36
1,2,1,PATRICIA,JOHNSON,2006-02-14 22:04:36


In [8]:
drop_cols = ['last_update']
df_stores.drop(drop_cols, axis=1, inplace=True)
df_stores.rename(columns={"store_id":"store_key"}, inplace=True)

df_stores.head(2)

Unnamed: 0,store_key,manager_staff_id,address_id
0,1,1,1
1,2,2,2


#### Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

Here, I create a dataframe with the dimension tables.

In [9]:
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_stores', df_stores, 'store_key')]

In [10]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Create & Populate the Fact Table by implementing the solution using Pandas DataFrames to Craft the table

In this section, I create a fact table with payment and rental information.

##### Get all the data from each of the tables involved

Here, I retrive the data from the tables of interest and make a few transformations.

In [11]:
sql_payments = "SELECT * FROM sakila.payment;"
df_payments = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payments)
drop_cols = ['last_update']
df_payments.drop(drop_cols, axis=1, inplace=True)
df_payments.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date
0,1,1,1,76,2.99,2005-05-25 11:30:37
1,2,1,1,573,0.99,2005-05-28 10:35:23


In [12]:
sql_rentals = "SELECT * FROM sakila.rental;"
df_rentals = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rentals)
drop_cols = ['last_update']
df_rentals.drop(drop_cols, axis=1, inplace=True)
df_rentals.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1


##### Join the Payments and rentals DataFrames

In [13]:
df_fact_info = pd.merge(df_payments, df_rentals, on=['customer_id','rental_id','staff_id'], how='left')
df_fact_info.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,rental_date,inventory_id,return_date
0,1,1,1,76,2.99,2005-05-25 11:30:37,NaT,,NaT
1,2,1,1,573,0.99,2005-05-28 10:35:23,2005-05-28 10:35:23,4020.0,2005-06-03 06:32:23


##### Get the Data from the Date Dimension Table.

In [14]:
sql_dim_date = "SELECT date_key, full_date FROM sakila.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


##### Lookup the DateKeys from the Date Dimension Table.

Here, I connected the dim_date dimension with the date columns that exist in my dimensions of interest.

In [15]:
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_fact_info.payment_date = df_fact_info.payment_date.astype('datetime64').dt.date
df_fact_info = pd.merge(df_fact_info, df_dim_payment_date, on='payment_date', how='left')
df_fact_info.drop(['payment_date'], axis=1, inplace=True)
df_fact_info.head(2)

  df_fact_info.payment_date = df_fact_info.payment_date.astype('datetime64').dt.date


Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,rental_date,inventory_id,return_date,payment_date_key
0,1,1,1,76,2.99,NaT,,NaT,20050525
1,2,1,1,573,0.99,2005-05-28 10:35:23,4020.0,2005-06-03 06:32:23,20050528


In [16]:
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_info.rental_date = df_fact_info.rental_date.astype('datetime64').dt.date
df_fact_info = pd.merge(df_fact_info, df_dim_rental_date, on='rental_date', how='left')
df_fact_info.drop(['rental_date'], axis=1, inplace=True)
df_fact_info.head(2)

  df_fact_info.rental_date = df_fact_info.rental_date.astype('datetime64').dt.date


Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,inventory_id,return_date,payment_date_key,rental_date_key
0,1,1,1,76,2.99,,NaT,20050525,
1,2,1,1,573,0.99,4020.0,2005-06-03 06:32:23,20050528,20050528.0


##### Perform any Additional Transformations

In [17]:
df_fact_info.rename(columns={"payment_id":"payment_key", "customer_id":"customer_key"
                        ,"rental_id":"rental_key","inventory_id":"inventory_key"
                        ,"staff_id":"staff_key"}, inplace=True)

ordered_columns = ['customer_key','payment_key','rental_key','inventory_key'
                   ,'amount','return_date','payment_date_key','rental_date_key','staff_key']
df_fact_info = df_fact_info[ordered_columns]

df_fact_info.insert(0, "fact_info_key", range(1, df_fact_info.shape[0]+1))
df_fact_info.head(2)

Unnamed: 0,fact_info_key,customer_key,payment_key,rental_key,inventory_key,amount,return_date,payment_date_key,rental_date_key,staff_key
0,1,1,1,76,,2.99,NaT,20050525,,1
1,2,1,2,573,4020.0,0.99,2005-06-03 06:32:23,20050528,20050528.0,1


##### Write the DataFrame Back to the Database

In [18]:
table_name = "fact_info"
primary_key = "fact_info_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_info, table_name, primary_key, db_operation)

##### Validate DataFrame was added 

In [22]:
sql_fact_info = "SELECT * FROM sakila_dw_final.fact_info;"
df_fact_info = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_fact_info)
df_fact_info.head(2)

Unnamed: 0,fact_info_key,customer_key,payment_key,rental_key,inventory_key,amount,return_date,payment_date_key,rental_date_key,staff_key
0,1,1,1,76,,2.99,NaT,20050525,,1
1,2,1,2,573,4020.0,0.99,2005-06-03 06:32:23,20050528,20050528.0,1


## 2.0. MongoDB

Here, I will extract and transform the film and film category tables from the sakila database using JSON files and loading them into MongoDB.

#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working 

In [20]:
mysql_uid = "root"
mysql_pwd = "Passw0rd123"
mysql_hostname = "localhost"

atlas_cluster_name = "film.uy9lfwn"
atlas_user_name = "scj5sa"
atlas_password = "TNE1A7JoLcsQTBdK"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "sakila_films"
dst_dbname = "sakila_dw_final"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://scj5sa:TNE1A7JoLcsQTBdK@film.uy9lfwn.mongodb.net


#### Define Functions for Getting Data From and Setting Data Into Databases

In [21]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Populate MongoDB with Source Data

In [23]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"film_info" : 'sakila_film.json',
              "film_category" : 'sakila_category.json'}

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        
client.close()

### Create and Populate the New Dimension Tables

#### Extract Data from the Source MongoDB Collections Into DataFrames

For this section, I will follow a similar process as above but focus on the possible films customers can rent and which categories they are in.

In [24]:
query = {} 
collection = "film_info"

df_film_info = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)  
df_film_info.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [25]:
query = {}
collection = "film_category"

df_film_category = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_film_category.head(2)

Unnamed: 0,film_id,category_id,last_update
0,1,6,2006-02-15 05:07:09
1,2,11,2006-02-15 05:07:09


#### Perform Any Necessary Transformations to the DataFrames

In [26]:
df_film_info.rename(columns={"film_id":"film_key"}, inplace=True)
df_film_info.head(2)

Unnamed: 0,film_key,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [27]:
df_film_category.rename(columns={"film_id":"film_key","category_id":"category_key"}, inplace=True)
df_film_category.head(2)

Unnamed: 0,film_key,category_key,last_update
0,1,6,2006-02-15 05:07:09
1,2,11,2006-02-15 05:07:09


#### Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [28]:
dataframe = df_film_info
table_name = 'dim_film_info'
primary_key = 'film_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [29]:
dataframe = df_film_category
table_name = 'dim_film_category'
primary_key = 'film_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Validate that the New Dimension Tables were Created.

In [30]:
sql_film_info = "SELECT * FROM sakila_dw_final.dim_film_info;"
df_dim_film_info = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_film_info)
df_dim_film_info.head(2)

Unnamed: 0,film_key,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


In [31]:
sql_film_category = "SELECT * FROM sakila_dw_final.dim_film_category;"
df_dim_film_category = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_film_category)
df_dim_film_category.head(2)

Unnamed: 0,film_key,category_key,last_update
0,1,6,2006-02-15 05:07:09
1,2,11,2006-02-15 05:07:09


#### Get the Data from the Date Dimension Table.

In [32]:
sql_dim_date = "SELECT date_key, full_date FROM sakila.dim_date;"
df_dim_date = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


#### Lookup the DateKeys from the Date Dimension Table.

In [33]:
df_dim_last_update = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_film_info.last_update = df_film_info.last_update.astype('datetime64').dt.date
df_film_info = pd.merge(df_film_info, df_dim_last_update, on='last_update', how='left')
df_film_info.drop(['last_update'], axis=1, inplace=True)
df_film_info.head(2)

Unnamed: 0,film_key,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update_key
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",20060215
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",20060215


In [34]:
df_dim_last_update = df_dim_date.rename(columns={"date_key" : "last_update_key", "full_date" : "last_update"})
df_film_category.last_update = df_film_category.last_update.astype('datetime64').dt.date
df_film_category = pd.merge(df_film_category, df_dim_last_update, on='last_update', how='left')
df_film_category.drop(['last_update'], axis=1, inplace=True)
df_film_category.head(2)

Unnamed: 0,film_key,category_key,last_update_key
0,1,6,20060215
1,2,11,20060215


#### Perform Any Necessary Transformations to the DataFrames

In [35]:
column_name_map = {"film_id" : "film_key"}
df_film_info.rename(columns=column_name_map, inplace=True)

drop_cols = ['language_id','original_language_id']
df_film_info.drop(drop_cols, axis=1, inplace=True)

reordered_columns = ['film_key','title','rating','description','release_year','rental_duration'
                     ,'rental_rate', 'length','replacement_cost', 'special_features','last_update_key']

df_film_info = df_film_info[reordered_columns]

df_film_info.insert(0, "fact_film_info_key", range(1, df_film_info.shape[0]+1))
df_film_info.head(2)

Unnamed: 0,fact_film_info_key,film_key,title,rating,description,release_year,rental_duration,rental_rate,length,replacement_cost,special_features,last_update_key
0,1,1,ACADEMY DINOSAUR,PG,A Epic Drama of a Feminist And a Mad Scientist...,2006,6,0.99,86,20.99,"Deleted Scenes,Behind the Scenes",20060215
1,2,2,ACE GOLDFINGER,G,A Astounding Epistle of a Database Administrat...,2006,3,4.99,48,12.99,"Trailers,Deleted Scenes",20060215


In [36]:
column_name_map = {"category_id" : "category_key", "film_id" : "film_key"}
df_film_category.rename(columns=column_name_map, inplace=True)

reordered_columns = ['category_key', 'film_key']

df_film_category = df_film_category[reordered_columns]

df_film_category.insert(0, "fact_film_category_key", range(1, df_film_category.shape[0]+1))
df_film_category.head(2)

Unnamed: 0,fact_film_category_key,category_key,film_key
0,1,6,1
1,2,11,2


#### Load Newly Transformed MongoDB Data into the sakila_dw_final Data Warehouse

Here, I connect the work I did above with customers, stores, payments and rentals to the fact table containing information on films so all the data is in the same place.

In [37]:
dataframe = df_film_info
table_name = 'fact_film_info'
primary_key = 'fact_film_info_key'
db_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [38]:
dataframe = df_film_category
table_name = 'fact_film_category'
primary_key = 'fact_film_category_key'
dp_operation = "insert"

set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### Validate that the New Fact Tables were Created

In [39]:
sql_film_info = "SELECT * FROM sakila_dw_final.fact_film_info;"
df_fact_film_info = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_film_info)
df_fact_film_info.head(2)

Unnamed: 0,fact_film_info_key,film_key,title,rating,description,release_year,rental_duration,rental_rate,length,replacement_cost,special_features,last_update_key
0,1,1,ACADEMY DINOSAUR,PG,A Epic Drama of a Feminist And a Mad Scientist...,2006,6,0.99,86,20.99,"Deleted Scenes,Behind the Scenes",20060215
1,2,2,ACE GOLDFINGER,G,A Astounding Epistle of a Database Administrat...,2006,3,4.99,48,12.99,"Trailers,Deleted Scenes",20060215


In [40]:
sql_film_category = "SELECT * FROM sakila_dw_final.fact_film_category;"
df_fact_film_category = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_film_category)
df_fact_film_category.head(2)

Unnamed: 0,fact_film_category_key,category_key,film_key
0,1,6,1
1,2,11,2


## 3.0. Local File System

Here, I will load and extract the film actor and film text infromation to compliment that data collected using MongoDB.

#### Extract Data from the Source Database Tables 

In [41]:
data_dir = os.path.join(os.getcwd(), 'data');
data_file = os.path.join(data_dir, 'sakila_film_text.csv');

df_film = pd.read_csv(data_file, delimiter=';', header=0);
df_film.head(5)

Unnamed: 0,film_id,title,description
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...
2,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a ...
3,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumb...
4,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And ...


#### Perform Any Necessary Transformations

Here, I drop columns that are unnecessary and rename id columns to keys to match data warehouse standards.

In [42]:
df_film.rename(columns={"film_id": "film_key"}, inplace=True)
df_film.head(5)

Unnamed: 0,film_key,title,description
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...
2,3,ADAPTATION HOLES,A Astounding Reflection of a Lumberjack And a ...
3,4,AFFAIR PREJUDICE,A Fanciful Documentary of a Frisbee And a Lumb...
4,5,AFRICAN EGG,A Fast-Paced Documentary of a Pastry Chef And ...


##### Add transformed data to the new data warehouse

In [43]:
table_name = "dim_film"
primary_key = "film_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_film, table_name, primary_key, db_operation)

## Query Statements

How much did each customer spend at different stores? 

In [44]:
sql_test = f"""
SELECT customers.`last_name` AS `Customer Name`,
    store.`store_key` AS `Store`,
    SUM(payment.amount) AS `Total Amount`
    FROM `sakila_dw_final`.`dim_customers` AS customers
    INNER JOIN `sakila_dw_final`.`fact_info` AS payment
    ON payment.customer_key = customers.customer_key
    LEFT OUTER JOIN `sakila_dw_final`.`dim_stores` AS store
    ON customers.store_id = store.store_key
    GROUP BY customers.`last_name`, store.`store_key`
    ORDER BY `Total Amount` DESC;
"""

df_test = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_test)
df_test.head()

Unnamed: 0,Customer Name,Store,Total Amount
0,SEAL,2,221.55
1,HUNT,1,216.54
2,SHAW,1,195.58
3,KENNEDY,2,194.61
4,SNYDER,2,194.61


What is the average film length in each category of films?

In [46]:
sql_test2 = f"""
SELECT categories.`category_key` AS `Category`,
    AVG(films.`length`) AS `Average Length`
    FROM `sakila_dw_final`.`fact_film_info` AS films
    INNER JOIN `sakila_dw_final`.`fact_film_category` AS categories
    ON films.film_key = categories.film_key
    GROUP BY categories.`category_key`
    ORDER BY `Category`, `Average Length` DESC;
"""

df_test2 = get_dataframe(user_id, pwd, host_name, dst_dbname, sql_test2)
df_test2.head()

Unnamed: 0,Category,Average Length
0,1,111.6094
1,2,111.0152
2,3,109.8
3,4,111.6667
4,5,115.8276
