# My Workflow
### Overall Pipeline

**Source → Read → Inspect → Clean → Integrate → Filter → Feature Engineering → Modeling-Ready Dataset**

---

### Data Sources (Raw Layer)

* Multiple **CSV files** from the **Olist Brazilian E-Commerce dataset**
* Files:

  1. Orders
  2. Customers
  3. Order items
  4. Payments
  5. Reviews

 *Goal:* Work with realistic multi-table transactional data

---

### Data Ingestion (Read Layer)

* Initialized **SparkSession**
* Loaded all CSV files into **PySpark DataFrames**
* Enabled schema inference & headers

 *Goal:* Bring raw data into Spark for scalable processing

---

### Data Understanding & Validation

* Printed **schemas**
* Checked **row/column counts** using a custom `checkShape()` function
* Verified data consistency across tables

 *Goal:* Ensure data quality before transformations

---

### Data Cleaning (Trusted Raw Layer)

* Selected **relevant columns only**
* Removed unnecessary or noisy attributes
* Feature aggregation
* Feature Engineering (Basic)

 *Goal:* Reduce noise and keep business-relevant attributes

---

### Data Integration (Core Transformation)

* Performed **multiple joins**:

  * Orders ↔ Customers
  * Orders ↔ Order Items
  * Orders ↔ Payments
  * Orders ↔ Reviews
* Built a **master transactional DataFrame**

 *Goal:* Create a single source of truth per customer

---

### Time Window Filtering

* Defined a **snapshot date**
* Filtered data of **flast 180 days**

 *Goal:* Simulate real ML conditions

---

### Feature Engineering (RFM-Based)

Created **customer-level features**:

* **Recency** → Days since last purchase
* **Frequency** → Number of orders
* **Monetary** → Total spending
* **Total Items Volume**
* **Customer Location** (city, state, zip)
* **Last Purchase Date**

 *Goal:* Convert transactional data into behavioral signals

---

### Level Engineering (Aggregation)

* Converted **order-level data → customer-level data**
* Used `groupBy(customer_unique_id)`
* Aggregated metrics using `sum`, `count`, `max`

 *Goal:* One row = one customer (ML-ready)

---

### Feature Selection & Final Dataset

* Dropped IDs like `order_id`
* Kept only **predictive features**
* Produced `customer_training_df`

 *Goal:* Clean, compact dataset for churn modeling


In [145]:
from pyspark.sql.functions import count, countDistinct
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder \
.appName("Olist Customer Churn") \
.getOrCreate()

spark.sparkContext.setLogLevel("WARN")

In [146]:
# =========================
# Load CSV Files
# =========================
orders_path = "data/olist_orders_dataset.csv"
customers_path = "data/olist_customers_dataset.csv"
order_items_path = "data/olist_order_items_dataset.csv"
order_payment_path = "data/olist_order_payments_dataset.csv"
order_reviews_path = "data/olist_order_reviews_dataset.csv"

orders_df = spark.read.csv(
    orders_path,
    header= True,
    inferSchema= True,
    multiLine=True,
    escape='"',
    quote='"'
)

customers_df = spark.read.csv(
    customers_path,
    header = True,
    inferSchema = True,
    multiLine=True,
    escape='"',
    quote='"'
)

order_items_df = spark.read.csv(
    order_items_path,
    header= True,
    inferSchema= True,
    multiLine=True,
    escape='"',
    quote='"'
)

order_payment_df = spark.read.csv(
    order_payment_path,
    header= True,
    inferSchema= True,
    multiLine=True,
    escape='"',
    quote='"'
)

order_reviews_df = spark.read.csv(
    order_reviews_path,
    header=True,
    inferSchema=True,
    multiLine=True,
    escape='"',
    quote='"'
)

# =========================
# Check Schema
# =========================

print("Order Schema")
orders_df.printSchema()

print("Customer Schema")
customers_df.printSchema()


Order Schema
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: string (nullable = true)
 |-- order_approved_at: string (nullable = true)
 |-- order_delivered_carrier_date: string (nullable = true)
 |-- order_delivered_customer_date: string (nullable = true)
 |-- order_estimated_delivery_date: string (nullable = true)

Customer Schema
root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



In [147]:
print("Order Item Schema")
order_items_df.printSchema()

print("Order Payment Schema")
order_payment_df.printSchema()

print("Order Reviews Schema")
order_reviews_df.printSchema()

Order Item Schema
root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: string (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)

Order Payment Schema
root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: double (nullable = true)

Order Reviews Schema
root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: string (nullable = true)
 |-- review_answer_timestamp: string (nullable = true)



**Usefull Function**

In [148]:
# ==================================
# Check Shape --> Rows and Columns
# ==================================

def  checkShape(df, df_name):
    rows = df.count()
    cols = len(df.columns)
    print(f"The shape of {df_name} is: {rows} rows by {cols} columns")

# ==================================
# Check Unique --> Unique or Not
# ==================================

def check_uniqueness(df, column_name, df_name):
    total_count = df.count()
    unique_count = df.select(column_name).distinct().count()
    print(f"--- Checking Uniqueness for {df_name} ---")
    print(f"Total Rows: {total_count}")
    print(f"Unique {column_name}s: {unique_count}")

    if total_count == unique_count:
        print(f"SUCCESS: {column_name} is a Unique Key in {df_name}.")
    else:
        print(f"WARNING: {column_name} is NOT unique in {df_name}. Duplicates found!")
    print("-" * 40)

# ==================================
# Check Null
# ==================================
def CheckNull(df):
    df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

# ==================================
# Check Duplicate
# ==================================

def CheckDuplicate(df, df_name):
    total_rows = df.count()
    unique_orders = df.select("order_id").distinct().count()
    print(f"--- Duplicate Audit for {df_name} ---")
    print(f"Total Rows: {total_rows}")
    print(f"Unique Order IDs: {unique_orders}")

    if total_rows == unique_orders:
        print("SUCCESS: Each order_id is now unique. No duplicates!")
    else:
        print(f"WARNING: You have {total_rows - unique_orders} duplicate rows. Check your groupBy logic.")

In [149]:
checkShape(orders_df, "orders_df")
checkShape(customers_df, "customers_df")
checkShape(order_items_df, "order_items_df")
checkShape(order_payment_df, "order_payment_df")
checkShape(order_reviews_df, "order_reviews_df")

The shape of orders_df is: 99441 rows by 8 columns
The shape of customers_df is: 99441 rows by 5 columns
The shape of order_items_df is: 112650 rows by 7 columns
The shape of order_payment_df is: 103886 rows by 5 columns
The shape of order_reviews_df is: 99224 rows by 7 columns


*Check Uniqueness*

In [150]:
check_uniqueness(customers_df, "customer_id", "Customers")
check_uniqueness(orders_df, "order_id", "Orders")
check_uniqueness(order_items_df, "order_id", "Order Items")
check_uniqueness(order_payment_df, "order_id", "Order Payments")
check_uniqueness(order_reviews_df, "order_id", "Order Reviews")

--- Checking Uniqueness for Customers ---
Total Rows: 99441
Unique customer_ids: 99441
SUCCESS: customer_id is a Unique Key in Customers.
----------------------------------------
--- Checking Uniqueness for Orders ---
Total Rows: 99441
Unique order_ids: 99441
SUCCESS: order_id is a Unique Key in Orders.
----------------------------------------
--- Checking Uniqueness for Order Items ---
Total Rows: 112650
Unique order_ids: 98666
----------------------------------------
--- Checking Uniqueness for Order Payments ---
Total Rows: 103886
Unique order_ids: 99440
----------------------------------------
--- Checking Uniqueness for Order Reviews ---
Total Rows: 99224
Unique order_ids: 98673
----------------------------------------


### Data Cleaning and Reducing Noise

In [151]:
# ==================================
# Order Items Table
# ==================================

#Step 1: Feature Aggregation
items_agg = order_items_df.groupBy('order_id').agg(
    F.sum("price").alias("total_price"),
    F.sum("freight_value").alias("total_freight"),
    F.count("product_id").alias("total_items"),
    F.first("shipping_limit_date").alias("shipping_limit_date")
)

# Step 2: Check for Null Values in each column
print("--- Null Value Audit for items_agg ---")
CheckNull(items_agg)

# Step 3: Check for Duplicate order_ids
CheckDuplicate(items_agg, "items_agg")


print("\nThe Final Order Item Dataset")
items_agg.show(5)

--- Null Value Audit for items_agg ---
+--------+-----------+-------------+-----------+-------------------+
|order_id|total_price|total_freight|total_items|shipping_limit_date|
+--------+-----------+-------------+-----------+-------------------+
|       0|          0|            0|          0|                  0|
+--------+-----------+-------------+-----------+-------------------+

--- Duplicate Audit for items_agg ---
Total Rows: 98666
Unique Order IDs: 98666
SUCCESS: Each order_id is now unique. No duplicates!

The Final Order Item Dataset
+--------------------+-----------+-------------+-----------+-------------------+
|            order_id|total_price|total_freight|total_items|shipping_limit_date|
+--------------------+-----------+-------------+-----------+-------------------+
|00018f77f2f0320c5...|      239.9|        19.93|          1|     5/3/2017 11:05|
|00042b26cf59d7ce6...|      199.9|        18.14|          1|    2/13/2017 13:57|
|00054e8431b9d7675...|       19.9|        11.85

In [152]:
# ==================================
# Order Payment Table
# ==================================

# Step 1: Feature Aggregation
payment_agg = order_payment_df.groupBy('order_id').agg(
    F.sum("payment_value").alias("total_order_payment"),
    F.max("payment_installments").alias("max_installments"),
    F.count("payment_sequential").alias("payment_method_count"),
    F.first("payment_type").alias("primary_payment_type")
)

# Step 2: Check for Null Values in each column
print("--- Null Value Audit for payment_agg ---")
CheckNull(payment_agg)

# Step 3: Check for Duplicate order_ids
CheckDuplicate(payment_agg, "payment_agg")

print("\nThe Final Order Payment Dataset")
payment_agg.show(5)

--- Null Value Audit for payment_agg ---
+--------+-------------------+----------------+--------------------+--------------------+
|order_id|total_order_payment|max_installments|payment_method_count|primary_payment_type|
+--------+-------------------+----------------+--------------------+--------------------+
|       0|                  0|               0|                   0|                   0|
+--------+-------------------+----------------+--------------------+--------------------+

--- Duplicate Audit for payment_agg ---
Total Rows: 99440
Unique Order IDs: 99440
SUCCESS: Each order_id is now unique. No duplicates!

The Final Order Payment Dataset
+--------------------+-------------------+----------------+--------------------+--------------------+
|            order_id|total_order_payment|max_installments|payment_method_count|primary_payment_type|
+--------------------+-------------------+----------------+--------------------+--------------------+
|00018f77f2f0320c5...|            

In [153]:
# ==================================
# Order Reviews Table
# ==================================

# Steps 1: Aggregating Reviews
reviews_agg = order_reviews_df.groupBy("order_id").agg(
    F.avg("review_score").alias("Avg_review_score"),
    F.min("review_score").alias("Min_review_score"),
    F.count("review_id").alias("Total_Review_Number")
)

# Step 2: Check for Null Values in each column
print("--- Null Value Audit for reviews_agg ---")
CheckNull(reviews_agg)

# Step 3: Check for Duplicate order_ids
CheckDuplicate(reviews_agg, "reviews_agg")

print("\nThe Final Order Payment Dataset")
reviews_agg.show(5)


--- Null Value Audit for reviews_agg ---
+--------+----------------+----------------+-------------------+
|order_id|Avg_review_score|Min_review_score|Total_Review_Number|
+--------+----------------+----------------+-------------------+
|       0|               0|               0|                  0|
+--------+----------------+----------------+-------------------+

--- Duplicate Audit for reviews_agg ---
Total Rows: 98673
Unique Order IDs: 98673
SUCCESS: Each order_id is now unique. No duplicates!

The Final Order Payment Dataset
+--------------------+----------------+----------------+-------------------+
|            order_id|Avg_review_score|Min_review_score|Total_Review_Number|
+--------------------+----------------+----------------+-------------------+
|e239d280236cdd3c4...|             5.0|               5|                  1|
|d451da9b109e1786f...|             5.0|               5|                  1|
|0de5dbc9f32267616...|             4.0|               4|                  1|
|25

In [154]:
checkShape(orders_df, "orders_df")
checkShape(customers_df, "customers_df")
checkShape(items_agg, "order_items_df")
checkShape(payment_agg, "payment_agg")
checkShape(reviews_agg, "reviews_agg")

The shape of orders_df is: 99441 rows by 8 columns
The shape of customers_df is: 99441 rows by 5 columns
The shape of order_items_df is: 98666 rows by 5 columns
The shape of payment_agg is: 99440 rows by 5 columns
The shape of reviews_agg is: 98673 rows by 4 columns


**Three Tables Schema Reviews**

In [155]:
print("Final Dataset of Order Item")
items_agg.printSchema()

print("Final Dataset of Order Payment")
payment_agg.printSchema()

print("Final Dataset of Order Review")
reviews_agg.printSchema()

Final Dataset of Order Item
root
 |-- order_id: string (nullable = true)
 |-- total_price: double (nullable = true)
 |-- total_freight: double (nullable = true)
 |-- total_items: long (nullable = false)
 |-- shipping_limit_date: string (nullable = true)

Final Dataset of Order Payment
root
 |-- order_id: string (nullable = true)
 |-- total_order_payment: double (nullable = true)
 |-- max_installments: integer (nullable = true)
 |-- payment_method_count: long (nullable = false)
 |-- primary_payment_type: string (nullable = true)

Final Dataset of Order Review
root
 |-- order_id: string (nullable = true)
 |-- Avg_review_score: double (nullable = true)
 |-- Min_review_score: integer (nullable = true)
 |-- Total_Review_Number: long (nullable = false)



### Data Integration

In [156]:
# ============================================================================
# Joining Tables --> Inner → left → left → lest → master dataframe
# ============================================================================
# Step 1: Join Five Tables
master_df = customers_df.join(orders_df, "customer_id", "inner")
master_df = master_df.join(items_agg, "order_id", "left")
master_df = master_df.join(payment_agg, "order_id", "left")
master_df = master_df.join(reviews_agg, "order_id", "left")

# Step 2: Check Rows and Columns
checkShape(master_df, "master_df")

# Step 3: DataFrame Schema
print("Final Dataset Schema")
master_df.printSchema()

The shape of master_df is: 99441 rows by 23 columns
Final Dataset Schema
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: string (nullable = true)
 |-- order_approved_at: string (nullable = true)
 |-- order_delivered_carrier_date: string (nullable = true)
 |-- order_delivered_customer_date: string (nullable = true)
 |-- order_estimated_delivery_date: string (nullable = true)
 |-- total_price: double (nullable = true)
 |-- total_freight: double (nullable = true)
 |-- total_items: long (nullable = true)
 |-- shipping_limit_date: string (nullable = true)
 |-- total_order_payment: double (nullable = true)
 |-- max_installments: integer (nullable = true)
 |-- payment_method_count: lon

In [157]:
# Step 4: Final DataFrame View
print("Final Master Dataset Schema")
master_df.show(5)

Final Master Dataset Schema
+--------------------+--------------------+--------------------+------------------------+--------------------+--------------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+-----------+-------------+-----------+-------------------+-------------------+----------------+--------------------+--------------------+----------------+----------------+-------------------+
|            order_id|         customer_id|  customer_unique_id|customer_zip_code_prefix|       customer_city|customer_state|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|total_price|total_freight|total_items|shipping_limit_date|total_order_payment|max_installments|payment_method_count|primary_payment_type|Avg_review_score|Min_review_score|Total_Review_Number|
+--------------------+--------------------+-----

In [158]:
# Step 5: Check Null Values
print("Null Value Audit for master_df")
CheckNull(master_df)

# Step 5: Check Duplicate
print("Check Duplicate")
CheckDuplicate(master_df, "master_df")

Null Value Audit for master_df
+--------+-----------+------------------+------------------------+-------------+--------------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+-----------+-------------+-----------+-------------------+-------------------+----------------+--------------------+--------------------+----------------+----------------+-------------------+
|order_id|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|total_price|total_freight|total_items|shipping_limit_date|total_order_payment|max_installments|payment_method_count|primary_payment_type|Avg_review_score|Min_review_score|Total_Review_Number|
+--------+-----------+------------------+------------------------+-------------+--------------+----------

### Data Preprocessing

In [159]:
# ==========================
# Timestamp Casting
# ==========================
date_format = "M/d/yyyy H:mm"
date_cols = [
    "order_purchase_timestamp", "order_approved_at", 
    "order_delivered_carrier_date", "order_delivered_customer_date", 
    "order_estimated_delivery_date", "shipping_limit_date"
]

master_df_date = master_df
for col_name in date_cols:
    master_df_date = master_df_date.withColumn(col_name, F.to_timestamp(col_name, date_format))

print("After Casting to Tilestamp the Final Dataset")
master_df_date.printSchema()


After Casting to Tilestamp the Final Dataset
root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: integer (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)
 |-- total_price: double (nullable = true)
 |-- total_freight: double (nullable = true)
 |-- total_items: long (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- total_order_payment: double (nullable = true)
 |-- max_installments: integer (nullable = true)
 |-- payment_method_count: long (nullabl

### Handling Missing Values

**Why did I drop numm value of *total_items* and *total_order_payment* features?**
**Answer:**
A null value in *total_items* or *total_order_payment* indicates that the customer did not purchase anything. In my opinion, imputing values for these nulls would mislead the model by providing false information.

---

**Why did I choose to impute values for *Avg_review_score*, *Min_review_score*, and *Total_Review_Number*?**
**Answer:**
If a customer does not provide any review, it means they have no expressed opinion about their purchase. Assigning a value of 3 represents a neutral sentiment—neither positive nor negative.

---

**What about *order_delivered_customer_date*, *order_approved_at*, *order_estimated_delivery_date* and  *order_delivered_carrier_date*?**
**Answer:**
If the dates are missing, it make sence that the order was either not delivered or not approved. Therefore, we convert these missing values into meaningful indicator features and delete them;



In [160]:
# ==========================
# Timestamp Casting
# ==========================
# Step 1: DROP the logical impossibilities (Ghost Records)
master_df_drop = master_df_date.dropna(subset=["total_items", "total_order_payment"])

master_df_fillna = master_df_drop.fillna({
    "Avg_review_score": 3.0,
    "Min_review_score": 3.0,
    "Total_Review_Number": 0
})

# Step 2: Data Imputation
master_df_update = master_df_fillna.withColumn(
    "is_delivered",
    F.when(F.col("order_delivered_customer_date").isNotNull(), 1).otherwise(0)
).withColumn(
    "is_approved",
    F.when(F.col("order_approved_at").isNotNull(), 1).otherwise(0)
).withColumn(
    "delivery_delay_days",
    F.datediff("order_delivered_customer_date", "order_estimated_delivery_date")     # Calculate the raw difference
).withColumn(
    "delivery_delay_days",                                                           # Apply the clipping logic
    F.when(F.col("delivery_delay_days") > 30, 30)                                    # Cap late/nulls at 30
     .when(F.col("delivery_delay_days") < -30, -30)                                  # Cap early at -30
     .otherwise(F.col("delivery_delay_days"))
).fillna(
    {"delivery_delay_days": 31}                                                      # Assign a specific penalty for nulls (Non-delivered)
).drop("order_approved_at", "order_delivered_carrier_date", "order_estimated_delivery_date", "order_delivered_customer_date")

# Step 3: Check Null value
print("--- Null Value Audit for master_df ---")
CheckNull(master_df_update)

--- Null Value Audit for master_df ---
+--------+-----------+------------------+------------------------+-------------+--------------+------------+------------------------+-----------+-------------+-----------+-------------------+-------------------+----------------+--------------------+--------------------+----------------+----------------+-------------------+------------+-----------+-------------------+
|order_id|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|order_status|order_purchase_timestamp|total_price|total_freight|total_items|shipping_limit_date|total_order_payment|max_installments|payment_method_count|primary_payment_type|Avg_review_score|Min_review_score|Total_Review_Number|is_delivered|is_approved|delivery_delay_days|
+--------+-----------+------------------+------------------------+-------------+--------------+------------+------------------------+-----------+-------------+-----------+-------------------+-------------------+----------

### Choose Time Frame

---

This time frame, I choose, is strategically chosen to balance seasonal patterns with realistic customer behavior in an e-commerce context. By using a **12-month feature window**, the model captures a full annual cycle, ensuring that seasonal peaks—such as Black Friday, Christmas, and local Brazilian holidays—are reflected in the customer’s behavioral features like frequency and spending. This year-long history provides a robust baseline for establishing "normal" behavior. The subsequent **6-month label window** is critical for a "low-frequency" retail environment; because customers do not purchase every week, a shorter window might incorrectly label a slow-but-loyal buyer as "churned." A 6-month period provides enough time to realistically observe whether a customer intends to return, while the overall **18-month snapshot** fits perfectly within your two-year dataset, leaving the model with enough relevant, recent data to make accurate predictions without being diluted by outdated trends from years prior.

In [161]:
from datetime import timedelta

# ==========================================
# Define the 12-month and 6-month Windows
# ==========================================

# STEP 1: Get the absolute last date in your dataset
max_date = master_df_update.select(F.max("order_purchase_timestamp")).collect()[0][0]

# STEP 2: Split date is 6 months (180 days) before the end
split_date = max_date - timedelta(days=180)

# STEP 3: Start date for features is 12 months (365 days) before the split
start_date = split_date - timedelta(days=365)

print(f"Dataset End: {max_date}")
print(f"Split Point (Prediction Start): {split_date}")
print(f"Feature History Start: {start_date}")

Dataset End: 2018-09-03 09:06:00
Split Point (Prediction Start): 2018-03-07 09:06:00
Feature History Start: 2017-03-07 09:06:00


### Feature Engineering (RFM-Based)
Extracting *Recency*, *Frequency*, *Monetary* Features consider as Simple and interpretable for customer churn, which called as RFM-based churn modeling.

**Why each of the excluded features is not kept?**
* `order_id`: Transaction ID → replaced by `frequency`
* `customer_id`: Duplicate of `customer_unique_id`
* `order_purchase_timestamp`: Used to compute recency → keep aggregated only
* `order_approved_at`: Used to compute is_approved → aggregated feature sufficient
* `shipping_limit_date`, `order_estimated_delivery_date` and `order_delivered_customer_date`: Used for `delivery_delay_days`
* `total_price`, `total_freight` and `total_order_payment`: Captured by `Monetary`

In [162]:
# ==========================================
# Temporal Feature Engineering
# ==========================================

# STEP 1: Filter for Features (The 12-month "Past")
obs_df = master_df_update.filter(
    (F.col("order_purchase_timestamp") >= start_date) & 
    (F.col("order_purchase_timestamp") < split_date)
)

# STEP 2: Customer Roll-Up (Feature Engineering)
customer_features = obs_df.groupBy("customer_unique_id").agg(
    F.datediff(F.lit(split_date), F.max("order_purchase_timestamp")).alias("recency"),
    F.count("order_id").alias("frequency"),
    F.sum("total_order_payment").alias("monetary"),
    F.first("customer_zip_code_prefix", ignorenulls=True).alias("zip_code"),
    F.first("customer_city", ignorenulls=True).alias("city"),
    F.first("customer_state", ignorenulls=True).alias("state"),
    F.sum("total_items").alias("total_items_volume"),
    F.max("delivery_delay_days").alias("max_delivery_delay"),
    F.max("is_delivered").alias("is_delivered"),
    F.max("is_approved").alias("is_approved"),
    F.max("payment_method_count").alias("payment_method_count"),
    F.first("primary_payment_type", ignorenulls=True).alias("primary_payment_type"),
    F.max("max_installments").alias("max_installments"),
    F.avg("Avg_review_score").alias("avg_satisfaction"),
    F.min("Min_review_score").alias("min_review_score"),
    F.sum("Total_Review_Number").alias("total_reviews_given")
)

# STEP 3: Create Labels (The 6-month "Future")
future_df = master_df_update.filter(
    (F.col("order_purchase_timestamp") >= split_date) & 
    (F.col("order_purchase_timestamp") <= max_date)
)
returned_customers = future_df.select("customer_unique_id").distinct().withColumn("label", F.lit(0))

# STEP 4: Join and Handle Churners
customer_training_df = customer_features.join(returned_customers, on="customer_unique_id", how="left")
customer_training_df = customer_training_df.fillna({"label": 1})

# Step 5: Final Dataset Schema
customer_training_df.printSchema()


root
 |-- customer_unique_id: string (nullable = true)
 |-- recency: integer (nullable = true)
 |-- frequency: long (nullable = false)
 |-- monetary: double (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- total_items_volume: long (nullable = true)
 |-- max_delivery_delay: integer (nullable = true)
 |-- is_delivered: integer (nullable = true)
 |-- is_approved: integer (nullable = true)
 |-- payment_method_count: long (nullable = true)
 |-- primary_payment_type: string (nullable = true)
 |-- max_installments: integer (nullable = true)
 |-- avg_satisfaction: double (nullable = true)
 |-- min_review_score: integer (nullable = true)
 |-- total_reviews_given: long (nullable = true)
 |-- label: integer (nullable = false)



In [163]:
# Step 5: Final Dataset Audit
checkShape(customer_training_df, "Final Data Frame")
customer_training_df.show(5)

The shape of Final Data Frame is: 55245 rows by 18 columns
+--------------------+-------+---------+--------+--------+--------------------+-----+------------------+------------------+------------+-----------+--------------------+--------------------+----------------+----------------+----------------+-------------------+-----+
|  customer_unique_id|recency|frequency|monetary|zip_code|                city|state|total_items_volume|max_delivery_delay|is_delivered|is_approved|payment_method_count|primary_payment_type|max_installments|avg_satisfaction|min_review_score|total_reviews_given|label|
+--------------------+-------+---------+--------+--------+--------------------+-----+------------------+------------------+------------+-----------+--------------------+--------------------+----------------+----------------+----------------+-------------------+-----+
|0006fdc98a402fceb...|    232|        1|    29.0|   29400|       mimoso do sul|   ES|                 1|               -12|           1| 

**Final Features Explenation**

* **customer_unique_id** → Unique identifier representing an individual customer across all orders.
* **recency** → Number of days since the customer’s most recent purchase.
* **frequency** → Total number of orders placed by the customer.
* **monetary** → Total amount of money spent by the customer.
* **zip_code** → Postal code of the customer’s delivery address.
* **city** → City where the customer is located.
* **state** → State where the customer is located.
* **total_items_volume** → Total number of items purchased by the customer across all orders.
* **max_delivery_delay** → Maximum delivery delay experienced by the customer in days.
* **is_delivered** → Indicator showing whether the customer has at least one successfully delivered order.
* **is_approved** → Indicator showing whether the customer’s payment was approved.
* **payment_method_count** → Number of distinct payment methods used by the customer.
* **primary_payment_type** → Most frequently used payment method by the customer.
* **max_installments** → Maximum number of payment installments chosen by the customer.
* **avg_satisfaction** → Average review score given by the customer across all orders.
* **min_review_score** → Lowest review score ever given by the customer.
* **total_reviews_given** → Total number of reviews submitted by the customer.
* **label** → Target variable indicating whether the customer has churned (1) or not (0).


In [164]:
import os
# =========================
# Export the Final Dataset
# =========================

# Step 1: Define subfolder path
data_folder = os.path.join(os.getcwd(), "data/finalData")

# Step 2: Create folder if it doesn't exist
os.makedirs(data_folder, exist_ok=True)

# Step 3: Define full CSV path
output_csv = os.path.join(data_folder, "customer_training.csv")

# Step 4: Save DataFrame to CSV
customer_training_df.toPandas().to_csv(output_csv, index=False)

print(f"DataFrame successfully saved as '{output_csv}'")


DataFrame successfully saved as 'h:\OneDrive - etu.u-cergy.fr\M2 Classes\5. Big Data & AI__Project\Project 3 - Brazilian E-Commerce Public Dataset by Olist\data/finalData\customer_training.csv'
