In [0]:
spark

In [0]:
#Load the data
from pyspark.sql.functions import col
subscriptions=spark.read.option("header", True).option("inferSchema", True).csv("file:/Workspace/Shared/subscriptions.csv")
activity=spark.read.option("header", True).option("inferSchema", True).csv("file:/Workspace/Shared/user_activity.csv")

subscriptions.printSchema()
activity.printSchema()

subscriptions=subscriptions.withColumn("StartDate", col("StartDate").cast("date")) \
                             .withColumn("EndDate", col("EndDate").cast("date")) \
                             .withColumn("PriceUSD", col("PriceUSD").cast("double"))

activity=activity.withColumn("EventTime", col("EventTime").cast("timestamp"))

subscriptions.createOrReplaceTempView("subscriptions")
activity.createOrReplaceTempView("activity")
     

root
 |-- SubscriptionID: string (nullable = true)
 |-- UserID: string (nullable = true)
 |-- PlanType: string (nullable = true)
 |-- StartDate: date (nullable = true)
 |-- EndDate: date (nullable = true)
 |-- PriceUSD: integer (nullable = true)
 |-- IsActive: boolean (nullable = true)
 |-- AutoRenew: boolean (nullable = true)

root
 |-- UserID: string (nullable = true)
 |-- EventTime: string (nullable = true)
 |-- EventType: string (nullable = true)
 |-- FeatureUsed: string (nullable = true)



In [0]:
from pyspark.sql.functions import datediff, count, col
# Calculate:
# active_days = EndDate - StartDate
subscriptions_with_days = subscriptions.withColumn(
    "active_days", datediff("EndDate", "StartDate")
)
subscriptions_with_days.show()

# events_per_user = count(EventType) grouped by UserID
events_per_user = activity.groupBy("UserID").agg(
    count("EventType").alias("events_per_user")
)
events_per_user.show()

# Create a score: engagement_score = (events_per_user / active_days) * PriceUSD
combined_df = subscriptions_with_days.join(events_per_user, on="UserID", how="inner")

combined_df = combined_df.withColumn(
    "engagement_score", (col("events_per_user") / col("active_days")) * col("PriceUSD")
)
combined_df.select("UserID", "PlanType", "PriceUSD", "active_days", "events_per_user", "engagement_score").show()

+--------------+------+--------+----------+----------+--------+--------+---------+-----------+
|SubscriptionID|UserID|PlanType| StartDate|   EndDate|PriceUSD|IsActive|AutoRenew|active_days|
+--------------+------+--------+----------+----------+--------+--------+---------+-----------+
|        SUB001|  U001|   Basic|2024-01-01|2024-04-01|    30.0|    true|     true|         91|
|        SUB002|  U002|     Pro|2024-02-15|2024-05-15|    90.0|    true|    false|         90|
|        SUB003|  U003|     Pro|2024-03-10|2024-06-10|    90.0|   false|    false|         92|
|        SUB004|  U001| Premium|2024-04-05|2024-07-05|   120.0|    true|     true|         91|
|        SUB005|  U004|   Basic|2024-01-20|2024-04-20|    30.0|   false|    false|         91|
+--------------+------+--------+----------+----------+--------+--------+---------+-----------+

+------+---------------+
|UserID|events_per_user|
+------+---------------+
|  U004|              1|
|  U002|              1|
|  U003|           

In [0]:
# B. Anomaly Detection via SQL
# Identify users with:
# Subscription inactive but recent activity
#Inactive but recently active
print("Inactive but recently active:")
spark.sql("""
SELECT s.UserID, s.IsActive, MAX(a.EventTime) AS LastEvent
FROM subscriptions s
LEFT JOIN activity a ON s.UserID = a.UserID
GROUP BY s.UserID, s.IsActive
HAVING s.IsActive = false AND LastEvent > current_date() - INTERVAL 7 DAYS
""").show()

#AutoRenew true but no activity in last 30 days
print("AutoRenew true but no activity in last 30 days:")
spark.sql("""
SELECT s.UserID, MAX(a.EventTime) AS LastEvent
FROM subscriptions s
LEFT JOIN activity a ON s.UserID = a.UserID
WHERE s.AutoRenew = true
GROUP BY s.UserID
HAVING LastEvent < current_date() - INTERVAL 30 DAYS OR LastEvent IS NULL
""").show()

Inactive but recently active:
+------+--------+---------+
|UserID|IsActive|LastEvent|
+------+--------+---------+
+------+--------+---------+

AutoRenew true but no activity in last 30 days:
+------+---------+
|UserID|LastEvent|
+------+---------+
|  U001|     NULL|
+------+---------+



In [0]:
# C. Delta Lake + Merge Simulation
# Imagine a billing fix needs to be applied:
# For all Pro plans in March, increase price by $5 retroactively.
# Use MERGE INTO on Delta table to apply the change.
subscriptions.write.format("delta").mode("overwrite").save("file:/Workspace/Shared/subscriptions")
spark.sql("""
MERGE INTO delta.`file:/Workspace/Shared/subscriptions` target
USING (
  SELECT * FROM delta.`file:/Workspace/Shared/subscriptions` 
  WHERE PlanType = 'Pro' AND month(StartDate) = 3
) src
ON target.SubscriptionID = src.SubscriptionID
WHEN MATCHED THEN
  UPDATE SET target.PriceUSD = target.PriceUSD + 5
""").show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                1|               1|               0|                0|
+-----------------+----------------+----------------+-----------------+



In [0]:
# D. Time Travel Debugging
# Show describe history of the table before and after the billing fix.
# Query using VERSION AS OF to prove the issue existed.
spark.sql("DESCRIBE HISTORY delta.`file:/Workspace/Shared/subscriptions`").show()

old=spark.read.format("delta").option("versionAsOf", 0).load("file:/Workspace/Shared/subscriptions")
old.filter(col("PlanType") == "Pro").show()

+-------+--------------------+----------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|          userId|            userName|operation| operationParameters| job|          notebook|           clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+----------------+--------------------+---------+--------------------+----+------------------+--------------------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|      1|2025-06-16 11:29:...|3955481281677681|azuser3561_mml.lo...|    MERGE|{predicate -> ["(...|NULL|{2045471936929044}|0611-041854-oedbfkos|          0|WriteSerializable|        false|{numTargetRowsCop...|        NULL|Databricks-Runtim...|
|      0|2025-06-16 11:2

In [0]:
# E. Build Tier Migration Table
# Identify users who upgraded:
# From Basic → Pro → Premium
# Use PySpark with lag() function to model this.

from pyspark.sql.window import Window
from pyspark.sql.functions import lag
w = Window.partitionBy("UserID").orderBy("StartDate")
migration = subscriptions.withColumn("PrevPlan", lag("PlanType").over(w))
migration.filter(
    (col("PrevPlan") == "Basic") & (col("PlanType") == "Pro")
).show()
     

+--------------+------+--------+---------+-------+--------+--------+---------+--------+
|SubscriptionID|UserID|PlanType|StartDate|EndDate|PriceUSD|IsActive|AutoRenew|PrevPlan|
+--------------+------+--------+---------+-------+--------+--------+---------+--------+
+--------------+------+--------+---------+-------+--------+--------+---------+--------+



In [0]:
# F. Power Users Detection
# Define a power user as:
# Used ≥ 2 features
# Logged in ≥ 3 times
# Create a separate Delta table power_users

from pyspark.sql.functions import countDistinct, col
feature = activity.groupBy("UserID").agg(countDistinct("FeatureUsed").alias("feature_count"))
feature.show()
login = activity.filter(col("EventType") == "login").groupBy("UserID").count().withColumnRenamed("count", "login_count")
login.show()
power_users = feature.join(login, "UserID") \
    .filter((col("feature_count") >= 2) & (col("login_count") >= 3))
power_users.write.format("delta").mode("overwrite").save("file:/Workspace/Shared/power_users")

+------+-------------+
|UserID|feature_count|
+------+-------------+
|  U004|            1|
|  U002|            1|
|  U003|            1|
|  U001|            1|
+------+-------------+

+------+-----------+
|UserID|login_count|
+------+-----------+
|  U004|          1|
|  U001|          1|
+------+-----------+



In [0]:
# G. Session Replay View
# Build a user session trace table using:
# Window.partitionBy("UserID").orderBy("EventTime")
# Show how long each user spent between login and logout events.
from pyspark.sql.functions import lag, unix_timestamp, when
window=Window.partitionBy("UserID").orderBy("EventTime")
sessions=activity.withColumn("PrevEvent", lag("EventType").over(window)) \
                   .withColumn("PrevTime", lag("EventTime").over(window)) \
                   .withColumn("SessionDuration", 
                        when((col("PrevEvent") == "login") & (col("EventType") == "logout"),
                             unix_timestamp("EventTime") - unix_timestamp("PrevTime"))
                    )
print("Sessions:")
sessions.select("UserID", "EventTime", "EventType", "SessionDuration").show()

Sessions:
+------+---------+---------+---------------+
|UserID|EventTime|EventType|SessionDuration|
+------+---------+---------+---------------+
|  U001|     NULL|    login|           NULL|
|  U001|     NULL|   logout|           NULL|
|  U002|     NULL|   upload|           NULL|
|  U003|     NULL| download|           NULL|
|  U004|     NULL|    login|           NULL|
+------+---------+---------+---------------+

