In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, array_contains, explode, exists
from pyspark.sql.types import StringType

In [2]:
# Initialize a Spark session
spark = SparkSession.builder \
    .appName("Fetch Rewards Exercise") \
    .getOrCreate()

25/05/14 22:29:30 WARN Utils: Your hostname, Johns-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.130 instead (on interface en0)
25/05/14 22:29:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/14 22:29:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
receiptsDF = spark.read.json("receipts.jsonl")

25/05/14 22:29:32 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [4]:
receiptsDF.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- bonusPointsEarned: long (nullable = true)
 |-- bonusPointsEarnedReason: string (nullable = true)
 |-- createDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- dateScanned: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- finishedDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- modifyDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- pointsAwardedDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- pointsEarned: string (nullable = true)
 |-- purchaseDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- purchasedItemCount: long (nullable = true)
 |-- rewardsReceiptItemList: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- barcode: string (nullable = true)
 |    |    |-- brandCode: string (nullable = true)
 |    |    |-- compe

#### Print the number of lines of JSON objects, i.e. number of distinct receipts

In [5]:
receiptsDF.count()

1119

#### Map all date structs to the actual `long` value to flatten Data Frame

In [6]:
remappedDF = receiptsDF.withColumn("_id", col("_id.$oid")) \
    .withColumn("createDate", col("createDate.$date")) \
    .withColumn("dateScanned", col("dateScanned.$date")) \
    .withColumn("finishedDate", col("finishedDate.$date")) \
    .withColumn("modifyDate", col("modifyDate.$date")) \
    .withColumn("pointsAwardedDate", col("pointsAwardedDate.$date")) \
    .withColumn("purchaseDate", col("purchaseDate.$date"))

In [7]:
remappedDF.printSchema()

root
 |-- _id: string (nullable = true)
 |-- bonusPointsEarned: long (nullable = true)
 |-- bonusPointsEarnedReason: string (nullable = true)
 |-- createDate: long (nullable = true)
 |-- dateScanned: long (nullable = true)
 |-- finishedDate: long (nullable = true)
 |-- modifyDate: long (nullable = true)
 |-- pointsAwardedDate: long (nullable = true)
 |-- pointsEarned: string (nullable = true)
 |-- purchaseDate: long (nullable = true)
 |-- purchasedItemCount: long (nullable = true)
 |-- rewardsReceiptItemList: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- barcode: string (nullable = true)
 |    |    |-- brandCode: string (nullable = true)
 |    |    |-- competitiveProduct: boolean (nullable = true)
 |    |    |-- competitorRewardsGroup: string (nullable = true)
 |    |    |-- deleted: boolean (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- discountedItemPrice: string (nullable = true)
 |    |    |-- finalPric

#### Explode the `rewardsReceiptItemList` array so that the total number of new rows represents the number of items scanned in total
This makes it much easier to filter for receipts that have at least one item with specific criteria

In [8]:
explodedDF = remappedDF.select(remappedDF["*"], explode("rewardsReceiptItemList").alias("receipt_item")).drop("rewardsReceiptItemList")

In [9]:
explodedDF.printSchema()

root
 |-- _id: string (nullable = true)
 |-- bonusPointsEarned: long (nullable = true)
 |-- bonusPointsEarnedReason: string (nullable = true)
 |-- createDate: long (nullable = true)
 |-- dateScanned: long (nullable = true)
 |-- finishedDate: long (nullable = true)
 |-- modifyDate: long (nullable = true)
 |-- pointsAwardedDate: long (nullable = true)
 |-- pointsEarned: string (nullable = true)
 |-- purchaseDate: long (nullable = true)
 |-- purchasedItemCount: long (nullable = true)
 |-- rewardsReceiptStatus: string (nullable = true)
 |-- totalSpent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- receipt_item: struct (nullable = true)
 |    |-- barcode: string (nullable = true)
 |    |-- brandCode: string (nullable = true)
 |    |-- competitiveProduct: boolean (nullable = true)
 |    |-- competitorRewardsGroup: string (nullable = true)
 |    |-- deleted: boolean (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- discountedItemPrice: string (n

#### Receipts that have an item with the description `ITEM NOT FOUND`
* There are a large number of receipt items that have this description
* This could be due to a process that calculates reward points at a certain workflow transition

In [10]:
notFoundAgg = explodedDF.filter(col("receipt_item.description").contains("ITEM NOT FOUND")).groupBy("_id").count()
notFoundAgg.count()

165

#### Receipt Items that have a valid description but no brand code
* There are many receipt items that do not have a brand code to match what appears to be a valid item
* This prevents using the data to calculate metrics for brands for items that do exist in the system

For refernce adding a `~` character negates the condition

In [11]:
noBrandCodeAgg = explodedDF.filter(
    ~col("receipt_item.description").contains("ITEM NOT FOUND") & 
    col("receipt_item.description").isNotNull() &
    col("receipt_item.brandCode").isNull()
).groupBy("_id").count()

noBrandCodeAgg.count()

401

#### Valid Receipt Items that do not indicate if they are a competitive product
* There are receipt items that appear to be valid items with a `description` and `brandCode` but no `competitiveProduct`
* This prevents using the data to calculate metrics for brands for items that should indicate if they are competitive
* There are many items have have a value of `false` for the `competitiveProduct` field so we can't assume a lack of a field means it isn't competitive

In [12]:
noCompetitiveItemAgg = explodedDF.filter(
    ~col("receipt_item.description").contains("ITEM NOT FOUND") & 
    col("receipt_item.description").isNotNull() &
    col("receipt_item.brandCode").isNotNull() &
    col("receipt_item.competitiveProduct").isNull()
).groupBy("_id").count()

noCompetitiveItemAgg.count()

84

This shows items that do have a non-null `competitiveProduct` field that is `false`

In [13]:
noCompetitiveItemAgg = explodedDF.filter(
    ~col("receipt_item.description").contains("ITEM NOT FOUND") & 
    col("receipt_item.description").isNotNull() &
    col("receipt_item.brandCode").isNotNull() &
    ~col("receipt_item.competitiveProduct")
).groupBy("_id").count()

noCompetitiveItemAgg.count()

31

This shows items that do have a non-null `competitiveProduct` field that is `true`

In [14]:
noCompetitiveItemAgg = explodedDF.filter(
    ~col("receipt_item.description").contains("ITEM NOT FOUND") & 
    col("receipt_item.description").isNotNull() &
    col("receipt_item.brandCode").isNotNull() &
    col("receipt_item.competitiveProduct")
).groupBy("_id").count()

noCompetitiveItemAgg.count()

29