## Building features to prepare the Customer Master Table

In [1]:
import os
import numpy as np
import pandas as pd
from pathlib import Path

### Checking Overlaps between different tables before joining.

In [2]:
# product_translations will join with olist_product and geolocation will join with seller data.

product_translations = pd.read_csv("../../Data/raw/product_category_name_translation.csv")
products = pd.read_csv("../../Data/raw/olist_products_dataset.csv")

geolocations = pd.read_csv("../../Data/raw/olist_geolocation_dataset.csv")
sellers = pd.read_csv("../../Data/raw/olist_sellers_dataset.csv")

#Overlap between product translations and products
prod_category_match_prop = products['product_category_name'].isin(product_translations['product_category_name']).mean()
print(f"Product Category Name coverage: {prod_category_match_prop:.2f}")

#Overlap between sellers and geolocations
geo_match_prop = sellers["seller_zip_code_prefix"].isin(geolocations["geolocation_zip_code_prefix"]).mean()
print(f"Goelocations Zip Code coverage: {geo_match_prop}")

Product Category Name coverage: 0.98
Goelocations Zip Code coverage: 0.9977382875605816


In [3]:
# olist_products and olist_sellers will join with olist_order_items dataset.

order_items = pd.read_csv("../../Data/raw/olist_order_items_dataset.csv")

# Overlap between products and orders
prod_match_prop = order_items['product_id'].isin(products['product_id']).mean()

# Overlap between ssellers and orders
seller_match_prop = order_items['seller_id'].isin(sellers['seller_id']).mean()

print(f"Product ID coverage: {prod_match_prop:.2f}")
print(f"Seller ID coverage:  {seller_match_prop:.2f}")

Product ID coverage: 1.00
Seller ID coverage:  1.00


In [4]:
# reviews, payments and order items will join on orders data.

reviews = pd.read_csv("../../Data/raw/olist_order_reviews_dataset.csv", parse_dates=["review_creation_date"])
payments = pd.read_csv("../../Data/raw/olist_order_payments_dataset.csv")
orders = pd.read_csv("../../Data/raw/olist_orders_dataset.csv", parse_dates=["order_purchase_timestamp"])

rev_match_prop = orders["order_id"].isin(reviews["order_id"]).mean()
payment_match_prop = orders["order_id"].isin(payments["order_id"]).mean()

print(f"Review ID coverage: {rev_match_prop:.2f}")
print(f"Payment ID coverage:  {payment_match_prop:.2f}")

Review ID coverage: 0.99
Payment ID coverage:  1.00


In [5]:
# orders data will join on customers data.

customers = pd.read_csv("../../Data/raw/olist_customers_dataset.csv")

order_match_prop = customers["customer_id"].isin(orders["customer_id"]).mean()

print(f"Customer ID coverage: {order_match_prop:.2f}")

Customer ID coverage: 1.00


### Key Observations:

- 98% of the product categories have an english translation.
- 99% of the orders have an entry in the reviews dataset: which is natural as not all the orders are expected to be having a review.
- 99.7% of the sellers geolocations are not present in the geolocations dataset.
- Other than the above tables, rest of the tables have a perfect match for order_id and customer_id.

### Joining the tables to prepare the master table.

1. Preparing the order level details- this will act as the intermediate output.
2. This output will then be joined with customer dataset and aggregated from order_id to customer_id

In [6]:
def safe_mode(series):
    m = series.mode(dropna=True)
    return m.iloc[0] if not m.empty else None

In [7]:
geo_agg_df = (
    geolocations
    .groupby("geolocation_zip_code_prefix", as_index=False)
    .agg(
        geolocation_lat=("geolocation_lat", "mean"),
        geolocation_lng=("geolocation_lng", "mean"),
        geolocation_city=("geolocation_city", "first"),
        geolocation_state=("geolocation_state", "first")
    )
)

products_enriched_df = (
    products
    .merge(
        product_translations,
        on="product_category_name",
        how="left"
    )
)

sellers_enriched_df = (
    sellers
    .merge(
        geo_agg_df,
        left_on="seller_zip_code_prefix",
        right_on="geolocation_zip_code_prefix",
        how="left"
    )
    .drop(columns=["geolocation_zip_code_prefix"])
)

In [8]:
order_items_products_df = (
    order_items
    .merge(
        products_enriched_df,
        on="product_id",
        how="left"
    )
)

order_items_super_df = (
    order_items_products_df
    .merge(
        sellers_enriched_df,
        on="seller_id",
        how="left"
    )
)

order_items_with_ts_df = (
    order_items_super_df
    .merge(
        orders[["order_id", "order_purchase_timestamp"]],
        on="order_id",
        how="left"
    )
)

# calculating the total order value.
order_items_with_ts_df["total_value"] = order_items_with_ts_df["price"] + order_items_with_ts_df["freight_value"]

order_items_with_ts_df["shipping_limit_date"] = pd.to_datetime(order_items_with_ts_df["shipping_limit_date"])

In [9]:
# aggregating order items dataset at order_id, product_id and seller_id level - as that is the granularity of this dataset.

order_product_seller_agg_df = (
    order_items_with_ts_df
    .groupby(
        ["order_id", "product_id", "seller_id"],
        as_index=False
    )
    .agg(
        # timestamps
        order_purchase_timestamp=("order_purchase_timestamp", "first"),
        last_shipping_limit_date=("shipping_limit_date", "max"),

        # item count
        order_item_cnt=("order_item_id", "nunique"),

        # monetary
        price=("price", "sum"),
        freight_value=("freight_value", "sum"),
        total_value=("total_value", "sum"),

        # product attributes
        product_category_name=("product_category_name", "first"),
        product_name_lenght=("product_name_lenght", "first"),
        product_description_lenght=("product_description_lenght", "first"),
        product_photos_qty=("product_photos_qty", "first"),
        product_weight_g=("product_weight_g", "first"),
        product_length_cm=("product_length_cm", "first"),
        product_height_cm=("product_height_cm", "first"),
        product_width_cm=("product_width_cm", "first"),
        product_category_name_english=("product_category_name_english", "first"),

        # seller attributes
        seller_zip_code_prefix=("seller_zip_code_prefix", "first"),
        seller_city=("seller_city", "first"),
        seller_state=("seller_state", "first"),
        geolocation_lat=("geolocation_lat", "first"),
        geolocation_lng=("geolocation_lng", "first"),
        geolocation_city=("geolocation_city", "first"),
        geolocation_state=("geolocation_state", "first"),
    )
)

In [10]:
# calculating the feature days since last shipped

order_product_seller_agg_df["days_since_last_shipped"] = (
    order_product_seller_agg_df["order_purchase_timestamp"]
    - order_product_seller_agg_df["last_shipping_limit_date"]
).dt.days

In [11]:
# sanity checks

# Uniqueness at target grain
assert order_product_seller_agg_df.duplicated(
    ["order_id", "product_id", "seller_id"]
).sum() == 0

# No negative monetary values
assert (order_product_seller_agg_df["total_value"] >= 0).all()

In [12]:
# aggregating at order_id level

order_level_df = (
    order_product_seller_agg_df
    .groupby("order_id", as_index=False)
    .agg(
        # counts
        num_products=("product_id", "nunique"),
        num_sellers=("seller_id", "nunique"),

        # timestamps
        order_purchase_timestamp=("order_purchase_timestamp", "first"),
        last_shipping_limit_date=("last_shipping_limit_date", "max"),

        # order size
        avg_order_size=("order_item_cnt", "mean"),

        # price
        tot_order_price=("price", "sum"),
        avg_order_price=("price", "mean"),

        # freight
        tot_order_freight_value=("freight_value", "sum"),
        avg_order_freight_value=("freight_value", "mean"),

        # total
        tot_order_value=("total_value", "sum"),
        avg_order_value=("total_value", "mean"),

        # product preferences
        pref_prod_category=("product_category_name", safe_mode),
        avg_prod_length=("product_name_lenght", "mean"),
        avg_prod_desc_length=("product_description_lenght", "mean"),
        avg_prod_photo_qty=("product_photos_qty", "mean"),
        avg_prod_weight_g=("product_weight_g", "mean"),
        avg_prod_length_cm=("product_length_cm", "mean"),
        avg_prod_height_cm=("product_height_cm", "mean"),
        avg_prod_width_cm=("product_width_cm", "mean"),
        pref_prod_category_english=("product_category_name_english", safe_mode),

        # seller preferences
        pref_seller_zip_code=("seller_zip_code_prefix", safe_mode),
        pref_seller_city=("seller_city", safe_mode),
        pref_seller_state=("seller_state", safe_mode),
        pref_seller_lat=("geolocation_lat", safe_mode),
        pref_seller_lng=("geolocation_lng", safe_mode),
        pref_seller_geolocation_city=("geolocation_city", safe_mode),
        pref_seller_geolocation_state=("geolocation_state", safe_mode),
    )
)

In [13]:
order_level_df = order_level_df.drop(columns=["days_since_last_shipped"], errors="ignore")

In [14]:
order_level_df.to_csv("../../Data/processed/intermediate_output_orders.csv", index = False)

In [18]:
# sanity checks

# One row per order
assert order_level_df["order_id"].is_unique

# No negative monetary values
assert (order_level_df["tot_order_value"] >= 0).all()

# Timestamp consistency
assert order_level_df["order_purchase_timestamp"].notna().all()

In [21]:
# define the "Fixed" structure of your pivoted features
ALL_STATUSES = ['approved', 'canceled', 'created', 'delivered', 'invoiced', 'processing', 'shipped', 'unavailable']
ALL_PAYMENTS = ['boleto', 'credit_card', 'debit_card', 'not_defined', 'voucher']
STATUS_METRICS = ['num_orders', 'tot_pymt_val', 'num_rev', 'num_products', 'avg_order_size', 'tot_order_price', 
'tot_order_freight_value', 'tot_order_value']

def aggregate_payments(df):
    """Aggregates payments and ensures all payment types exist as columns."""
    base = df.groupby("order_id", as_index=False).agg(
        tot_pymt_sqntl=("payment_sequential", "nunique"),
        avg_pymt_instllmnt=("payment_installments", "mean"),
        tot_pymt_val=("payment_value", "sum"),
    )

    pivot = df.pivot_table(
        index="order_id", 
        columns="payment_type", 
        values="payment_value", 
        aggfunc="sum", 
        fill_value=0
    )
    
    # Reindex to force all payment columns to exist
    pivot = pivot.reindex(columns=ALL_PAYMENTS, fill_value=0)
    pivot.columns = [f"pymt_{c}" for c in pivot.columns]
    
    return base.merge(pivot.reset_index(), on="order_id", how="left")

def aggregate_reviews(df, cutoff):
    f = df[df["review_creation_date"] <= cutoff]
    if f.empty:
        return pd.DataFrame(columns=["order_id", "num_rev", "avg_rev_score", 
                                   "avg_rev_title_length", "avg_rev_length", 
                                   "days_since_lst_rev_creation"])
    
    g = f.groupby("order_id").agg(
        num_rev=("review_id", "count"),
        avg_rev_score=("review_score", "mean"),
        avg_rev_title_length=("review_title_len", "mean"),
        avg_rev_length=("review_msg_len", "mean"),
        last_rev_date=("review_creation_date", "max"),
    ).reset_index()

    g["days_since_lst_rev_creation"] = (cutoff - g["last_rev_date"]).dt.days
    return g.drop(columns="last_rev_date")

payments = pd.read_csv("../../Data/raw/olist_order_payments_dataset.csv")
reviews = pd.read_csv("../../Data/raw/olist_order_reviews_dataset.csv", parse_dates=["review_creation_date"])
orders_df = pd.read_csv("../../Data/raw/olist_orders_dataset.csv", parse_dates=["order_purchase_timestamp", "order_approved_at", 
"order_delivered_carrier_date","order_delivered_customer_date", "order_estimated_delivery_date"])

# Pre-process Reviews
reviews = reviews.merge(orders_df[["order_id", "order_purchase_timestamp"]], on="order_id", how="left")
reviews = reviews.sort_values("order_purchase_timestamp").drop_duplicates("review_id", keep="first")
reviews["review_title_len"] = reviews["review_comment_title"].fillna("").str.len()
reviews["review_msg_len"] = reviews["review_comment_message"].fillna("").str.len()

payments_agg_df = aggregate_payments(payments)

START_YEAR, START_MONTH = 2016, 9
END_YEAR, END_MONTH = 2018, 10
num_months = ((END_YEAR - START_YEAR) * 12 + (END_MONTH - START_MONTH) + 1)
output_root = Path("../../Data/processed/customer_snapshots")
output_root.mkdir(exist_ok=True)

for run_seq in range(1, num_months + 1):
    year = START_YEAR + (START_MONTH - 1 + run_seq - 1) // 12
    month = (START_MONTH - 1 + run_seq - 1) % 12 + 1
    cutoff = pd.Timestamp(year, month, 1) + pd.offsets.MonthEnd(0)
    next_month = cutoff + pd.offsets.Day(1)

    orders_f = orders_df[orders_df["order_purchase_timestamp"] <= cutoff].copy()
    if orders_f.empty: continue

    reviews_agg = aggregate_reviews(reviews, cutoff)

    # Master Join at Order Level
    df = (
        orders_f
        .merge(payments_agg_df, on="order_id", how="left")
        .merge(reviews_agg, on="order_id", how="left")
        .merge(order_level_df, on="order_id", how="left")
    )

    # Fix Timestamps
    df = df.drop(columns=["order_purchase_timestamp_y"], errors="ignore")
    df = df.rename(columns={"order_purchase_timestamp_x": "order_purchase_timestamp"})

    # Time Features
    df["days_since_last_shipped"] = (next_month - df["last_shipping_limit_date"]).dt.days
    df["days_since_lst_order_purchased"] = (next_month - df["order_purchase_timestamp"]).dt.days
    df["days_since_lst_order_approved"] = (next_month - df["order_approved_at"]).dt.days
    df["days_since_lst_order_delivered_carrier"] = (next_month - df["order_delivered_carrier_date"]).dt.days
    df["days_since_lst_order_delivered_cust"] = (next_month - df["order_delivered_customer_date"]).dt.days
    df["days_since_lst_shipping_llimit_date"] = df["days_since_last_shipped"]

    # 1. Customer Scalar Features
    cust_scalar = df.groupby("customer_id", as_index=False).agg(
        num_orders=("order_id", "nunique"),
        tot_pymt_sqntl=("tot_pymt_sqntl", "sum"),
        avg_pymt_instllmnt=("avg_pymt_instllmnt", "mean"),
        tot_pymt_val=("tot_pymt_val", "sum"),
        **{f"tot_pymt_{p}": (f"pymt_{p}", "sum") for p in ALL_PAYMENTS},
        num_rev=("num_rev", "sum"),
        avg_rev_score=("avg_rev_score", "mean"),
        avg_rev_title_length=("avg_rev_title_length", "mean"),
        avg_rev_length=("avg_rev_length", "mean"),
        days_since_lst_rev_creation=("days_since_lst_rev_creation", "min"),
        num_products=("num_products", "sum"),
        num_sellers=("num_sellers", "sum"),
        avg_order_size=("avg_order_size", "mean"),
        tot_order_price=("tot_order_price", "sum"),
        avg_order_price=("avg_order_price", "mean"),
        tot_order_freight_value=("tot_order_freight_value", "sum"),
        avg_order_freight_value=("avg_order_freight_value", "mean"),
        tot_order_value=("tot_order_value", "sum"),
        avg_order_value=("avg_order_value", "mean"),
        days_since_last_shipped=("days_since_last_shipped", "min"),
        days_since_lst_order_purchased=("days_since_lst_order_purchased", "min"),
        days_since_lst_order_approved=("days_since_lst_order_approved", "min"),
        days_since_lst_order_delivered_carrier=("days_since_lst_order_delivered_carrier", "min"),
        days_since_lst_order_delivered_cust=("days_since_lst_order_delivered_cust", "min"),
        days_since_lst_shipping_llimit_date=("days_since_lst_shipping_llimit_date", "min"),
        pref_prod_category=("pref_prod_category", safe_mode),
        pref_prod_category_english=("pref_prod_category_english", safe_mode),
    )

    # 2. Consistent Status Pivot
    status_base = df.groupby(["customer_id", "order_status"], as_index=False).agg(
        num_orders=("order_id", "nunique"),
        tot_pymt_val=("tot_pymt_val", "sum"),
        num_rev=("num_rev", "sum"),
        num_products=("num_products", "sum"),
        avg_order_size=("avg_order_size", "mean"),
        tot_order_price=("tot_order_price", "sum"),
        tot_order_freight_value=("tot_order_freight_value", "sum"),
        tot_order_value=("tot_order_value", "sum"),
    )

    status_pivot = status_base.pivot(index="customer_id", columns="order_status")
    
    # Enforce global column template for status pivot
    full_columns = pd.MultiIndex.from_product([STATUS_METRICS, ALL_STATUSES])
    status_pivot = status_pivot.reindex(columns=full_columns, fill_value=0)
    status_pivot.columns = [f"{metric}_{status}" for metric, status in status_pivot.columns]
    status_pivot = status_pivot.reset_index()

    # 3. Join & Aggregate to Unique ID
    customer_final = cust_scalar.merge(status_pivot, on="customer_id", how="left")
    cust_joined = customers.merge(customer_final, on="customer_id", how="left")
    cust_joined = cust_joined[cust_joined["num_orders"].notna()].copy()

    # Define aggregation logic for rolling up to unique_id
    agg_dict = {
        'customer_city': safe_mode, 
        'customer_state': safe_mode,
        'customer_zip_code_prefix': 'first'
    }
    for col in cust_joined.columns:
        if col in agg_dict or col in ["customer_id", "customer_unique_id"]: continue
        if col.startswith("num_") or col.startswith("tot_"): agg_dict[col] = "sum"
        elif col.startswith("days_since_"): agg_dict[col] = "min"
        elif col.startswith("avg_"): agg_dict[col] = "mean"
        elif col.startswith("pref_"): agg_dict[col] = safe_mode

    customer_unique_final = cust_joined.groupby("customer_unique_id", as_index=False).agg(agg_dict)

    # 4. Write Output
    run_folder = output_root / f"{month:02d}-{year}"
    run_folder.mkdir(exist_ok=True)
    customer_unique_final.to_csv(run_folder / "customer_unique_snapshot.csv", index=False)
    print(f"{month:02d}-{year} written.")

print("ALL SNAPSHOTS COMPLETE")

09-2016 written.
10-2016 written.
11-2016 written.
12-2016 written.
01-2017 written.
02-2017 written.
03-2017 written.
04-2017 written.
05-2017 written.
06-2017 written.
07-2017 written.
08-2017 written.
09-2017 written.
10-2017 written.
11-2017 written.
12-2017 written.
01-2018 written.
02-2018 written.
03-2018 written.
04-2018 written.
05-2018 written.
06-2018 written.
07-2018 written.
08-2018 written.
09-2018 written.
10-2018 written.
ALL SNAPSHOTS COMPLETE


### Customer snapshots have been written. The tables are at customer_unique_id and year-month level.

Meaning that each table contains the details of customers till that point of itneraction.