In [0]:
from pyspark.sql import SparkSession
import re
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [0]:
# Initialize a SparkSession with the name "Fetch JSON Data"
spark = SparkSession.builder \
    .appName("Fetch JSON Data") \
    .getOrCreate()

In [0]:
# Define the paths to the JSON files for brands, users, and receipts data
brands_volume_path = "/Volumes/fetch_rewards/default/fetch_data_volume/brands.json"
user_volume_path = "/Volumes/fetch_rewards/default/fetch_data_volume/users.json"
receipts_volume_path = "/Volumes/fetch_rewards/default/fetch_data_volume/receipts.json"

### Data Extraction and Flattening

In [0]:
from pyspark.sql.functions import col

def flatten_df(nested_df):
    # Get the list of columns that are not nested (not of struct type)
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    # Get the list of columns that are nested (of struct type)
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    # Loop until there are no more nested columns
    while nested_cols:
        # Select flat columns and expand nested columns
        flat_df = nested_df.select(
            flat_cols + 
            [col(nc + '.' + c).alias(nc + '_' + c) for nc in nested_cols for c in nested_df.select(nc + '.*').columns]
        )
        # Update nested_df to the newly flattened DataFrame
        nested_df = flat_df
        # Update the list of flat columns
        flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
        # Update the list of nested columns
        nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    return nested_df

In [0]:
# Read the JSON file into a DataFrame
raw_brands_df = spark.read.json(brands_volume_path)

# Flatten the nested DataFrame
brands_df = flatten_df(raw_brands_df)

# Rename columns for consistency and readability
brands_df = brands_df.withColumnRenamed('brandCode','brand_code')\
                        .withColumnRenamed('category','category_name')\
                        .withColumnRenamed('categoryCode','category_code')\
                        .withColumnRenamed('name','brand_name')\
                        .withColumnRenamed('topBrand','is_top_brand')\
                        .withColumnRenamed('_id_$oid','brand_id') \
                        .withColumnRenamed('cpg_$ref','cpg_ref')\
                        .withColumnRenamed('cpg_$id_$oid','cpg_id')

In [0]:
from pyspark.sql.functions import to_date, col

# Read the JSON file into a DataFrame
raw_receipts_df = spark.read.json(receipts_volume_path)

# Flatten the nested DataFrame
receipts_df = flatten_df(raw_receipts_df)

# Rename columns for consistency and readability
receipts_df = receipts_df.withColumnRenamed('bonusPointsEarned','bonus_points_earned')\
                            .withColumnRenamed('bonusPointsEarnedReason','bonus_points_earned_reason')\
                            .withColumnRenamed('createDate_$date','create_date')\
                            .withColumnRenamed('dateScanned_$date','date_scanned')\
                            .withColumnRenamed('finishedDate_$date','finished_date')\
                            .withColumnRenamed('modifyDate_$date','modify_date')\
                            .withColumnRenamed('pointsAwardedDate_$date','points_awarded_date')\
                            .withColumnRenamed('pointsEarned','points_earned')\
                            .withColumnRenamed('purchaseDate_$date','purchase_date')\
                            .withColumnRenamed('purchasedItemCount','purchased_item_count')\
                            .withColumnRenamed('rewardsReceiptItemList','rewards_receipt_item_list')\
                            .withColumnRenamed('rewardsReceiptStatus','rewards_receipt_status')\
                            .withColumnRenamed('totalSpent','total_spent')\
                            .withColumnRenamed('userId','user_id')\
                            .withColumnRenamed('_id_$oid','receipt_id')

# Convert columns from unix to date type
receipts_df = receipts_df.withColumn('create_date', to_date((col('create_date') / 1000).cast('timestamp')))\
                         .withColumn('date_scanned', to_date((col('date_scanned') / 1000).cast('timestamp')))\
                         .withColumn('finished_date', to_date((col('finished_date') / 1000).cast('timestamp')))\
                         .withColumn('modify_date', to_date((col('modify_date') / 1000).cast('timestamp')))\
                         .withColumn('points_awarded_date', to_date((col('points_awarded_date') / 1000).cast('timestamp')))\
                         .withColumn('purchase_date', to_date((col('purchase_date') / 1000).cast('timestamp')))


In [0]:
from pyspark.sql.functions import explode

# Explode the rewards_receipt_item_list array column
exploded_receipts_df = receipts_df.withColumn("rewards_receipt_item", explode("rewards_receipt_item_list"))

# Create a separate DataFrame with receipt_id and all the exploded columns
receipt_items_df = exploded_receipts_df.select("receipt_id", "rewards_receipt_item.*")

# Rename columns for consistency and readability
for col_name in receipt_items_df.columns:
    new_col_name = ''.join(['_' + i.lower() if i.isupper() else i for i in col_name]).lstrip('_')
    receipt_items_df = receipt_items_df.withColumnRenamed(col_name, new_col_name)

In [0]:
# Read the JSON file into a DataFrame
raw_users_df = spark.read.json(user_volume_path)

# Flatten the nested DataFrame and drop any corrupt records
users_df = flatten_df(raw_users_df).drop('_corrupt_record')

# Rename columns for consistency and readability
users_df = users_df.withColumnRenamed('signUpSource','sign_up_source')\
                   .withColumnRenamed('active','is_active')\
                   .withColumnRenamed('createdDate_$date','created_date')\
                   .withColumnRenamed('lastLogin_$date','last_login_date')\
                   .withColumnRenamed('_id_$oid','user_id')

users_df = users_df.withColumn('created_date', to_date((col('created_date') / 1000).cast('timestamp')))\
                         .withColumn('last_login_date', to_date((col('last_login_date') / 1000).cast('timestamp')))


### Data Quality Checks

In [0]:
#Data Quality check for Receipts table

from pyspark.sql.functions import count, when, col

# Check for unique values in primary key column
unique_receipt_ids = receipts_df.select("receipt_id").distinct().count()

# Check missing value in primary key column receipt_id
missing_receipt_ids = receipts_df.filter(col("receipt_id").isNull()).count()
print(f"Missing receipt IDs: {missing_receipt_ids}")
total_receipt_ids = receipts_df.count()
print(f"Unique receipt IDs: {unique_receipt_ids}, Total receipt IDs: {total_receipt_ids}")

# Count nulls for specified columns
missing_values_receipts = receipts_df.select([count(when(col(c).isNull(), c)).alias(c) for c in ["user_id", "receipt_id", "create_date", "purchase_date"]])
display(missing_values_receipts)

Missing receipt IDs: 0
Unique receipt IDs: 1119, Total receipt IDs: 1119


user_id,receipt_id,create_date,purchase_date
0,0,0,448


In [0]:
# Data Quality check for Receipt Items table

from pyspark.sql.functions import count, when, col, min, max

# Count nulls for specified columns
missing_values_receipt_items = receipt_items_df.select([count(when(col(c).isNull(), c)).alias(c) for c in ["receipt_id", "barcode", "quantity_purchased", "item_price","points_earned"]])
display(missing_values_receipt_items)

# Range checks for numerical columns
numerical_columns_stats = receipt_items_df.select(
    min(col("quantity_purchased")).alias("min_quantity"),
    max(col("quantity_purchased")).alias("max_quantity"),
    min(col("item_price")).alias("min_price"),
    max(col("item_price")).alias("max_price"),
    min(col("points_earned")).alias("min_points"),
    max(col("points_earned")).alias("max_points")
)
display(numerical_columns_stats)

receipt_id,barcode,quantity_purchased,item_price,points_earned
0,3851,174,174,6014


min_quantity,max_quantity,min_price,max_price,min_points,max_points
1,17,0.0,95.84,10.0,99.9


In [0]:
#Data Quality check for Users table

from pyspark.sql.functions import count, when, col

# Check for unique values in primary key column
unique_user_ids = users_df.select("user_id").distinct().count()

# Check missing value in primary key column user_id
missing_user_ids = users_df.filter(col("user_id").isNull()).count()
print(f"Missing user IDs: {missing_user_ids}")
total_user_ids = users_df.count()
print(f"Unique user IDs: {unique_user_ids}, Total user IDs: {total_user_ids}")

# Check for duplicate user IDs
duplicate_user_ids = total_user_ids - unique_user_ids
print(f"Duplicate user IDs: {duplicate_user_ids}")

# Count nulls for specified columns
missing_values_users = users_df.select([count(when(col(c).isNull(), c)).alias(c) for c in ["user_id", "sign_up_source", "is_active", "created_date", "last_login_date"]])
display(missing_values_users)

Missing user IDs: 1
Unique user IDs: 213, Total user IDs: 495
Duplicate user IDs: 282


user_id,sign_up_source,is_active,created_date,last_login_date
1,49,1,1,63


In [0]:
# Data Quality check for Brands table

from pyspark.sql.functions import count, when, col

# Check for unique values in primary key column
unique_brand_ids = brands_df.select("brand_id").distinct().count()

# Check missing value in primary key column brand_id
missing_brand_ids = brands_df.filter(col("brand_id").isNull()).count()
print(f"Missing brand IDs: {missing_brand_ids}")
total_brand_ids = brands_df.count()
print(f"Unique brand IDs: {unique_brand_ids}, Total brand IDs: {total_brand_ids}")

# Check for duplicate brand IDs
duplicate_brand_ids = total_brand_ids - unique_brand_ids
print(f"Duplicate brand IDs: {duplicate_brand_ids}")

# Count nulls for specified columns
missing_values_brands = brands_df.select([count(when(col(c).isNull(), c)).alias(c) for c in ["brand_id","barcode","brand_code","brand_name", "category_code","category_name"]])
display(missing_values_brands)

Missing brand IDs: 0
Unique brand IDs: 1167, Total brand IDs: 1167
Duplicate brand IDs: 0


brand_id,barcode,brand_code,brand_name,category_code,category_name
0,0,234,0,650,155


### Response to questions from a business stakeholder

In [0]:
# Create temporary views for the dataframes
receipts_df.createOrReplaceTempView("receipts")
users_df.createOrReplaceTempView("users")
brands_df.createOrReplaceTempView("brands")
receipt_items_df.createOrReplaceTempView("receipt_items")

In [0]:
#When considering average spend from receipts with 'rewardsReceiptStatus’ of ‘Accepted’ or ‘Rejected’, which is greater?

query = """
SELECT 
  CASE 
    WHEN AVG(CASE WHEN r.rewards_receipt_status = 'Accepted' THEN r.total_spent ELSE NULL END) > 
         AVG(CASE WHEN r.rewards_receipt_status = 'Rejected' THEN r.total_spent ELSE NULL END) 
    THEN 'Accepted' 
    ELSE 'Rejected' 
  END AS greater_average_spend
FROM receipts r
"""

# Execute the query and display the result
result_df = spark.sql(query)
display(result_df)

greater_average_spend
Rejected


In [0]:
#When considering total number of items purchased from receipts with 'rewardsReceiptStatus’ of ‘Accepted’ or ‘Rejected’, which is greater?

query = """
SELECT 
  CASE 
    WHEN SUM(CASE WHEN r.rewards_receipt_status = 'Accepted' THEN ri.quantity_purchased ELSE 0 END) > 
         SUM(CASE WHEN r.rewards_receipt_status = 'Rejected' THEN ri.quantity_purchased ELSE 0 END) 
    THEN 'Accepted' 
    ELSE 'Rejected' 
  END AS greater_total_items_purchased
FROM receipts r
JOIN receipt_items ri
ON r.receipt_id = ri.receipt_id
"""

# Execute the query and display the result
result_df = spark.sql(query)
display(result_df)

greater_total_items_purchased
Rejected


### Communication with Business Stakeholders 

In [0]:
"""
Hello Team,

I hope this email finds you well.

As I continue analyzing and optimizing our data assets, I have a few questions and concerns that I would like to discuss:

Master Dataset Availability – I am looking for a master dataset that includes key item details such as original price, original description, and other metadata attributes. Access to this data would help add context and improve the quality of our datasets.

Missing Values & Data Accuracy – I have noticed a significant number of missing values in the dataset, particularly for brand_code, category_code, and barcode. This could pose challenges in ensuring data accuracy. Additionally, to support accurate database design, could you clarify the relationship between item_number, brand_code, and barcode?

Brand Mapping Issues – I also found receipts containing brand_code values that do not exist in the brands table. It would be helpful to have the correct mapping between these brand codes and the brands table.

Data Processing Efficiency – Processing unstructured JSON data into a structured format can be compute-intensive and time-consuming, especially at scale. I would like to explore the possibility of using a NoSQL database and compare its efficiency with our current approach.

Your input will be invaluable in delivering high-quality and optimized data solutions. I appreciate your time and look forward to your insights and any additional details you can provide.

Thank you for your time and cooperation.

Best regards,
DE team

"""