#Scenario 1: Filter Events Based on Business Logic

#insert Data in Volume

In [0]:
%sql
USE CATALOG company;
USE SCHEMA unit;

CREATE VOLUME IF NOT EXISTS sales_input_today
COMMENT 'Volume to store todays sales data: 2025-06-27';

In [0]:
from datetime import datetime

today = datetime.today().strftime("%Y-%m-%d")

transactions = [
    {"transaction_id": 2001, "customer_id": 1, "product_id": 101, "quantity": 2},
    {"transaction_id": 2002, "customer_id": 2, "product_id": 102, "quantity": 1},
    {"transaction_id": 2003, "customer_id": 3, "product_id": 103, "quantity": 5},
    {"transaction_id": 2004, "customer_id": 1, "product_id": 104, "quantity": 3},
    {"transaction_id": 2005, "customer_id": 2, "product_id": 101, "quantity": 4},
    {"transaction_id": 2006, "customer_id": 3, "product_id": 102, "quantity": 2},
    {"transaction_id": 2007, "customer_id": 1, "product_id": 103, "quantity": 1},
    {"transaction_id": 2008, "customer_id": 2, "product_id": 104, "quantity": 2},
    {"transaction_id": 2009, "customer_id": 3, "product_id": 101, "quantity": 6},
    {"transaction_id": 2010, "customer_id": 1, "product_id": 102, "quantity": 3},
]

for txn in transactions:
    content = f"""{{
  "transaction_time": "{today}T10:00:00",
  "transaction_id": {txn["transaction_id"]},
  "customer_id": {txn["customer_id"]},
  "product_id": {txn["product_id"]},
  "quantity": {txn["quantity"]}
}}"""
    path = f"/Volumes/company/unit/sales_input_today/txn_{txn['transaction_id']}.json"
    dbutils.fs.put(path, content, overwrite=True)


#Create Static Demographics Data

In [0]:
%python
from pyspark.sql import Row

# Static dimension data
user_data = [
    Row(user_id=1, age=25, location="India"),
    Row(user_id=2, age=40, location="Germany"),
    Row(user_id=3, age=31, location="India")
]

user_df = spark.createDataFrame(user_data)

#Read Streaming Sales Data from Volume

In [0]:

from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType
from pyspark.sql.functions import col

# Define schema
sales_schema = StructType([
    StructField("transaction_time", TimestampType()),
    StructField("transaction_id", IntegerType()),
    StructField("customer_id", IntegerType()),
    StructField("product_id", IntegerType()),
    StructField("quantity", IntegerType())
])

# Read from Unity Catalog volume (JSON)
sales_stream_df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .schema(sales_schema) \
    .load("/Volumes/company/unit/sales_input_today/")


#Debug and Validate Data in Volume

In [0]:
sales_stream_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .option("checkpointLocation", "/Volumes/company/unit/chk/debug_sales_input_today") \
    .toTable("company.unit.sales_input_today")

In [0]:
%sql

select * from company.unit.sales_input_today

#Add Product Price Info (Simulated Static Table)

In [0]:
from pyspark.sql import Row

# Static product prices
product_data = [
    Row(product_id=101, price=500.0),
    Row(product_id=102, price=600.0),
    Row(product_id=103, price=200.0),
    Row(product_id=104, price=300.0)
]

product_df = spark.createDataFrame(product_data)


#Enrich Stream with Price and Demographics

In [0]:
# Enrich sales with product price
sales_enriched_product = sales_stream_df.join(product_df, "product_id", "left") \
    .withColumn("spend", col("quantity") * col("price"))

# Add user demographics (assuming customer_id == user_id)
sales_enriched_prod_user = sales_enriched_product.join(user_df, sales_enriched_product.customer_id == user_df.user_id, "left")


In [0]:
%sql

drop table company.unit.sales_enriched_prod_user

In [0]:
sales_enriched_prod_user.writeStream \
    .format("delta") \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .option("checkpointLocation", "/Volumes/company/unit/chk/sales_enriched_prod_user") \
    .toTable("company.unit.sales_enriched_prod_user")

In [0]:
%sql

select * from company.unit.sales_enriched_prod_user

#Apply Business Filters: Spend > 1000 or Quantity > 3, and Young Indian Users

In [0]:
# Business logic filters
flagged_sales = sales_enriched_prod_user.filter(
    (
        (col("spend") > 1000) | (col("quantity") > 3)
    ) & 
    (col("location") == "India")
)


In [0]:
flagged_sales.writeStream \
    .format("delta") \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .option("checkpointLocation", "/Volumes/company/unit/chk/flagged_transactions") \
    .toTable("company.unit.flagged_transactions1")


In [0]:
%sql

select * from company.unit.flagged_transactions1

# Aggregation by Category and Country in 10-Minute Windows

In [0]:
dbutils.fs.put("/Volumes/company/unit/sales_input_volume/debug_transaction.json", """
{
  "transaction_time": "2025-06-28T10:00:00",
  "transaction_id": 1006,
  "customer_id": 1,
  "product_id": 101,
  "quantity": 2
}
""", overwrite=True)

In [0]:
sales_enriched_prod_user.writeStream \
    .format("delta") \
    .outputMode("append") \
    .trigger(availableNow=True) \
    .option("checkpointLocation", "/Volumes/company/unit/chk/sales_enriched_prod_user_debug11") \
    .toTable("company.unit.sales_enriched_prod_user_debug")

In [0]:
%sql

drop table company.unit.sales_enriched_prod_user_debug

In [0]:
%sql

select * from company.unit.sales_enriched_prod_user_debug

In [0]:
from pyspark.sql.functions import window, count

category_country_agg = sales_enriched_prod_user \
    .groupBy(
        col("location")
    ) \
    .agg(
        count("transaction_id").alias("transaction_count")
    )


In [0]:
from pyspark.sql.functions import window, count

category_country_agg = sales_enriched_prod_user \
    .withWatermark("transaction_time", "10 minutes") \
    .groupBy(
        window("transaction_time", "10 minutes"),
        col("location")
    ) \
    .agg(
        count("transaction_id").alias("transaction_count")
    )


In [0]:
category_country_agg.writeStream \
    .format("delta") \
    .outputMode("Complete") \
    .trigger(availableNow=True) \
    .option("checkpointLocation", "/Volumes/company/unit/chk/category_country_windowed11111") \
    .toTable("company.unit.category_country_event_count")


In [0]:
%sql

drop table company.unit.category_country_event_count1

In [0]:
%sql

select * from company.unit.category_country_event_count

#Debug

In [0]:
files = dbutils.fs.ls("/Volumes/company/unit/sales_input_volume/")
display(files)

In [0]:
%sql

SELECT * FROM company.unit.debug_sales_landing;

In [0]:
test = sales_stream_df \
    .join(product_df, "product_id", "left") \
    .join(user_df, sales_stream_df.customer_id == user_df.user_id, "left")

test.select("transaction_id", "product_id", "customer_id", "location").show()