# AdventureWorks ETL Pipeline
## By Sonika Modur
[Insert description here]

### Import Necessary Libraries

In [18]:
import os
import json
import numpy
import pandas as pd
import datetime
import certifi

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [2]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 1.4.7
Running PyMongo Version: 4.10.1


### Declare and Assign Connection Variables for MySQL Server and Databases

In [3]:
src_mysql_args = {
    "uid" : "root",
    "pwd" : "PASSWORD123!",
    "hostname" : "localhost",
    "dbname" : "adventureworks"
}

dst_mysql_args = {
    "uid" : "root",
    "pwd" : "PASSWORD123!",
    "hostname" : "localhost",
    "dbname" : "adventureworks_dw"
}

### Declare Functions for Getting Data From and Setting Data Into Databases (MySQL)

In [4]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

### Declare Functions for Getting Data From and Setting Data Into Databases (MongoDB)

In [14]:
def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

### Create the AdventureWorks Data Warehouse

In [5]:
conn_str = f"mysql+pymysql://{dst_mysql_args['uid']}:{dst_mysql_args['pwd']}@{dst_mysql_args['hostname']}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{dst_mysql_args['dbname']}`;"))
connection.execute(text(f"CREATE DATABASE `{dst_mysql_args['dbname']}`;"))
connection.execute(text(f"USE {dst_mysql_args['dbname']};"))

connection.close()

### ETL Process for MySQL

#### Extract step: Create & populate customer dimension table

In [6]:
sql_customers = "SELECT * FROM adventureworks.customer;"
df_customers = get_sql_dataframe(sql_customers, **src_mysql_args)
df_customers.head(2)

Unnamed: 0,CustomerID,TerritoryID,AccountNumber,CustomerType,rowguid,ModifiedDate
0,1,1,AW00000001,S,b'^\xe9Z?}\xb8\xedJ\x95\xb4\xc3yz\xfc\xb7O',2004-10-13 11:15:07
1,2,1,AW00000002,S,b'W\xf6R\xe5\xaf\xa9}J\xa6E\xc4)\xd6\xe0$\x91',2004-10-13 11:15:07


#### Transform step: drop/rename columns in customer dimension table

In [7]:
# Drop rowguid column
drop_cols = ['rowguid']
df_customers.drop(drop_cols, axis=1, inplace=True)

# Rename columns for consistency 
df_customers.rename(columns={
    'CustomerID': 'customer_key',
    'TerritoryID': 'territory_id',
    'AccountNumber': 'account_number',
    'CustomerType': 'customer_type',
    'ModifiedDate': 'modified_date'
}, inplace=True)

df_customers.head(2)

Unnamed: 0,customer_key,territory_id,account_number,customer_type,modified_date
0,1,1,AW00000001,S,2004-10-13 11:15:07
1,2,1,AW00000002,S,2004-10-13 11:15:07


#### Load step: create and populate date dimension 

Execute the `Create_Populate_Dim_Date.sql` script to create and populate a date dimension table (`date_dim`) in the AdventureWorks data warehouse. The SQL file must be located in the working directory for this step. 

#### Load step: populate customer dimension

In [8]:
db_operation = "insert"
set_dataframe(df_customers, table_name='dim_customers', pk_column='customer_key',db_operation=db_operation, **dst_mysql_args)

#### Verify success of ETL operations for date dimension

In [12]:
# Retrieve and display date dimension table from adventureworks data warehouse
sql_date = "SELECT * FROM dim_date;"
df_date = get_sql_dataframe(sql_date, **dst_mysql_args)
df_date.head(2)

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


#### Verify success of ETL operations for customer dimension

In [13]:
# Retrieve and display customer dimension table from adventureworks data warehouse
sql_customers = "SELECT * FROM dim_customers;"
df_verify_customers = get_sql_dataframe(sql_customers, **dst_mysql_args)
df_verify_customers.head(2)

Unnamed: 0,customer_key,territory_id,account_number,customer_type,modified_date
0,1,1,AW00000001,S,2004-10-13 11:15:07
1,2,1,AW00000002,S,2004-10-13 11:15:07


### ETL Process for MongoDB

#### Write an SQL query for the employee dimension

Note: did not include rowguid in Employee table since it is irrelevant. Also did not include CurrentFlag in Employee table since its value is 1 for all employees. 

In [32]:
sql_dim_employees = """
SELECT
    e.EmployeeID,
    e.NationalIDNumber,
    e.ContactID,
    e.LoginID,
    e.ManagerID,
    e.Title AS EmployeeTitle,
    e.BirthDate,
    e.MaritalStatus,
    e.Gender,
    e.HireDate,
    e.SalariedFlag,
    e.VacationHours,
    e.SickLeaveHours,
    e.ModifiedDate AS EmployeeModifiedDate,

    c.FirstName,
    c.MiddleName,
    c.LastName,
    c.EmailAddress,
    c.Phone,
    c.ModifiedDate AS ContactModifiedDate,

    ea.AddressID,
    ea.ModifiedDate AS EmployeeAddressModifiedDate,

    a.AddressLine1,
    a.AddressLine2,
    a.City,
    a.PostalCode,
    a.ModifiedDate AS AddressModifiedDate,

    edh.DepartmentID,
    edh.ShiftID,
    edh.StartDate AS DeptStartDate,
    edh.EndDate AS DeptEndDate,
    edh.ModifiedDate AS DeptHistModifiedDate,

    d.Name AS DeptName,
    d.GroupName,
    d.ModifiedDate AS DeptModifiedDate,
    
    s.Name AS ShiftName,
    s.StartTime AS ShiftStartTime,
    s.EndTime AS ShiftEndTime,
    s.ModifiedDate AS ShiftModifiedDate,

    eph.RateChangeDate,
    eph.Rate,
    eph.PayFrequency,
    eph.ModifiedDate AS PayHistModifiedDate

FROM employee e
LEFT JOIN contact c
    ON e.ContactID = c.ContactID
LEFT JOIN employeeaddress ea
    ON e.EmployeeID = ea.EmployeeID
LEFT JOIN address a
    ON ea.AddressID = a.AddressID
LEFT JOIN employeedepartmenthistory edh
    ON e.EmployeeID = edh.EmployeeID
LEFT JOIN department d
    ON edh.DepartmentID = d.DepartmentID
LEFT JOIN shift s
    ON edh.ShiftID = s.ShiftID
LEFT JOIN employeepayhistory eph
    ON e.EmployeeID = eph.EmployeeID
"""

#### Extract step: get employee data from MySQL

In [33]:
df_employee = get_sql_dataframe(sql_dim_employees, **src_mysql_args)
df_employee.head(2)

Unnamed: 0,EmployeeID,NationalIDNumber,ContactID,LoginID,ManagerID,EmployeeTitle,BirthDate,MaritalStatus,Gender,HireDate,...,GroupName,DeptModifiedDate,ShiftName,ShiftStartTime,ShiftEndTime,ShiftModifiedDate,RateChangeDate,Rate,PayFrequency,PayHistModifiedDate
0,1,14417807,1209,adventure-works\guy1,16.0,Production Technician - WC60,1972-05-15,M,M,1996-07-31,...,Manufacturing,1998-06-01,Day,1900-01-01 07:00:00,1900-01-01 15:00:00,1998-06-01,1996-07-31,12.45,1,2004-07-31
1,2,253022876,1030,adventure-works\kevin0,6.0,Marketing Assistant,1977-06-03,S,M,1997-02-26,...,Sales and Marketing,1998-06-01,Day,1900-01-01 07:00:00,1900-01-01 15:00:00,1998-06-01,1997-02-26,13.4615,2,2004-07-31


#### Validate dataframe columns for JSON conversion 
This was a troubleshooting step, since there was initially an error that some column(s) cannot be encoded into UTF-8 when converting the dataframe to JSON. Without being compatible for UTF-8 encoding, the dataframe cannot be converted into a JSON file. So, this code iterates through the columns to find which ones are problematic.

In [34]:
for col in df_employee.select_dtypes(include=['object']).columns:
    for value in df_employee[col]:
        if value is None:
            continue
        try:
            value.encode('utf-8')
        except Exception as e:
            print(f"Error in column '{col}' for value '{value}'")
            break # proceed to check next column 

Error in column 'SalariedFlag' for value 'b'\x00''


#### Transform step: convert byte values in SalariedFlag into Integers
The problematic column was found to be `SalariedFlag`, which had at least one value of type `byte`. This code iterates through the values in `SalariedFlag` and converts the byte types into integers so they are compatible with JSON.

In [37]:
def byte_to_int(val):
    if isinstance(val, bytes):
        return int.from_bytes(val, 'big')
    return val
df_employee['SalariedFlag'] = df_employee['SalariedFlag'].apply(byte_to_int)

#### Export dataframe resulting from SQL Query to JSON 

In [38]:
file = os.path.join(os.getcwd(), 'adventureworks_employee.json')
df_employee.to_json(file, orient='records')