In [81]:
from pyspark.sql import SparkSession, Row, DataFrame
from pyspark.sql.types import (
    DateType, StructType, StructField, StringType, IntegerType, BooleanType, DoubleType, MapType
)
from pyspark.sql.window import Window as W
from pyspark.sql import functions as F

import glob
import os,shutil
from datetime import datetime
from functools import reduce

os.environ["HADOOP_HOME"] = "C:\\hadoop"
os.environ["PATH"] += ";C:\\hadoop\\bin"

from delta import configure_spark_with_delta_pip

from typing import Union, List
from delta.tables import DeltaTable

In [82]:
builder = (
    SparkSession.builder.appName("RevenueCatLTVApp")
    .master("local[*]")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [83]:
# Path to your file
file_path = "C:/Users/Ronald/Downloads/full_events.csv.gz"

# Load CSV with headers and correct compression
raw_df = spark.read.csv(
    path=file_path,
    header=True,
    inferSchema=False,
    multiLine=True,   # important since event_data contains JSON with commas
    escape='"',       # handle quotes inside JSON
)

# Show a few rows
raw_df.show(truncate=False)

# Print schema
raw_df.printSchema()

+------------------------------------+----------------+------------------------------------+---------------------------------------+--------+-------+---------------------------------------------------------------------------------+
|user_id                             |event_type      |event_date                          |tx_id                                  |platform|country|event_data                                                                       |
+------------------------------------+----------------+------------------------------------+---------------------------------------+--------+-------+---------------------------------------------------------------------------------+
|17db231c-b614-4741-aa3d-63bb573704ce|initial_purchase|2025-01-21 12:06:21                 |tx-fd4f3a96-6a34-4101-8a5d-b9ccf6a72ea7|iOS     |FR     |{"plan": "basic_weekly", "phone_number": "+12012239534"}                         |
|17db231c-b614-4741-aa3d-63bb573704ce|price_info      |2025-01-21 12:06:

In [84]:
raw_df.count()

1125379

In [85]:
##CLEAN RAW DATA AND FLATTEN IMPORTANT VALUES

# --- 2) Clean / normalize columns ---

# Some rows have concatenated timestamps like "2025-01-21 12:06:2112:06:2102:00:00Z".
# Extract the first proper "YYYY-MM-DD HH:MM:SS" and cast to timestamp.
clean_ts = F.to_timestamp(
    F.regexp_extract(F.col("event_date"), r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}", 0),
    "yyyy-MM-dd HH:mm:ss"
)

# Normalize platform (some rows use "IOS" vs "iOS")
norm_platform = F.when(F.lower("platform") == F.lit("ios"), F.lit("iOS")) \
                 .when(F.lower("platform") == F.lit("android"), F.lit("Android")) \
                 .otherwise(F.col("platform"))

# Convert literal "null" tx_id to actual null
norm_tx_id = F.when(F.lower(F.col("tx_id")) == F.lit("null"), F.lit(None)).otherwise(F.col("tx_id"))

df = raw_df.select(
    "user_id",
    "event_type",
    clean_ts.alias("event_ts"),
    norm_tx_id.alias("tx_id"),
    norm_platform.alias("platform"),
    "country",
    "event_data"
)

# -------------------------
# 4) Define schema for event_data
# -------------------------
event_data_schema = StructType([
    StructField("plan", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("attempt", IntegerType(), True),
    StructField("success", BooleanType(), True),
    StructField("refunded_pct", IntegerType(), True),
    StructField("type", StringType(), True),
    StructField("effective_date", StringType(), True),  # can cast to date later if needed
])

# -------------------------
# 5) Parse event_data JSON
# -------------------------
parsed = F.from_json(F.col("event_data"), event_data_schema)

df_parsed = df.withColumn("edata", parsed)

# -------------------------
# 6) Flatten fields
# -------------------------
flat_df = (
    df_parsed
    .withColumn("plan", F.col("edata.plan"))
    .withColumn("price", F.col("edata.price"))
    .withColumn("attempt", F.col("edata.attempt"))
    .withColumn("success", F.col("edata.success"))
    .withColumn("refunded_pct", F.col("edata.refunded_pct"))
    .withColumn("event_type_name", F.col("edata.type"))          # disambiguate from Spark's type()
    .withColumn("effective_date", F.to_date("edata.effective_date"))
    .drop("edata")
)

# -------------------------
# 7) Add handy derived date
# -------------------------
flat_df = flat_df.withColumn("event_date", F.to_date("event_ts"))

# -------------------------
# 8) Inspect
# -------------------------
flat_df.printSchema()
flat_df.show(truncate=False, n=5)

root
 |-- user_id: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_ts: timestamp (nullable = true)
 |-- tx_id: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- country: string (nullable = true)
 |-- event_data: string (nullable = true)
 |-- plan: string (nullable = true)
 |-- price: double (nullable = true)
 |-- attempt: integer (nullable = true)
 |-- success: boolean (nullable = true)
 |-- refunded_pct: integer (nullable = true)
 |-- event_type_name: string (nullable = true)
 |-- effective_date: date (nullable = true)
 |-- event_date: date (nullable = true)

+------------------------------------+----------------+-------------------+---------------------------------------+--------+-------+---------------------------------------------------------------------------------+------------+-----+-------+-------+------------+---------------+--------------+----------+
|user_id                             |event_type      |event_ts           |tx_

In [86]:
##CALCULATE INITIAL PURCHASE DATA AND COHORTED ATTRIBUTES

# -------------------------
# 1) Filter only initial_purchase events
# -------------------------
initial_df = flat_df.filter(F.col("event_type") == "initial_purchase") \
    .select(
        F.col("user_id").alias("UserID"),
        F.col("plan").alias("Plan"),
        F.col("country").alias("Country"),
        F.col("platform").alias("Platform"),
        F.col("tx_id").alias("InitialTxnID"),
        F.to_date("event_ts").alias("InitialPurchaseDate"),
        F.date_format("event_ts", "yyyy-MM").alias("InitialPurchaseMonth")
    )

# -------------------------
# 2) Get price_info events for join
# -------------------------
price_info_df = flat_df.filter(F.col("event_type") == "price_info") \
    .select(
        "user_id",
        "tx_id",
        "event_ts",
        F.col("price").cast("decimal(10,2)").alias("price")
    )

# -------------------------
# 3) Join initial purchases to price_info on user_id + tx_id
# -------------------------
initial_with_price_df = (
    initial_df.alias("i")
    .join(
        price_info_df.alias("p"),
        (F.col("i.UserID") == F.col("p.user_id")) & 
        (F.col("i.InitialTxnID") == F.col("p.tx_id")),
        "left"
    )
    .drop("user_id", "tx_id")  # drop duplicate join keys
    .withColumnRenamed("price", "InitialPurchaseAmount")
)

# -------------------------
# 4) Inspect
# -------------------------
initial_with_price_df.show()
initial_with_price_df.printSchema()

+--------------------+------------+-------+--------+--------------------+-------------------+--------------------+-------------------+---------------------+
|              UserID|        Plan|Country|Platform|        InitialTxnID|InitialPurchaseDate|InitialPurchaseMonth|           event_ts|InitialPurchaseAmount|
+--------------------+------------+-------+--------+--------------------+-------------------+--------------------+-------------------+---------------------+
|a760a69e-029a-42f...|basic_weekly|    GBR| Android|tx-bd076894-2a0b-...|         2024-07-11|             2024-07|2024-07-11 07:41:17|                 1.99|
|a153c93f-ccf2-420...|basic_weekly|     GB|     iOS|tx-2f36412a-24f4-...|         2025-06-14|             2025-06|2025-06-14 01:53:33|                 1.99|
|e0f09b5e-b923-45b...| monthly_pro|     US|     iOS|tx-9830136f-5815-...|         2024-09-05|             2024-09|2024-09-05 09:56:37|                10.00|
|5589e996-d4dc-4e8...|basic_weekly|     US|     iOS|tx-fa2

In [87]:
summary_df = initial_with_price_df.agg(
    F.count("*").alias("total_rows"),
    F.countDistinct("UserID").alias("distinct_initial_UserID")
)

summary_df.show()

+----------+-----------------------+
|total_rows|distinct_initial_UserID|
+----------+-----------------------+
|     25000|                  25000|
+----------+-----------------------+



In [88]:
initial_purchase_count = (
    raw_df.filter(F.col("event_type") == "initial_purchase")
          .agg(F.count("*").alias("initial_purchase_events"))
)

initial_purchase_count.show()

+-----------------------+
|initial_purchase_events|
+-----------------------+
|                  25000|
+-----------------------+



In [89]:
initial_with_price_df.show()

+--------------------+------------+-------+--------+--------------------+-------------------+--------------------+-------------------+---------------------+
|              UserID|        Plan|Country|Platform|        InitialTxnID|InitialPurchaseDate|InitialPurchaseMonth|           event_ts|InitialPurchaseAmount|
+--------------------+------------+-------+--------+--------------------+-------------------+--------------------+-------------------+---------------------+
|a760a69e-029a-42f...|basic_weekly|    GBR| Android|tx-bd076894-2a0b-...|         2024-07-11|             2024-07|2024-07-11 07:41:17|                 1.99|
|a153c93f-ccf2-420...|basic_weekly|     GB|     iOS|tx-2f36412a-24f4-...|         2025-06-14|             2025-06|2025-06-14 01:53:33|                 1.99|
|e0f09b5e-b923-45b...| monthly_pro|     US|     iOS|tx-9830136f-5815-...|         2024-09-05|             2024-09|2024-09-05 09:56:37|                10.00|
|5589e996-d4dc-4e8...|basic_weekly|     US|     iOS|tx-fa2

In [92]:
##CA

# -------------------------
# 1) Filter renewals
# -------------------------
renewals_df = flat_df.filter(F.col("event_type") == "renewal")

# -------------------------
# 2) Split success vs fail
# -------------------------
success_df = renewals_df.filter(F.col("success") == True) \
    .select("user_id", "tx_id")

fail_df = renewals_df.filter(F.col("success") == False) \
    .select("user_id", "tx_id")

# -------------------------
# 3) Join successful renewals to price_info for amounts
# -------------------------
## Use price_info_df from above 

success_with_price = (
    success_df.alias("s")
    .join(
        price_info_df.alias("p"),
        (F.col("s.user_id") == F.col("p.user_id")) &
        (F.col("s.tx_id") == F.col("p.tx_id")),
        "left"
    )
    .select(F.col("s.user_id"), F.col("s.tx_id"), F.col("p.price"))
)

# -------------------------
# 4) Aggregate per user
# -------------------------
agg_df = (
    renewals_df.groupBy("user_id")
    .agg(
        F.sum(F.when(F.col("success") == True, 1).otherwise(0)).alias("SuccessfulRenewalCount"),
        F.sum(F.when(F.col("success") == False, 1).otherwise(0)).alias("FailedRenewalCount")
    )
)

# Add amounts
amounts_df = (
    success_with_price.groupBy("user_id")
    .agg(
        F.sum("price").alias("TotalSuccessfulRenewalAmount"),
        F.avg("price").alias("AverageSuccessfulRenewalAmount")
    )
)

# -------------------------
# 5) Combine counts + amounts
# -------------------------
renewal_summary_df = (
    agg_df.join(amounts_df, on="user_id", how="left")
    .fillna({"TotalSuccessfulRenewalAmount": 0, "AverageSuccessfulRenewalAmount": 0})
)

In [96]:
renewal_summary_df.show()
renewal_summary_df.printSchema()

+--------------------+----------------------+------------------+----------------------------+------------------------------+
|             user_id|SuccessfulRenewalCount|FailedRenewalCount|TotalSuccessfulRenewalAmount|AverageSuccessfulRenewalAmount|
+--------------------+----------------------+------------------+----------------------------+------------------------------+
|8d112e06-9b63-438...|                     9|                 2|                       90.00|                     10.000000|
|bf8f6f74-3952-4d1...|                    39|                18|                       77.61|                      1.990000|
|7f2ae295-14dc-491...|                     7|                 5|                       13.93|                      1.990000|
|2fdd60f4-3b89-4c8...|                     5|                 5|                        9.95|                      1.990000|
|bc8d6fbc-fcf9-430...|                    43|                28|                       85.57|                      1.990000|


In [99]:
# 1) Refund events (event_type == 'refund')
refund_events = (
    flat_df.filter(F.col("event_type") == "refund")
           .select(
               F.col("user_id"),
               F.col("event_ts").alias("refund_ts"),
               F.col("refunded_pct").cast("decimal(7,4)").alias("refunded_pct")
           )
)

# 2) Use initial_with_price as-is (no dedupe)
initial_base = (
    initial_with_price
      .select(
          F.col("UserID").alias("user_id"),
          F.col("InitialPurchaseAmount").cast("decimal(20,2)").alias("initial_amount")
      )
)

# 3) Join refunds to initial amount and compute refund $
refunds_priced = (
    refund_events.alias("r")
      .join(initial_base.alias("i"), on="user_id", how="left")
      .withColumn(
          "RefundAmount",
          F.bround(F.col("i.initial_amount") * (F.col("r.refunded_pct") / F.lit(100)), 2)
            .cast("decimal(20,2)")
      )
)

# 4) Aggregate per user
refunds_df = (
    refunds_priced.groupBy("user_id")
      .agg(
          F.count("*").alias("RefundCount"),
          F.sum("RefundAmount").cast("decimal(20,2)").alias("TotalRefundAmount")
      )
)


In [100]:
refunds_df.show(truncate=False)
refunds_df.printSchema()

+------------------------------------+-----------+-----------------+
|user_id                             |RefundCount|TotalRefundAmount|
+------------------------------------+-----------+-----------------+
|3b968431-66cf-468c-a7b6-b2bbb326e031|1          |1.00             |
|9a57791c-5aae-4893-8c13-26fbf81ef500|1          |0.28             |
|de3378c3-44ae-4f5d-8cc5-2e66b2805675|1          |0.38             |
|89f22e8c-9677-419c-becf-65aeb9e9b5af|1          |1.07             |
|cbc2d108-40c4-4428-92b6-40778ba496ea|1          |1.65             |
|0da9e5f6-abde-40e8-8c5e-95b248a21314|1          |4.50             |
|7e019bdc-b209-414c-af21-66fb28c5b1ce|1          |1.53             |
|917a0347-86b2-43a0-bd65-351452b54b16|1          |1.25             |
|d295e98c-ff16-4e92-b896-904f647c63ed|1          |0.20             |
|d575768a-3d9b-4f12-b21b-1f69be829064|1          |0.84             |
|5c9df208-1898-46bc-aadb-233f01520f98|1          |1.37             |
|969a33d6-3593-43dc-8490-3ae6e21e5

In [106]:
# Normalize join keys to "UserID"
ren_df = (
    renewal_summary_df
    .select(
        F.col("user_id").alias("UserID"),
        "SuccessfulRenewalCount",
        "FailedRenewalCount",
        F.col("TotalSuccessfulRenewalAmount").alias("TotalSuccessfulRenewalAmount")
        # If you have an average column you don't need, you can drop it here
        # .drop("AverageSuccessfulRenewalAmount")
    )
)

ref_df = (
    refunds_df
    .select(
        F.col("user_id").alias("UserID"),
        "RefundCount",
        "TotalRefundAmount"
    )
)

# Merge via LEFT JOIN on UserID
merged_df = (
    initial_with_price_df
    .join(ren_df, on="UserID", how="left")
    .join(ref_df, on="UserID", how="left")
    .fillna({
        "SuccessfulRenewalCount": 0,
        "FailedRenewalCount": 0,
        "TotalSuccessfulRenewalAmount": 0,
        "RefundCount": 0,
        "TotalRefundAmount": 0
    })
)

# Final selection
final_df = merged_df.select(
    "UserID",
    "Plan",
    "Country",
    "Platform",
    "InitialTxnID",
    "InitialPurchaseDate",   # Date of Initial Purchase
    "InitialPurchaseMonth",  # Month of Initial Purchase
    "InitialPurchaseAmount",
    "SuccessfulRenewalCount",
    "FailedRenewalCount",
    "TotalSuccessfulRenewalAmount",
    "RefundCount",
    "TotalRefundAmount"
)



In [107]:
# add TotalLTVAmount (decimal, null-safe)
zero = F.lit(0).cast("decimal(20,2)")

final_df = (
    final_df
    .withColumn(
        "TotalLTVAmount",
        (
            F.coalesce(F.col("InitialPurchaseAmount").cast("decimal(20,2)"), zero)
            + F.coalesce(F.col("TotalSuccessfulRenewalAmount").cast("decimal(20,2)"), zero)
            - F.coalesce(F.col("TotalRefundAmount").cast("decimal(20,2)"), zero)
        ).cast("decimal(20,2)")
    )
    .select(  # reorder with LTV at the end
        "UserID",
        "Plan",
        "Country",
        "Platform",
        "InitialTxnID",
        "InitialPurchaseDate",
        "InitialPurchaseMonth",
        "InitialPurchaseAmount",
        "SuccessfulRenewalCount",
        "FailedRenewalCount",
        "TotalSuccessfulRenewalAmount",
        "RefundCount",
        "TotalRefundAmount",
        "TotalLTVAmount",
    )
)

In [108]:
# Inspect
final_df.show(truncate=False)
final_df.printSchema()

+------------------------------------+------------+-------+--------+---------------------------------------+-------------------+--------------------+---------------------+----------------------+------------------+----------------------------+-----------+-----------------+--------------+
|UserID                              |Plan        |Country|Platform|InitialTxnID                           |InitialPurchaseDate|InitialPurchaseMonth|InitialPurchaseAmount|SuccessfulRenewalCount|FailedRenewalCount|TotalSuccessfulRenewalAmount|RefundCount|TotalRefundAmount|TotalLTVAmount|
+------------------------------------+------------+-------+--------+---------------------------------------+-------------------+--------------------+---------------------+----------------------+------------------+----------------------------+-----------+-----------------+--------------+
|000667f8-3edc-4e5d-bc53-97a6691c8da1|basic_weekly|FR     |iOS     |tx-42e6c20a-7002-43ab-b018-4cbe68bbfaeb|2024-11-27         |2024-11 

In [110]:
##This is the Average LTV AMOUNT for ALL USERS
from pyspark.sql import functions as F

avg_ltv_df = (
    final_df
    .select(
        F.coalesce(F.col("TotalLTVAmount").cast("decimal(20,2)"), F.lit(0).cast("decimal(20,2)")).alias("ltv"),
        F.col("UserID")
    )
    .agg(
        (F.sum("ltv") / F.count("UserID")).cast("decimal(20,2)").alias("AverageLTVAmount")
    )
)

avg_ltv_df.show()

+----------------+
|AverageLTVAmount|
+----------------+
|           47.32|
+----------------+



In [114]:
zero = F.lit(0).cast("decimal(20,2)")

monthly_avg_ltv_df = (
    final_df
    .groupBy("InitialPurchaseMonth")
    .agg(
        F.countDistinct("UserID").alias("UserCount"),
        F.sum(F.coalesce(F.col("TotalLTVAmount").cast("decimal(20,2)"), zero)).alias("TotalLTV")
    )
    .withColumn(
        "AverageLTVPerUser",
        (F.col("TotalLTV") / F.col("UserCount")).cast("decimal(20,2)")
    )
    .select("InitialPurchaseMonth", "AverageLTVPerUser")
    .orderBy("InitialPurchaseMonth")
)

monthly_avg_ltv_df.show(truncate=False)

+--------------------+-----------------+
|InitialPurchaseMonth|AverageLTVPerUser|
+--------------------+-----------------+
|2024-06             |71.31            |
|2024-07             |68.16            |
|2024-08             |67.97            |
|2024-09             |64.26            |
|2024-10             |61.00            |
|2024-11             |57.02            |
|2024-12             |54.51            |
|2025-01             |48.33            |
|2025-02             |41.95            |
|2025-03             |34.92            |
|2025-04             |27.39            |
|2025-05             |18.76            |
|2025-06             |10.77            |
+--------------------+-----------------+



In [115]:
### VALIDATION
# 1) Price info (dedupe per (user, tx) -> earliest record)
price_info = (
    flat_df.filter(F.col("event_type") == "price_info")
           .select("user_id", "tx_id", "event_ts",
                   F.col("price").cast("decimal(20,2)").alias("price"))
)
w = W.partitionBy("user_id", "tx_id").orderBy(F.col("event_ts").asc())
price_info_dedup = price_info.withColumn("rn", F.row_number().over(w)) \
                             .filter(F.col("rn") == 1) \
                             .drop("rn", "event_ts")

# 2) Initial purchases total
initials = flat_df.filter(F.col("event_type") == "initial_purchase") \
                  .select("user_id", "tx_id")
initial_join = initials.join(price_info_dedup, ["user_id", "tx_id"], "left")
initial_sum = initial_join.agg(
    F.coalesce(F.sum("price"), F.lit(0).cast("decimal(20,2)")).alias("InitialPurchaseTotal")
)

# 3) Successful renewals total
success = flat_df.filter((F.col("event_type") == "renewal") & (F.col("success") == True)) \
                 .select("user_id", "tx_id")
success_join = success.join(price_info_dedup, ["user_id", "tx_id"], "left")
renewal_sum = success_join.agg(
    F.coalesce(F.sum("price"), F.lit(0).cast("decimal(20,2)")).alias("SuccessfulRenewalTotal")
)

# 4) Combine into one row (plus overall total)
totals_df = (
    initial_sum.crossJoin(renewal_sum)
               .withColumn(
                   "CombinedTotal",
                   (F.col("InitialPurchaseTotal") + F.col("SuccessfulRenewalTotal")).cast("decimal(20,2)")
               )
)

totals_df.show(truncate=False)

+--------------------+----------------------+-------------+
|InitialPurchaseTotal|SuccessfulRenewalTotal|CombinedTotal|
+--------------------+----------------------+-------------+
|109168.18           |1081050.39            |1190218.57   |
+--------------------+----------------------+-------------+



In [116]:
total_refunds_df = (
    refunds_df.agg(
        F.coalesce(F.sum(F.col("TotalRefundAmount").cast("decimal(20,2)")),
                   F.lit(0).cast("decimal(20,2)")).alias("TotalRefundsAmount")
    )
)

total_refunds_df.show()

+------------------+
|TotalRefundsAmount|
+------------------+
|           7191.65|
+------------------+



In [117]:
total_users_df = raw_df.select(F.countDistinct("user_id").alias("TotalUsers"))
total_users_df.show()

+----------+
|TotalUsers|
+----------+
|     25000|
+----------+



In [118]:
( 1190218.57 - 7191.65 ) / 25000

47.32107680000001