#### Import Necessary Libraries

In [3]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [5]:
import pymysql

#### Check to make sure imported

In [8]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.30
Running PyMongo Version: 4.10.1


#### Connect to MySQL and MongoDB

In [11]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "warriors21!"

src_dbname = "sakila"
dst_dbname = "sakila_dw"

mysql_args = {
    "uid" : "root",
    "pwd" : "warriors21!",
    "hostname" : "localhost",
    "dbname" : "sakila_dw"
}

# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "kpunsalan",
    "password" : "GoHoos123",
    "cluster_name" : "Cluster0KP",
    "cluster_subnet" : "vcr5b",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila_category"
}

#### Define Functions for Getting Data and Setting Into Databases from SQL and MongoDB

In [14]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe

def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe
    

def set_sql_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### Create the new sakila_dw database as target of operations

In [17]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{dst_dbname}`;"))
connection.execute(text(f"CREATE DATABASE `{dst_dbname}`;"))
connection.execute(text(f"USE {dst_dbname};"))

connection.close()

### Extract data for dim tables from MySQL (Films)

In [20]:
# Film table
sql_film = "SELECT * FROM sakila.film;"
df_film = get_dataframe(user_id, pwd, host_name, src_dbname, sql_film)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 05:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 05:03:42


#### Perform any transformations (drop columns, insert any keys needed for dim tables)

In [22]:
# Film

# drop any irrelevant columns
drop_cols = ['length', 'release_year', 'description','language_id','original_language_id','rental_duration','replacement_cost','special_features', 'last_update']
df_film.drop(drop_cols, axis=1, inplace=True)

# primary key already called film_id

# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_film.insert(0, "film_key", range(1, df_film.shape[0]+1))

df_film.head(2)

Unnamed: 0,film_key,film_id,title,rental_rate,rating
0,1,1,ACADEMY DINOSAUR,0.99,PG
1,2,2,ACE GOLDFINGER,4.99,G


### Use MongoDB and json files to get data for dim tables

#### Populate MongoDB with film category json data

In [24]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

# Customer, Category, and Film Category tables
json_files = {"category" : 'sakila_category.json',
              "film_category" : 'sakila_film_category.json'
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)

#### Extract data from the MongoDB collections into tables

In [26]:
# Category table
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "category"

df_category = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_category.head(2)

Unnamed: 0,category_id,name,last_update
0,1,Action,2006-02-15 04:46:27
1,2,Animation,2006-02-15 04:46:27


In [28]:
# Film Category table
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "film_category"

df_film_category = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_film_category.head(2)

Unnamed: 0,film_id,category_id,last_update
0,1,6,2006-02-15 05:07:09
1,2,11,2006-02-15 05:07:09


#### Any transformations needed to the dataframes from MongoDB

In [30]:
# Category
# drop any irrelevant columns
drop_cols = ['last_update']
df_category.drop(drop_cols, axis=1, inplace=True)

# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_category.insert(0, "category_key", range(1, df_category.shape[0]+1))

df_category.head(2)

Unnamed: 0,category_key,category_id,name
0,1,1,Action
1,2,2,Animation


In [32]:
# Film Category
# drop any irrelevant columns
drop_cols = ['last_update']
df_film_category.drop(drop_cols, axis=1, inplace=True)

# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_film_category.insert(0, "film_category_key", range(1, df_film_category.shape[0]+1))

df_film_category.head(2)

Unnamed: 0,film_category_key,film_id,category_id
0,1,1,6
1,2,2,11


### Merge category information into film table
To organize my data I am first going to get the film information merged, in the film table I want to have the film information and the category information.

In [34]:
# Match categories to film_categories on category id
# I want to match up film id with its actual category name
df_film_category = pd.merge(df_film_category, df_category, on='category_id', how='left')
df_film_category.rename(columns={"name":"category_name"}, inplace=True)

df_film_category.head(2)

Unnamed: 0,film_category_key,film_id,category_id,category_key,category_name
0,1,1,6,6,Documentary
1,2,2,11,11,Horror


In [36]:
# Category name is now matched to each film_id
# Use that to merge category names onto film table
df_film = pd.merge(df_film, df_film_category, on='film_id', how='left')
df_film.rename(columns={"title":"film_title"}, inplace=True)

df_film.head(2)

Unnamed: 0,film_key,film_id,film_title,rental_rate,rating,film_category_key,category_id,category_key,category_name
0,1,1,ACADEMY DINOSAUR,0.99,PG,1,6,6,Documentary
1,2,2,ACE GOLDFINGER,4.99,G,2,11,11,Horror


### Extract table data from csv file

In [38]:
# Customer table
data_dir = os.path.join(os.getcwd(), 'data')
data_file = os.path.join(data_dir, 'sakila_customer.csv')

df_customer = pd.read_csv(data_file, header=0, index_col=0)
df_customer.head(2)

Unnamed: 0_level_0,store_id,first_name,last_name,email,address_id,active,create_date,last_update
customer_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
1,1,MARY,SMITH,MARY.SMITH@sakilacustomer.org,5,1,2006-02-14 22:04:36,2006-02-15 04:57:20
2,1,PATRICIA,JOHNSON,PATRICIA.JOHNSON@sakilacustomer.org,6,1,2006-02-14 22:04:36,2006-02-15 04:57:20


In [40]:
# make any needed transformations
# drop any irrelevant columns
drop_cols = ['store_id', 'email', 'address_id', 'active', 'create_date', 'last_update']
df_customer.drop(drop_cols, axis=1, inplace=True)

# Insert new columns, with an ever-incrementing numeric value, to serve as the business and primary keys.
df_customer.insert(0, "customer_id", range(1, df_customer.shape[0]+1))
df_customer.insert(0, "customer_key", range(1, df_customer.shape[0]+1))

df_customer.head(2)

Unnamed: 0_level_0,customer_key,customer_id,first_name,last_name
customer_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1,1,1,MARY,SMITH
2,2,2,PATRICIA,JOHNSON


### Load the dim tables into new sakila_dw data warehouse

In [42]:
# Load new dimension tables: Film, Category, Film Category, and Customer
db_operation = "insert"

tables = [('dim_film', df_film, 'film_key'),
          ('dim_category', df_category, 'category_key'),
          ('dim_film_category', df_film_category, 'film_category_key'),
          ('dim_customer', df_customer, 'customer_id')]

In [44]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

### Create the date dimension table

To create the dim_date table I executed the code from lab 2c in MySQL.
I changed the date range to be aligned with the sakila database which is from 2005-05-24 to 2006-02-14.

In [46]:
# demonstrate that dim_date table exists
sql_dim_date_query = """
    SELECT MIN(full_date) AS BeginDate, 
        MAX(full_date) AS EndDate
    FROM dim_date;
"""

In [48]:
df_dim_date_query = get_sql_dataframe(sql_dim_date_query, **mysql_args)
df_dim_date_query

Unnamed: 0,BeginDate,EndDate
0,2005-05-24,2006-02-14


### Create fact table with all of the transaction data from rental, payment, inventory data

#### Read in data from rental table

In [50]:
sql_rental = "SELECT * FROM sakila.rental;"
df_rental = get_dataframe(user_id, pwd, host_name, src_dbname, sql_rental)
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53


#### Read in data from payment table

In [53]:
sql_payment = "SELECT * FROM sakila.payment;"
df_payment = get_dataframe(user_id, pwd, host_name, src_dbname, sql_payment)
df_payment.head(2)

Unnamed: 0,payment_id,customer_id,staff_id,rental_id,amount,payment_date,last_update
0,1,1,1,76,2.99,2005-05-25 11:30:37,2006-02-15 22:12:30
1,2,1,1,573,0.99,2005-05-28 10:35:23,2006-02-15 22:12:30


#### Read in data from inventory table

In [56]:
sql_inventory = "SELECT * FROM sakila.inventory;"
df_inventory = get_dataframe(user_id, pwd, host_name, src_dbname, sql_inventory)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 05:09:17
1,2,1,1,2006-02-15 05:09:17


#### Perform any needed transformations before merging

In [58]:
drop_cols = ['staff_id', 'last_update', 'customer_id']
df_payment.drop(drop_cols, axis=1, inplace=True)
df_payment.head(2)

Unnamed: 0,payment_id,rental_id,amount,payment_date
0,1,76,2.99,2005-05-25 11:30:37
1,2,573,0.99,2005-05-28 10:35:23


In [60]:
drop_cols = ['store_id', 'last_update']
df_inventory.drop(drop_cols, axis=1, inplace=True)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id
0,1,1
1,2,1


#### Combine inventory onto rental

In [62]:
# need to match rentals with their film id
df_rental = pd.merge(df_rental, df_inventory, on='inventory_id', how='left') 
df_rental.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update,film_id
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,80
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53,333


#### Combine rental and payment table data to get all transaction data in fact table

In [65]:
df_fact_rentals = pd.merge(df_rental, df_payment, on='rental_id', how='left') 
df_fact_rentals.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,customer_id,return_date,staff_id,last_update,film_id,payment_id,amount,payment_date
0,1,2005-05-24 22:53:30,367,130,2005-05-26 22:04:30,1,2006-02-15 21:30:53,80,3504,2.99,2005-05-24 22:53:30
1,2,2005-05-24 22:54:33,1525,459,2005-05-28 19:40:33,1,2006-02-15 21:30:53,333,12377,2.99,2005-05-24 22:54:33


### Fetch surrogate primary key and business key from each of the dimension tables

In [67]:
# Select 'customer_key' and 'customer_id' from sakila_dw.dim_customer
sql_dim_customer = "SELECT customer_key, customer_id FROM sakila_dw.dim_customer;"
df_dim_customer = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_customer)
df_dim_customer.head(2)

Unnamed: 0,customer_key,customer_id
0,1,1
1,2,2


In [69]:
# Select 'film_key' and 'film_id' from sakila_dw.dim_film
sql_dim_film = "SELECT film_key, film_id FROM sakila_dw.dim_film;"
df_dim_film = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_film)
df_dim_film.head(2)

Unnamed: 0,film_key,film_id
0,1,1
1,2,2


### Perform lookup operations to replace business codes in fact df with surrogate primary keys from each dim table

In [71]:
# Customers dimension
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_customer, on='customer_id', how='inner')
df_fact_rentals.drop(['customer_id'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,return_date,staff_id,last_update,film_id,payment_id,amount,payment_date,customer_key
0,1,2005-05-24 22:53:30,367,2005-05-26 22:04:30,1,2006-02-15 21:30:53,80,3504,2.99,2005-05-24 22:53:30,130
1,2,2005-05-24 22:54:33,1525,2005-05-28 19:40:33,1,2006-02-15 21:30:53,333,12377,2.99,2005-05-24 22:54:33,459


In [73]:
# Film dimension
df_fact_rentals = pd.merge(df_fact_rentals, df_dim_film, on='film_id', how='inner')
df_fact_rentals.drop(['film_id'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,rental_date,inventory_id,return_date,staff_id,last_update,payment_id,amount,payment_date,customer_key,film_key
0,1,2005-05-24 22:53:30,367,2005-05-26 22:04:30,1,2006-02-15 21:30:53,3504,2.99,2005-05-24 22:53:30,130,80
1,2,2005-05-24 22:54:33,1525,2005-05-28 19:40:33,1,2006-02-15 21:30:53,12377,2.99,2005-05-24 22:54:33,459,333


### Replacing datetime values with date keys from dim_date table

#### Fetch surrogate and business keys from dim date table

In [77]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20050524,2005-05-24
1,20050525,2005-05-25


#### Lookup surrogate key that corresponds to each date column

In [79]:
# Rental date
df_dim_rental_date = df_dim_date.rename(columns={"date_key" : "rental_date_key", "full_date" : "rental_date"})
df_fact_rentals.rental_date = df_fact_rentals.rental_date.astype('datetime64[ns]').dt.date

df_fact_rentals = pd.merge(df_fact_rentals, df_dim_rental_date, on='rental_date', how='left')
df_fact_rentals.drop(['rental_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,inventory_id,return_date,staff_id,last_update,payment_id,amount,payment_date,customer_key,film_key,rental_date_key
0,1,367,2005-05-26 22:04:30,1,2006-02-15 21:30:53,3504,2.99,2005-05-24 22:53:30,130,80,20050524
1,2,1525,2005-05-28 19:40:33,1,2006-02-15 21:30:53,12377,2.99,2005-05-24 22:54:33,459,333,20050524


In [81]:
# return date
df_dim_return_date = df_dim_date.rename(columns={"date_key" : "return_date_key", "full_date" : "return_date"})
df_fact_rentals.return_date = df_fact_rentals.return_date.astype('datetime64[ns]').dt.date

df_fact_rentals = pd.merge(df_fact_rentals, df_dim_return_date, on='return_date', how='left')
df_fact_rentals.drop(['return_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,inventory_id,staff_id,last_update,payment_id,amount,payment_date,customer_key,film_key,rental_date_key,return_date_key
0,1,367,1,2006-02-15 21:30:53,3504,2.99,2005-05-24 22:53:30,130,80,20050524,20050526.0
1,2,1525,1,2006-02-15 21:30:53,12377,2.99,2005-05-24 22:54:33,459,333,20050524,20050528.0


In [83]:
# payment date
df_dim_payment_date = df_dim_date.rename(columns={"date_key" : "payment_date_key", "full_date" : "payment_date"})
df_fact_rentals.payment_date = df_fact_rentals.payment_date.astype('datetime64[ns]').dt.date

df_fact_rentals = pd.merge(df_fact_rentals, df_dim_payment_date, on='payment_date', how='left')
df_fact_rentals.drop(['payment_date'], axis=1, inplace=True)
df_fact_rentals.head(2)

Unnamed: 0,rental_id,inventory_id,staff_id,last_update,payment_id,amount,customer_key,film_key,rental_date_key,return_date_key,payment_date_key
0,1,367,1,2006-02-15 21:30:53,3504,2.99,130,80,20050524,20050526.0,20050524
1,2,1525,1,2006-02-15 21:30:53,12377,2.99,459,333,20050524,20050528.0,20050524


### Perform any additional transformations (drop unwanted columns, reorder columns, and create new primary key)

In [86]:
# 1. Drop the columns of no particular interest
drop_columns = ['staff_id','last_update']
df_fact_rentals.drop(drop_columns, axis=1, inplace=True)
                
# 2. Reorder the remaining columns
ordered_columns = ['rental_id', 'payment_id', 'customer_key', 'film_key', 'inventory_id',
                   'rental_date_key', 'return_date_key', 'payment_date_key', 'amount']
df_fact_rentals = df_fact_rentals[ordered_columns]
                
# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_fact_rentals.insert(0, 'fact_rental_key', range(1, df_fact_rentals.shape[0]+1))
                
# 4. Display the first 2 rows of the dataframe to validate your work
df_fact_rentals.head(2)

Unnamed: 0,fact_rental_key,rental_id,payment_id,customer_key,film_key,inventory_id,rental_date_key,return_date_key,payment_date_key,amount
0,1,1,3504,130,80,367,20050524,20050526.0,20050524,2.99
1,2,2,12377,459,333,1525,20050524,20050528.0,20050524,2.99


### Write the dataframe back to the database

In [89]:
table_name = "fact_rentals"
primary_key = "fact_rental_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_rentals, table_name, primary_key, db_operation)

### Demonstrate that sakila data warehouse exists with queries

#### A SQL Query that shows the most popular movie:
* Each Movie Title
* The total amount of customers who rented that movie
* The total amount of money spent on that movie from all payments

In [91]:
sql_test = """
    SELECT films.`film_title` AS `Film Title`,
        COUNT(DISTINCT customers.`customer_id`) AS `Total Customers`,
        SUM(rentals.`amount`) AS `Total Rental Payment Revenue`
    FROM `sakila_dw`.`fact_rentals` AS rentals
    INNER JOIN `sakila_dw`.`dim_film` AS films ON rentals.film_key = films.film_id
    INNER JOIN `sakila_dw`.`dim_customer` AS customers ON rentals.customer_key = customers.customer_id
    GROUP BY films.`film_title`
    ORDER BY 
        `Total Customers` DESC,
        `Total Rental Payment Revenue` DESC;
""".format(dst_dbname)

In [93]:
df_test = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test)
df_test.head()

Unnamed: 0,Film Title,Total Customers,Total Rental Payment Revenue
0,BUCKET BROTHERHOOD,33,180.66
1,SCALAWAG DUCK,32,172.68
2,RIDGEMONT SUBMARINE,32,130.68
3,FORWARD TEMPLE,32,128.68
4,GRIT CLOCKWORK,32,110.68


#### A SQL Query that shows the most popular genre:
* Each Genre
* The total amount of customers who rented that genre
* The total amount of money spent on that genre

In [95]:
sql_test2 = """
    SELECT films.`category_name` AS `Genre`,
        COUNT(DISTINCT customers.`customer_id`) AS `Total Customers`,
        SUM(rentals.`amount`) AS `Total Rental Payment Revenue`
    FROM `sakila_dw`.`fact_rentals` AS rentals
    INNER JOIN `sakila_dw`.`dim_film` AS films ON rentals.film_key = films.film_id
    INNER JOIN `sakila_dw`.`dim_customer` AS customers ON rentals.customer_key = customers.customer_id
    GROUP BY films.`category_name`
    ORDER BY 
        `Total Customers` DESC,
        `Total Rental Payment Revenue` DESC;
""".format(dst_dbname)

In [97]:
df_test2 = get_dataframe(user_id, pwd, host_name, src_dbname, sql_test2)
df_test2.head()

Unnamed: 0,Genre,Total Customers,Total Rental Payment Revenue
0,Sports,519,5314.21
1,Action,510,4375.85
2,Sci-Fi,507,4756.98
3,Drama,501,4587.39
4,Family,501,4226.07
