# ETL Pipeline

I used a cloud based solution (AWS) to do the ETL work. 

Step 1: Uploaded the file to S3

Step 2: Create Staging tables in Redshift

Step 3: Use COPY command available for redshift to use MPP capability of redshift to copy data from S3 to Staging tables

Step 4: Apply Transformations in the staging area by loading data in pandas dataframe

Step 5: Created ER and Tables in Redshift 

Step 6: Loaded Data in the new Tables

Finally used SQL to answer questions.



In Redshift the following 3 tables have been created.

Table 1: grocery_item
         1. upc
         2. price_group
Table 2: grocery_size
         1. upc
         2. size
Table 3: grocery_desc
         1. upc
         2. item_description
         3. category_level_01
         4. category_level_02
         5. category_level_03
         6. category_level_04
         7. in_service_date

## Import libraries

In [2]:
import pandas as pd
import re
import pandas.io.sql as psql
from pandas.io.sql import read_sql, to_sql
import psycopg2
import s3fs
import sqlalchemy as sa 

## Read csv file from local directory

In [3]:
df = pd.read_csv('item_list.csv')
df.head(2)


Unnamed: 0,PRICE_GROUP,UPC,ITEM_DESCRIPTION,SIZE,CATEGORY_LEVEL_01,CATEGORY_LEVEL_02,CATEGORY_LEVEL_03,CATEGORY_LEVEL_04,IN_SERVICE_DATE
0,50089,1329,PRIVATE LABEL PURIFIED WATER 35PK,35/16.9 FO,GROCERY,GROCERY,GROC-ALL OTHER,WATER,2/15/2016
1,1500,2591,PRIVATE LABEL PURIFIED WATER 24PK,24/16.9 FO,GROCERY,GROCERY,GROC-ALL OTHER,WATER,2/15/2016


## Check Issues with Data to create Data Cleaning Strategy

### Summary of the issues identified with commands below
1. Drop duplicate data
2. Fill null values. Null values in attribute price_group are replaced with 0 to be consistent with integer datatype in tables.
3. Replace values similar to null with string data type, for example "(null)"
4. Replace mutiple values in same column cell, for example (value,value)

Anamolies are found here and later implemented in transforamtion stage of ETL


In [6]:
df.info()
print(len(df['UPC'].unique().tolist()))


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10168 entries, 0 to 10167
Data columns (total 9 columns):
PRICE_GROUP          10168 non-null object
UPC                  10168 non-null object
ITEM_DESCRIPTION     10168 non-null object
SIZE                 10168 non-null object
CATEGORY_LEVEL_01    10168 non-null object
CATEGORY_LEVEL_02    10168 non-null object
CATEGORY_LEVEL_03    10168 non-null object
CATEGORY_LEVEL_04    10168 non-null object
IN_SERVICE_DATE      10167 non-null object
dtypes: object(9)
memory usage: 715.1+ KB
10106


In [8]:
duplicateRowsDFF = df[df.duplicated()]
print(duplicateRowsDFF)
print("Duplicate Rows except first occurrence based on all columns are :")


     PRICE_GROUP         UPC                          ITEM_DESCRIPTION  \
1319       39518  1111038127  PRIVATE LABEL OIL FREE LOTION SPF30        
1320       39518  1111038127  PRIVATE LABEL OIL FREE LOTION SPF30        
1342       39518  1111038461  PRIVATE LABEL SPRT SNSCRN LTN SPF30        
1343       39518  1111038461  PRIVATE LABEL SPRT SNSCRN LTN SPF30        
4088       43645  1111083908  PRIVATE LABEL CHILI NO BEANS               
...          ...         ...                                       ...   
9001       39518  4126037914  PRIVATE LABEL SUNSCREEN LTN SPF50          
9008       39518  4126037930      PRIVATE LABEL KD TRGR SPF50            
9009       39518  4126037930      PRIVATE LABEL KD TRGR SPF50            
9096       39518  4126038475  PRIVATE LABEL SUNSCREEN LTN SPF50          
9097       39518  4126038475  PRIVATE LABEL SUNSCREEN LTN SPF50          

            SIZE CATEGORY_LEVEL_01 CATEGORY_LEVEL_02 CATEGORY_LEVEL_03  \
1319        8 FO           DRUG/GM   

In [3]:
df = df.drop_duplicates()
df_check = df[df.duplicated()]
print(df_check)

Empty DataFrame
Columns: [PRICE_GROUP, UPC, ITEM_DESCRIPTION, SIZE, CATEGORY_LEVEL_01, CATEGORY_LEVEL_02, CATEGORY_LEVEL_03, CATEGORY_LEVEL_04, IN_SERVICE_DATE]
Index: []


In [4]:
df["PRICE_GROUP"].fillna("(null)", inplace = True)
df['PRICE_GROUP'] = df['PRICE_GROUP'].str.replace(r'((,) [0-9]+)','')
df['UPC'] = df['UPC'].str.replace(r'((,) [0-9]+)','')
display(df[df['PRICE_GROUP'].str.contains(',')])
display(df[df['UPC'].str.contains(',')])


Unnamed: 0,PRICE_GROUP,UPC,ITEM_DESCRIPTION,SIZE,CATEGORY_LEVEL_01,CATEGORY_LEVEL_02,CATEGORY_LEVEL_03,CATEGORY_LEVEL_04,IN_SERVICE_DATE


Unnamed: 0,PRICE_GROUP,UPC,ITEM_DESCRIPTION,SIZE,CATEGORY_LEVEL_01,CATEGORY_LEVEL_02,CATEGORY_LEVEL_03,CATEGORY_LEVEL_04,IN_SERVICE_DATE


In [5]:
print(df.isnull().sum())
# we still have one null value, will be dealt in transformation

PRICE_GROUP          0
UPC                  0
ITEM_DESCRIPTION     0
SIZE                 0
CATEGORY_LEVEL_01    0
CATEGORY_LEVEL_02    0
CATEGORY_LEVEL_03    0
CATEGORY_LEVEL_04    0
IN_SERVICE_DATE      1
dtype: int64


## Other  Anomalies: They do not affect the queries asked later therefore have not been implemented

All other columns are similar except item description which points to the same product. Most likely the same product having two UPC because of misspelled description. 
example 1
UPC:834674001  ITEM_DESCRIPTION: SLMFST 321 MLK CH SHK 4PK   
UPC:834612204  ITEM_DESCRIPTION: SLMFST 321 MLK CHO SHK4PK    

example 2 
UPC: 1111000892 ITEM_DESCRIPTION: HAWAI HOT DG BNS 8CT      
UPC: 1111000899 ITEM_DESCRIPTION: HAWAI HT DOG BNS 8CT   

WHL MILK                   WHOLE MILK                 1111042851    1/2 GAL 1111042850    1/2 GAL

Other anomaly that can be seen below is because of white spaces. Again the same product but different UPC
ITEM_DESCRIPTION: HAMBURGER BUNS 8CT
ITEM_DESCRIPTION:  HAMBURGER BUNS 8CT

Inconsistent spacing
1%LF MILK                 1% LF MILK                 1111041754    1/2 GAL 1111042855    1/2 GAL



# Step 1: Copy inital data file into S3

This step was done through the GUI of S3


# Step 2: Create staging tables in redshift

In [9]:
#details hidden
conn=psycopg2.connect(dbname='dev', host='',port= '5439', user='nikhil', password='')
cur = conn.cursor()


In [29]:
create_staging_grocery_table = """CREATE TABLE IF NOT EXISTS staging_grocery(
    price_group         varchar,
    upc                 varchar,
    item_description    varchar,
    size                varchar,
    category_level_01   varchar,
    category_level_02   varchar,
    category_level_03   varchar,
    category_level_04   varchar, 
    in_service_date     varchar)
"""

# Step 3: Copy data into staging tables

In [30]:
copy_to_staging_grocery = """copy staging_grocery
from 's3://engage3/item_list_new.csv' 
iam_role 'arn:aws:iam::525333178651:role/new_redshift'
delimiter ',' 
IGNOREHEADER 1 
emptyasnull 
blanksasnull 
removequotes 
escape ;"""

In [31]:
cur.execute(create_staging_grocery_table)
cur.execute(copy_to_staging_grocery)
conn.commit()

# Step 4: Load data into dataframe for transformation

In [33]:
dataframe = psql.read_sql("SELECT * FROM staging_grocery", conn)

In [34]:
dataframe.head()

Unnamed: 0,price_group,upc,item_description,size,category_level_01,category_level_02,category_level_03,category_level_04,in_service_date
0,1500,2591,PRIVATE LABEL PURIFIED WATER 24PK,24/16.9 FO,GROCERY,GROCERY,GROC-ALL OTHER,WATER,2/15/2016
1,17302,4077,CORN WHITE IN HUSK BULK,1 RW,PRODUCE,FRESH PRODUCE,VEGETABLES,CORN,5/7/2016
2,17302,4590,CORN BICOLOR IN HUSK BULK,1 EA,PRODUCE,FRESH PRODUCE,VEGETABLES,CORN,5/7/2016
3,30517,64895,PRIVATE LABEL PURIFIED WATER 32PK,32/16.9 FO,GROCERY,GROCERY,GROC-ALL OTHER,WATER,2/15/2016
4,15168,100000839,PRIVATE LABEL APPLE CIDER VINEGAR,128 FO,GROCERY,GROCERY,GROC-ALL OTHER,VINEGAR&COOKING WINES,2/15/2016


In [35]:
dataframe = dataframe.drop_duplicates()

In [36]:
dataframe['price_group'] = dataframe['price_group'].str.replace(r'((,) [0-9]+)','')
dataframe['upc'] = dataframe['upc'].str.replace(r'((,) [0-9]+)','')


In [37]:
dataframe.head()

Unnamed: 0,price_group,upc,item_description,size,category_level_01,category_level_02,category_level_03,category_level_04,in_service_date
0,1500,2591,PRIVATE LABEL PURIFIED WATER 24PK,24/16.9 FO,GROCERY,GROCERY,GROC-ALL OTHER,WATER,2/15/2016
1,17302,4077,CORN WHITE IN HUSK BULK,1 RW,PRODUCE,FRESH PRODUCE,VEGETABLES,CORN,5/7/2016
2,17302,4590,CORN BICOLOR IN HUSK BULK,1 EA,PRODUCE,FRESH PRODUCE,VEGETABLES,CORN,5/7/2016
3,30517,64895,PRIVATE LABEL PURIFIED WATER 32PK,32/16.9 FO,GROCERY,GROCERY,GROC-ALL OTHER,WATER,2/15/2016
4,15168,100000839,PRIVATE LABEL APPLE CIDER VINEGAR,128 FO,GROCERY,GROCERY,GROC-ALL OTHER,VINEGAR&COOKING WINES,2/15/2016


In [42]:
dataframe['price_group'] = dataframe['price_group'].str.replace(r'[\(null)]','0')
dataframe['in_service_date'] = dataframe['in_service_date'].str.replace(r'[\(null)]','0')

dataframe['size'].fillna(value='Not Known', inplace = True)
dataframe['price_group'].fillna(value=int(0), inplace = True)
dataframe['upc'].fillna(value=int(0), inplace = True)

dataframe.isna().values.sum()
print(dataframe[dataframe.isnull().any(axis=1)==True])

Empty DataFrame
Columns: [price_group, upc, item_description, size, category_level_01, category_level_02, category_level_03, category_level_04, in_service_date]
Index: []


In [44]:
dataframe_grocery_fact =  dataframe[['upc','price_group']]
dataframe_grocery_fact = dataframe_grocery_fact.dropna()

dataframe_grocery_fact['upc'] = dataframe_grocery_fact['upc'].astype(str).astype('float64').astype('int64')
dataframe_grocery_fact['price_group'] = dataframe_grocery_fact['price_group'].astype(str).astype('float64').astype('int64')

In [45]:
size_list_text = dataframe['size'].tolist()
upc_colm = dataframe_grocery_fact['upc']
size_dataframe = pd.DataFrame(
    {'upc': upc_colm,
     'size': size_list_text,
    })

In [46]:
dataframe_grocery_desc =  dataframe[['upc','item_description','category_level_01','category_level_02','category_level_03','category_level_04','in_service_date']]
dataframe_grocery_desc['upc'] = dataframe_grocery_desc['upc'].astype(str).astype('float64').astype('int64')

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  


# Step to load data faster in final tables 

Using psycopg2 to load data in tables takes a lot of time.The faster technique would be to copy transformed data in S3 and then use copy commands to load data into redshift tables. 

In [54]:
engine = sa.create_engine('')
s3 = s3fs.S3FileSystem(anon=False, key='', secret='')


In [None]:
filename = 'engage3/fact.csv'
filename1 = 'engage3/fact_size.csv'
filename2 = 'engage3/fact_desc.csv'


with s3.open(filename2, 'w') as f2:
    dataframe_grocery_desc.to_csv(f2, index=False, header=False)

with s3.open(filename, 'w') as f:
    dataframe_grocery_fact.to_csv(f, index=False, header=False)

with s3.open(filename1, 'w') as f1:   
    size_dataframe.to_csv(f1, index=False, header=False)

# Step 5: Create tables in Redshift

In [48]:
grocery_item = """CREATE TABLE grocery_item(
  upc int8 PRIMARY KEY NOT NULL,
  price_group int )"""

In [49]:
grocery_size = """CREATE TABLE grocery_size(
  upc int8 PRIMARY KEY NOT NULL,
  size varchar )"""

In [50]:
grocery_desc = """CREATE TABLE grocery_desc(
upc	int8 NOT NULL PRIMARY KEY,
item_description VARCHAR,
category_level_01 VARCHAR,
category_level_02 VARCHAR,
category_level_03 VARCHAR,
category_level_04 VARCHAR,
in_service_date VARCHAR)"""

In [51]:
cur.execute(grocery_item)
cur.execute(grocery_size)
cur.execute(grocery_desc)
conn.commit()

# Step 6: Load data in tables

In [55]:
engine.execute("""
    COPY grocery_size
    from 's3://engage3/fact_size.csv'
    iam_role 'arn:aws:iam::525333178651:role/new_redshift'
    delimiter ',';""")

<sqlalchemy.engine.result.ResultProxy at 0x1ff8f6bb4c8>

In [56]:
engine.execute("""
    COPY grocery_desc
    from 's3://engage3/fact_desc.csv'
    iam_role 'arn:aws:iam::525333178651:role/new_redshift'
    delimiter ',';""")

<sqlalchemy.engine.result.ResultProxy at 0x1ff8f6a3d88>

In [57]:
engine.execute("""
    COPY grocery_item
    from 's3://engage3/fact.csv'
    iam_role 'arn:aws:iam::525333178651:role/new_redshift'
    delimiter ',';""")

<sqlalchemy.engine.result.ResultProxy at 0x1ff8f6bb1c8>

# SQL Queries

In [30]:
q1 = """select count(upc) as item_count, price_group from grocery_item group by(price_group) order by(item_count) desc"""

In [40]:
q1_df = pd.read_sql_query(q1, con=conn)
q1_df.head(10)
# price group 0 denotes that value was not known or bad data

Unnamed: 0,item_count,price_group
0,1060,0
1,186,2204
2,140,2964
3,136,2096
4,99,743
5,88,36105
6,63,786
7,61,4871
8,58,23301
9,58,2099


In [32]:
q2_1 = """(select count(x.upc) as item_count from grocery_item as x, grocery_desc as y where x.upc = y.upc and y.category_level_01 = 'MEAT' and y.category_level_02 = 'PKG MEAT' and y.category_level_03 = 'PKG MEAT' and y.category_level_04 = 'LUNCHMEAT')"""

In [43]:
q2_1_df = pd.read_sql_query(q2_1, con=conn)
q2_1_df.head()

Unnamed: 0,item_count
0,75


In [34]:
q2_2 = """(select count(x.upc) as item_count from grocery_item as x, grocery_desc as y where x.upc = y.upc and y.category_level_01 = 'DRUG/GM' and y.category_level_02 = 'HBC' and y.category_level_03 = 'HEALTH')"""

In [44]:
q2_2_df = pd.read_sql_query(q2_2, con=conn)
q2_2_df.head()

Unnamed: 0,item_count
0,528


In [54]:
q3 = """select y.item_description, x.UPC from grocery_item x join grocery_desc y on x.upc = y.upc where x.price_group = 29327"""

In [57]:
q3_df = pd.read_sql_query(q3, con=conn)
q3_df

Unnamed: 0,item_description,upc
0,SLMFST RTD CHC RYL 6PKBNS,834628711
1,SLMFST 321 CHOC ROYAL 4PK,834674007
2,SLMFST RTD MLKCHOC 6PKBNS,834628710
3,SLMFST CAPPCNO DLGHT 4PK,834674005
4,SLMFST 321 MLK CHO SHK4PK,834612204
5,SLMFST 321 MLK CH SHK 4PK,834674001
6,SLMFST 321 CHOC ROYAL 4PK,834612203
7,SLMFST RTD CAPPDLG 6PKBNS,834628713
