### Importing the necessary libraries

In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

In [2]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 1.4.39
Running PyMongo Version: 4.6.2


### Declaring & assigning the connection variables for the MongoDB Server, the MySQL Server, and databases that will be used



In [3]:
mysql_args = {
    "uid" : "root",
    "pwd" : "Passw0rd123",
    "hostname" : "localhost",
    "dbname" : "sakila"
}

# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "tmf8dy",
    "password" : "RFZRxVe7nNHuqyg9",
    "cluster_name" : "cluster0",
    "cluster_subnet" : "bs8vmg2",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila_inventory"
}

### Defining the functions for getting data from and setting data into databases

In [4]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

import os
print(os.listdir())

['.conda', '.condarc', '.continuum', '.ipynb_checkpoints', '.ipython', '.jupyter', '3D Objects', 'anaconda3', 'AppData', 'Application Data', 'Contacts', 'Cookies', 'data', 'Desktop', 'Documents', 'Downloads', 'Favorites', 'Links', 'Local Settings', 'Music', 'My Documents', 'NetHood', 'NTUSER.DAT', 'ntuser.dat.LOG1', 'ntuser.dat.LOG2', 'NTUSER.DAT{b1015c0c-1ca2-11ee-bad0-00155d004702}.TM.blf', 'NTUSER.DAT{b1015c0c-1ca2-11ee-bad0-00155d004702}.TMContainer00000000000000000001.regtrans-ms', 'NTUSER.DAT{b1015c0c-1ca2-11ee-bad0-00155d004702}.TMContainer00000000000000000002.regtrans-ms', 'ntuser.ini', 'Pictures', 'PrintHood', 'Project 1.ipynb', 'Recent', 'Saved Games', 'Searches', 'SendTo', 'Start Menu', 'Templates', 'Videos']


### Populating MongoDB with source data



In [5]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook, then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"store" : 'sakila_store.json',
              "film" : 'sakila_film.json',
              "inventory" : 'sakila_inventory.json',
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)  

### Creating and populating the new dimension tables and extracting data from the source MongoDB collections into dataframes

In [6]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "store"

df_store = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_store.head(2)

Unnamed: 0,store_id,manager_staff_id,address_id,last_update
0,1,1,1,2006-02-14 23:57:12
1,2,2,2,2006-02-14 23:57:12


In [7]:
client = get_mongo_client(**mongodb_args)
query = {} # Select all elements (columns), and all documents (rows).
collection = "film"

df_film = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_film.head(2)

Unnamed: 0,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
1,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 00:03:42


### Looking up the invoice date keys from the date dimension table and getting the data from the date dimension table

In [8]:
# Getting the data from the date dimension table 

sql_dim_date = "SELECT date_key, full_date FROM sakila.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


### Performing necessary transformations to the dataframes

In [9]:
df_store.insert(0, "store_key", range(1, df_store.shape[0]+1))
df_store.head(2)

Unnamed: 0,store_key,store_id,manager_staff_id,address_id,last_update
0,1,1,1,1,2006-02-14 23:57:12
1,2,2,2,2,2006-02-14 23:57:12


In [10]:
df_film.insert(0, "film_key", range(1, df_film.shape[0]+1))
df_film.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 00:03:42


### Loading the dataframes into the new data warehouse by creating new tables

In [11]:
dataframe = df_store
table_name = 'dim_store'
primary_key = 'store_id'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [12]:
dataframe = df_film
table_name = 'dim_film'
primary_key = 'film_id'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

### Validating that the new dimension tables were created

In [13]:
sql_store = "SELECT * FROM sakila.dim_store;"
df_dim_store = get_sql_dataframe(sql_store, **mysql_args)
df_dim_store.head(2)

Unnamed: 0,store_key,store_id,manager_staff_id,address_id,last_update
0,1,1,1,1,2006-02-14 23:57:12
1,2,2,2,2,2006-02-14 23:57:12


In [14]:
sql_film = "SELECT * FROM sakila.dim_film;"
df_dim_film = get_sql_dataframe(sql_film, **mysql_args)
df_dim_film.head(2)

Unnamed: 0,film_key,film_id,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
1,2,2,ACE GOLDFINGER,A Astounding Epistle of a Database Administrat...,2006,1,0,3,4.99,48,12.99,G,"Trailers,Deleted Scenes",2006-02-15 00:03:42


### Creating and populating the new fact table, then extracting data from the source MongoDB collections into dataframes

In [15]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "inventory"

df_fact_inventory = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_fact_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,last_update
0,1,1,1,2006-02-15 00:09:17
1,2,1,1,2006-02-15 00:09:17


### Looking up the surrogate primary key (date_key) that corresponds to the payment_date column

In [16]:
df_fact_inventory['last_update'] = pd.to_datetime(df_fact_inventory['last_update']).dt.date

df_fact_inventory_date = df_dim_date.rename(columns={"date_key": "inventory_date_key", "full_date": "last_update"})
df_fact_inventory.last_update = df_fact_inventory.last_update.astype('datetime64[ns]').dt.date
df_fact_inventory = pd.merge(df_fact_inventory, df_fact_inventory_date, on='last_update', how='left')
df_fact_inventory.drop(['last_update'], axis=1, inplace=True)
df_fact_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id,inventory_date_key
0,1,1,1,20060215
1,2,1,1,20060215


### Looking up the primary keys from the dimension tables, by first fetching the surrogate primary key and the business key from each dimension table.

In [17]:
sql_dim_store = "SELECT store_key, store_id FROM sakila.dim_store;"
df_dim_store = get_sql_dataframe(sql_dim_store, **mysql_args)
df_dim_store.head(2)

Unnamed: 0,store_key,store_id
0,1,1
1,2,2


In [18]:
sql_dim_film = "SELECT film_key, film_id FROM sakila.dim_film;"
df_dim_film = get_sql_dataframe(sql_dim_film, **mysql_args)
df_dim_film.head(2)

Unnamed: 0,film_key,film_id
0,1,1
1,2,2


### Using business keys to lookup corresponding surrogate primary keys in the dimension tables

In [19]:
df_fact_inventory = pd.merge(df_fact_inventory, df_film, on='film_id', how='left')
df_fact_inventory.drop('film_id', axis=1, inplace=True)
df_fact_inventory

Unnamed: 0,inventory_id,store_id,inventory_date_key,film_key,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update
0,1,1,20060215,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
1,2,1,20060215,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
2,3,1,20060215,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
3,4,1,20060215,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
4,5,2,20060215,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4576,4577,1,20060215,1000,ZORRO ARK,A Intrepid Panorama of a Mad Scientist And a B...,2006,1,0,3,4.99,50,18.99,NC-17,"Trailers,Commentaries,Behind the Scenes",2006-02-15 00:03:42
4577,4578,2,20060215,1000,ZORRO ARK,A Intrepid Panorama of a Mad Scientist And a B...,2006,1,0,3,4.99,50,18.99,NC-17,"Trailers,Commentaries,Behind the Scenes",2006-02-15 00:03:42
4578,4579,2,20060215,1000,ZORRO ARK,A Intrepid Panorama of a Mad Scientist And a B...,2006,1,0,3,4.99,50,18.99,NC-17,"Trailers,Commentaries,Behind the Scenes",2006-02-15 00:03:42
4579,4580,2,20060215,1000,ZORRO ARK,A Intrepid Panorama of a Mad Scientist And a B...,2006,1,0,3,4.99,50,18.99,NC-17,"Trailers,Commentaries,Behind the Scenes",2006-02-15 00:03:42


In [20]:
df_fact_inventory['fact_inventory_rental_key'] = range(1, len(df_fact_inventory) + 1)
df_fact_inventory

Unnamed: 0,inventory_id,store_id,inventory_date_key,film_key,title,description,release_year,language_id,original_language_id,rental_duration,rental_rate,length,replacement_cost,rating,special_features,last_update,fact_inventory_rental_key
0,1,1,20060215,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42,1
1,2,1,20060215,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42,2
2,3,1,20060215,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42,3
3,4,1,20060215,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42,4
4,5,2,20060215,1,ACADEMY DINOSAUR,A Epic Drama of a Feminist And a Mad Scientist...,2006,1,0,6,0.99,86,20.99,PG,"Deleted Scenes,Behind the Scenes",2006-02-15 00:03:42,5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
4576,4577,1,20060215,1000,ZORRO ARK,A Intrepid Panorama of a Mad Scientist And a B...,2006,1,0,3,4.99,50,18.99,NC-17,"Trailers,Commentaries,Behind the Scenes",2006-02-15 00:03:42,4577
4577,4578,2,20060215,1000,ZORRO ARK,A Intrepid Panorama of a Mad Scientist And a B...,2006,1,0,3,4.99,50,18.99,NC-17,"Trailers,Commentaries,Behind the Scenes",2006-02-15 00:03:42,4578
4578,4579,2,20060215,1000,ZORRO ARK,A Intrepid Panorama of a Mad Scientist And a B...,2006,1,0,3,4.99,50,18.99,NC-17,"Trailers,Commentaries,Behind the Scenes",2006-02-15 00:03:42,4579
4579,4580,2,20060215,1000,ZORRO ARK,A Intrepid Panorama of a Mad Scientist And a B...,2006,1,0,3,4.99,50,18.99,NC-17,"Trailers,Commentaries,Behind the Scenes",2006-02-15 00:03:42,4580


### Performing necessary transformations to the dataframe

In [21]:
ordered_columns = ['fact_inventory_rental_key', 'inventory_date_key','inventory_id','store_id', 'film_key', 'title','rental_duration', 'rental_rate',
                   'length', 'replacement_cost','last_update']

df_fact_inventory = df_fact_inventory[ordered_columns]
df_fact_inventory.head(2)

Unnamed: 0,fact_inventory_rental_key,inventory_date_key,inventory_id,store_id,film_key,title,rental_duration,rental_rate,length,replacement_cost,last_update
0,1,20060215,1,1,1,ACADEMY DINOSAUR,6,0.99,86,20.99,2006-02-15 00:03:42
1,2,20060215,2,1,1,ACADEMY DINOSAUR,6,0.99,86,20.99,2006-02-15 00:03:42


###  Loading newly transformed MongoDB data into the data warehouse

In [22]:
dataframe = df_fact_inventory
table_name = 'fact_inventory_rentals'
primary_key = 'fact_inventory_rental_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

### Validating that the new fact tables was created

In [23]:
sql_inventory_rentals = "SELECT * FROM sakila.fact_inventory_rentals;"
df_fact_inventory_rentals = get_sql_dataframe(sql_inventory_rentals, **mysql_args)
df_fact_inventory_rentals.head(2)

Unnamed: 0,fact_inventory_rental_key,inventory_date_key,inventory_id,store_id,film_key,title,rental_duration,rental_rate,length,replacement_cost,last_update
0,1,20060215,1,1,1,ACADEMY DINOSAUR,6,0.99,86,20.99,2006-02-15 00:03:42
1,2,20060215,2,1,1,ACADEMY DINOSAUR,6,0.99,86,20.99,2006-02-15 00:03:42


### Demonstrating that the new data warehouse exists and contains the correct data

In [31]:
sql_inventory_rentals = """
    SELECT 
        f.title AS 'Film Title',
        s.store_id AS 'Store ID',
        FORMAT(SUM(TIMESTAMPDIFF(DAY, r.rental_date, r.return_date) * p.amount), 2) AS 'Total Sales'
    FROM 
        rental r
    INNER JOIN inventory i 
        ON r.inventory_id = i.inventory_id
    INNER JOIN film f 
        ON i.film_id = f.film_id
    INNER JOIN store s 
        ON i.store_id = s.store_id
    INNER JOIN payment p
        ON r.rental_id = p.rental_id
    GROUP BY 
        f.title, 
        s.store_id
    ORDER BY 
        'Total Sales' DESC, 
        'Film Title' ASC;
"""

In [32]:
df_fact_inventory_rentals = get_sql_dataframe(sql_inventory_rentals, **mysql_args)
df_fact_inventory_rentals

Unnamed: 0,Film Title,Store ID,Total Sales
0,BLANKET BEVERLY,1,228.37
1,FREAKY POCUS,2,80.73
2,GRADUATE LORD,2,128.67
3,LOVE SUICIDES,1,223.06
4,IDOLS SNATCHERS,2,374.13
...,...,...,...
1516,RACER EGG,1,32.89
1517,WAR NOTTING,2,97.82
1518,CIDER DESIRE,1,75.77
1519,REMEMBER DIARY,1,67.83
