In [1]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 3, Finished, Available, Finished)

In [2]:
orders = spark.read.table("bronze_orders")
customers = spark.read.table("bronze_customers")
order_items = spark.read.table("bronze_order_items")
reviews = spark.read.table("bronze_order_reviews")

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 4, Finished, Available, Finished)

In [3]:
orders.printSchema()

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 5, Finished, Available, Finished)

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [4]:
from pyspark.sql import functions as F

max_date = orders.select(
    F.max("order_purchase_timestamp").alias("max_date")
).collect()[0]["max_date"]

max_date

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 6, Finished, Available, Finished)

datetime.datetime(2018, 10, 17, 17, 30, 18)

In [5]:
from datetime import timedelta

reference_date = max_date - timedelta(days=90)
reference_date

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 7, Finished, Available, Finished)

datetime.datetime(2018, 7, 19, 17, 30, 18)

In [6]:
reference_date = F.to_timestamp(F.lit("2018-07-19 17:30:18"))
from pyspark.sql import functions as F

reference_date_col = F.lit(reference_date)


StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 8, Finished, Available, Finished)

In [7]:
orders_pre = orders.filter(
    F.col("order_purchase_timestamp") < reference_date)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 9, Finished, Available, Finished)

In [8]:
orders_pre = (
    orders_pre
    .join(customers, on="customer_id", how="left")
    .select(
        "order_id",
        "customer_unique_id",
        "order_purchase_timestamp",
        "order_delivered_customer_date",
        "order_estimated_delivery_date"
    )
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 10, Finished, Available, Finished)

In [9]:
orders_pre.select("customer_unique_id").distinct().count()

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 11, Finished, Available, Finished)

86924

In [10]:
customer_order_dates = (
    orders_pre
    .groupBy("customer_unique_id")
    .agg(
        F.max("order_purchase_timestamp").alias("last_order_date"),
        F.min("order_purchase_timestamp").alias("first_order_date"),
        F.countDistinct("order_id").alias("total_order_count")
    )
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 12, Finished, Available, Finished)

In [11]:
customer_order_dates = (
    customer_order_dates
    .withColumn(
        "days_since_last_order",
        F.datediff(reference_date_col, F.col("last_order_date"))
    )
    .withColumn(
        "customer_tenure_days",
        F.datediff(reference_date_col, F.col("first_order_date"))
    )
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 13, Finished, Available, Finished)

In [12]:
orders_last_90d = orders_pre.filter(
    F.col("order_purchase_timestamp") >= F.date_sub(reference_date, 90)
)

order_count_last_90d = (
    orders_last_90d
    .groupBy("customer_unique_id")
    .agg(
        F.countDistinct("order_id").alias("order_count_last_90d")
    )
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 14, Finished, Available, Finished)

In [13]:
window_spec = Window.partitionBy("customer_unique_id").orderBy("order_purchase_timestamp")

orders_with_lag = (
    orders_pre
    .withColumn(
        "prev_order_date",
        F.lag("order_purchase_timestamp").over(window_spec)
    )
    .withColumn(
        "days_between_orders",
        F.datediff(
            F.col("order_purchase_timestamp"),
            F.col("prev_order_date")
        )
    )
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 15, Finished, Available, Finished)

In [14]:
order_interval_mean = (
    orders_with_lag
    .groupBy("customer_unique_id")
    .agg(
        F.avg("days_between_orders").alias("order_interval_mean")
    )
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 16, Finished, Available, Finished)

In [15]:
features_time = (
    customer_order_dates
    .join(order_count_last_90d, on="customer_unique_id", how="left")
    .join(order_interval_mean, on="customer_unique_id", how="left")
)


StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 17, Finished, Available, Finished)

In [16]:
features_time = features_time.fillna({
    "order_count_last_90d": 0,
    "order_interval_mean": 999
})


StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 18, Finished, Available, Finished)

In [17]:
order_items_pre = (
    order_items
    .join(
        orders_pre.select("order_id", "customer_unique_id", "order_purchase_timestamp"),
        on="order_id",
        how="inner"
    )
    .withColumn(
        "item_revenue",
        F.col("price") + F.col("freight_value")
    )
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 19, Finished, Available, Finished)

In [18]:
order_level_metrics = (
    order_items_pre
    .groupBy("customer_unique_id", "order_id", "order_purchase_timestamp")
    .agg(
        F.sum("item_revenue").alias("order_revenue"),
        F.count("*").alias("items_in_order")
    )
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 20, Finished, Available, Finished)

In [19]:
monetary_features = (
    order_level_metrics
    .groupBy("customer_unique_id")
    .agg(
        F.sum("order_revenue").alias("total_spent"),
        F.avg("order_revenue").alias("avg_order_value"),
        F.stddev("order_revenue").alias("spend_std"),
        F.avg("items_in_order").alias("avg_items_per_order")
    )
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 21, Finished, Available, Finished)

In [20]:
order_last_90d = order_level_metrics.filter(
    F.col("order_purchase_timestamp") >= F.date_sub(reference_date, 90)
)


StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 22, Finished, Available, Finished)

In [21]:
total_spent_last_90d = (
    order_last_90d
    .groupBy("customer_unique_id")
    .agg(
        F.sum("order_revenue").alias("total_spent_last_90d")
    )
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 23, Finished, Available, Finished)

In [22]:
features_monetary = (
    monetary_features
    .join(total_spent_last_90d, on="customer_unique_id", how="left")
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 24, Finished, Available, Finished)

In [23]:
features_monetary = features_monetary.fillna({
    "total_spent_last_90d": 0,
    "spend_std": 0
})

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 25, Finished, Available, Finished)

In [24]:
features_step3 = (
    features_time
    .join(features_monetary, on="customer_unique_id", how="left")
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 26, Finished, Available, Finished)

In [25]:
features_step3.printSchema()
features_step3.select(
    "total_spent", "avg_order_value", "spend_std", "avg_items_per_order"
).show(5)


StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 27, Finished, Available, Finished)

root
 |-- customer_unique_id: string (nullable = true)
 |-- last_order_date: timestamp (nullable = true)
 |-- first_order_date: timestamp (nullable = true)
 |-- total_order_count: long (nullable = false)
 |-- days_since_last_order: integer (nullable = true)
 |-- customer_tenure_days: integer (nullable = true)
 |-- order_count_last_90d: long (nullable = false)
 |-- order_interval_mean: double (nullable = false)
 |-- total_spent: double (nullable = true)
 |-- avg_order_value: double (nullable = true)
 |-- spend_std: double (nullable = true)
 |-- avg_items_per_order: double (nullable = true)
 |-- total_spent_last_90d: double (nullable = true)

+-----------------+-----------------+---------+-------------------+
|      total_spent|  avg_order_value|spend_std|avg_items_per_order|
+-----------------+-----------------+---------+-------------------+
|             NULL|             NULL|     NULL|               NULL|
|            94.77|            94.77|      0.0|                1.0|
|          

In [26]:
features_step3 = features_step3.fillna({
    "total_spent": 0,
    "avg_order_value": 0,
    "avg_items_per_order": 0,
    "spend_std": 0
})

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 28, Finished, Available, Finished)

In [27]:
reviews_pre = reviews.filter(
    F.col("review_creation_date") <= reference_date
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 29, Finished, Available, Finished)

In [28]:
complaints = (
    reviews_pre
    .withColumn(
        "is_complaint",
        F.when(F.col("review_score").isin([1, 2]), 1).otherwise(0)
    )
    .groupBy("order_id")
    .agg(
        F.max("is_complaint").alias("has_complaint_order")
    )
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 30, Finished, Available, Finished)

In [29]:
complaints_customer = (
    complaints
    .join(
        orders_pre.select("order_id", "customer_unique_id"),
        on="order_id",
        how="inner"
    )
    .groupBy("customer_unique_id")
    .agg(
        F.max("has_complaint_order").alias("has_complaint")
    )
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 31, Finished, Available, Finished)

In [30]:
delivery_delay = (
    orders_pre
    .withColumn(
        "is_delivery_delay",
        F.when(
            F.col("order_delivered_customer_date") >
            F.col("order_estimated_delivery_date"),
            1
        ).otherwise(0)
    )
    .groupBy("customer_unique_id")
    .agg(
        F.max("is_delivery_delay").alias("has_delivery_delay")
    )
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 32, Finished, Available, Finished)

In [31]:
final_features = (
    features_step3
    .join(complaints_customer, on="customer_unique_id", how="left")
    .join(delivery_delay, on="customer_unique_id", how="left")
)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 33, Finished, Available, Finished)

In [32]:
final_features = final_features.fillna({
    "has_complaint": 0,
    "has_delivery_delay": 0
})


StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 34, Finished, Available, Finished)

Churn Label + Final Model Dataset

In [33]:
model_df = final_features.withColumn(
    "churn",
    F.when(
        (F.col("days_since_last_order") > 120) &
        (F.col("total_order_count") >= 2),
        1
    ).otherwise(0)
)


StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 35, Finished, Available, Finished)

In [34]:
model_df.groupBy("churn").count().show()


StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 36, Finished, Available, Finished)

+-----+-----+
|churn|count|
+-----+-----+
|    1| 1829|
|    0|85095|
+-----+-----+



In [35]:
model_df.select(
    F.mean("days_since_last_order"),
    F.expr("percentile(days_since_last_order, 0.75)"),
    F.expr("percentile(days_since_last_order, 0.9)")
).show()


StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 37, Finished, Available, Finished)

+--------------------------+------------------------------------------+-----------------------------------------+
|avg(days_since_last_order)|percentile(days_since_last_order, 0.75, 1)|percentile(days_since_last_order, 0.9, 1)|
+--------------------------+------------------------------------------+-----------------------------------------+
|         221.4948805853389|                                     324.0|                                    435.0|
+--------------------------+------------------------------------------+-----------------------------------------+



In [36]:
model_df.write.mode("overwrite").format("delta").saveAsTable("churn_model_dataset_bronze")

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 38, Finished, Available, Finished)

RFM TABLE

In [38]:
orders_pre.printSchema()

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 40, Finished, Available, Finished)

root
 |-- order_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [40]:
monetary_features.printSchema()

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 42, Finished, Available, Finished)

root
 |-- customer_unique_id: string (nullable = true)
 |-- total_spent: double (nullable = true)
 |-- avg_order_value: double (nullable = true)
 |-- spend_std: double (nullable = true)
 |-- avg_items_per_order: double (nullable = true)



In [41]:
rfm = orders_pre \
    .groupBy("customer_unique_id") \
    .agg(
        F.max("order_purchase_timestamp").alias("last_order_date"),
        F.countDistinct("order_id").alias("frequency")
    ) \
    .withColumn(
        "recency",
        F.datediff(F.lit(reference_date), F.col("last_order_date"))
    )


StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 43, Finished, Available, Finished)

In [None]:
rfm=rfm.join(monetary_features["total_spent"] alias monetary,on:customer_unique_id,"left")

In [43]:

from pyspark.sql.functions import col

rfm = rfm.join(
    monetary_features.select("customer_unique_id", "total_spent"),
    on="customer_unique_id",
    how="left"  
)


StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 45, Finished, Available, Finished)

In [46]:
rfm.printSchema()

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 48, Finished, Available, Finished)

root
 |-- customer_unique_id: string (nullable = true)
 |-- last_order_date: timestamp (nullable = true)
 |-- frequency: long (nullable = false)
 |-- recency: integer (nullable = true)
 |-- monetary: double (nullable = true)



In [45]:
rfm = rfm.withColumnRenamed("total_spent", "monetary")

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 47, Finished, Available, Finished)

In [47]:
rfm = rfm \
    .withColumn("r_score",
        F.when(F.col("recency") <= 90, 5)
         .when(F.col("recency") <= 180, 4)
         .when(F.col("recency") <= 270, 3)
         .when(F.col("recency") <= 360, 2)
         .otherwise(1)
    ) \
    .withColumn("f_score",
        F.when(F.col("frequency") >= 10, 5)
         .when(F.col("frequency") >= 7, 4)
         .when(F.col("frequency") >= 4, 3)
         .when(F.col("frequency") >= 2, 2)
         .otherwise(1)
    ) \
    .withColumn("m_score",
        F.when(F.col("monetary") >= 1000, 5)
         .when(F.col("monetary") >= 500, 4)
         .when(F.col("monetary") >= 250, 3)
         .when(F.col("monetary") >= 100, 2)
         .otherwise(1)
    )


StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 49, Finished, Available, Finished)

In [51]:
rfm = rfm.withColumn(
    "rfm_segment",
    F.when((F.col("r_score") >= 4) & (F.col("f_score") >= 4) & (F.col("m_score") >= 4), "Champions")
     .when((F.col("r_score") >= 3) & (F.col("f_score") >= 4), "Loyal Customers")
     .when((F.col("r_score") >= 4) & (F.col("f_score").between(2, 3)), "Potential Loyalists")
     .when((F.col("r_score") == 5) & (F.col("f_score") == 1), "New Customers")
     .when((F.col("r_score").between(2, 3)) & (F.col("f_score").between(2, 3)), "Need Attention")
     .when((F.col("r_score") <= 2) & (F.col("f_score") >= 3), "At Risk")
     .when((F.col("r_score") <= 2) & (F.col("f_score") <= 2), "Hibernating")
     .otherwise("Others")
)


StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 53, Finished, Available, Finished)

In [52]:
display(rfm)

StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 54, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1c4da6d3-d32b-41ac-a7fb-bb6aee27b1fe)

In [54]:
rfm.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("rfm_customer")


StatementMeta(, cdcd495d-8aa0-4095-bbf1-e9c372ff4de2, 56, Finished, Available, Finished)