In [None]:
<div style="color:blue">   
    <h1> ETL for Engage3 technical exercise in Data Load (using AWS/- RDS(mySQL)) </h1>
</div>

### Sanjay Mamidi 9/20/2019


### Summary

This ETL implemented by __Extracting__ sample data from a csv file generated on MAC OS. After conversion 
the data was  __Transformed__ using Pandas. The data was  then __Loaded__  
into a MySQL database using the RDS platform of AWS.The database design implemented is an __ER__ schema.

### Data Source
[Engage3]()

### ER Diagram
[ER Pic](https://www.draw.io/#G1m_bw4aO4XUG47VeHgVVghjDs93w2kwFN)
[Shareable link] (https://drive.google.com/file/d/1m_bw4aO4XUG47VeHgVVghjDs93w2kwFN/view?usp=sharing)

### Repository
Implemented using AWS MySQL RDS Data Repository.
[AWS RDS Platform] etldb.cnbpyehug8lo.us-east-2.rds.amazonaws.com

### Model
The 'PriceGroup', 'Size' and 'Category Level' are the 3 lookup tables. The central entity 
is the 'Items' instantiated on the table 'Items'. The Items table has a primary key as the UPC code itself.
The Size Entity , Price Group And 'Category Level' tables have been modeled on a single primary key with attributes. 
The Items table has a FK constraints enabled to each of the other tables as well. 
Cascade Deletes and Update constraints are enabled on the tables as well. 

### Data Cleansing Strategy
The data was quite clean however there were the usual Null values and suprious string values. These were handled
by writing custom functions to clean the data on the fly keeping in mind the target database table feilds datatype 
and length.  There were some not so obvious data errors as well. The lowest granularity on the category level 4 was
unexpectly not enough to model the Category levels in an heirarchical way. So I went with a flat table approach ie one 
table for all 4 Categories rather than 4 tables with a ascending and descending hierarchy. I did lose some data about 
75 rows since they fell out as duplicates and I would need to troubleshoot more to fix the issue.

### Loading Strategy
The lookup tables were loaded first with auto increment of Primary Keys. This data was then reloaded in a local df 
to ensure the keys were avaiable for the Item table load to do a local join. It however turned out not to be required
and instead the cleaned data was loaded into a StagingTable on the database. The mySQL client was then used to insert data 
the Items table with referentaial integrity turned on. 



         
         

### Will require the commented out pip installs to complete connection to db if not there already.  

In [None]:
#!pip install mysql-connector-python
# !pip install mysqlclient
import sqlalchemy
import pymysql as psql
from sqlalchemy import create_engine
from sqlalchemy.sql import text
import pandas as pd
from sqlalchemy.sql import text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Numeric
from sqlalchemy.orm import sessionmaker
import turbodbc
from datetime import datetime

In [188]:
## The password will be sent seperately.
engine = create_engine("mysql+mysqldb://admin:replaceme@etldb.cnbpyehug8lo.us-east-2.rds.amazonaws.com:3306/Engage3")

In [143]:
items_df = pd.read_csv('c:/users/mangoes/item_list1.csv',encoding='latin1')
items_df.tail(1)

Unnamed: 0,PRICE_GROUP,UPC,ITEM_DESCRIPTION,SIZE,CATEGORY_LEVEL_01,CATEGORY_LEVEL_02,CATEGORY_LEVEL_03,CATEGORY_LEVEL_04,IN_SERVICE_DATE
10167,(null),8005240000000.0,GG SUPER GLUE CS,48 CT,DRUG/GM,GM,PAINT/DECOR,PAINT SUNDRIES,8/29/2015


In [144]:
items_df.count()

PRICE_GROUP          10168
UPC                  10168
ITEM_DESCRIPTION     10168
SIZE                 10168
CATEGORY_LEVEL_01    10168
CATEGORY_LEVEL_02    10168
CATEGORY_LEVEL_03    10168
CATEGORY_LEVEL_04    10168
IN_SERVICE_DATE      10167
dtype: int64

### All data cleansing functions upfront.

In [145]:
""" This function is used to strip trailing blanks                """
""" This is called for each item in the df                         """
""" Accepts one elemement of the df and returns the stripped value """

def cleanUpSpaces(item):
    return item.rstrip()

In [147]:
""" This function implements a strategy to replace non integer values with a noticible string value """
""" for later cleanup and to enable data loading to continue                                        """
""" Accepts one element of the df and returns element unchanged or replaced with String             """

def handleNonNumericData(item):
    try:
        if item.isnumeric(): # if this test fails item is not numeric data hence replace with str "BAD DATA" 
            return item # return unchanged
        else:
            return "BAD DATA"
    except:
        print("Bad data here")
        return "BAD DATA"

In [151]:
""" This function converts 'date' in string to pd.datetime for mySQL load """
""" In case unable again replaces with a noticible String value           """
""" In the database this a datatime value but is Nullable allowing for data loads to continue """


def handleDates(item):
    try:
        return pd.to_datetime(item) 
    except:
        return '1/1/9999'

### Start of Loading. I have made comments where there is data cleaning taking place
### We then load this data and we rely on Auto Increment table function 
### to create the Index(Primary Key) at time of dataload in the database.¶

In [146]:
## Insert SIZE Table
size_df = items_df[['SIZE']].copy()
print("Before DB Insert {0}".format(size_df.count()))

### Cleanup required
size_df['SIZE'] = size_df['SIZE'].apply(cleanUpSpaces)
upc_df = size_df['SIZE'] 
size_df = size_df.drop_duplicates()

### Load
size_df.to_sql('SIZE',con=engine,if_exists ='append',index=False)
size_df = pd.read_sql('SELECT * FROM SIZE',con=engine)


Before DB Insert SIZE    10168
dtype: int64


In [148]:
#Insert PRICE_GROUP Table
#Clean up required for Text data (Null) in numeric data
pg_df = items_df[['PRICE_GROUP']].copy()
pg_df =  pg_df.applymap(handleNonNumericData)
upc_df = pd.concat([upc_df,pg_df], axis = 1) 
pg_df = pg_df.drop_duplicates()
pg_df.to_sql('PRICE_GROUP',con=engine,if_exists ='append',index=False)
pg_df = pd.read_sql('SELECT * FROM PRICE_GROUP ',con=engine)
print("After DB Insert {0}".format(pg_df.count()))

After DB Insert PRICE_GROUP_ID    1188
PRICE_GROUP       1188
dtype: int64


In [149]:
#Insert CATEGORY_LEVELS Table Data
#Clean up required for extraspaces in the data causing dataload to fail.
cl_df = items_df[['CATEGORY_LEVEL_01','CATEGORY_LEVEL_02','CATEGORY_LEVEL_03','CATEGORY_LEVEL_04']].copy()
cl_df = cl_df.rename(columns={'CATEGORY_LEVEL_01':'CL_01_DESC','CATEGORY_LEVEL_02':'CL_02_DESC','CATEGORY_LEVEL_03':'CL_03_DESC','CATEGORY_LEVEL_04':'CL_04_DESC'})
cl_df = cl_df.applymap(cleanUpSpaces)
upc_df = pd.concat([upc_df,cl_df], axis = 1) 
cl_df = cl_df.drop_duplicates()
cl_df.to_sql('CATEGORY_LEVELS',con=engine,if_exists ='append',index=False)
cl_df = pd.read_sql('SELECT * FROM CATEGORY_LEVELS',con=engine)



## Now clean the Items Data 

In [150]:
#Clean the UPC Data since it has a few non numeric data and this feild is a primary key in our Items table
UPC_df = items_df[['UPC']].copy()
# # # UPC_df = UPC_df.drop_duplicates()
print(UPC_df.count())
UPC_df = UPC_df.applymap(handleNonNumericData)


UPC    10168
dtype: int64


### Cleaning the Date fields and the Item Description

In [153]:
ISD_df = items_df[['IN_SERVICE_DATE']].copy()
ISD_df.count()
ISD_df = ISD_df.applymap(handleDates)


In [156]:
IDesc_df = items_df[['ITEM_DESCRIPTION']].copy()
IDesc_df =IDesc_df.applymap(cleanUpSpaces)


### Concatinating all the data to upload to the StagingTable table in the Db. Much easier in the db 
### to manipulate complicated inserts.

In [157]:
insert_df = pd.concat([ISD_df,IDesc_df,upc_df,UPC_df], axis = 1)
insert_df = insert_df.drop_duplicates()
insert_df.to_sql('StagingTable',con=engine,if_exists ='append',index=False)


#####  After data load the following SQL was used to load up the Items table. 
#####  Insert into ITEMS
#####  select a.UPC, a.ITEM_DESCRIPTION,a.IN_SERVICE_DATE, b.SIZE_ID, 
#####  c.PRICE_GROUP_ID, d.CATEGORY_LEVEL_ID
#####  from CATEGORY_LEVELS d, PRICE_GROUP c,SIZE b, StagingTable a 
#####  where a.CL_04_DESC= d.CL_04_DESC and b.SIZE = a.SIZE and c.PRICE_GROUP = a.PRICE_GROUP

### And finally for the Querys to get data. 

In [179]:
sql = text( ' Select count(UPC) as ITEMS_CNT, PRICE_GROUP' \
            ' from ITEMS a,PRICE_GROUP b where'  \
            ' a.PRICE_GROUP_ID = b.PRICE_GROUP_ID' \
            ' group by a.PRICE_GROUP_ID' \
            ' order by ITEMS_CNT desc' )
        
query_df = pd.read_sql_query(sql, con=engine, params={})

query_df.head(10)

Unnamed: 0,ITEMS_CNT,PRICE_GROUP
0,1051,BAD DATA
1,186,2204
2,140,2964
3,136,2096
4,99,743
5,88,36105
6,63,786
7,61,4871
8,58,23301
9,58,2099


In [177]:
sql = text( ' Select count(UPC) as ITEMS' \
            ' from ITEMS a , CATEGORY_LEVELS b' \
            ' where a.CATEGORY_LEVEL_ID = b.CATEGORY_LEVEL_ID' \
            ' and b.CL_04_DESC like "LUNCHMEAT"' \
            ' and b.CL_03_DESC like "PKG MEAT"' \
            ' and b.CL_02_DESC like "PKG MEAT" and b.CL_01_DESC = "MEAT"')

query_df = pd.read_sql_query(sql, con=engine, params={})

query_df

Unnamed: 0,ITEMS
0,75


In [186]:
sql = text( ' Select count(UPC) as ITEMS  from ITEMS a ,' \
            ' CATEGORY_LEVELS b ' \
            ' where a.CATEGORY_LEVEL_ID = b.CATEGORY_LEVEL_ID' \
            ' and b.CL_03_DESC = "HEALTH"' \
            ' and b.CL_02_DESC = "HBC"' \
            ' and b.CL_01_DESC = "DRUG/GM"')

query_df = pd.read_sql_query(sql, con=engine, params={})

query_df

Unnamed: 0,ITEMS
0,528


In [187]:
sql = text( ' select a.ITEM_DESCRIPTION, a.UPC from ITEMS a,' \
            ' PRICE_GROUP b where  a.PRICE_GROUP_ID = b.PRICE_GROUP_ID ' \
            ' and b.PRICE_GROUP = "29327"' )

query_df = pd.read_sql_query(sql, con=engine, params={})

query_df

Unnamed: 0,ITEM_DESCRIPTION,UPC
0,SLMFST 321 CHOC ROYAL 4PK,834612203
1,SLMFST 321 MLK CHO SHK4PK,834612204
2,SLMFST RTD MLKCHOC 6PKBNS,834628710
3,SLMFST RTD CHC RYL 6PKBNS,834628711
4,SLMFST RTD CAPPDLG 6PKBNS,834628713
5,SLMFST 321 MLK CH SHK 4PK,834674001
6,SLMFST CAPPCNO DLGHT 4PK,834674005
7,SLMFST 321 CHOC ROYAL 4PK,834674007
