In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql.window import Window

In [None]:
# Define a helper function to convert string to date
def to_date(date_str):
    return datetime.strptime(date_str, "%Y-%m-%d").date() if date_str else None

# Define the schema for the customers table
customers_schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("customer_name", StringType(), True),
    StructField("subscription_start", DateType(), True),
    StructField("subscription_end", DateType(), True)
])

# Create the data for the customers table with converted date values
customers_data = [
    (1, 'Alice', to_date('2024-01-15'), to_date('2024-07-10')),
    (2, 'Bob', to_date('2024-02-01'), None),
    (3, 'Carol', to_date('2024-01-25'), to_date('2024-03-01')),
    (4, 'David', to_date('2024-03-15'), None),
    (5, 'Eva', to_date('2024-04-12'), to_date('2024-06-30')),
    (6, 'Frank', to_date('2024-04-25'), to_date('2024-05-15')),
    (7, 'Gina', to_date('2024-05-05'), None),
    (8, 'Harry', to_date('2024-05-20'), to_date('2024-07-20')),
    (9, 'Ivy', to_date('2024-06-01'), None),
    (10, 'John', to_date('2024-06-15'), to_date('2024-06-30'))
]

# Convert to DataFrame
customers_df = spark.createDataFrame(customers_data, schema=customers_schema)

# Write the data into a Delta table
customers_df.write.format("delta").mode("overwrite").saveAsTable("customers")

## Identify Active Customers at the Start of Each Month

active_customers = monthly_subscription_df.filter(F.col("subscription_active") == 1) \
    .groupBy("month") \
    .agg(F.countDistinct("customer_id").alias("active_customers_count"))

active_customers.show()

## Calculate the Churn Rate

# Identify churned customers
churned_customers = previous_month_customers.join(
    current_month_customers,
    (previous_month_customers.prev_customer_id == current_month_customers.customer_id) &
    (previous_month_customers.previous_month == current_month_customers.month),
    "left_anti"  # Keep only rows from previous_month_customers without a match in current_month_customers
).groupBy("previous_month") \
 .agg(F.countDistinct("prev_customer_id").alias("churned_customers_count"))

# Debug Step: Show churned customers
churned_customers.show()


In [None]:
# Calculate churn rate
churn_rate = active_customers_with_lag.join(
    churned_customers,
    active_customers_with_lag.previous_month == churned_customers.previous_month,
    "left"
).fillna(0) \
    .withColumn(
        "churn_rate_percentage",
        F.round((F.col("churned_customers_count") / F.col("active_customers_count")) * 100, 2)
    ) \
    .select(
        F.col("month"),
        F.col("active_customers_count"),
        F.col("churned_customers_count"),
        F.col("churn_rate_percentage")
    )

# Debug Step: Show final churn rate results
churn_rate.show()
