## Tables creation from json files provided

In [0]:
brand_data=spark.read.json(f'abfss://brandjsonfile@kartheekuntdl.dfs.core.windows.net/brands.json')

In [0]:
users_data=spark.read.json(f'abfss://fetchjsonfiles@kartheekuntdl.dfs.core.windows.net/users.json')

In [0]:
from pyspark.sql.functions import col

# Flatten and transform the DataFrame
df_transformed = (
    brand_data
    # Extract _id.$oid as id
    .withColumn("id", col("_id.$oid")).drop("_id")
    #cpg":{"$id":{"$oid":"601ac114be37ce2ead437550"},"$ref":"Cogs"}
    # Extract cpg.$id.$oid and cpg.$ref
    .withColumn("cpg_id", col("cpg.$id.$oid"))
    .withColumn("cpg_ref", col("cpg.$ref"))
    .drop("cpg")

    # Rename other columns as required (keeping existing ones)
    .withColumnRenamed("barcode", "barcode")
    .withColumnRenamed("brandCode", "brand_code")
    .withColumnRenamed("category", "category")
    .withColumnRenamed("categoryCode", "category_code")
    .withColumnRenamed("name", "brand_name")
    .withColumnRenamed("topBrand", "top_brand")
)




In [0]:
%sql
CREATE TABLE brands_table (
    id STRING,
    barcode STRING,
    brand_code STRING,
    category STRING,
    category_code STRING,
    cpg_id STRING,
    cpg_ref STRING,
    brand_name STRING,
    top_brand BOOLEAN
)
USING DELTA;


In [0]:
# Save DataFrame as a Delta table
df_transformed.write.format("delta").mode("overwrite").saveAsTable("brands_table")

In [0]:
%sql 
select * from brands_table

In [0]:
from pyspark.sql.functions import col, from_unixtime

# Flatten and transform the DataFrame
df_transformed = (
    users_data
    # Extract _id.$oid as id
    .withColumn("id", col("_id.$oid")).drop("_id")

    # Convert $date timestamps to readable format
    .withColumn("created_date", from_unixtime(col("createdDate.$date") / 1000))
    .withColumn("last_login", from_unixtime(col("lastLogin.$date") / 1000))
    .drop("createdDate", "lastLogin")

    # Rename other columns as required
    .withColumnRenamed("role", "role")
    .withColumnRenamed("signUpSource", "sign_up_source")
    .withColumnRenamed("state", "state")
)

# Show the transformed DataFrame
#df_transformed.show(truncate=False)

display(df_transformed)



# Save DataFrame as a Delta table
#df_transformed.write.format("delta").mode("overwrite").saveAsTable("users_table")


In [0]:
%sql
CREATE TABLE users_table (
    id STRING,
    active BOOLEAN,
    created_date TIMESTAMP,
    last_login TIMESTAMP,
    role STRING,
    sign_up_source STRING,
    state STRING
)
USING DELTA;


In [0]:
from pyspark.sql.functions import col, from_unixtime, explode

# Load JSON file (Update the file path accordingly)

df = spark.read.json(f'abfss://recepientjsonfile@kartheekuntdl.dfs.core.windows.net/receipts.json')

# Flatten and transform the DataFrame
df_transformed = (
    df
    # Extract _id.$oid as id
    .withColumn("id", col("_id.$oid")).drop("_id")

    # Convert $date timestamps to readable format
    .withColumn("created_date", from_unixtime(col("createDate.$date") / 1000))
    .withColumn("date_scanned", from_unixtime(col("dateScanned.$date") / 1000))
    .withColumn("finished_date", from_unixtime(col("finishedDate.$date") / 1000))
    .withColumn("modify_date", from_unixtime(col("modifyDate.$date") / 1000))
    .withColumn("points_awarded_date", from_unixtime(col("pointsAwardedDate.$date") / 1000))
    .withColumn("purchase_date", from_unixtime(col("purchaseDate.$date") / 1000))
    .drop("createDate", "dateScanned", "finishedDate", "modifyDate", "pointsAwardedDate", "purchaseDate")

    # Rename other columns as required
    .withColumnRenamed("bonusPointsEarned", "bonus_points_earned")
    .withColumnRenamed("bonusPointsEarnedReason", "bonus_points_reason")
    .withColumnRenamed("pointsEarned", "points_earned")
    .withColumnRenamed("purchasedItemCount", "purchased_item_count")
    .withColumnRenamed("rewardsReceiptStatus", "receipt_status")
    .withColumnRenamed("totalSpent", "total_spent")
    .withColumnRenamed("userId", "user_id")
)

# Explode the rewardsReceiptItemList to create one row per item
df_items = df_transformed.withColumn("items", explode(col("rewardsReceiptItemList"))).drop("rewardsReceiptItemList")

# Extract item details from the nested structure
df_final = df_items.select(
    col("id"),
    col("created_date"),
    col("date_scanned"),
    col("finished_date"),
    col("modify_date"),
    col("points_awarded_date"),
    col("purchase_date"),
    col("bonus_points_earned"),
    col("bonus_points_reason"),
    col("points_earned"),
    col("purchased_item_count"),
    col("receipt_status"),
    col("total_spent"),
    col("user_id"),
    col("items.barcode").alias("item_barcode"),
    col("items.description").alias("item_description"),
    col("items.finalPrice").alias("item_final_price"),
    col("items.itemPrice").alias("item_price"),
    col("items.quantityPurchased").alias("item_quantity_purchased"),
    col("items.userFlaggedBarcode").alias("item_user_flagged_barcode"),
    col("items.userFlaggedPrice").alias("item_user_flagged_price"),
    col("items.userFlaggedQuantity").alias("item_user_flagged_quantity")
)

# Display the transformed DataFrame
display(df_final)

# Save DataFrame as a Delta table
df_final.write.format("delta").mode("overwrite").saveAsTable("receipts_table")


In [0]:
%sql
Select * from receipts_table

## Second: Write queries that directly answer predetermined questions from a business stakeholder

In [0]:
%sql
--from the given data in json we are seeing only FLAGGED, FINISHED, REJECTED, PENDING
select distinct receipt_status from receipts_table

In [0]:
%sql

--5.When considering total number of items purchased from receipts with 'rewardsReceiptStatus’ of ‘Accepted’ or ‘Rejected’, which is greater?
with cte as(
SELECT receipt_Status, SUM(purchased_item_count) AS total_items_purchased
FROM receipts_table
WHERE receipt_Status IN ('ACCEPTED', 'REJECTED')
GROUP BY receipt_Status)

--In order to get the greater value we need to get the max value
select * from cte where total_items_purchased == (select max(total_items_purchased) from cte)





In [0]:
%sql
--6.When considering average spend from receipts with 'rewardsReceiptStatus’ of ‘Accepted’ or ‘Rejected’, which is greater?
with cte as (
SELECT receipt_Status, avg(total_spent) AS avg_spent
FROM receipts_table
WHERE receipt_Status IN ('ACCEPTED', 'REJECTED')
GROUP BY receipt_Status)
--In order to get the greater value we need to get the max value
select * from cte where avg_spent == (select max(avg_spent) from cte)


In [0]:
%sql
--1.What are the top 5 brands by receipts scanned for most recent month?
--Here, I dont see any relation between the brands table and receipts table from the files given. I need more information about how these tables are related. The information is incomplete
--if i assume barcode in receipt table and barcode in brand table are same then we can get the top 5 brands by receipts scanned for most recent month
--the most recent month of date scanned in receipts table is 03 and year is 2021
--if you are asking for the top 5 brands by receipts scanned for most recent month then we dont have any records with 03 and year is 2021
--Because if i join brands table and receipt table on barcode, i will be having 89 records with 2021 and 01 as latest year and month



-- Finding the most recent year and month from receipts table
WITH recent_date AS (
    SELECT 
        MAX(YEAR(date_scanned)) AS recent_year,
        MAX(MONTH(date_scanned)) AS recent_month
    FROM receipts_table
)

-- Get the top 5 brands by receipts scanned for the most recent month
SELECT 
    b.brand_name, 
    COUNT(r.id) AS receipts_count
FROM 
    receipts_table r
INNER JOIN 
    brands_table b 
ON 
    r.item_barcode = b.barcode
INNER JOIN 
    recent_date rd
ON 
    YEAR(r.date_scanned) = rd.recent_year 
    AND MONTH(r.date_scanned) = rd.recent_month
GROUP BY 
    b.brand_name
ORDER BY 
    receipts_count DESC
LIMIT 5


##Third: Evaluate Data Quality Issues in the Data Provided

In [0]:
%sql
--users table
--1.if we assume id is unique then we can should no have any duplicates for the users
--but we are seeing duplicate records for user id and below query can give you those id's and i could see the duplicate records for these id's example id: '5fc961c3b8cfca11a077dd33'

--we can perform this query on primary key making sure no duplicate values
Select id,count(*) 
from users_table 
group by id 
having count(*) > 1
order by count(*) desc;

--checking whether primary key values having null values
select * from brands_table where id is null;

--here we are having null values for the item_barcode which should not be as we exepcting barcode for every receipt 
SELECT DISTINCT id FROM receipts_table WHERE item_barcode IS NULL;

--Chceking for any negative values , here i am checking total_spent and purchased_item_count from receipts table , further we can check with any other columns as required with below query
SELECT 
    COUNT(*) AS negative_prices 
FROM receipts_table 
WHERE total_spent < 0 OR purchased_item_count < 0

--checking for the year of date columns making sure it is always less than current_date()
SELECT COUNT(*) AS future_dates 
FROM users_table 
WHERE year(created_date) > year(current_date())

--we have milliseconds (Unix epoch time) in the date columns and i have created the tables with correct date format using below 
-- ("created_date", from_unixtime(col("createDate.$date") / 1000))



##Fourth: Communicate with Stakeholders 


Hello Team,

We've been analyzing our data quality and have identified several issues that may impact analytics and decision-making.


1. Duplicate User IDs 🔄

   Some user IDs appear multiple times, which could lead to incorrect user analytics (I am assuming ID's as primary key)

2. Missing & Null Data
  
  Some barcodes are missing from receipts, making it difficult to track item purchases accurately.
  
3. Relation between tables

here i dont see any any relation between the brands and receipts table as per the json files given. 
I would like to have more clarity on this tables dependancies.

Also, I am doing more quality checks on the data provdied against tables.

Confirm expected relationships between brands, receipts, and users.
Fix duplicate user IDs and ensure unique constraints.
Investigate missing barcodes—should these be mandatory?
Standardize timestamps to prevent future-date issues.

Would love your input on which issues are most critical to the business so we can prioritize our fixes.
I am happy to discuss more on this through call

Thanks & Regards
Kartheek Gurijala