# Feature Engineering

______
## 1. Chuẩn bị vấn đề

In [54]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import *
import pandas as pd

In [55]:
spark = SparkSession.builder.getOrCreate()

In [56]:
def read_csv(path):
    df = spark.read.csv(path, header=True)
    return df

customers = read_csv("../data/2_clean/customers.csv")
orders = read_csv("../data/2_clean/orders.csv")
order_items = read_csv("../data/2_clean/order_items.csv")
payments = read_csv("../data/2_clean/payments.csv")
products = read_csv("../data/2_clean/products.csv")
reviews = read_csv("../data/2_clean/reviews.csv")

____
## 2. Featuring Data

### 2.1 Chỉ số chính của khách hàng và đơn hàng

```cmd
Tính Customer360 metrics
    
1. Vòng đời khách hàng: first_purchase, last_purchase, customer_age_days, customer_lifetime_days
    - tính trên tất cả orders
2. Đơn hàng: total_orders, delivered_orders, avg_num_of_payments
    - total_orders: tất cả orders
    - delivered_orders, avg_num_of_payments: tính trên delivered orders
3. Giao hàng: num_of_late_delivery, num_of_on_time_delivery, late_rate, avg_delivery_days
    - chỉ delivered orders
4. Địa lý: customer_city, customer_state
    - lấy first
5. RFM: recency_days, monetary
    - chỉ delivered orders
```

In [57]:
def calculate_metrics_customer_order(df, last_date):
    # Tách df delivered
    df_delivered = df.filter(col("order_status") == "delivered")

    # Vòng đời & total_orders (tất cả orders)
    df_all = df.groupBy("customer_unique_id").agg(
        min(col("order_purchase_timestamp")).alias("first_purchase"),
        max(col("order_purchase_timestamp")).alias("last_purchase"),
        count_distinct("order_id").alias("total_orders"),
        first("customer_city").alias("customer_city"),
        first("customer_state").alias("customer_state")
    )

    # Metrics delivered: delivery + monetary + avg_num_of_payments
    df_delivered_agg = df_delivered.groupBy("customer_unique_id").agg(
        sum(when(col("order_estimated_delivery_date") < col("order_delivered_customer_date"), 1).otherwise(0)).alias("num_of_late_delivery"),
        sum(when(col("order_estimated_delivery_date") >= col("order_delivered_customer_date"), 1).otherwise(0)).alias("num_of_on_time_delivery"),
        round(avg(date_diff(col("order_delivered_customer_date"), col("order_purchase_timestamp"))), 2).alias("avg_delivery_days"),
        round(sum("total_value"), 2).alias("monetary"),
        avg("num_of_payments").alias("avg_num_of_payments"),
        count_distinct("order_id").alias("delivered_orders")
    )

    # Join vòng đời + delivered metrics
    df = df_all.join(df_delivered_agg, on="customer_unique_id", how="left")

    # Tính các cột bổ trợ
    df = df.withColumn("customer_age_days", date_diff(lit(last_date), col("first_purchase"))) \
           .withColumn("customer_lifetime_days", date_diff(col("last_purchase"), col("first_purchase")) + 1) \
           .withColumn("late_rate", round(col("num_of_late_delivery") / 
                                          (col("num_of_late_delivery") + col("num_of_on_time_delivery")) * 100, 2)) \
           .withColumn("recency_days", date_diff(lit(last_date), col("last_purchase"))) \
           .withColumn("avg_order_value", round(col("monetary") / (col("delivered_orders")), 2)) \
           .withColumn("orders_per_month", when(col("customer_age_days") > 30, round(col("total_orders") / ((col("customer_age_days") / 30)), 2)).otherwise(1))
           

    return df

### 2.2 Chỉ số khách hàng và phương Thức Thanh Toán (Customer-Payment)

- Các phương thức thanh toán người dùng sử dùng (preferred_payment_type)
- Tổng tiền khách đã chi (total_value)
- Số lần góp tiền trung bình của khách (num_of_payments)

In [58]:
def calculate_metrics_payment(df):
    '''
    Payment trên mỗi order: 
        - total_value          : Tổng tiền khách trả cho 1 đơn
        - preferred_payment_type : Loại payment hay dùng nhất trong order
    '''
    df = df.groupBy("order_id").agg(
        round(sum("payment_value"), 2).alias("total_value"),
        count("order_id").alias("num_of_payments"),
    )
    return df

In [59]:
def calculate_preferred_payment_type(customers, orders, payments):
    df = customers.join(orders, on="customer_id", how="left") \
                  .join(payments, on="order_id", how="left")
    return df.groupBy("customer_unique_id").agg(concat_ws("/", collect_set("payment_type")).alias("preferred_payment_type"))

### 2.3 Chỉ số khách hàng và đánh giá đơn hàng (Customer-Review)

- Tính điểm đánh giá trung bình của khách (avg_review_score)

In [60]:
def calculate_avg_review_score(customers, orders, reviews):
    df = customers.join(orders, on="customer_id", how="left") \
                  .join(reviews, on="order_id", how="left")
    return df.groupBy("customer_unique_id").agg(round(avg(col("review_score")),2).alias("avg_review_score"))

### 2.4 Chỉ số khách hàng và sản phẩm (Customer-Product)

- Top 3 loại sản phẩm được mua theo từng người dùng (top_3_categories)

In [61]:
def calculate_top_3_categories(customers, orders, order_items, products):
    df = customers.join(orders, on="customer_id", how="left") \
                  .join(order_items, on="order_id", how="left") \
                  .join(products, on="product_id", how="left") 
    return df.groupBy("customer_unique_id").agg(concat_ws("/", collect_set("product_category_name")).alias("top_3_categories"))

### 2.5 Tính điểm cho mô hình RFM 

In [62]:
def calculate_RFM_scores(df):
    # Recency
    windowR = Window.orderBy(col("recency_days").asc())
    df = df.withColumn("recency_percentile", percent_rank().over(windowR))
    df = df.withColumn("r_score", 
                       when(col("recency_percentile") <= 0.2, 1)
                       .when(col("recency_percentile") <= 0.4, 2)
                       .when(col("recency_percentile") <= 0.6, 3)
                       .when(col("recency_percentile") <= 0.8, 4)
                       .otherwise(5))

    # Frequency
    windowF = Window.orderBy(col("delivered_orders").desc())
    df = df.withColumn("frequency_percentile", percent_rank().over(windowF))
    df = df.withColumn("f_score", 
                       when(col("frequency_percentile") <= 0.2, 1)
                       .when(col("frequency_percentile") <= 0.4, 2)
                       .when(col("frequency_percentile") <= 0.6, 3)
                       .when(col("frequency_percentile") <= 0.8, 4)
                       .otherwise(5))

    # Monetary
    windowM = Window.orderBy(col("monetary").desc())
    df = df.withColumn("monetary_percentile", percent_rank().over(windowM))
    df = df.withColumn("m_score", 
                       when(col("monetary_percentile") <= 0.2, 1)
                       .when(col("monetary_percentile") <= 0.4, 2)
                       .when(col("monetary_percentile") <= 0.6, 3)
                       .when(col("monetary_percentile") <= 0.8, 4)
                       .otherwise(5))

    df = df.withColumn("rfm_score", concat(col("r_score"), col("f_score"), col("m_score")))
    return df

### 2.6 Tổng hợp và xử lý dữ liệu

#### Xử lý giá trị null

In [63]:
def fill_null_values(df):
    return df.fillna({
        "delivered_orders": 0,
        "monetary": 0,
        "avg_order_value": 0,
        "avg_num_of_payments": 0,
        "num_of_on_time_delivery": 0,
        "num_of_late_delivery": 0,
        "late_rate": 0,
        "avg_delivery_days": 0,
    })

#### Sắp xếp cột

In [64]:
def select_customer360(df):
    return df.select(
        "customer_unique_id",
        "first_purchase", "last_purchase", "customer_age_days", "customer_lifetime_days", "recency_days",
        "total_orders", "delivered_orders", "monetary", "avg_order_value", "orders_per_month",
        "avg_num_of_payments", "preferred_payment_type", "avg_review_score", "top_3_categories",
        "r_score", "f_score", "m_score", "rfm_score",
        "num_of_on_time_delivery", "num_of_late_delivery", "late_rate", "avg_delivery_days",
        "customer_city", "customer_state"
    )

#### Ghi dữ liệu

In [65]:
def write_to_disk(df):
    pandas_df = df.toPandas()
    pandas_df.to_csv("../data/3_model/featured_data.csv", index=False)

#### Main

In [66]:
def generate_customer360_df(customers, orders, order_items, payments, products, reviews, last_date):
    preferred_payment_type_df = calculate_preferred_payment_type(customers, orders, payments)
    avg_review_score_df = calculate_avg_review_score(customers, orders, reviews)
    top_3_categories_df = calculate_top_3_categories(customers, orders, order_items, products)

    enriched_payments = calculate_metrics_payment(payments)
    orders = orders.join(enriched_payments, on = "order_id", how="left")

    df = customers.join(orders, on = "customer_id", how="left")
    df = calculate_metrics_customer_order(df, last_date)
    df = df.join(preferred_payment_type_df, on = "customer_unique_id", how="left")
    df = df.join(avg_review_score_df, on = "customer_unique_id", how="left")
    df = df.join(top_3_categories_df, on = "customer_unique_id", how="left")

    df = calculate_RFM_scores(df)
    
    df = fill_null_values(df)
    df = select_customer360(df)

    return df


_____
## 3. Kết quả

In [67]:
if __name__ == "__main__":
    last_date = orders.agg(max("order_purchase_timestamp")).collect()[0][0]
    customer360_df = generate_customer360_df(customers, orders, order_items, payments, products, reviews, last_date)
    write_to_disk(customer360_df)
    customer360_df.show()
    # print(customer360_df.count())

+--------------------+-------------------+-------------------+-----------------+----------------------+------------+------------+----------------+--------+---------------+----------------+-------------------+----------------------+----------------+--------------------+-------+-------+-------+---------+-----------------------+--------------------+---------+-----------------+------------------+--------------+
|  customer_unique_id|     first_purchase|      last_purchase|customer_age_days|customer_lifetime_days|recency_days|total_orders|delivered_orders|monetary|avg_order_value|orders_per_month|avg_num_of_payments|preferred_payment_type|avg_review_score|    top_3_categories|r_score|f_score|m_score|rfm_score|num_of_on_time_delivery|num_of_late_delivery|late_rate|avg_delivery_days|     customer_city|customer_state|
+--------------------+-------------------+-------------------+-----------------+----------------------+------------+------------+----------------+--------+---------------+-------