#### Reading files and writing to delta tables

In [0]:
from pyspark.sql.functions import *

In [0]:
# Reading Files
receiptsDF = spark.read.json("/Volumes/workspace/fetch/json_files/receipts.json")
usersDF = spark.read.json("/Volumes/workspace/fetch/json_files/users.json")
brandsDF = spark.read.json("/Volumes/workspace/fetch/json_files/brands.json")

# Creating dictionary of dataframes
jsonDFs = {'receipts': receiptsDF, 'users': usersDF, 'brands': brandsDF}

# Writing results to table
for file in jsonDFs:
  jsonDFs[file].write.format("delta").saveAsTable(f"fetch.{file}_raw")

#### Cleaning Users
* Following commands clean the users raw table and perform data quality checks against the data

In [0]:
# Applying some light transformations and writing results
df = (spark.table("fetch.users_raw")
             .withColumn('_id', col("_id.$oid"))
             .withColumn('createdDate', col("createdDate.$date"))
             .withColumn('lastLogin', col("lastLogin.$date"))
             .selectExpr(
                "_id",
                "cast(active as boolean) as active",
                "cast(from_unixtime(createdDate/1000) as timestamp) as createdDate",
                "cast(from_unixtime(lastLogin/1000) as timestamp) as lastLogin",
                "role",
                "signUpSource",
                "state")
             .distinct()
             )

df.write.format("delta").mode('overwrite').saveAsTable(f"fetch.users_processed")

In [0]:
%sql
-- Examining active but never login users
-- 40 users appear to be inappropriately tagged as active although they never had a single login
-- Potential recommendations: 
-- 1. Have system set lastLogin to createdDate and active flag to false upon initial creation (similar to _id 6008622ebe5fc9247bab4eb9 and records in the below query)
-- 2. Have system set active to false if no LastLogin has been recorded yet
select  
  _id,
  active,
  createdDate,
  lastLogin
from fetch.users_processed
where lastLogin is null 
and active is true

_id,active,createdDate,lastLogin
600a03e17d983a124e9adb0f,True,2021-01-21T22:44:49.000Z,
6006f786fb296c7f688530f8,True,2021-01-19T15:15:18.000Z,
6000b75bbe5fc96dfee1d4d3,True,2021-01-14T21:27:55.000Z,
600f19914329897eac238fef,True,2021-01-25T19:18:41.000Z,
60025fe0e257124ec6b99a87,True,2021-01-16T03:39:13.000Z,
60186237c8b50e11d8454d5f,True,2021-02-01T20:19:03.000Z,
6008d447b6310511daa4f281,True,2021-01-21T01:09:27.000Z,
600f20c15edb787dce060911,True,2021-01-25T19:49:21.000Z,
60145ff384231211ce796d51,True,2021-01-29T19:20:19.000Z,
6002541ae257124ec6b99a3a,True,2021-01-16T02:48:58.000Z,


In [0]:
%sql
-- Displaying the records that seemingly had lastLogin set to createdDate (39% of the population)
-- This is an indication that the system has sporadic issues when creating a user record and setting the lastLogin to createdDate upon creation
select 
  *
from fetch.users_processed
where createdDate = lastLogin

_id,active,createdDate,lastLogin,role,signUpSource,state
5ff8c241b3348b11c93379fc,True,2021-01-08T20:36:17.000Z,2021-01-08T20:36:17.000Z,consumer,Email,WI
5ff7268eeb7c7d12096da2a9,True,2021-01-07T15:19:42.000Z,2021-01-07T15:19:42.000Z,consumer,Email,WI
602558adb5459313e1e9b7ce,True,2021-02-11T16:17:49.000Z,2021-02-11T16:17:49.000Z,consumer,Email,WI
6008622ebe5fc9247bab4eb9,False,2021-01-20T17:02:38.000Z,2021-01-20T17:02:38.000Z,consumer,Email,WI
601ac1da591789121574dc07,True,2021-02-03T15:31:38.000Z,2021-02-03T15:31:38.000Z,consumer,Email,WI
60132b8b73c60b3ca7f3baee,True,2021-01-28T21:24:27.000Z,2021-01-28T21:24:27.000Z,consumer,Email,WI
60118c0fa4b74c18d3a8c151,True,2021-01-27T15:51:43.000Z,2021-01-27T15:51:43.000Z,consumer,Email,WI
6025388fb54593795bf69234,True,2021-02-11T14:00:47.000Z,2021-02-11T14:00:47.000Z,consumer,Email,WI
60147001c8b50e11d8453c36,True,2021-01-29T20:28:49.000Z,2021-01-29T20:28:49.000Z,consumer,Email,WI
60229990b57b8a12187fe9e0,True,2021-02-09T14:17:52.000Z,2021-02-09T14:17:52.000Z,consumer,Email,WI


In [0]:
%sql
-- Examining fetch-staff users
-- Data quality issues: 
-- 1. Out of 8 staff, 38% of them are missing a singUpSource
-- 2. Out of 8 staff, 38% of them are missing a value for state
select  
  sum(case when signUpSource is null then 1 else 0 end) as signUpSourceNulls,
  round(sum(case when signUpSource is null then 1 else 0 end)/count(*), 2)  as signUpNullsPercentage,
  sum(case when state is null then 1 else 0 end) as stateNulls,
  round(sum(case when state is null then 1 else 0 end)/count(*), 2) as stateNullsPercentage
from fetch.users_processed
where role = 'fetch-staff'


signUpSourceNulls,signUpNullsPercentage,stateNulls,stateNullsPercentage
3,0.38,3,0.38


In [0]:
%sql
-- Query to display data quality issues for each column
-- Only the lastLogin, SignUpSource, and state columns have issues
select 
  round(sum(case when _id is null then 1 else 0 end)/ count(*), 2) as id_nulls_percentage,
  round(sum(case when active is null then 1 else 0 end)/ count(*), 2) as active_nulls_percentage,
  round(sum(case when createdDate is null then 1 else 0 end)/ count(*), 2) as createdDate_nulls_percentage,
  round(sum(case when lastLogin is null then 1 else 0 end)/ count(*), 2) as lastLogin_nulls_percentage,
  round(sum(case when role is null then 1 else 0 end)/ count(*), 2) as role_nulls_percentage,
  round(sum(case when signUpSource is null then 1 else 0 end)/ count(*), 2) as signUpSource_nulls_percentage,
  round(sum(case when state is null then 1 else 0 end)/ count(*), 2) as state_nulls_percentage
from fetch.users_processed  

id_nulls_percentage,active_nulls_percentage,createdDate_nulls_percentage,lastLogin_nulls_percentage,role_nulls_percentage,signUpSource_nulls_percentage,state_nulls_percentage
0.0,0.0,0.0,0.19,0.0,0.02,0.03


#### Cleaning Brands
* Following commands clean the brands raw table and perform data quality checks against the data

In [0]:
# Accessing elements from structs and creating a temp view for further processing
df = (spark.table("fetch.brands_raw")
             .withColumn('_id', col("_id.$oid"))
             .withColumn('cpg_id', col("cpg.$id.$oid"))
             .withColumn('cpg_reference', col("cpg.$ref"))
             .selectExpr(
                "_id",
                "barcode",
                "brandCode",
                "category",
                "categoryCode",
                "cpg_id",
                "cpg_reference",
                "name",
                "topBrand")
             .distinct()
             )

df.createOrReplaceTempView("temp_brands")

In [0]:
%sql
-- Brandcode and Name appear to be the same minus the fact that brandcode is generally uppercased and name is typically just capitalized
-- Exploring the possiblity of backfilling the brandcode with name in the case of nulls
with cleaned_strings as (
  select 
    lower(regexp_replace(brandcode, '[^a-zA-Z0-9]', '')) AS cleaned_brandcode,
    lower(regexp_replace(name, '[^a-zA-Z0-9]', '')) AS cleaned_name
  from temp_brands
  where brandcode is not null -- Removing nulls
  and length(brandcode) > 0 -- Removing blanks
  and try_cast(brandCode as DOUBLE) is  null -- Removing Numeric 
  and lower(brandcode) not like "%test%" -- Removing test records
)

-- 81% of the records match the pattern so we can try to improve data quality by backfilling the brandcode with name
select 
  sum(case when lower(cleaned_brandcode) = lower(cleaned_name) then 1 else 0 end) as brandcode_equals_name,
  round(sum(case when lower(cleaned_brandcode) = lower(cleaned_name) then 1 else 0 end)/count(*),2) * 100 as brandcode_equals_name_percentage
from cleaned_strings

brandcode_equals_name,brandcode_equals_name_percentage
389,81.0


In [0]:
%sql
-- Investigating categoryCode nulls and if it could potentially be backfilled with category with some adjustments
-- Transforming categoryCode sligthly and checking the match rate between the adjusted value and the category field
-- Query shows that the two fields matched 100% with 1 case condition needed for Health and Wellness
with categories as (
select distinct
  category,
  categoryCode as rawCategoryCode,
  case 
  when categoryCode like "HEALTHY%" then "Health & Wellness"
  when categoryCode not like "HEALTHY%" then initcap((regexp_replace(replace(categoryCode, '_', ' '), '\\bAND\\b', '&')))
  end as newCategoryCode
from temp_brands
where categoryCode is not null)

select 
  sum(case when newCategoryCode = category then 1 else 0 end) as match_count,
  round(sum(case when newCategoryCode = category then 1 else 0 end)/count(*),2) * 100  as match_percentage
from categories

match_count,match_percentage
14,100.0


In [0]:
%sql
-- Creating table using temp view and cte
Create Or Replace Table fetch.brands_processed as
with cleaned as (
select
  _id,
  barcode,
  case 
  when brandcode is null or length(brandcode) = 0 or try_cast(brandCode as DOUBLE) is not null then upper(name)
  else brandcode end as brandCode,
  category,
  case 
  when category like "Health%" then "HEALTH_AND_WELLNESS"
  when category not like "Health%" then upper((regexp_replace(replace(category, ' ', '_'), '\\b&\\b', 'AND')))
  end as categoryCode,
  cpg_id as cpgID,
  cpg_reference as cpgReference,
  initcap(name) as name,
  topBrand
from temp_brands)

-- Removing test records for now
select 
  *
from cleaned
where lower(brandCode) not like "%test%"

num_affected_rows,num_inserted_rows


In [0]:
%sql
-- Query to display data quality issues for each column
-- More than half the brands are missing a value for the topBrand field. Potentially recommend removal of this field or update this field based on query with reciepts
select 
  round(sum(case when _id is null then 1 else 0 end)/ count(*), 2) as id_nulls_percentage,
  round(sum(case when barcode is null then 1 else 0 end)/ count(*), 2) as barcode_nulls_percentage,
  round(sum(case when brandCode is null then 1 else 0 end)/ count(*), 2) as brandCode_nulls_percentage,
  round(sum(case when category is null then 1 else 0 end)/ count(*), 2) as category_nulls_percentage,
  round(sum(case when categoryCode is null then 1 else 0 end)/ count(*), 2) as categoryCode_nulls_percentage,
  round(sum(case when cpgID is null then 1 else 0 end)/ count(*), 2) as cpgID_nulls_percentage,
  round(sum(case when cpgReference is null then 1 else 0 end)/ count(*), 2) as cpgReference_nulls_percentage,
  round(sum(case when name is null then 1 else 0 end)/ count(*), 2) as name_nulls_percentage,
  round(sum(case when topBrand is null then 1 else 0 end)/ count(*), 2) as topBrand_nulls_percentage
from fetch.brands_processed  

id_nulls_percentage,barcode_nulls_percentage,brandCode_nulls_percentage,category_nulls_percentage,categoryCode_nulls_percentage,cpgID_nulls_percentage,cpgReference_nulls_percentage,name_nulls_percentage,topBrand_nulls_percentage
0.0,0.0,0.0,0.21,0.21,0.0,0.0,0.0,0.55


In [0]:
%sql
-- There appears to be a loose correlation between the cpgReference and the nulls in the category and categoryCode fields.
-- Potentially further research on why certain records are being tagged with Cpgs and others with Cogs
select
  cpgReference,
  sum(case when category is null then 1 else 0 end)/count(*) as category_nulls_percentage,
  sum(case when categoryCode is null then 1 else 0 end)/count(*) as categoryCode_nulls_percentage
from fetch.brands_processed
group by 
  1

cpgReference,category_nulls_percentage,categoryCode_nulls_percentage
Cpgs,0.9251700680272108,0.9251700680272108
Cogs,0.032312925170068,0.032312925170068


In [0]:
%sql
-- There appears to be some correlation between the first 6 characters in the id_ fields for some records that dont have a category or category code
-- This appears to work in some instances but not a fullproof method to fill the category and category code fields
select distinct 
  left(_id, 6) as first_part_of_id,
  *
from fetch.brands_processed
where _id = '53e10d6368abd3c7065097cc' or left(_id, 6) = '5d658f'
order by 
  category asc

first_part_of_id,_id,barcode,brandCode,category,categoryCode,cpg_id,cpg_reference,name,topBrand
5d658f,5d658fca6d5f3b23d1bc7912,511111405184,DO IT YOURSELF,,,53e10d6368abd3c7065097cc,Cogs,Do It Yourself,
5d658f,5d658fd1a3a018514994f431,511111905196,BETTER HOMES AND GARDENS SPECIAL EDITION MAGAZINE - FOOD,,,53e10d6368abd3c7065097cc,Cogs,Better Homes And Gardens Special Edition Magazine - Food,
5d658f,5d658fc0a3a018514994f430,511111205173,ALLRECIPES SPECIAL EDITION MAGAZINE,,,53e10d6368abd3c7065097cc,Cogs,Allrecipes Special Edition Magazine,
5d658f,5d658fde6d5f3b23d1bc7913,511111505204,BETTY CROCKER SPECIAL EDITION MAGAZINE,,,53e10d6368abd3c7065097cc,Cogs,Betty Crocker Special Edition Magazine,
5d658f,5d658ffa6d5f3b23d1bc7914,511111205227,DIABETIC LIVING MAGAZINE,,,53e10d6368abd3c7065097cc,Cogs,Diabetic Living Magazine,
5d658f,5d658ff3a3a018514994f432,511111005216,COOKING LIGHT MAGAZINE,Magazines,MAGAZINES,53e10d6368abd3c7065097cc,Cogs,Cooking Light Magazine,


#### Cleaning Receipts
* Following commands clean the receipts raw table and perform data quality checks against the data

In [0]:
# Applying some light transforms and creating a temporary view
# createDate and dateScanned are always the same so could consider dropping one
df = (spark.table("fetch.receipts_raw")
             .withColumn('_id', col("_id.$oid"))
             .withColumn('createDate', col("createDate.$date"))
             .withColumn('dateScanned', col("dateScanned.$date"))
             .withColumn('finishedDate', col("finishedDate.$date"))
             .withColumn('modifyDate', col("modifyDate.$date"))
             .withColumn('pointsAwardedDate', col("pointsAwardedDate.$date"))
             .withColumn('purchaseDate', col("purchaseDate.$date"))
             .selectExpr(
                "_id",
                "from_unixtime(createDate/1000) as createDate",
                "from_unixtime(dateScanned/1000) as dateScanned",
                "from_unixtime(finishedDate/1000) as finishedDate",
                "from_unixtime(modifyDate/1000) as modifyDate",
                "from_unixtime(purchaseDate/1000) as purchaseDate",
                "from_unixtime(pointsAwardedDate/1000) as pointsAwardedDate",
                "pointsEarned",
                "bonusPointsEarned",
                "bonusPointsEarnedReason",
                "purchasedItemCount",
                "rewardsReceiptItemList",
                "rewardsReceiptStatus",
                "totalSpent",
                "userId")
             .distinct()
             )

df.createOrReplaceTempView("temp_receipts")

In [0]:
%sql
-- Checking the distinct values of bonusPointsEarnedReason
-- Writing some code to standardize the case and remove some added strings (DEFAULT (5cefdcacf3693e0b50e83a36))
select distinct 
  bonusPointsEarnedReason,
  case
    when bonusPointsEarnedReason like "%(5cefdcacf3693e0b50e83a36)" then initcap(replace(bonusPointsEarnedReason, "DEFAULT (5cefdcacf3693e0b50e83a36)", ""))
    when bonusPointsEarnedReason like "%\\_%" then initcap(replace(bonusPointsEarnedReason, "_", " "))
    else initcap(bonusPointsEarnedReason)
  end as bonusPointsEarnedReason
from temp_receipts


bonusPointsEarnedReason,bonusPointsEarnedReason.1
,
"Receipt number 1 completed, bonus point schedule DEFAULT (5cefdcacf3693e0b50e83a36)","Receipt Number 1 Completed, Bonus Point Schedule"
COMPLETE_NONPARTNER_RECEIPT,Complete Nonpartner Receipt
"Receipt number 5 completed, bonus point schedule DEFAULT (5cefdcacf3693e0b50e83a36)","Receipt Number 5 Completed, Bonus Point Schedule"
COMPLETE_PARTNER_RECEIPT,Complete Partner Receipt
"Receipt number 6 completed, bonus point schedule DEFAULT (5cefdcacf3693e0b50e83a36)","Receipt Number 6 Completed, Bonus Point Schedule"
"Receipt number 3 completed, bonus point schedule DEFAULT (5cefdcacf3693e0b50e83a36)","Receipt Number 3 Completed, Bonus Point Schedule"
All-receipts receipt bonus,All-receipts Receipt Bonus
"Receipt number 4 completed, bonus point schedule DEFAULT (5cefdcacf3693e0b50e83a36)","Receipt Number 4 Completed, Bonus Point Schedule"
"Receipt number 2 completed, bonus point schedule DEFAULT (5cefdcacf3693e0b50e83a36)","Receipt Number 2 Completed, Bonus Point Schedule"


In [0]:
%sql
-- Noticed potential inconsistencies in type for the pointsEarned column
select distinct
  pointsEarned as pointsEarned_raw,
  cast(pointsEarned as decimal(32,2)) as newPointsEarned
from fetch.receipts_raw
order by 
  1 desc

pointsEarned_raw,newPointsEarned
986.5,986.5
9850.0,9850.0
9449.8,9449.8
940.0,940.0
94.6,94.6
934.4,934.4
922.1,922.1
9200.0,9200.0
91.2,91.2
8950.0,8950.0


In [0]:
%sql
-- Applying light transformations and creating a new table
-- Casting fields to ensure consistency with eachother
-- Cleaning formatting and values in the bonusPointsEarnedReason
Create or replace table fetch.receipts_processed as
select 
  _id,
  cast(createDate as timestamp) as createDate,
  cast(dateScanned as timestamp) as scannedDate,
  cast(finishedDate as timestamp) as finishedDate,
  cast(modifyDate as timestamp) as modifyDate,
  cast(purchaseDate as timestamp) as purchaseDate,
  cast(pointsAwardedDate as timestamp) as pointsAwardedDate,
  cast(pointsEarned as decimal(32,2)) as pointsEarned,
  cast(bonusPointsEarned as decimal(32,2)) as bonusPointsEarned,
  case
    when bonusPointsEarnedReason like "%(5cefdcacf3693e0b50e83a36)" then initcap(replace(bonusPointsEarnedReason, "DEFAULT (5cefdcacf3693e0b50e83a36)", ""))
    when bonusPointsEarnedReason like "%\\_%" then initcap(replace(bonusPointsEarnedReason, "_", " "))
    else initcap(bonusPointsEarnedReason)
  end as bonusPointsEarnedReason,
  rewardsReceiptStatus,
  cast(purchasedItemCount as Integer) as purchasedItemCount,
  cast(totalSpent as decimal(32,2)) as totalSpent,
  userId
from temp_receipts

num_affected_rows,num_inserted_rows


In [0]:
%sql
-- Query to display data quality issues for each column
-- PointsAwardedDate is null more than 50% of the time but the pointsEarned field is only null 46% of the time
select 
  round(sum(case when _id is null then 1 else 0 end)/ count(*), 2) as id_nulls_percentage,
  round(sum(case when createDate is null then 1 else 0 end)/ count(*), 2) as createDate_nulls_percentage,
  round(sum(case when scannedDate is null then 1 else 0 end)/ count(*), 2) as scannedDate_nulls_percentage,
  round(sum(case when finishedDate is null then 1 else 0 end)/ count(*), 2) as finishedDate_nulls_percentage,
  round(sum(case when modifyDate is null then 1 else 0 end)/ count(*), 2) as modifyDate_nulls_percentage,
  round(sum(case when purchaseDate is null then 1 else 0 end)/ count(*), 2) as purchaseDate_nulls_percentage,
  round(sum(case when pointsAwardedDate is null then 1 else 0 end)/ count(*), 2) as pointsAwardedDate_nulls_percentage,
  round(sum(case when pointsEarned is null then 1 else 0 end)/ count(*), 2) as pointsEarned_nulls_percentage,
  round(sum(case when bonusPointsEarned is null then 1 else 0 end)/ count(*), 2) as bonusPointsEarned_nulls_percentage,
  round(sum(case when bonusPointsEarnedReason is null then 1 else 0 end)/ count(*), 2) as bonusPointsEarnedReason_nulls_percentage,
  round(sum(case when rewardsReceiptStatus is null then 1 else 0 end)/ count(*), 2) as rewardsReceiptStatus_nulls_percentage,
  round(sum(case when purchasedItemCount is null then 1 else 0 end)/ count(*), 2) as purchasedItemCount_nulls_percentage,
  round(sum(case when totalSpent is null then 1 else 0 end)/ count(*), 2) as totalSpent_nulls_percentage,
  round(sum(case when userId is null then 1 else 0 end)/ count(*), 2) as userId_nulls_percentage
from fetch.receipts_processed  

id_nulls_percentage,createDate_nulls_percentage,scannedDate_nulls_percentage,finishedDate_nulls_percentage,modifyDate_nulls_percentage,purchaseDate_nulls_percentage,pointsAwardedDate_nulls_percentage,pointsEarned_nulls_percentage,bonusPointsEarned_nulls_percentage,bonusPointsEarnedReason_nulls_percentage,rewardsReceiptStatus_nulls_percentage,purchasedItemCount_nulls_percentage,totalSpent_nulls_percentage,userId_nulls_percentage
0.0,0.0,0.0,0.49,0.0,0.4,0.52,0.46,0.51,0.51,0.0,0.43,0.39,0.0


In [0]:
%sql
-- Investigating the pointsAwarded situation
-- There appears to be statusCodes that might revoke the issue of points to the user
select distinct
  *
from fetch.receipts_processed  
where pointsAwardedDate is null

_id,createDate,scannedDate,finishedDate,modifyDate,purchaseDate,pointsAwardedDate,pointsEarned,bonusPointsEarned,bonusPointsEarnedReason,rewardsReceiptStatus,purchasedItemCount,totalSpent,userId
6025c21e0a7214d8e90002ad,2021-02-11T23:47:42.000Z,2021-02-11T23:47:42.000Z,,2021-02-11T23:47:42.000Z,,,,,,SUBMITTED,,,5fc961c3b8cfca11a077dd33
601e911a0a720f053c000070,2021-02-06T12:52:42.000Z,2021-02-06T12:52:42.000Z,,2021-02-06T12:52:42.000Z,,,,,,SUBMITTED,,,5fc961c3b8cfca11a077dd33
603afc0e0a720fde10000243,2021-02-28T02:12:30.000Z,2021-02-28T02:12:30.000Z,,2021-02-28T02:12:31.000Z,2020-08-17T00:00:00.000Z,,25.0,25.0,Complete Nonpartner Receipt,REJECTED,2.0,34.96,5fc961c3b8cfca11a077dd33
60263d290a7214d8e90002e3,2021-02-12T08:32:41.000Z,2021-02-12T08:32:41.000Z,,2021-02-12T08:32:41.000Z,,,,,,SUBMITTED,,,5fc961c3b8cfca11a077dd33
603c6adf0a720fde1000039a,2021-03-01T04:17:35.000Z,2021-03-01T04:17:35.000Z,,2021-03-01T04:17:35.000Z,,,,,,SUBMITTED,,,5fc961c3b8cfca11a077dd33
601d7b8e0a720f0554000079,2021-02-05T17:08:30.000Z,2021-02-05T17:08:30.000Z,2021-02-05T17:08:30.000Z,2021-02-05T17:08:30.000Z,2021-02-05T00:00:00.000Z,,,,,PENDING,,28.57,59c124bae4b0299e55b0f330
603c7c6c0a7217c72c0003b3,2021-03-01T05:32:28.000Z,2021-03-01T05:32:28.000Z,,2021-03-01T05:32:29.000Z,2020-08-17T00:00:00.000Z,,25.0,25.0,Complete Nonpartner Receipt,REJECTED,2.0,34.96,5fc961c3b8cfca11a077dd33
5ff1e1b60a7214ada100055c,2021-01-03T15:24:38.000Z,2021-01-03T15:24:38.000Z,,2021-01-03T15:24:38.000Z,2021-02-03T15:24:38.000Z,,8850.0,150.0,"Receipt Number 5 Completed, Bonus Point Schedule",FLAGGED,10.0,290.0,5ff1e194b6a9d73a3a9f1052
5ff873f10a720f052300064f,2021-01-08T15:02:09.000Z,2021-01-08T15:02:09.000Z,,2021-01-08T15:02:10.000Z,2021-02-08T15:02:10.000Z,,9200.0,500.0,"Receipt Number 2 Completed, Bonus Point Schedule",FLAGGED,10.0,290.0,5ff873d1b3348b11c9337716
603943720a7217c72c0000ed,2021-02-26T18:52:34.000Z,2021-02-26T18:52:34.000Z,,2021-02-26T18:52:34.000Z,,,,,,SUBMITTED,,,5fc961c3b8cfca11a077dd33


#### Items Dimension

In [0]:
# Exploding the items from the receipts rewardsReceiptItemList and writing to a table
df = (spark.table("fetch.receipts_raw")
             .withColumn('receiptId', col("_id.$oid"))
             .selectExpr(
                "receiptId",
                "explode(rewardsReceiptItemList) as item"
             )
             .select("receiptId", "item.*")
             )

df.write.format("delta").mode('overwrite').saveAsTable(f"fetch.items_raw")


In [0]:
%sql
-- Checking the count of records items that had a barcode that existed in the brands table
-- only 82 total records had a valid barcode (~1.2%)
select 
  count(*)
from `fetch`.items_raw
where barcode in (select distinct barcode from fetch.brands_processed)

receiptId,barcode,brandCode,competitiveProduct,competitorRewardsGroup,deleted,description,discountedItemPrice,finalPrice,itemNumber,itemPrice,metabriteCampaignId,needsFetchReview,needsFetchReviewReason,originalFinalPrice,originalMetaBriteBarcode,originalMetaBriteDescription,originalMetaBriteItemPrice,originalMetaBriteQuantityPurchased,originalReceiptItemText,partnerItemId,pointsEarned,pointsNotAwardedReason,pointsPayerId,preventTargetGapPoints,priceAfterCoupon,quantityPurchased,rewardsGroup,rewardsProductPartnerId,targetPrice,userFlaggedBarcode,userFlaggedDescription,userFlaggedNewItem,userFlaggedPrice,userFlaggedQuantity
600206000a720f05f3000087,511111204206,SWANSON,False,,,SWANSON,5.58,5.58,,5.58,,,,,,,,,SWANSON UNSLTD CKN,1083,55.8,,5a734034e4b0d58f376be874,,,2,,5a734034e4b0d58f376be874,,,,,,
600208270a720f05f3000088,511111204206,SWANSON,False,,,SWANSON,5.58,5.58,,5.58,,,,,,,,,SWANSON UNSLTD CKN,1083,55.8,,5a734034e4b0d58f376be874,,,2,,5a734034e4b0d58f376be874,,,,,,
600208270a720f05f3000088,511111001485,TOSTITOS,False,,,TOSTITOS,2.98,2.98,,2.98,,,,,,,,,TOSTITOS MED SALSA,1178,,Action not allowed for user and CPG,5332f5fbe4b03c9a25efd0ba,,,1,,5332f5fbe4b03c9a25efd0ba,,,,,,
600208270a720f05f3000088,511111001485,TOSTITOS,False,,,TOSTITOS,3.99,3.99,,3.99,,,,,,,,,TOSTITOS MED SALSA,1191,,Action not allowed for user and CPG,5332f5fbe4b03c9a25efd0ba,,,1,,5332f5fbe4b03c9a25efd0ba,,,,,,
600373700a720f05f3000091,511111204206,SWANSON,False,,,SWANSON,5.58,5.58,,5.58,,,,,,,,,SWANSON UNSLTD CKN,1090,55.8,,5a734034e4b0d58f376be874,,,2,,5a734034e4b0d58f376be874,,,,,,
600373700a720f05f3000091,511111104537,KETTLE BRAND,False,,,KETTLE BRAND,3.69,3.69,,3.69,,,,,,,,,HYV KETTLE CHIP ORIG,1143,36.9,,5a734034e4b0d58f376be874,,,1,,5a734034e4b0d58f376be874,,,,,,
600373700a720f05f3000091,511111001485,TOSTITOS,False,,,TOSTITOS,2.98,2.98,,2.98,,,,,,,,,TOSTITOS MED SALSA,1185,,Action not allowed for user and CPG,5332f5fbe4b03c9a25efd0ba,,,1,,5332f5fbe4b03c9a25efd0ba,,,,,,
600373700a720f05f3000091,511111001485,TOSTITOS,False,,,TOSTITOS,3.99,3.99,,3.99,,,,,,,,,TOSTITOS MED SALSA,1198,,Action not allowed for user and CPG,5332f5fbe4b03c9a25efd0ba,,,1,,5332f5fbe4b03c9a25efd0ba,,,,,,
60023e8f0a720f05f300008b,511111204206,SWANSON,False,,,SWANSON,5.58,5.58,,5.58,,,,,,,,,SWANSON UNSLTD CKN,1082,55.8,,5a734034e4b0d58f376be874,,,2,,5a734034e4b0d58f376be874,,,,,,
60023e8f0a720f05f300008b,511111001485,TOSTITOS,False,,,TOSTITOS,2.98,2.98,,2.98,,,,,,,,,TOSTITOS MED SALSA,1177,,Action not allowed for user and CPG,5332f5fbe4b03c9a25efd0ba,,,1,,5332f5fbe4b03c9a25efd0ba,,,,,,


In [0]:
%sql
-- Checking the records with invalid barcodes and their counts
-- Recommendation to investigate why so many barcodes are not covered in the brands table
with aggregated as (
select 
  description, 
  count(*) as null_record_count
from fetch.items_raw
where barcode not in (select distinct barcode from fetch.brands_processed)
group by 
  1)

select 
  description,
  null_record_count,
  sum(null_record_count) over () as total_records,
  round(null_record_count/sum(null_record_count) over () *100 , 2) as percentage
from aggregated
order by 2 desc

description,null_record_count,total_records,percentage
,231,3008,7.68
ITEM NOT FOUND,173,3008,5.75
HUGGIES SIMPLY CLEAN PREMOISTENED WIPE FRAGRANCE FREE BAG 216 COUNT,92,3008,3.06
MILLER LITE 24 PACK 12OZ CAN,90,3008,2.99
KLEENEX POP UP RECTANGLE BOX FACIAL TISSUE 2 PLY 8PK 160 CT,87,3008,2.89
REGULAR PEPSI SODA COLA CAN 12 CT 144 OZ,76,3008,2.53
Ben & Jerry's Chunky Monkey Non-Dairy Frozen Dessert 16 oz,60,3008,1.99
KNORR RICE SIDES CREAMY CHICKEN REGULAR RICE AND PASTA MIX LONG GRAIN 5.7 OZ - 0041000022624,54,3008,1.8
BEN & JERRYS FROZEN CHUNKY MONKEY ICE CREAM REGULAR 16 OZ - 0076840100351,53,3008,1.76
DORITOS TORTILLA CHIP NACHO CHEESE BAG 7.625 OZ,45,3008,1.5


In [0]:
%sql
-- looking at the null percentages of the fields
-- A ton of fields have nulls and would need to dig further to see why there are so many and potential imputation methods
-- Investigate whether all these fields are needed in the items table. Potentially expand the brands table to include certain fields
select 
  round(sum(case when receiptID is null then 1 else 0 end)/ count(*), 2) as receiptID_nulls_percentage,
  round(sum(case when barcode is null then 1 else 0 end)/ count(*), 2) as barcode_nulls_percentage,
  round(sum(case when brandCode is null then 1 else 0 end)/ count(*), 2) as brandCode_nulls_percentage,
  round(sum(case when competitiveProduct is null then 1 else 0 end)/ count(*), 2) as competitiveProduct_nulls_percentage,
  round(sum(case when competitorRewardsGroup is null then 1 else 0 end)/ count(*), 2) as competitorRewardsGroup_nulls_percentage,
  round(sum(case when deleted is null then 1 else 0 end)/ count(*), 2) as deleted_nulls_percentage,
  round(sum(case when description is null then 1 else 0 end)/ count(*), 2) as description_nulls_percentage,
  round(sum(case when discountedItemPrice is null then 1 else 0 end)/ count(*), 2) as discountedItemPrice_nulls_percentage,
  round(sum(case when finalPrice is null then 1 else 0 end)/ count(*), 2) as finalPrice_nulls_percentage,
  round(sum(case when itemNumber is null then 1 else 0 end)/ count(*), 2) as itemNumber_nulls_percentage,
  round(sum(case when metabriteCampaignId is null then 1 else 0 end)/ count(*), 2) as metabriteCampaignId_nulls_percentage,
  round(sum(case when needsFetchReviewReason is null then 1 else 0 end)/ count(*), 2) as needsFetchReviewReason_nulls_percentage,
  round(sum(case when originalFinalPrice is null then 1 else 0 end)/ count(*), 2) as originalFinalPrice_nulls_percentage,
  round(sum(case when originalMetaBriteBarcode is null then 1 else 0 end)/ count(*), 2) as originalMetaBriteBarcode_nulls_percentage,
  round(sum(case when originalMetaBriteDescription is null then 1 else 0 end)/ count(*), 2) as originalMetaBriteDescription_nulls_percentage,
  round(sum(case when originalMetaBriteItemPrice is null then 1 else 0 end)/ count(*), 2) as originalMetaBriteItemPrice_nulls_percentage,
  round(sum(case when originalMetaBriteQuantityPurchased is null then 1 else 0 end)/ count(*), 2) as originalMetaBriteQuantityPurchased_nulls_percentage,
  round(sum(case when originalReceiptItemText is null then 1 else 0 end)/ count(*), 2) as originalReceiptItemText_nulls_percentage,
  round(sum(case when partnerItemId is null then 1 else 0 end)/ count(*), 2) as partnerItemId_nulls_percentage,
  round(sum(case when pointsEarned is null then 1 else 0 end)/ count(*), 2) as pointsEarned_nulls_percentage,
  round(sum(case when pointsNotAwardedReason is null then 1 else 0 end)/ count(*), 2) as pointsNotAwardedReason_nulls_percentage,
  round(sum(case when pointsPayerId is null then 1 else 0 end)/ count(*), 2) as pointsPayerId_nulls_percentage,
  round(sum(case when preventTargetGapPoints is null then 1 else 0 end)/ count(*), 2) as preventTargetGapPoints_nulls_percentage,
  round(sum(case when priceAfterCoupon is null then 1 else 0 end)/ count(*), 2) as priceAfterCoupon_nulls_percentage,
  round(sum(case when quantityPurchased is null then 1 else 0 end)/ count(*), 2) as quantityPurchased_nulls_percentage,
  round(sum(case when rewardsGroup is null then 1 else 0 end)/ count(*), 2) as rewardsGroup_nulls_percentage,
  round(sum(case when rewardsProductPartnerId is null then 1 else 0 end)/ count(*), 2) as rewardsProductPartnerId_nulls_percentage,
  round(sum(case when targetPrice is null then 1 else 0 end)/ count(*), 2) as targetPrice_nulls_percentage,
  round(sum(case when userFlaggedBarcode is null then 1 else 0 end)/ count(*), 2) as userFlaggedBarcode_nulls_percentage,
  round(sum(case when userFlaggedDescription is null then 1 else 0 end)/ count(*), 2) as userFlaggedDescription_nulls_percentage,
  round(sum(case when userFlaggedNewItem is null then 1 else 0 end)/ count(*), 2) as userFlaggedNewItem_nulls_percentage,
  round(sum(case when userFlaggedPrice is null then 1 else 0 end)/ count(*), 2) as userFlaggedPrice_nulls_percentage,
  round(sum(case when userFlaggedQuantity is null then 1 else 0 end)/ count(*), 2) as userFlaggedQuantity_nulls_percentage
from fetch.items_raw  

receiptID_nulls_percentage,barcode_nulls_percentage,brandCode_nulls_percentage,competitiveProduct_nulls_percentage,competitorRewardsGroup_nulls_percentage,deleted_nulls_percentage,description_nulls_percentage,discountedItemPrice_nulls_percentage,finalPrice_nulls_percentage,itemNumber_nulls_percentage,metabriteCampaignId_nulls_percentage,needsFetchReviewReason_nulls_percentage,originalFinalPrice_nulls_percentage,originalMetaBriteBarcode_nulls_percentage,originalMetaBriteDescription_nulls_percentage,originalMetaBriteItemPrice_nulls_percentage,originalMetaBriteQuantityPurchased_nulls_percentage,originalReceiptItemText_nulls_percentage,partnerItemId_nulls_percentage,pointsEarned_nulls_percentage,pointsNotAwardedReason_nulls_percentage,pointsPayerId_nulls_percentage,preventTargetGapPoints_nulls_percentage,priceAfterCoupon_nulls_percentage,quantityPurchased_nulls_percentage,rewardsGroup_nulls_percentage,rewardsProductPartnerId_nulls_percentage,targetPrice_nulls_percentage,userFlaggedBarcode_nulls_percentage,userFlaggedDescription_nulls_percentage,userFlaggedNewItem_nulls_percentage,userFlaggedPrice_nulls_percentage,userFlaggedQuantity_nulls_percentage
0.0,0.55,0.63,0.91,0.96,1.0,0.05,0.17,0.03,0.98,0.88,0.97,1.0,0.99,1.0,1.0,1.0,0.17,0.0,0.87,0.95,0.82,0.95,0.86,0.03,0.75,0.67,0.95,0.95,0.97,0.95,0.96,0.96


In [0]:
%sql
-- Writing results and creating a new PK
create or replace table fetch.items_processed 
select 
  monotonically_increasing_id() as _id,
  receiptID,
  barcode,
  competitiveProduct,
  competitorRewardsGroup,
  deleted,
  description,
  discountedItemPrice,
  finalPrice,
  itemPrice,
  itemNumber, 
  metabriteCampaignId,
  needsFetchReviewReason,
  originalFinalPrice,
  originalMetaBriteBarcode,
  originalMetaBriteDescription,
  originalMetaBriteItemPrice,
  originalMetaBriteQuantityPurchased,
  originalReceiptItemText,
  partnerItemId,
  pointsEarned,
  pointsNotAwardedReason,
  pointsPayerId,
  preventTargetGapPoints,
  priceAfterCoupon,
  quantityPurchased,
  rewardsGroup,
  rewardsProductPartnerId,
  targetPrice,
  userFlaggedBarcode,
  userFlaggedDescription,
  userFlaggedNewItem,
  userFlaggedPrice,
  userFlaggedQuantity
from fetch.items_raw

num_affected_rows,num_inserted_rows


#### Write queries that directly answer predetermined questions from business stakeholder
1. What are the top 5 brands by receipts scanned for most recent month?
2. How does the ranking of the top 5 brands by receipts scanned for the recent month compare to the ranking for the previous month?
3. When considering average spend from receipts with 'rewardsReceiptStatus’ of ‘Accepted’ or ‘Rejected’, which is greater?
4. When considering total number of items purchased from receipts with 'rewardsReceiptStatus’ of ‘Accepted’ or ‘Rejected’, which is greater?
5. Which brand has the most spend among users who were created within the past 6 months?
6. Which brand has the most transactions among users who were created within the past 6 months?


In [0]:
%sql
-- Question 1
-- Because of lack of relationships between brands and items.barcode I used some other fields to extract some more brand information
with most_recent_month as (
select 
  max(date_format(scannedDate, 'yyyy-MM')) AS year_month
from fetch.receipts_processed), 

joined as (
select
  coalesce(coalesce(c.name, b.description), b.userFlaggedDescription) as brand,
  count(distinct a._id) as receipt_count
from fetch.receipts_processed a
left join fetch.items_processed b on a._id = b.receiptID
left join fetch.brands_processed c on b.barcode = c.barcode
where date_format(scannedDate, 'yyyy-MM') = (select year_month from most_recent_month)
group by 
  c.name,
  b.description,
  b.userFlaggedDescription
order by 
  2 desc)

select 
  initcap(brand) as brand,
  receipt_count
from joined 
where brand is not null 
and brand not in ('ITEM NOT FOUND', '' )
order by  
  receipt_count desc
limit 5

brand,receipt_count
"Mueller Austria Hypergrind Precision Electric Spice/coffee Grinder Millwith Large Grinding Capacity And Hd Motor Also For Spices, Herbs, Nuts,grains, White",13
Thindust Summer Face Mask - Sun Protection Neck Gaiter For Outdooractivities,13


In [0]:
%sql
-- Question 2
-- Similar issues to question 1. Collected the results for the most recent months and displayed them side-by-side
with year_months as (
select distinct
  date_format(scannedDate, 'yyyy-MM') AS year_month
from fetch.receipts_processed),

ranked_months as (
  select 
    *,
    row_number() over (order by year_month desc) as year_month_rank
  from year_months
  limit 2
),

joined as (
select
  d.year_month,
  d.year_month_rank,
  coalesce(coalesce(c.name, b.description), b.userFlaggedDescription) as brand,
  count(distinct a._id) as receipt_count
from fetch.receipts_processed a
left join fetch.items_processed b on a._id = b.receiptID
left join fetch.brands_processed c on b.barcode = c.barcode
inner join ranked_months d on date_format(scannedDate, 'yyyy-MM') = d.year_month
group by 
  d.year_month,
  d.year_month_rank,
  c.name,
  b.description,
  b.userFlaggedDescription
order by 
  2 desc)

select 
  initcap(brand) as brand,
  sum(case
    when year_month_rank = 1 then receipt_count
    else 0 
    end) as most_recent_month_receipts,
    sum(case
    when year_month_rank = 2 then receipt_count
    else 0 
    end) as second_most_recent_month_receipts
from joined 
where brand is not null 
and brand not in ('ITEM NOT FOUND', '' )
group by 
  brand
order by  
  most_recent_month_receipts desc
limit 5


brand,most_recent_month_receipts,second_most_recent_month_receipts
Thindust Summer Face Mask - Sun Protection Neck Gaiter For Outdooractivities,13,27
"Mueller Austria Hypergrind Precision Electric Spice/coffee Grinder Millwith Large Grinding Capacity And Hd Motor Also For Spices, Herbs, Nuts,grains, White",13,27
Brownie Britle,0,1
Jif Crmy Pnt Btr Jar 40 Oz,0,2
Mssn Tortlla,0,2


In [0]:
%sql
-- Question 3
-- Im assuming the instructions are a lil outdated since Accepted is not a value for rewardsReceiptStatus
-- Finished is higher
select 
  rewardsReceiptStatus,
  avg(totalSpent) as avg_spend
from fetch.receipts_processed
where rewardsReceiptStatus in ('FINISHED', 'REJECTED')
group by 
  1
order by 2 desc

rewardsReceiptStatus,avg_spend
FINISHED,80.854305
REJECTED,23.326056


In [0]:
%sql
-- Question 4
-- Im assuming the instructions are a lil outdated since Accepted is not a value for rewardsReceiptStatus
-- Finished is higher
select 
  rewardsReceiptStatus,
  sum(purchasedItemCount) as total_number_of_items_purchased
from fetch.receipts_processed
where rewardsReceiptStatus in ('FINISHED', 'REJECTED')
group by 
  1
order by 2 desc

rewardsReceiptStatus,total_number_of_items_purchased
FINISHED,8184
REJECTED,173


In [0]:
%sql
-- Question 5
with receipts as (
select _id, userId, totalSpent from fetch.receipts_processed),

recently_created_users as (
select  
  _id
from fetch.users_processed
where date_format(createdDate, 'yyyy-MM') >= (select date_format(add_months(max(to_date(createdDate, 'yyyy-MM')), -6), 'yyyy-MM') from fetch.users_processed))

select 
  coalesce(coalesce(d.name, c.description), c.userFlaggedDescription) as brand,
  sum(distinct totalSpent) as totalSpent
from receipts a
inner join recently_created_users b on a.userId = b._id
left join fetch.items_processed c on a._id = c.receiptID
left join fetch.brands_processed d on c.barcode = d.barcode
group by 
  d.name, 
  c.description,
  c.userFlaggedDescription
order by 2 desc
limit 1



brand,totalSpent
1% Milk,19405.89


In [0]:
%sql
-- Question 6
with receipts as (
select _id, userId, purchasedItemCount
 from fetch.receipts_processed),

recently_created_users as (
select  
  _id
from fetch.users_processed
where date_format(createdDate, 'yyyy-MM') >= (select date_format(add_months(max(to_date(createdDate, 'yyyy-MM')), -6), 'yyyy-MM') from fetch.users_processed))

select 
  coalesce(coalesce(d.name, c.description), c.userFlaggedDescription) as brand,
  sum(distinct purchasedItemCount) as purchasedItemCount
from receipts a
inner join recently_created_users b on a.userId = b._id
left join fetch.items_processed c on a._id = c.receiptID
left join fetch.brands_processed d on c.barcode = d.barcode
group by 
  d.name, 
  c.description,
  c.userFlaggedDescription
order by 2 desc
limit 1



brand,purchasedItemCount
1% Milk,3843


#### Communication with stakeholders


**To**: leadership@fetch.com

**Subject**: Data Analysis Update + Request for Feedback

Hi Everyone,

I hope you're doing well! My name is Jose, and I’ve recently started diving into the company data provided to me a few days ago. I wanted to reach out with a few updates based on my initial analysis, as well as a few questions where your input would be really helpful.

Below are five things I'd appreciate your feedback on:


1. Which user-related metrics are most important to you (e.g., MAU, acquisition source, user location)? Given current data quality issues, would it be helpful to flag potentially impacted metrics or temporarily pause reporting while we work on resolving them?

2. Did you anticipate data quality challenges in these datasets? I've found significant gaps—like missing values in key reporting fields—and would appreciate your input before moving forward with any data-cleaning approach.

3. Could you connect me with the data engineering team(s) so that I can better understand how the brand dataset is maintained? From my analysis, product details and barcodes are essential for tracking user behavior. However, due to current data quality issues, it might be challenging to accurately identify opportunities that could drive more value for the company.

4. Could you help point me to the right POC who can walk me through our current data architecture? Understanding whether we’re working with a Data Warehouse, Data Lake, or Data Lakehouse will help me design data assets that are aligned with both our existing setup and future business needs.

5. Based on my analysis and the current structure of the datasets, I anticipate that real-time processing and scaling could become challenges as we move into production. To ensure we’re balancing performance, cost, and efficiency, I’d like to better understand how quickly the you need access to data for decision-making? With your input, I can recommend a more tailored solution. That said, given the unstructured nature of our data and expected growth in user activity, a Data Lake or Data Lakehouse paired with a scalable streaming tool like Amazon Kinesis could be a strong fit. 


Your insights will help me ensure that the data solutions I propose are aligned with your goals and priorities. If it would be easier to discuss any of this over a call, I’d be more than happy to schedule a follow-up meeting at your convenience.

Looking forward to hearing your thoughts!


Best,

Jose Hercules
