In [0]:
import os
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
offers = spark.table("default.offers")
profiles = spark.table("default.profile")
transactions = spark.table("default.transactions")

In [0]:
print("Data loaded successfully:")
print(f"Profiles: {profiles.count()} records")
print(f"Offers: {offers.count()} records") 
print(f"Transactions: {transactions.count()} records")

Data loaded successfully:
Profiles: 17000 records
Offers: 10 records
Transactions: 306534 records


In [0]:
profiles_processed = profiles.withColumn(
    "registered_on", 
    to_date(col("registered_on").cast("string"), "yyyyMMdd")
).withColumn(
    "today", 
    lit("2018-07-27").cast("date")
).withColumn(
    "days_since_register",
    greatest(
        datediff(col("today"), col("registered_on")),
        lit(0)
    ).cast("long")
).withColumn(
    "months_since_register",
    col("days_since_register") / 30.0
)

profiles_processed = profiles_processed.withColumn(
    "is_new_customer", 
    (col("months_since_register") <= 8).cast("boolean")
).withColumn(
    "is_continuous_customer",
    ((col("months_since_register") > 8) & (col("months_since_register") <= 12)).cast("boolean")
).withColumn(
    "is_tenured_customer",
    ((col("months_since_register") > 12) & (col("months_since_register") <= 26)).cast("boolean")
).withColumn(
    "is_high_tenured_customer",
    ((col("months_since_register") > 26) & (col("months_since_register") <= 46)).cast("boolean")
).withColumn(
    "is_extreme_tenured_customer",
    ((col("months_since_register") > 46) & (col("months_since_register") <= 57)).cast("boolean")
)

profiles_processed = profiles_processed.drop("today")

print("Profile processing completed")
profiles_processed.show(5)

Profile processing completed
+---+-----------------+------+--------------------+-------------+-------------------+---------------------+---------------+----------------------+-------------------+------------------------+---------------------------+
|age|credit_card_limit|gender|                  id|registered_on|days_since_register|months_since_register|is_new_customer|is_continuous_customer|is_tenured_customer|is_high_tenured_customer|is_extreme_tenured_customer|
+---+-----------------+------+--------------------+-------------+-------------------+---------------------+---------------+----------------------+-------------------+------------------------+---------------------------+
|118|             NULL|  NULL|68be06ca386d4c319...|   2017-02-12|                530|   17.666666666666668|          false|                 false|               true|                   false|                      false|
| 55|         112000.0|     F|0610b486422d4921a...|   2017-07-15|                377|   12.

In [0]:
offers_processed = offers.withColumn("category", monotonically_increasing_id())

channel_types = ["email", "mobile", "social", "web"]

for channel in channel_types:
    offers_processed = offers_processed.withColumn(
        f"has_{channel}",
        array_contains(col("channels"), channel)
    )

print("Offer processing completed")
offers_processed.show(5)


Offer processing completed
+--------------------+--------------+--------+--------------------+---------+-------------+--------+---------+----------+----------+-------+
|            channels|discount_value|duration|                  id|min_value|   offer_type|category|has_email|has_mobile|has_social|has_web|
+--------------------+--------------+--------+--------------------+---------+-------------+--------+---------+----------+----------+-------+
|[email, mobile, s...|            10|     7.0|ae264e3637204a6fb...|       10|         bogo|       0|     true|      true|      true|  false|
|[web, email, mobi...|            10|     5.0|4d5c57ea9a6940dd8...|       10|         bogo|       1|     true|      true|      true|   true|
|[web, email, mobile]|             0|     4.0|3f207df678b143eea...|        0|informational|       2|     true|      true|     false|   true|
|[web, email, mobile]|             5|     7.0|9b98b8c7a33c4b65b...|        5|         bogo|       3|     true|      true|     f

In [0]:
from pyspark.sql.functions import get_json_object, col

transactions_processed = transactions.withColumn(
    "offer_id",
    coalesce(
        get_json_object(col("value"), "$.offer_id"),
        get_json_object(col("value"), "$.offer id")
    )
).withColumn(
    "amount",
    get_json_object(col("value"), "$.amount")
).drop("value")

print("Transaction processing completed")
transactions_processed.show(5)

Transaction processing completed
+--------------------+--------------+---------------------+--------------------+------+
|          account_id|         event|time_since_test_start|            offer_id|amount|
+--------------------+--------------+---------------------+--------------------+------+
|78afa995795e4d85b...|offer received|                  0.0|9b98b8c7a33c4b65b...|  NULL|
|a03223e636434f42a...|offer received|                  0.0|0b1e1539f2cc45b7b...|  NULL|
|e2127556f4f64592b...|offer received|                  0.0|2906b810c7d441179...|  NULL|
|8ec6ce2a7e7949b1b...|offer received|                  0.0|fafdcd668e3743c1b...|  NULL|
|68617ca6246f4fbc8...|offer received|                  0.0|4d5c57ea9a6940dd8...|  NULL|
+--------------------+--------------+---------------------+--------------------+------+
only showing top 5 rows


In [0]:
offers_processed_renamed = offers_processed.withColumnRenamed("id", "offer_id")

data = transactions_processed.alias("t").join(
    profiles_processed.alias("p").select([col(c).alias(f"profile_{c}") for c in profiles_processed.columns]),
    col("t.account_id") == col("profile_id"),
    "left"
).join(
    offers_processed_renamed.alias("o").select([col(c).alias(f"offer_{c}") for c in offers_processed_renamed.columns]),
    col("t.offer_id") == col("offer_offer_id"),
    "left"
)

print("Data merging completed")
print(f"Merged dataset: {data.count()} records")

Data merging completed
Merged dataset: 306534 records


In [0]:
offers_received = data.filter(col("event") == "offer received")
offers_completed = transactions_processed.filter(col("event") == "offer completed").withColumnRenamed("time_since_test_start", "completion_time")

merged_offers = offers_received.alias("r").join(
    offers_completed.alias("c").select("account_id", "offer_id", "completion_time"),
    ["account_id", "offer_id"],
    "left"
).withColumn(
    "label",
    when(
        (col("completion_time").isNotNull()) &
        (col("completion_time") >= col("time_since_test_start")) &
        (col("completion_time") <= (col("time_since_test_start") + col("offer_duration"))),
        1
    ).otherwise(0)
)

# Group by to get max label per offer instance
offers_received_labeled = merged_offers.groupBy(
    "account_id", "offer_id", "time_since_test_start", "offer_duration"
).agg(
    max("label").alias("label")
)

print("Offer labeling completed")
offers_received_labeled.show(5)

Offer labeling completed
+--------------------+--------------------+---------------------+--------------+-----+
|          account_id|            offer_id|time_since_test_start|offer_duration|label|
+--------------------+--------------------+---------------------+--------------+-----+
|7eb85a133c7e4a7d9...|2298d6c36e964ae4a...|                  0.0|           7.0|    1|
|8d49501a3a90477da...|f19421c1d4aa40978...|                  0.0|           5.0|    1|
|00d791e20c564add8...|2298d6c36e964ae4a...|                  0.0|           7.0|    1|
|4de18cbefc364c788...|2298d6c36e964ae4a...|                  0.0|           7.0|    0|
|166a75c9081b47c29...|4d5c57ea9a6940dd8...|                  0.0|           5.0|    1|
+--------------------+--------------------+---------------------+--------------+-----+
only showing top 5 rows


In [0]:
tx = transactions_processed.filter(col("event") == "transaction")

daily_tx = tx.groupBy("account_id", "time_since_test_start").agg(
    sum("amount").alias("daily_spent"),
    count("amount").alias("daily_count")
)

# Calculate cumulative features using window functions
window_spec = Window.partitionBy("account_id").orderBy("time_since_test_start")

daily_tx_with_cum = daily_tx.withColumn(
    "cum_spent",
    sum("daily_spent").over(window_spec)
).withColumn(
    "cum_count",
    sum("daily_count").over(window_spec)
)


In [0]:
offers_received_w_daily = offers_received.join(
    daily_tx_with_cum,
    ["account_id", "time_since_test_start"],
    "left"
).fillna(0, ["daily_spent", "daily_count", "cum_spent", "cum_count"]).withColumn(
    "hist_spent",
    col("cum_spent") - col("daily_spent")
).withColumn(
    "hist_count", 
    col("cum_count") - col("daily_count")
)

print("Historical transaction features completed")
offers_received_w_daily.show(5)

Historical transaction features completed
+--------------------+---------------------+--------------+--------------------+------+-----------+-------------------------+--------------+--------------------+---------------------+---------------------------+-----------------------------+-----------------------+------------------------------+---------------------------+--------------------------------+-----------------------------------+--------------------+--------------------+--------------+--------------------+---------------+----------------+--------------+---------------+----------------+----------------+-------------+-----------+-----------+---------+---------+----------+----------+
|          account_id|time_since_test_start|         event|            offer_id|amount|profile_age|profile_credit_card_limit|profile_gender|          profile_id|profile_registered_on|profile_days_since_register|profile_months_since_register|profile_is_new_customer|profile_is_continuous_customer|profile_is_t

In [0]:
window_30d = Window.partitionBy("account_id").orderBy("time_since_test_start").rowsBetween(-30, -1)

daily_tx_with_rolling = daily_tx.withColumn(
    "rolling_spent_30d",
    coalesce(sum("daily_spent").over(window_30d), lit(0))
).withColumn(
    "rolling_count_30d",
    coalesce(sum("daily_count").over(window_30d), lit(0))
)

print("Rolling window features completed")
daily_tx_with_rolling.show(5)

Rolling window features completed
+--------------------+---------------------+-----------+-----------+------------------+-----------------+
|          account_id|time_since_test_start|daily_spent|daily_count| rolling_spent_30d|rolling_count_30d|
+--------------------+---------------------+-----------+-----------+------------------+-----------------+
|0020c2b971eb4e918...|                 2.25|      17.63|          1|               0.0|                0|
|0020c2b971eb4e918...|                 2.75|       32.0|          1|             17.63|                1|
|0020c2b971eb4e918...|                  3.0|      24.39|          1|49.629999999999995|                2|
|0020c2b971eb4e918...|                  6.0|      24.31|          1|             74.02|                3|
|0020c2b971eb4e918...|                21.25|      17.24|          1|             98.33|                4|
+--------------------+---------------------+-----------+-----------+------------------+-----------------+
only showing

In [0]:
offers_received_all = data.filter(col("event") == "offer received").withColumn("offer_count", lit(1))

# Calculate cumulative offers received
window_offers = Window.partitionBy("account_id").orderBy("time_since_test_start")
offers_received_all = offers_received_all.withColumn(
    "cum_offers_received",
    sum("offer_count").over(window_offers)
)

# Join with completion data
offers_completed_all = data.filter(col("event") == "offer completed").select(
    "account_id", "offer_id", col("time_since_test_start").alias("completion_time")
)

offers_with_completion = offers_received_all.join(
    offers_completed_all,
    ["account_id", "offer_id"],
    "left"
).withColumn(
    "completed_before",
    when(
        col("completion_time").isNotNull() &
        (col("completion_time") < col("time_since_test_start")),
        1
    ).otherwise(0)
)

In [0]:
offers_with_completion = offers_with_completion.withColumn(
    "cum_completed_before",
    sum("completed_before").over(window_offers)
).withColumn(
    "hist_offer_completion_rate",
    when(
        col("cum_offers_received") > 1,
        (col("cum_completed_before") - col("completed_before")) / (col("cum_offers_received") - 1)
    ).otherwise(0)
)

hist_completion = offers_with_completion.select(
    "account_id", "offer_id", "time_since_test_start", "hist_offer_completion_rate"
)

print("Historical offer completion rate completed")
hist_completion.show(5)

Historical offer completion rate completed
+--------------------+--------------------+---------------------+--------------------------+
|          account_id|            offer_id|time_since_test_start|hist_offer_completion_rate|
+--------------------+--------------------+---------------------+--------------------------+
|004b041fbfe448599...|3f207df678b143eea...|                  7.0|                       0.0|
|004b041fbfe448599...|f19421c1d4aa40978...|                 21.0|                       0.0|
|004b041fbfe448599...|fafdcd668e3743c1b...|                 24.0|                       0.0|
|00840a2ca5d2408e9...|2906b810c7d441179...|                  0.0|                       0.0|
|00840a2ca5d2408e9...|fafdcd668e3743c1b...|                  7.0|                       0.0|
+--------------------+--------------------+---------------------+--------------------------+
only showing top 5 rows


In [0]:
dataset = offers_received_labeled.join(
    offers_received_w_daily.select("account_id", "offer_id", "time_since_test_start", "hist_spent", "hist_count"),
    ["account_id", "offer_id", "time_since_test_start"],
    "left"
)

# Add offer features
dataset = dataset.join(
    offers_processed.select(
        "id", "duration", "min_value", "offer_type", "discount_value", 
        "has_email", "has_mobile", "has_social", "has_web"
    ),
    col("offer_id") == col("id"),
    "left"
).drop("id")

# Add profile features  
dataset = dataset.join(
    profiles_processed.select(
        "id", "age", "gender", "months_since_register", "is_new_customer",
        "is_continuous_customer", "is_tenured_customer", "is_high_tenured_customer",
        "is_extreme_tenured_customer", "credit_card_limit"
    ),
    col("account_id") == col("id"),
    "left"
).drop("id")

# Add rolling features
dataset = dataset.join(
    daily_tx_with_rolling.select("account_id", "time_since_test_start", "rolling_spent_30d", "rolling_count_30d"),
    ["account_id", "time_since_test_start"],
    "left"
).fillna(0, ["rolling_spent_30d", "rolling_count_30d"])

# Add historical completion rate
dataset = dataset.join(
    hist_completion,
    ["account_id", "offer_id", "time_since_test_start"],
    "left"
).fillna(0, ["hist_offer_completion_rate"])

print("Final dataset assembly completed")
print(f"Final dataset shape: {dataset.count()} rows, {len(dataset.columns)} columns")
dataset.show(5)

Final dataset assembly completed
Final dataset shape: 86432 rows, 27 columns
+--------------------+--------------------+---------------------+--------------+-----+----------+----------+--------+---------+----------+--------------+---------+----------+----------+-------+---+------+---------------------+---------------+----------------------+-------------------+------------------------+---------------------------+-----------------+-----------------+-----------------+--------------------------+
|          account_id|            offer_id|time_since_test_start|offer_duration|label|hist_spent|hist_count|duration|min_value|offer_type|discount_value|has_email|has_mobile|has_social|has_web|age|gender|months_since_register|is_new_customer|is_continuous_customer|is_tenured_customer|is_high_tenured_customer|is_extreme_tenured_customer|credit_card_limit|rolling_spent_30d|rolling_count_30d|hist_offer_completion_rate|
+--------------------+--------------------+---------------------+--------------+---

In [0]:
print("Dataset Schema:")
dataset.printSchema()

print("\nDataset Summary:")
dataset.describe().show()

print("\nNull Value Counts:")
null_counts = dataset.select([count(when(col(c).isNull(), c)).alias(c) for c in dataset.columns])
null_counts.show()

Dataset Schema:
root
 |-- account_id: string (nullable = true)
 |-- offer_id: string (nullable = true)
 |-- time_since_test_start: double (nullable = true)
 |-- offer_duration: double (nullable = true)
 |-- label: integer (nullable = true)
 |-- hist_spent: double (nullable = true)
 |-- hist_count: long (nullable = true)
 |-- duration: double (nullable = true)
 |-- min_value: long (nullable = true)
 |-- offer_type: string (nullable = true)
 |-- discount_value: long (nullable = true)
 |-- has_email: boolean (nullable = true)
 |-- has_mobile: boolean (nullable = true)
 |-- has_social: boolean (nullable = true)
 |-- has_web: boolean (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- months_since_register: double (nullable = true)
 |-- is_new_customer: boolean (nullable = true)
 |-- is_continuous_customer: boolean (nullable = true)
 |-- is_tenured_customer: boolean (nullable = true)
 |-- is_high_tenured_customer: boolean (nullable = true)
 |-- is_e

In [0]:
print("\nFinal dataset sample:")
dataset.show(10, truncate=False)


Final dataset sample:
+--------------------------------+--------------------------------+---------------------+--------------+-----+----------+----------+--------+---------+----------+--------------+---------+----------+----------+-------+---+------+---------------------+---------------+----------------------+-------------------+------------------------+---------------------------+-----------------+-----------------+-----------------+--------------------------+
|account_id                      |offer_id                        |time_since_test_start|offer_duration|label|hist_spent|hist_count|duration|min_value|offer_type|discount_value|has_email|has_mobile|has_social|has_web|age|gender|months_since_register|is_new_customer|is_continuous_customer|is_tenured_customer|is_high_tenured_customer|is_extreme_tenured_customer|credit_card_limit|rolling_spent_30d|rolling_count_30d|hist_offer_completion_rate|
+--------------------------------+--------------------------------+---------------------+

In [0]:
dataset.write.mode("overwrite").saveAsTable("dataset")