## Using Python to Integrate MongoDB Data into an ETL Process
Modern Data Warehousing and Analytics solutions frequently use languages like Python or Scala to extract data from numerous sources, including relational database management systems, NoSQL database systems, real-time streaming endpoints and Data Lakes. These languages can then be used to perform many types of transformation before then loading the data into a variety of destinations including file systems and data warehouses. This data can then be consumed by data scientists or business analysts.

In this lab you will build upon the **Northwind_DW2** dimensional database from Lab 3; however, you will be integrating new data sourced from an instance of MongoDB. The new data will be concerned with new business processes; inventory and purchasing. You will continue to interact with both the source systems (MongoDB and MySQL), and the destination system (the Northwind_DW2 data warehouse) from a remote client running Python (Jupyter Notebooks). 

Just as in Lab 3, you will fetch data into Pandas DataFrames, perform all the necessary transformations in-memory on the client, and then push the newly transformed DataFrame to the RDBMS data warehouse using a Pandas function that will create the table and fill it with data with a single operation.

### Prerequisites:
#### Import the Necessary Libraries

In [1]:
import os
import json
import numpy
import datetime
import pandas as pd

import pymongo
from sqlalchemy import create_engine

#series of JSON files in the 'data' section on git/02-python

#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working 

In [2]:
host_name = "localhost"
ports = {"mongo" : 27017, "mysql" : 3306}

user_id = "root"
pwd = "%"

src_dbname = "northwind_purchasing" #what create in mongo
dst_dbname = "northwind_dw2" #destination db, what adding to northwind from lab 3

#### Define Functions for Getting Data From and Setting Data Into Databases

In [3]:
#to get sql DF-- build DF with data from MYSQL table
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe

#connect to mongo, populate DF with data
def get_mongo_dataframe(user_id, pwd, host_name, port, db_name, collection, query):
    '''Create a connection to MongoDB, with or without authentication credentials'''
    if user_id and pwd:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db_name)
        client = pymongo.MongoClient(mongo_uri)
    else:
        conn_str = f"mongodb://{host_name}:{port}/"
        client = pymongo.MongoClient(conn_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query))) #passing in query
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    
    return dframe #gives back data from mongo db query in pandas DF

#pass any DF, will create a table filled with that on mySQL server
def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

#### Populate MongoDB with Source Data
Be certain you run this cell **ONLY ONCE!**  Otherwise, you will fill your MongoDB database with duplicate records which will cause duplicate key errors when you attempt to create and populate the MySQL data warehouse dimension and fact tables.

In [4]:
#names of files and what to create in a dictionary

#putting JSON files INTO MONGO

#connects to DB, iterates through, opens, reads, and inserts into mongo db
port = ports["mongo"]
conn_str = f"mongodb://{host_name}:{port}/"
client = pymongo.MongoClient(conn_str)
db = client[src_dbname]

data_dir = os.path.join(os.getcwd(), 'lab_data')

json_files = {"suppliers" : 'northwind_suppliers.json',
              "invoices" : 'northwind_invoices.json',
              "purchase_orders" : 'northwind_purchase_orders.json',
              "inventory_transactions" : 'northwind_inventory_transactions.json'
             }

#iterating for each file in colleection
for file in json_files:
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile) #load into json obejct
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.") #can prove by going into Compass, seeing collects in nw purchasing

        
client.close()        

## 1.0. Create and Populate the New Dimension Tables
### 1.1. Extract Data from the Source MongoDB Collections Into DataFrames

In [5]:
query = {}
port = ports["mongo"]
collection = "suppliers"

df_suppliers = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_suppliers.head(2)

Unnamed: 0,id,company,last_name,first_name,job_title
0,1,Supplier A,Andersen,Elizabeth A.,Sales Manager
1,2,Supplier B,Weiler,Cornelia,Sales Manager


#### TO DO: Extract data from the "Invoices" collection

In [24]:
query = {}
port = ports["mongo"]
collection = "invoices"

df_invoices = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_invoices.head(2)

Unnamed: 0,id,order_id,invoice_date,due_date,tax,shipping,amount_due
0,5,31,2006-03-22 16:08:59,,0.0,0.0,0.0
1,6,32,2006-03-22 16:10:27,,0.0,0.0,0.0


### 1.2. Perform Any Necessary Transformations to the DataFrames

In [7]:
df_suppliers.rename(columns={"id":"supplier_key"}, inplace=True)
df_suppliers.head(2)

Unnamed: 0,supplier_key,company,last_name,first_name,job_title
0,1,Supplier A,Andersen,Elizabeth A.,Sales Manager
1,2,Supplier B,Weiler,Cornelia,Sales Manager


#### TO DO: Perform any required transformations to the "Invoices" dataframe

In [25]:
#so it looks how i want-- use from last labs, rename and get rid or change order_id to order_key, 
###design table so in SQL can do star joins
###use to SQL (set dataframe function), add new tables to DW with past stuff


df_invoices.rename(columns={"id":"invoice_key"}, inplace=True)

#df_invoices.sort_values(by="due_date", ascending = True) #to check which columns have useful values in them to save space/power
drop_cols = ['due_date','tax', 'shipping', 'amount_due'] #columns with no info in those spots (either '0.0' or 'none')
df_invoices.drop(drop_cols, axis=1, inplace=True)

df_invoices.head(2)

Unnamed: 0,invoice_key,order_id,invoice_date
0,5,31,2006-03-22 16:08:59
1,6,32,2006-03-22 16:10:27


### 1.3. Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

Here we will call our **set_dataframe( )** function to create each dimension table. This function expects a number of parameters including the usual connection information (e.g., user_id, password, MySQL server name and database), the *table_name* we need to assign to the table, the *pandas DataFrame* we crafted to define & populate the table, the *name* we need to assign to the *primary_key* column, and finally, the database operation (insert or update). 

In [10]:
dataframe = df_suppliers
table_name = 'dim_suppliers'
primary_key = 'supplier_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### TO DO: Upload the "Invoices" dataframe to create the new "dim_invoices" dimension table


In [11]:
dataframe = df_invoices
table_name = 'dim_invoices'
primary_key = 'invoice_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

### 1.4. Validate that the New Dimension Tables were Created.

In [12]:
sql_suppliers = "SELECT * FROM northwind_dw2.dim_suppliers;"
df_dim_suppliers = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_suppliers)
df_dim_suppliers.head(2)

Unnamed: 0,supplier_key,company,last_name,first_name,job_title
0,1,Supplier A,Andersen,Elizabeth A.,Sales Manager
1,2,Supplier B,Weiler,Cornelia,Sales Manager


#### TO DO: Validate the new "dim_invoices" table in the northwind_dw2 data warehouse.

In [13]:
sql_suppliers = "SELECT * FROM northwind_dw2.dim_invoices;"
df_invoices = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_suppliers)
df_invoices.head(2)

Unnamed: 0,invoice_key,order_id,invoice_date
0,5,31,2006-03-22 16:08:59
1,6,32,2006-03-22 16:10:27


## 2.0. Create and Populate the New Fact Tables
### 2.1. Extract Data from the Source MongoDB Collections Into DataFrames

In [14]:
query = {} # Select all elements (columns), and all documents (rows).

port = ports["mongo"]
collection = "purchase_orders"

df_pos = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_pos.head(2)

Unnamed: 0,purchase_order_id,supplier_id,created_by,submitted_date,creation_date,status,expected_date,shipping_fee,taxes,payment_date,...,approved_by,approved_date,submitted_by,purchase_order_detail_id,product_id,quantity,unit_cost,date_received,posted_to_inventory,inventory_id
0,90,1,2.0,2006-01-14 00:00:00,2006-01-22 00:00:00,Approved,,0.0,0.0,,...,2.0,2006-01-22 00:00:00,2,238,1,40.0,14.0,2006-01-22 00:00:00,1,59.0
1,91,3,2.0,2006-01-14 00:00:00,2006-01-22 00:00:00,Approved,,0.0,0.0,,...,2.0,2006-01-22 00:00:00,2,239,3,100.0,8.0,2006-01-22 00:00:00,1,54.0


#### TO DO: Extract data for your new "Inventory Transactions" Fact Table

In [18]:
query = {} # Select all elements (columns), and all documents (rows).

port = ports["mongo"]
collection = "inventory_transactions"

df_invt = get_mongo_dataframe(None, None, host_name, port, src_dbname, collection, query)
df_invt.head(2)

Unnamed: 0,id,transaction_type,transaction_created_date,transaction_modified_date,product_id,quantity,purchase_order_id,customer_order_id
0,35,Purchased,2006-03-22 16:02:28,2006-03-22 16:02:28,80,75,,
1,36,Purchased,2006-03-22 16:02:48,2006-03-22 16:02:48,72,40,,


### 2.2. Perform Any Necessary Transformations to the DataFrames

In [16]:
#changing names from id to key
column_name_map = {"purchase_order_id" : "purchase_order_key",
                   "supplier_id" : "supplier_key",
                   "purchase_order_detail_id" : "purchase_order_detail_key",
                   "product_id" : "product_key",
                   "inventory_id" : "inventory_key"
                  }

df_pos.rename(columns=column_name_map, inplace=True)
df_pos.insert(0, "fact_purchase_order_key", range(1, df_pos.shape[0]+1))
df_pos.head(2)

Unnamed: 0,fact_purchase_order_key,purchase_order_key,supplier_key,created_by,submitted_date,creation_date,status,expected_date,shipping_fee,taxes,...,approved_by,approved_date,submitted_by,purchase_order_detail_key,product_key,quantity,unit_cost,date_received,posted_to_inventory,inventory_key
0,1,90,1,2.0,2006-01-14 00:00:00,2006-01-22 00:00:00,Approved,,0.0,0.0,...,2.0,2006-01-22 00:00:00,2,238,1,40.0,14.0,2006-01-22 00:00:00,1,59.0
1,2,91,3,2.0,2006-01-14 00:00:00,2006-01-22 00:00:00,Approved,,0.0,0.0,...,2.0,2006-01-22 00:00:00,2,239,3,100.0,8.0,2006-01-22 00:00:00,1,54.0


#### TO DO: Perform any required transformations to the inventory transactions dataframe

In [19]:
column_name_map = {"id" : "inventory_tran_key",
                   "product_id" : "product_key"
                  }

df_invt.rename(columns=column_name_map, inplace=True)
df_invt.insert(0, "fact_inventory_tran_key", range(1, df_invt.shape[0]+1))


drop_cols = ['purchase_order_id','customer_order_id']
df_invt.drop(drop_cols, axis=1, inplace=True)

df_invt.head(2)

Unnamed: 0,fact_inventory_tran_key,inventory_tran_key,transaction_type,transaction_created_date,transaction_modified_date,product_key,quantity
0,1,35,Purchased,2006-03-22 16:02:28,2006-03-22 16:02:28,80,75
1,2,36,Purchased,2006-03-22 16:02:48,2006-03-22 16:02:48,72,40


### 2.3. Load Newly Transformed MongoDB Data into the Northwind_DW2 Data Warehouse

In [20]:
dataframe = df_pos
table_name = 'fact_purchase_orders'
primary_key = 'Fact_purchase_order_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

#### TO DO: Upload the "Inventory Transaction" dataframe to create the new "fact_inventory_transactions" fact table. 

In [21]:
dataframe = df_invt
table_name = 'fact_inventory_tran'
primary_key = 'Fact_inventory_tran_key'
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

### 2.4. Validate that the New Fact Tables were Created

In [22]:
sql_purchase_orders = "SELECT * FROM northwind_dw2.fact_purchase_orders;"
df_fact_purchase_orders = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_purchase_orders)
df_fact_purchase_orders.head(2)

Unnamed: 0,fact_purchase_order_key,purchase_order_key,supplier_key,created_by,submitted_date,creation_date,status,expected_date,shipping_fee,taxes,...,approved_by,approved_date,submitted_by,purchase_order_detail_key,product_key,quantity,unit_cost,date_received,posted_to_inventory,inventory_key
0,1,90,1,2.0,2006-01-14 00:00:00,2006-01-22 00:00:00,Approved,,0.0,0.0,...,2.0,2006-01-22 00:00:00,2,238,1,40.0,14.0,2006-01-22 00:00:00,1,59.0
1,2,91,3,2.0,2006-01-14 00:00:00,2006-01-22 00:00:00,Approved,,0.0,0.0,...,2.0,2006-01-22 00:00:00,2,239,3,100.0,8.0,2006-01-22 00:00:00,1,54.0


#### TO DO: Validate the correctness of the new "Inventory Transactions" fact table.

In [23]:
sql_purchase_orders = "SELECT * FROM northwind_dw2.fact_inventory_tran;"
df_fact_inventory_tran = get_sql_dataframe(user_id, pwd, host_name, dst_dbname, sql_purchase_orders)
df_fact_inventory_tran.head(2)

Unnamed: 0,fact_inventory_tran_key,inventory_tran_key,transaction_type,transaction_created_date,transaction_modified_date,product_key,quantity
0,1,35,Purchased,2006-03-22 16:02:28,2006-03-22 16:02:28,80,75
1,2,36,Purchased,2006-03-22 16:02:48,2006-03-22 16:02:48,72,40
