## Fetch Rewards Data Analyst Take Home

The sample Fetch Rewards data are in 3 files in the JSON Lines format. Those files are:
1. Brands
2. Users
3. Receipts

The JSON files cannot be converted to a relational table directly as there is multi-level nesting and also contains normalization issues which leads to data quality issue of having redundant data values.  

In this notebook, I have created relational database design and implemented it one schema, the ER diagram of which is attached in the submission. The entire table after performing certain transformations, resulted in a toal of 9 tables.

As a last step to work on the queries, I imported the created tables to MySQL Server. I was not abel to generate the ER diagram in the Server as there were data quality issues, which have been pointed out in another file altogether.

## Import necessary packages

Imported the packages to perform file operations on json file, basic python packages and SQLAlchemy for easy access of data to run queries in MySQL Server.

In [1]:
import os
import gzip
import shutil
import pyodbc
import urllib

import pandas as pd
import numpy as np

from datetime import datetime

from sqlalchemy import create_engine
import pymysql

## Part II: (a) Relational structure creation

### (i) Unzip the 3 files and load 

Since the JSON files were compressed in a gzip, I used the code to unzip the files and load the data files

In [2]:
for i in os.listdir('./Data_gzip'):
    if 'gz' in i:
        files = f'Data_gzip/{i}'
        with gzip.open(files, 'rb') as input_file:
              with open(files.replace('.gz',''), 'wb') as output_file:
                shutil.copyfileobj(input_file, output_file)


In [3]:
receipts_file = pd.read_json('./Data_gzip/receipts.json',lines=True)
brands_file = pd.read_json('./Data_gzip/brands.json',lines=True)
users_file = pd.read_json('./Data_gzip/users.json',lines=True)

All dates are in UTC timestamps. Since they are not much relevant, I plan to drop milliseconds part from the timestamp and convert them to Python datetime using timestamp_datetime function defined below.

In [4]:
def timestamp_datetime(t):
    try:
        return (datetime.utcfromtimestamp(t // 1000).strftime('%Y-%m-%d %H:%M:%S'))
    except:
        return None

### (ii) Users Table

In [5]:
# Users table is created
users_file.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 495 entries, 0 to 494
Data columns (total 7 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   _id           495 non-null    object
 1   active        495 non-null    bool  
 2   createdDate   495 non-null    object
 3   lastLogin     433 non-null    object
 4   role          495 non-null    object
 5   signUpSource  447 non-null    object
 6   state         439 non-null    object
dtypes: bool(1), object(6)
memory usage: 23.8+ KB


In [6]:
# contents of users data as such
users_file.head()

Unnamed: 0,_id,active,createdDate,lastLogin,role,signUpSource,state
0,{'$oid': '5ff1e194b6a9d73a3a9f1052'},True,{'$date': 1609687444800},{'$date': 1609687537858},consumer,Email,WI
1,{'$oid': '5ff1e194b6a9d73a3a9f1052'},True,{'$date': 1609687444800},{'$date': 1609687537858},consumer,Email,WI
2,{'$oid': '5ff1e194b6a9d73a3a9f1052'},True,{'$date': 1609687444800},{'$date': 1609687537858},consumer,Email,WI
3,{'$oid': '5ff1e1eacfcf6c399c274ae6'},True,{'$date': 1609687530554},{'$date': 1609687530597},consumer,Email,WI
4,{'$oid': '5ff1e194b6a9d73a3a9f1052'},True,{'$date': 1609687444800},{'$date': 1609687537858},consumer,Email,WI


In [7]:
# Users Table Transformation

users_data = users_file.copy()
users_data['id'] = users_data['_id'].apply(lambda x: x['$oid'] if x else np.nan)
users_data['active'] = users_data['active'].astype(bool).astype('Int32').fillna(0)

# Using the timestamp function created above to change the UTC timestamp to datetime timestamp  
users_data['createdDate'] = users_data['createdDate'].apply(lambda x: timestamp_datetime(x['$date']) if type(x) is dict else np.nan)
users_data['lastLogin'] = users_data['lastLogin'].apply(lambda x: timestamp_datetime(x['$date']) if type(x) is dict else np.nan)

# Dropping the _id column as is has been put in a separate column in the final table structure 
users_data.drop('_id', axis=1, inplace=True)

In [8]:
# Contents of final users table
users_data.head()

Unnamed: 0,active,createdDate,lastLogin,role,signUpSource,state,id
0,1,2021-01-03 15:24:04,2021-01-03 15:25:37,consumer,Email,WI,5ff1e194b6a9d73a3a9f1052
1,1,2021-01-03 15:24:04,2021-01-03 15:25:37,consumer,Email,WI,5ff1e194b6a9d73a3a9f1052
2,1,2021-01-03 15:24:04,2021-01-03 15:25:37,consumer,Email,WI,5ff1e194b6a9d73a3a9f1052
3,1,2021-01-03 15:25:30,2021-01-03 15:25:30,consumer,Email,WI,5ff1e1eacfcf6c399c274ae6
4,1,2021-01-03 15:24:04,2021-01-03 15:25:37,consumer,Email,WI,5ff1e194b6a9d73a3a9f1052


### (iii) Items, Category and Brands Tables

I worked on the brands data as the next table. Each brand is distinguished by a brandCode

On having a look at the brands table I found out that there is a columns named "barCode" which happens to be a unique identifier for all the products.

Overall, the brands tables has been divided into three tables:
1. Items Table
2. Category Table
3. Brands Table

In [9]:
# Raw brands file without transformations
brands_file.head()

Unnamed: 0,_id,barcode,category,categoryCode,cpg,name,topBrand,brandCode
0,{'$oid': '601ac115be37ce2ead437551'},511111019862,Baking,BAKING,"{'$id': {'$oid': '601ac114be37ce2ead437550'}, ...",test brand @1612366101024,0.0,
1,{'$oid': '601c5460be37ce2ead43755f'},511111519928,Beverages,BEVERAGES,"{'$id': {'$oid': '5332f5fbe4b03c9a25efd0ba'}, ...",Starbucks,0.0,STARBUCKS
2,{'$oid': '601ac142be37ce2ead43755d'},511111819905,Baking,BAKING,"{'$id': {'$oid': '601ac142be37ce2ead437559'}, ...",test brand @1612366146176,0.0,TEST BRANDCODE @1612366146176
3,{'$oid': '601ac142be37ce2ead43755a'},511111519874,Baking,BAKING,"{'$id': {'$oid': '601ac142be37ce2ead437559'}, ...",test brand @1612366146051,0.0,TEST BRANDCODE @1612366146051
4,{'$oid': '601ac142be37ce2ead43755e'},511111319917,Candy & Sweets,CANDY_AND_SWEETS,"{'$id': {'$oid': '5332fa12e4b03c9a25efd1e7'}, ...",test brand @1612366146827,0.0,TEST BRANDCODE @1612366146827


In [10]:
#Brands Table Transformation

brands_data = brands_file.copy()
brands_data['id'] = brands_data['_id'].apply(lambda x: x['$oid'] if x else np.nan)
brands_data['topBrand'] = brands_data['topBrand'].astype(bool).astype('Int32').fillna(0)
brands_data['cpg_id'] = brands_data['cpg'].apply(lambda x: x['$id'].get('$oid') if type(x) is dict else np.nan)
brands_data['cpg_ref'] = brands_data['cpg'].apply(lambda x: x['$ref'] if type(x) is dict else np.nan)

In [11]:
# Items table created from the brands table as the items consist of the features as mentioned below which 
# needs to be separated from the brands table as a whole

items_data = brands_data.loc[:,['barcode','categoryCode','brandCode','cpg_id','cpg_ref']]
#items.drop_duplicates('barcode', inplace=True)

# Final Items Table
items_data.head()

Unnamed: 0,barcode,categoryCode,brandCode,cpg_id,cpg_ref
0,511111019862,BAKING,,601ac114be37ce2ead437550,Cogs
1,511111519928,BEVERAGES,STARBUCKS,5332f5fbe4b03c9a25efd0ba,Cogs
2,511111819905,BAKING,TEST BRANDCODE @1612366146176,601ac142be37ce2ead437559,Cogs
3,511111519874,BAKING,TEST BRANDCODE @1612366146051,601ac142be37ce2ead437559,Cogs
4,511111319917,CANDY_AND_SWEETS,TEST BRANDCODE @1612366146827,5332fa12e4b03c9a25efd1e7,Cogs


In [12]:
#Separating the category deatils from the brands table by using categoryCode and category
categories_data = brands_data.loc[:,['categoryCode','category']]
#categories.drop_duplicates('categoryCode', inplace=True)

# Final Categories Table
categories_data.head()

Unnamed: 0,categoryCode,category
0,BAKING,Baking
1,BEVERAGES,Beverages
2,BAKING,Baking
3,BAKING,Baking
4,CANDY_AND_SWEETS,Candy & Sweets


In [13]:
# Dropping the irrelevant columns for the Brands Table
brands_data.drop(['_id','cpg','barcode','category','categoryCode','cpg_id','cpg_ref'], axis=1, inplace=True)
#brands_data.drop_duplicates('brandCode', inplace=True)

# Final Brands Table
brands_data.head()

Unnamed: 0,name,topBrand,brandCode,id
0,test brand @1612366101024,0,,601ac115be37ce2ead437551
1,Starbucks,0,STARBUCKS,601c5460be37ce2ead43755f
2,test brand @1612366146176,0,TEST BRANDCODE @1612366146176,601ac142be37ce2ead43755d
3,test brand @1612366146051,0,TEST BRANDCODE @1612366146051,601ac142be37ce2ead43755a
4,test brand @1612366146827,0,TEST BRANDCODE @1612366146827,601ac142be37ce2ead43755e


### (iv) Transactions Table

Transactions data was worked on as next table. This table is created from the recipts table. Transactions were needed to be removed from the Recipts table as they are important for the company 

In [14]:
transactions_data = receipts_file.loc[:, ['_id','purchaseDate','userId']]
transactions_data.reset_index(inplace=True)

#Renaming the columns for the Transactions table
transactions_data.rename(columns = {'index':'id','_id':'receiptId','purchaseDate':'transactionDate'}, inplace=True)
transactions_data['receiptId'] = transactions_data['receiptId'].apply(lambda x: x['$oid'] if x else np.nan)

# Using the timestamp function created above to change the UTC timestamp to datetime timestamp 
transactions_data['transactionDate'] = transactions_data['transactionDate'].apply(lambda x: timestamp_datetime(x['$date']) if type(x) is dict else np.nan)

#transactions_data.drop_duplicates(inplace=True)

# Final Transaction Table
transactions_data.head()

Unnamed: 0,id,receiptId,transactionDate,userId
0,0,5ff1e1eb0a720f0523000575,2021-01-03 00:00:00,5ff1e1eacfcf6c399c274ae6
1,1,5ff1e1bb0a720f052300056b,2021-01-02 15:24:43,5ff1e194b6a9d73a3a9f1052
2,2,5ff1e1f10a720f052300057a,2021-01-03 00:00:00,5ff1e1f1cfcf6c399c274b0b
3,3,5ff1e1ee0a7214ada100056f,2021-01-03 00:00:00,5ff1e1eacfcf6c399c274ae6
4,4,5ff1e1d20a7214ada1000561,2021-01-02 15:25:06,5ff1e194b6a9d73a3a9f1052


### (v) TransactionItems Schema

Each transaction consists of a list of items, in order to make a list of items, the items tables has been separated from transactions. In the creation of tables in teh above cells, I already have an items table, but this table is different as it contains the lists of items from teh transactions which users have made, previous one was just a  list of items with barcode, categoryCode, brandcode, cpg_id and cpg_ref

After performing all transformations for the TransactionItems Table, each transaction item is a separate record (stored in separate rows) and they are linked to the main transaction. Thsi TransactionItems tables consists of various deatils about the items as can be seen in the final table.

In [15]:
# To map the Id's of transactions and receipts, I have zipped both the columsn together
transaction_receipt = dict(zip(transactions_data.receiptId, transactions_data.id))

In [16]:
receiptItems_base = receipts_file[['_id','rewardsReceiptItemList']].explode('rewardsReceiptItemList').reset_index(drop=True)
receiptItems_base['rewardsReceiptItemList'] = receiptItems_base['rewardsReceiptItemList'].apply(lambda x: dict() if x is np.nan else x)
receiptItems_data = pd.DataFrame.from_dict(list(receiptItems_base.rewardsReceiptItemList))
receiptItems_data['receiptId'] = receiptItems_base['_id'].apply(lambda x: x['$oid'] if x else np.nan)
receiptItems_data['transactionId'] = receiptItems_data['receiptId'].apply(lambda x: transaction_receipt.get(x))

for col in ['needsFetchReview','userFlaggedNewItem','preventTargetGapPoints','deleted','competitiveProduct']:
    receiptItems_data[col] = receiptItems_data[col].astype(bool).astype('Int32').fillna(0)

In [17]:
transactionItems_data = receiptItems_data.loc[:,['transactionId','barcode','deleted','finalPrice','itemPrice','priceAfterCoupon','discountedItemPrice','originalFinalPrice','quantityPurchased',
                        'originalMetaBriteItemPrice','partnerItemId','metabriteCampaignId',
                        'userFlaggedBarcode','userFlaggedNewItem','userFlaggedPrice','userFlaggedQuantity',
                        'needsFetchReview','needsFetchReviewReason','originalMetaBriteBarcode','originalMetaBriteQuantityPurchased']]
transactionItems_data = transactionItems_data.reset_index().rename(columns={'index':'id'})

# Final TransactionItems Table
transactionItems_data.head()

Unnamed: 0,id,transactionId,barcode,deleted,finalPrice,itemPrice,priceAfterCoupon,discountedItemPrice,originalFinalPrice,quantityPurchased,...,partnerItemId,metabriteCampaignId,userFlaggedBarcode,userFlaggedNewItem,userFlaggedPrice,userFlaggedQuantity,needsFetchReview,needsFetchReviewReason,originalMetaBriteBarcode,originalMetaBriteQuantityPurchased
0,0,0,4011.0,1,26.0,26.0,,,,5.0,...,1,,4011.0,1,26.0,5.0,0,,,
1,1,1,4011.0,1,1.0,1.0,,,,1.0,...,1,,,1,,,1,,,
2,2,1,28400642255.0,1,10.0,10.0,,,,1.0,...,2,,28400642255.0,1,10.0,1.0,1,USER_FLAGGED,,
3,3,2,,1,,,,,,,...,1,,4011.0,1,26.0,3.0,0,,,
4,4,3,4011.0,1,28.0,28.0,,,,4.0,...,1,,4011.0,1,28.0,4.0,0,,,


After running queries in SQL Server, I found out that there were certain missing entries. I figured out that this could be because some items are missing, I merged more data in the items database by using receiptItems table. After adding these items in the database, the missing entries issue was resolved

In [18]:
items_data = pd.concat([items_data, receiptItems_data.loc[~receiptItems_data.brandCode.isna(),['barcode','brandCode']]],ignore_index=True)

items_data.drop_duplicates('barcode',inplace=True)

### (vi) Receipts Table

Now I design the Receipts table. Reciepts are the main thing when it comes to Fetch as a company, as a customer scans a receipt, a new event will be created and then the whole points calculation takes place and the rewarsd are credited to users accounts. In this case, each receipt is linked to a transaction.

In [19]:
# Raw Reciepts Table
receipts_file.head()

Unnamed: 0,_id,bonusPointsEarned,bonusPointsEarnedReason,createDate,dateScanned,finishedDate,modifyDate,pointsAwardedDate,pointsEarned,purchaseDate,purchasedItemCount,rewardsReceiptItemList,rewardsReceiptStatus,totalSpent,userId
0,{'$oid': '5ff1e1eb0a720f0523000575'},500.0,"Receipt number 2 completed, bonus point schedu...",{'$date': 1609687531000},{'$date': 1609687531000},{'$date': 1609687531000},{'$date': 1609687536000},{'$date': 1609687531000},500.0,{'$date': 1609632000000},5.0,"[{'barcode': '4011', 'description': 'ITEM NOT ...",FINISHED,26.0,5ff1e1eacfcf6c399c274ae6
1,{'$oid': '5ff1e1bb0a720f052300056b'},150.0,"Receipt number 5 completed, bonus point schedu...",{'$date': 1609687483000},{'$date': 1609687483000},{'$date': 1609687483000},{'$date': 1609687488000},{'$date': 1609687483000},150.0,{'$date': 1609601083000},2.0,"[{'barcode': '4011', 'description': 'ITEM NOT ...",FINISHED,11.0,5ff1e194b6a9d73a3a9f1052
2,{'$oid': '5ff1e1f10a720f052300057a'},5.0,All-receipts receipt bonus,{'$date': 1609687537000},{'$date': 1609687537000},,{'$date': 1609687542000},,5.0,{'$date': 1609632000000},1.0,"[{'needsFetchReview': False, 'partnerItemId': ...",REJECTED,10.0,5ff1e1f1cfcf6c399c274b0b
3,{'$oid': '5ff1e1ee0a7214ada100056f'},5.0,All-receipts receipt bonus,{'$date': 1609687534000},{'$date': 1609687534000},{'$date': 1609687534000},{'$date': 1609687539000},{'$date': 1609687534000},5.0,{'$date': 1609632000000},4.0,"[{'barcode': '4011', 'description': 'ITEM NOT ...",FINISHED,28.0,5ff1e1eacfcf6c399c274ae6
4,{'$oid': '5ff1e1d20a7214ada1000561'},5.0,All-receipts receipt bonus,{'$date': 1609687506000},{'$date': 1609687506000},{'$date': 1609687511000},{'$date': 1609687511000},{'$date': 1609687506000},5.0,{'$date': 1609601106000},2.0,"[{'barcode': '4011', 'description': 'ITEM NOT ...",FINISHED,1.0,5ff1e194b6a9d73a3a9f1052


In [20]:
receipts_data = receipts_file.loc[:,['_id','createDate','dateScanned','finishedDate','modifyDate','userId']]
receipts_data['id'] = receipts_data['_id'].apply(lambda x: x['$oid'] if x else np.nan)

# Using the timestamp function created above to change the UTC timestamp to datetime timestamp 
receipts_data['createDate'] = receipts_data['createDate'].apply(lambda x: timestamp_datetime(x['$date']) if type(x) is dict else np.nan)
receipts_data['dateScanned'] = receipts_data['dateScanned'].apply(lambda x: timestamp_datetime(x['$date']) if type(x) is dict else np.nan)
receipts_data['finishedDate'] = receipts_data['finishedDate'].apply(lambda x: timestamp_datetime(x['$date']) if type(x) is dict else np.nan)
receipts_data['modifyDate'] = receipts_data['modifyDate'].apply(lambda x: timestamp_datetime(x['$date']) if type(x) is dict else np.nan)

receipts_data['transactionId'] = receipts_data['id'].apply(lambda x: transaction_receipt.get(x))

#receipts.drop_duplicates('id',inplace=True)

# Dropping the irrelevant column
receipts_data.drop('_id',axis=1,inplace=True)

In [21]:
# Final Receipts Table
receipts_data.head()

Unnamed: 0,createDate,dateScanned,finishedDate,modifyDate,userId,id,transactionId
0,2021-01-03 15:25:31,2021-01-03 15:25:31,2021-01-03 15:25:31,2021-01-03 15:25:36,5ff1e1eacfcf6c399c274ae6,5ff1e1eb0a720f0523000575,0
1,2021-01-03 15:24:43,2021-01-03 15:24:43,2021-01-03 15:24:43,2021-01-03 15:24:48,5ff1e194b6a9d73a3a9f1052,5ff1e1bb0a720f052300056b,1
2,2021-01-03 15:25:37,2021-01-03 15:25:37,,2021-01-03 15:25:42,5ff1e1f1cfcf6c399c274b0b,5ff1e1f10a720f052300057a,2
3,2021-01-03 15:25:34,2021-01-03 15:25:34,2021-01-03 15:25:34,2021-01-03 15:25:39,5ff1e1eacfcf6c399c274ae6,5ff1e1ee0a7214ada100056f,3
4,2021-01-03 15:25:06,2021-01-03 15:25:06,2021-01-03 15:25:11,2021-01-03 15:25:11,5ff1e194b6a9d73a3a9f1052,5ff1e1d20a7214ada1000561,4


### (vii) Rewards Table

Rewards is something which keep the customers engaged and it s like an incentive which keeps the customers involved by continous scanning and gaining rewards by saving the receipts of transactions.This was handled separately. 

In [22]:
rewards_data = receipts_file.loc[:,['_id','bonusPointsEarned','bonusPointsEarnedReason','pointsAwardedDate','pointsEarned',
                            'rewardsReceiptStatus']]
rewards_data['receiptId'] = rewards_data['_id'].apply(lambda x: x['$oid'] if x else np.nan)
rewards_data['pointsAwardedDate'] = rewards_data['pointsAwardedDate'].apply(lambda x: timestamp_datetime(x['$date']) if type(x) is dict else np.nan)

#rewards.drop_duplicates('receiptId',inplace=True)

# Drop irrelevant columns 
rewards_data.drop('_id',axis=1,inplace=True)
rewards_data = rewards_data.reset_index().rename(columns={'index':'id'})

# Final Rewards Table 
rewards_data.head()

Unnamed: 0,id,bonusPointsEarned,bonusPointsEarnedReason,pointsAwardedDate,pointsEarned,rewardsReceiptStatus,receiptId
0,0,500.0,"Receipt number 2 completed, bonus point schedu...",2021-01-03 15:25:31,500.0,FINISHED,5ff1e1eb0a720f0523000575
1,1,150.0,"Receipt number 5 completed, bonus point schedu...",2021-01-03 15:24:43,150.0,FINISHED,5ff1e1bb0a720f052300056b
2,2,5.0,All-receipts receipt bonus,,5.0,REJECTED,5ff1e1f10a720f052300057a
3,3,5.0,All-receipts receipt bonus,2021-01-03 15:25:34,5.0,FINISHED,5ff1e1ee0a7214ada100056f
4,4,5.0,All-receipts receipt bonus,2021-01-03 15:25:06,5.0,FINISHED,5ff1e1d20a7214ada1000561


### (viii) RewardItems Table

Coming to the last table, this table consists of reward items. This table has been created by separating the necessary features from the receipts table as receipt consists of each item purchased during a transaction

In [23]:
rewardItems_data = receiptItems_data.loc[:,['receiptId','originalReceiptItemText','itemNumber','pointsNotAwardedReason',
                                                  'pointsPayerId','rewardsGroup','rewardsProductPartnerId',
                                                  'competitorRewardsGroup','pointsEarned']]
rewardItems_data = rewardItems_data.reset_index().reset_index()
rewardItems_data.rename(columns={'level_0':'id','index':'transactionItemId'}, inplace=True)

# Final Reward Items Table
rewardItems_data.head()

Unnamed: 0,id,transactionItemId,receiptId,originalReceiptItemText,itemNumber,pointsNotAwardedReason,pointsPayerId,rewardsGroup,rewardsProductPartnerId,competitorRewardsGroup,pointsEarned
0,0,0,5ff1e1eb0a720f0523000575,,,,,,,,
1,1,1,5ff1e1bb0a720f052300056b,,,,,,,,
2,2,2,5ff1e1bb0a720f052300056b,,,Action not allowed for user and CPG,5332f5fbe4b03c9a25efd0ba,DORITOS SPICY SWEET CHILI SINGLE SERVE,5332f5fbe4b03c9a25efd0ba,,
3,3,3,5ff1e1f10a720f052300057a,,,,,,,,
4,4,4,5ff1e1ee0a7214ada100056f,,,,,,,,


## Part II: (b) Generating CSV files of the tables

In [24]:
users_data.to_csv('./Data_csv/users.csv',index=False)
items_data.to_csv('./Data_csv/items.csv',index=False)
brands_data.to_csv('./Data_csv/brands.csv',index=False)
categories_data.to_csv('./Data_csv/categories.csv',index=False)
transactions_data.to_csv('./Data_csv/transactions.csv',index=False)
transactionItems_data.to_csv('./Data_csv/transactionItems.csv',index=False)
receipts_data.to_csv('./Data_csv/receipts.csv',index=False) 
rewards_data.to_csv('./Data_csv/rewards.csv',index=False)  
rewardItems_data.to_csv('./Data_csv/rewardItems.csv',index=False)  

Data has been transformed and converted to CSV, rest all queries are run on SQL Server.