## Midterm Project Part 2

### Prerequisites:
This notebook uses the PyMongo database connectivity library to connect to MySQL databases; therefore, you must have first installed that libary into your python environment by executing the following command in a Terminal window.

- `python -m pip install pymongo[srv]`

#### Import the Necessary Libraries

In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

In [2]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 1.4.39
Running PyMongo Version: 4.6.3


#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working 

In [3]:
mysql_args = {
    "uid" : "root",
    "pwd" : "Passw0rd123",
    "hostname" : "localhost",
    "dbname" : "sakila_dw2"
}

# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "hnj4jk",
    "password" : "Passw0rd100603",
    "cluster_name" : "projectCluser",
    "cluster_subnet" : "qwakpze",
    "cluster_location" : "atlas", # "local"
    "db_name" : "sakila_database",
}

#### Define Functions for Getting Data From and Setting Data Into Databases

In [4]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### Populate MongoDB with Source Data


In [5]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join('C:\\Users\\ds2002-student\\Documents\\DS-2002-main\\Projects\\midterm')

json_files = {"address" : 'address.json',
              "category" : 'category.json',
              "city" : 'city.json',
              "country" : 'country.json',
              "film_category" : 'film_category.json',
              "inventory" : 'inventory.json',
              "rental" : 'rental.json',
              "store" : 'store.json'
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)         

### 1.0. Create and Populate the New Dimension Tables
#### 1.1. Extract Data from the Source MongoDB Collections Into DataFrames

In [6]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "address"

df_address = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_address.head(2)

Unnamed: 0,address_id,city_id
0,56,1
1,105,2


In [7]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "category"

df_category = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_category.head(2)

Unnamed: 0,category_id,name
0,1,Action
1,2,Animation


In [8]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "city"

df_city = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_city.head(2)

Unnamed: 0,city_id,country_id
0,251,1
1,59,2


In [9]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "country"

df_country = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_country.head(2)

Unnamed: 0,country_id,country
0,1,Afghanistan
1,2,Algeria


In [10]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "film_category"

df_film_category = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_film_category.head(2)

Unnamed: 0,category_id,film_id
0,1,19
1,1,21


In [11]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "inventory"

df_inventory = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_inventory.head(2)

Unnamed: 0,inventory_id,film_id,store_id
0,1,1,1
1,2,1,1


In [12]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "rental"

df_rental = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_rental.head(2)

Unnamed: 0,inventory_id,rental_id
0,1,4863
1,1,11433


In [13]:
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "store"

df_store = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_store.head(2)

Unnamed: 0,store_id,address_id
0,1,1
1,2,2


#### 1.2. Lookup the Invoice Date Keys from the Date Dimension Table.
Here we see an example of where a dimension cross-references another dimension; the Date dimension.  The Date dimension is a classic example of a **Role-Playing dimension**. Dates in-and-of themselves are universal; however, when applied to specific events they take on the identity of those events. Here, the *dim_date* table takes on the identity of *dim_invoice_date* to supply invoice date keys to the *dim_invoices* table.

##### 1.2.1. Get the Data from the Date Dimension Table.
First, fetch the Surrogate Primary Key (date_key) and the Business Key (full_date) from the Date Dimension table using the **get_sql_dataframe()** function. Be certain to cast the **full_date** column to the **datetime64[ns]** data type using the **.astype()** function that is native to Pandas DataFrame columns. Also, extract the **date** portion using the **.dt.date** attribute of the **datetime64[ns]** datatype.

In [14]:
sql_dim_date = "SELECT date_key, full_date FROM sakila_dw2.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20050524,2005-05-24
1,20050525,2005-05-25


#### 1.3. Perform Any Necessary Transformations to the DataFrames

In [15]:
# Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_address.insert(0, "address_key", range(1, df_address.shape[0]+1))
df_address.head(2)

Unnamed: 0,address_key,address_id,city_id
0,1,56,1
1,2,105,2


In [16]:
df_category.insert(0, "category_key", range(1, df_category.shape[0]+1))
df_category.head(2)

Unnamed: 0,category_key,category_id,name
0,1,1,Action
1,2,2,Animation


In [17]:
df_city.insert(0, "city_key", range(1, df_city.shape[0]+1))
df_city.head(2)

Unnamed: 0,city_key,city_id,country_id
0,1,251,1
1,2,59,2


In [18]:
df_country.insert(0, "country_key", range(1, df_country.shape[0]+1))
df_country.head(2)

Unnamed: 0,country_key,country_id,country
0,1,1,Afghanistan
1,2,2,Algeria


In [19]:
df_film_category.insert(0, "film_category_key", range(1, df_film_category.shape[0]+1))
df_film_category.head(2)

Unnamed: 0,film_category_key,category_id,film_id
0,1,1,19
1,2,1,21


In [20]:
df_inventory.insert(0, "inventory_key", range(1, df_inventory.shape[0]+1))
df_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id,store_id
0,1,1,1,1
1,2,2,1,1


In [21]:
df_rental.insert(0, "rental_key", range(1, df_rental.shape[0]+1))
df_rental.head(2)

Unnamed: 0,rental_key,inventory_id,rental_id
0,1,1,4863
1,2,1,11433


In [22]:
df_store.insert(0, "store_key", range(1, df_store.shape[0]+1))
df_store.head(2)

Unnamed: 0,store_key,store_id,address_id
0,1,1,1
1,2,2,2


#### 1.4. Load the DataFrames into the New Data Warehouse by Creating New Tables


In [23]:
dataframe = df_address
table_name = 'dim_address'
primary_key = 'address_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [24]:
dataframe = df_category
table_name = 'dim_category'
primary_key = 'category_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [25]:
dataframe = df_city
table_name = 'dim_city'
primary_key = 'city_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [26]:
dataframe = df_country
table_name = 'dim_country'
primary_key = 'country_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [27]:
dataframe = df_film_category
table_name = 'dim_film_category'
primary_key = 'film_category_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [28]:
dataframe = df_inventory
table_name = 'dim_inventory'
primary_key = 'inventory_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [29]:
dataframe = df_rental
table_name = 'dim_rental'
primary_key = 'rental_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [30]:
dataframe = df_store
table_name = 'dim_store'
primary_key = 'store_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

#### 1.4. Validate that the New Dimension Tables were Created.

In [31]:
sql_address = "SELECT * FROM sakila_dw2.dim_address;"
df_dim_address = get_sql_dataframe(sql_address, **mysql_args)
df_dim_address.head(2)

Unnamed: 0,address_key,address_id,city_id
0,1,56,1
1,2,105,2


In [32]:
sql_category = "SELECT * FROM sakila_dw2.dim_category;"
df_dim_category = get_sql_dataframe(sql_category, **mysql_args)
df_dim_category.head(2)

Unnamed: 0,category_key,category_id,name
0,1,1,Action
1,2,2,Animation


In [33]:
sql_city = "SELECT * FROM sakila_dw2.dim_city;"
df_dim_city = get_sql_dataframe(sql_city, **mysql_args)
df_dim_city.head(2)

Unnamed: 0,city_key,city_id,country_id
0,1,251,1
1,2,59,2


In [34]:
sql_country = "SELECT * FROM sakila_dw2.dim_country;"
df_dim_country = get_sql_dataframe(sql_country, **mysql_args)
df_dim_country.head(2)

Unnamed: 0,country_key,country_id,country
0,1,1,Afghanistan
1,2,2,Algeria


In [35]:
sql_film_category = "SELECT * FROM sakila_dw2.dim_film_category;"
df_dim_film_category = get_sql_dataframe(sql_film_category, **mysql_args)
df_dim_film_category.head(2)

Unnamed: 0,film_category_key,category_id,film_id
0,1,1,19
1,2,1,21


In [36]:
sql_inventory = "SELECT * FROM sakila_dw2.dim_inventory;"
df_dim_inventory = get_sql_dataframe(sql_inventory, **mysql_args)
df_dim_inventory.head(2)

Unnamed: 0,inventory_key,inventory_id,film_id,store_id
0,1,1,1,1
1,2,2,1,1


In [37]:
sql_rental = "SELECT * FROM sakila_dw2.dim_rental;"
df_dim_rental = get_sql_dataframe(sql_rental, **mysql_args)
df_dim_rental.head(2)

Unnamed: 0,rental_key,inventory_id,rental_id
0,1,1,4863
1,2,1,11433


In [38]:
sql_store = "SELECT * FROM sakila_dw2.dim_store;"
df_dim_store = get_sql_dataframe(sql_store, **mysql_args)
df_dim_store.head(2)

Unnamed: 0,store_key,store_id,address_id
0,1,1,1
1,2,2,2


### 2.0. Create and Populate the New Fact Tables
#### 2.1. Extract Data from the Source MongoDB Collections Into DataFrames

##### 2.3.2. Next, use Business Keys to lookup corresponding Surrogate Primary Keys in the Dimension tables
**2.3.2.1. Inventory Fact table.**  The inventory fact table contains a reference to the *dim_products* table by way of the *product_id* column that enables us to lookup the corresponding *product_key*.

In [39]:
# TODO: Merge 
df_fact_table = pd.merge(df_film_category, df_category, on='category_id',how='right')
df_fact_table.drop(['film_category_key','category_id','category_key'], axis=1, inplace=True)
df_fact_table

Unnamed: 0,film_id,name
0,19,Action
1,21,Action
2,29,Action
3,38,Action
4,56,Action
...,...,...
995,931,Travel
996,977,Travel
997,981,Travel
998,988,Travel


In [40]:
df_fact_table = pd.merge(df_fact_table, df_inventory, on='film_id',how='right')
df_fact_table.drop(['inventory_key','film_id'], axis=1, inplace=True)
df_fact_table

Unnamed: 0,name,inventory_id,store_id
0,Documentary,1,1
1,Documentary,2,1
2,Documentary,3,1
3,Documentary,4,1
4,Horror,16,1
...,...,...,...
995,Sports,1967,1
996,Classics,1979,1
997,Classics,1980,1
998,Classics,1981,1


In [41]:
df_fact_table = pd.merge(df_fact_table, df_store, on='store_id',how='inner')
df_fact_table.drop(['store_id','store_key'], axis=1, inplace=True)
df_fact_table

Unnamed: 0,name,inventory_id,address_id
0,Documentary,1,1
1,Documentary,2,1
2,Documentary,3,1
3,Documentary,4,1
4,Horror,16,1
...,...,...,...
995,Sports,1967,1
996,Classics,1979,1
997,Classics,1980,1
998,Classics,1981,1


In [42]:
df_fact_table = pd.merge(df_fact_table, df_address, on='address_id',how='inner')
df_fact_table.drop(['address_key','address_id'], axis=1, inplace=True)
df_fact_table

Unnamed: 0,name,inventory_id,city_id
0,Documentary,1,300
1,Documentary,2,300
2,Documentary,3,300
3,Documentary,4,300
4,Horror,16,300
...,...,...,...
995,Sports,1967,300
996,Classics,1979,300
997,Classics,1980,300
998,Classics,1981,300


In [43]:
df_fact_table = pd.merge(df_fact_table, df_city, on='city_id',how='left')
df_fact_table.drop(['city_key','city_id'], axis=1, inplace=True)
df_fact_table

Unnamed: 0,name,inventory_id,country_id
0,Documentary,1,20
1,Documentary,2,20
2,Documentary,3,20
3,Documentary,4,20
4,Horror,16,20
...,...,...,...
995,Sports,1967,20
996,Classics,1979,20
997,Classics,1980,20
998,Classics,1981,20


In [44]:
df_fact_table = pd.merge(df_fact_table, df_country, on='country_id',how='inner')
df_fact_table.drop(['country_id','country_key'], axis=1, inplace=True)
df_fact_table

Unnamed: 0,name,inventory_id,country
0,Documentary,1,Canada
1,Documentary,2,Canada
2,Documentary,3,Canada
3,Documentary,4,Canada
4,Horror,16,Canada
...,...,...,...
995,Sports,1967,Canada
996,Classics,1979,Canada
997,Classics,1980,Canada
998,Classics,1981,Canada


In [45]:
df_fact_table = pd.merge(df_fact_table, df_rental, on='inventory_id',how='left')
df_fact_table.drop(['inventory_id','rental_id','rental_key'], axis=1, inplace=True)
df_fact_table

Unnamed: 0,name,country
0,Documentary,Canada
1,Documentary,Canada
2,Documentary,Canada
3,Documentary,Canada
4,Documentary,Canada
...,...,...
1346,Sports,Canada
1347,Classics,Canada
1348,Classics,Canada
1349,Classics,Canada


In [46]:
# TODO: Insert a new 'fact_rental_key' column, with an ever-incrementing
# numeric value, to serve as the surrogate primary key. Then display the results.
df_fact_table.insert(0, "fact_rental_key", range(1, df_fact_table.shape[0]+1))
df_fact_table

Unnamed: 0,fact_rental_key,name,country
0,1,Documentary,Canada
1,2,Documentary,Canada
2,3,Documentary,Canada
3,4,Documentary,Canada
4,5,Documentary,Canada
...,...,...,...
1346,1347,Sports,Canada
1347,1348,Classics,Canada
1348,1349,Classics,Canada
1349,1350,Classics,Canada


#### 2.4. Perform Any Necessary Transformations to the DataFrames

In [47]:
df_fact_table

Unnamed: 0,fact_rental_key,name,country
0,1,Documentary,Canada
1,2,Documentary,Canada
2,3,Documentary,Canada
3,4,Documentary,Canada
4,5,Documentary,Canada
...,...,...,...
1346,1347,Sports,Canada
1347,1348,Classics,Canada
1348,1349,Classics,Canada
1349,1350,Classics,Canada


In [48]:
# Rename the name column 
df_fact_table.rename(columns={'name': 'category_name'}, inplace=True)

# Calculate how many rentals of each movie type in a country
df_fact_table['number_of_rentals'] = df_fact_table.groupby(['category_name', 'country']).transform('count')
df_fact_table = df_fact_table.drop_duplicates(subset=['category_name', 'country', 'number_of_rentals'])

df_fact_table


Unnamed: 0,fact_rental_key,category_name,country,number_of_rentals
0,1,Documentary,Canada,68
12,13,Horror,Canada,87
25,26,Foreign,Canada,81
36,37,Comedy,Canada,82
50,51,Sports,Canada,69
74,75,Music,Canada,98
108,109,Animation,Canada,84
121,122,Action,Canada,113
154,155,New,Canada,98
207,208,Sci-Fi,Canada,129


In [49]:
df_fact_table.drop(['fact_rental_key'], axis=1, inplace=True)
df_fact_table.insert(0, "fact_rentals_key", range(1, df_fact_table.shape[0]+1))

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_fact_table.drop(['fact_rental_key'], axis=1, inplace=True)


#### 2.5. Load Newly Transformed MongoDB Data into the Northwind_DW2 Data Warehouse

In [50]:
dataframe = df_fact_table
table_name = 'fact_rentals'
primary_key = 'fact_rentals_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)