In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, count, sum, avg, from_unixtime, to_timestamp, get_json_object, trim, explode, first, from_json
from pyspark.sql.types import StructType, StringType, MapType, StringType
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("CafeRewardsDataPipeline") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()


customers_file = "/Users/rafaelcamara/dev/ballastlane/archive/customers.csv"
offers_file = "/Users/rafaelcamara/dev/ballastlane/archive/offers.csv"
events_file = "/Users/rafaelcamara/dev/ballastlane/archive/events.csv"

print("Ingesting raw data...")
customers_df = spark.read.csv(customers_file, header=True, inferSchema=True)
offers_df = spark.read.csv(offers_file, header=True, inferSchema=True)
raw_events_df = spark.read.csv(events_file, header=True, inferSchema=True)


# Identify if the offer was completed
events_df = raw_events_df.withColumn(
    "offer_completed",
    when(trim(col("event")) == "offer completed", lit(1)).otherwise(lit(0))
)

# flatten the JSON structure in the "value" column
# Explode the map to key-value pairs

# Parse the JSON string into a Map
value_map_df = events_df.withColumn("value_map", from_json(col("value"), MapType(StringType(), StringType())))

exploded_df = value_map_df.select(
    "customer_id", 
    "event", 
    explode("value_map").alias("key", "val"),
    "time",
    "offer_completed"
)

pivoted_df = exploded_df.groupBy("customer_id", "event", "time","offer_completed") \
    .pivot("key", ["offer_id", "reward", "amount"]) \
    .agg(first("val"))


events_df = pivoted_df.withColumn("event_timestamp_readable", 
                                             from_unixtime(col("time")))
events_df = events_df.withColumn("event_datetime", 
                                             to_timestamp(col("event_timestamp_readable")))

# Join dataframes
# Join events with offers to get offer details for each event
events_with_offers = offers_df.join(events_df, "offer_id", "left")


customer_with_events = customers_df.join(events_df, "customer_id", "left")

# Join with customers to get customer demographics
full_df = customers_df.join(events_with_offers, "customer_id", "left")

# Analytical Questions
print("\n--- Analytical Results ---")

# 1. Total number of customers
total_customers = customers_df.count()
print(f"Total number of customers: {total_customers}")

# 2. Total number of offers
total_offers = offers_df.count()
print(f"Total number of offers: {total_offers}")

# 3. Total number of events
total_events = events_df.count()
print(f"Total number of events: {total_events}")

# 4. Number of unique customers who made a purchase
unique_purchasing_customers = full_df.filter(col("event") == "transaction") \
                                     .select("customer_id").distinct().count()
print(f"Number of unique customers who made a purchase: {unique_purchasing_customers}")

# 5. Top 5 most popular offers (by number of views or purchases)
# Assuming popularity is based on purchases within the offer period
top_5_offers = full_df.filter(col("offer_completed") == 1) \
                      .groupBy("offer_id", "offer_type") \
                      .agg(count("offer_id").alias("purchase_count")) \
                      .orderBy(col("purchase_count").desc()) \
                      .limit(5)
print("\nTop 5 most popular offers (by number of completed offers):")
top_5_offers.show(truncate=False)

# 6. Average transaction amount
# Filter for purchase events only, extract "amount" from JSON in "value"
average_transaction_amount = events_df.filter(col("event") == "transaction") \
    .agg(avg(col("amount").cast("double"))).collect()[0][0]
    #.agg(avg(get_json_object(col("value"), "$.amount").cast("double"))).collect()[0][0]
print(f"Average transaction amount: {average_transaction_amount:.2f}")

# 7. Number of offers completed vs. not completed
completion_window = Window.partitionBy("offer_completed")

offers_completion_status= events_df.select(
    "offer_completed",
    count("*").over(completion_window).alias("completion_count")
).dropDuplicates()
"""
offers_completion_status = events_df.groupBy("offer_completed") \
                                   .agg(count("offer_id").alias("count"))
"""                                   
print("\nNumber of offers completed vs. not completed:")
offers_completion_status.show()

# 8. Distribution of customers by age group
# Assuming age is available in customers_df and is numerical
# For simplicity, let\'s define some age groups
customers_df = customers_df.withColumn("age_group", 
                                     when(col("age") < 25, "<25") \
                                     .when((col("age") >= 25) & (col("age") < 35), "25-34") \
                                     .when((col("age") >= 35) & (col("age") < 45), "35-44") \
                                     .when((col("age") >= 45) & (col("age") < 55), "45-54") \
                                     .when((col("age") >= 55) & (col("age") < 65), "55-64") \
                                     .otherwise("65+"))

age_distribution = customers_df.groupBy("age_group").agg(count("customer_id").alias("customer_count")) \
                               .orderBy("age_group")
print("\nCustomer distribution by age group:")
age_distribution.show()

# 9. Offers with highest completion rate
# Calculate total offers viewed/received and completed offers per offer_id
offer_summary = full_df.groupBy("offer_id", "offer_type") \
                         .agg(count("offer_id").alias("total_events"), 
                              sum(col("offer_completed")).alias("completed_offers"))

offer_completion_rate = offer_summary.withColumn(
    "completion_rate",
    when(col("total_events") != 0, (col("completed_offers") / col("total_events")) * 100).otherwise(None)
)

highest_completion_rate_offers = offer_completion_rate.orderBy(col("completion_rate").desc()) \
                                                       .limit(5)
print("\nOffers with highest completion rate:")
highest_completion_rate_offers.show(truncate=False)

# 10. Customer lifetime value (CLV) - simplified: total transaction amount per customer
customer_clv = customer_with_events.filter(col("event") == "transaction") \
                       .groupBy("customer_id") \
                       .agg(sum(col("amount").cast("double")).alias("total_spent")) \
                       .orderBy(col("total_spent").desc())

print("\nTop 10 Customers by Lifetime Value (Total Spent):")
customer_clv.limit(10).show()

spark.stop()

Ingesting raw data...

--- Analytical Results ---
Total number of customers: 17000
Total number of offers: 10


                                                                                

Total number of events: 303572


                                                                                

Number of unique customers who made a purchase: 0

Top 5 most popular offers (by number of completed offers):


                                                                                

+--------------------------------+----------+--------------+
|offer_id                        |offer_type|purchase_count|
+--------------------------------+----------+--------------+
|fafdcd668e3743c1bb461111dcafc2a4|discount  |4927          |
|2298d6c36e964ae4a3e7e9706d1fb8c2|discount  |4698          |
|9b98b8c7a33c4b65b9aebfe6a799e6d9|bogo      |3926          |
|f19421c1d4aa40978ebb69ca19b0e20d|bogo      |3877          |
|2906b810c7d4411798c6938adc9daaa5|discount  |3642          |
+--------------------------------+----------+--------------+





Average transaction amount: 12.78


                                                                                


Number of offers completed vs. not completed:


                                                                                

+---------------+----------------+
|offer_completed|completion_count|
+---------------+----------------+
|              0|          272955|
|              1|           30617|
+---------------+----------------+


Customer distribution by age group:
+---------+--------------+
|age_group|customer_count|
+---------+--------------+
|    25-34|          1380|
|    35-44|          1869|
|    45-54|          3013|
|    55-64|          3421|
|      65+|          6441|
|      <25|           876|
+---------+--------------+


Offers with highest completion rate:


                                                                                

+--------------------------------+----------+------------+----------------+---------------+
|offer_id                        |offer_type|total_events|completed_offers|completion_rate|
+--------------------------------+----------+------------+----------------+---------------+
|4d5c57ea9a6940dd891ad53e9dbe8da0|bogo      |2986        |2986            |100.0          |
|9b98b8c7a33c4b65b9aebfe6a799e6d9|bogo      |3926        |3926            |100.0          |
|2298d6c36e964ae4a3e7e9706d1fb8c2|discount  |4698        |4698            |100.0          |
|2906b810c7d4411798c6938adc9daaa5|discount  |3642        |3642            |100.0          |
|fafdcd668e3743c1bb461111dcafc2a4|discount  |4927        |4927            |100.0          |
+--------------------------------+----------+------------+----------------+---------------+



[Stage 73:>                                                         (0 + 8) / 8]

+--------------------+----------------+------+---+------+--------------------+----------+----------+------+--------+--------------------+---------------+----+---------------+------+------+------------------------+-------------------+
|         customer_id|became_member_on|gender|age|income|            offer_id|offer_type|difficulty|reward|duration|            channels|          event|time|offer_completed|reward|amount|event_timestamp_readable|     event_datetime|
+--------------------+----------------+------+---+------+--------------------+----------+----------+------+--------+--------------------+---------------+----+---------------+------+------+------------------------+-------------------+
|38fe809add3b4fcf9...|        20180712|  NULL|118|  NULL|                NULL|      NULL|      NULL|  NULL|    NULL|                NULL|           NULL|NULL|           NULL|  NULL|  NULL|                    NULL|               NULL|
|8ec6ce2a7e7949b1b...|        20170925|  NULL|118|  NULL|       



+--------------------+------+
|         customer_id|amount|
+--------------------+------+
|0610b486422d4921a...|  NULL|
|148adfcaa27d485b8...|  NULL|
|2eeac8d8feae4a8ca...|  NULL|
|31dda685af34476ca...|  NULL|
|31dda685af34476ca...|  NULL|
|31dda685af34476ca...|  NULL|
|31dda685af34476ca...|  NULL|
|389bc3fa690240e79...|  NULL|
|389bc3fa690240e79...|  NULL|
|389bc3fa690240e79...|  NULL|
|38fe809add3b4fcf9...|  NULL|
|440cf1fd7580490c9...|  NULL|
|440cf1fd7580490c9...|  NULL|
|440cf1fd7580490c9...|  NULL|
|440cf1fd7580490c9...|  NULL|
|440cf1fd7580490c9...|  NULL|
|62cf5e10845442329...|  NULL|
|6445de3b47274c759...|  NULL|
|6445de3b47274c759...|  NULL|
|68617ca6246f4fbc8...|  NULL|
+--------------------+------+
only showing top 20 rows


                                                                                

+-----------+----------------+------+---+------+--------+----------+----------+------+--------+--------+-----+----+---------------+------+------+------------------------+--------------+
|customer_id|became_member_on|gender|age|income|offer_id|offer_type|difficulty|reward|duration|channels|event|time|offer_completed|reward|amount|event_timestamp_readable|event_datetime|
+-----------+----------------+------+---+------+--------+----------+----------+------+--------+--------+-----+----+---------------+------+------+------------------------+--------------+
+-----------+----------------+------+---+------+--------+----------+----------+------+--------+--------+-----+----+---------------+------+------+------------------------+--------------+



[Stage 104:>                                                        (0 + 8) / 8]

+--------------------+----------------+------+---+------+--------------------+----------+----------+------+--------+--------------------+---------------+----+---------------+------+------+------------------------+-------------------+
|         customer_id|became_member_on|gender|age|income|            offer_id|offer_type|difficulty|reward|duration|            channels|          event|time|offer_completed|reward|amount|event_timestamp_readable|     event_datetime|
+--------------------+----------------+------+---+------+--------------------+----------+----------+------+--------+--------------------+---------------+----+---------------+------+------+------------------------+-------------------+
|c05652fa9d2c4fac9...|        20180127|     M| 62| 73000|9b98b8c7a33c4b65b...|      bogo|         5|     5|       7|['web', 'email', ...|offer completed|  72|              1|     5|  NULL|     1969-12-31 21:01:12|1969-12-31 21:01:12|
|6be0513109fe4d738...|        20180613|     M| 42| 57000|2906b81

[Stage 118:>                                                        (0 + 7) / 7]

+--------------------+----------+----------+------+--------+----------------+--------------------+---------------+----+---------------+------+------+------------------------+-------------------+
|            offer_id|offer_type|difficulty|reward|duration|        channels|         customer_id|          event|time|offer_completed|reward|amount|event_timestamp_readable|     event_datetime|
+--------------------+----------+----------+------+--------+----------------+--------------------+---------------+----+---------------+------+------+------------------------+-------------------+
|0b1e1539f2cc45b7b...|  discount|        20|     5|      10|['web', 'email']|ffa81afacde642a3a...|offer completed|  42|              1|     5|  NULL|     1969-12-31 21:00:42|1969-12-31 21:00:42|
|0b1e1539f2cc45b7b...|  discount|        20|     5|      10|['web', 'email']|fe66b9029bf34fdba...|offer completed| 540|              1|     5|  NULL|     1969-12-31 21:09:00|1969-12-31 21:09:00|
|0b1e1539f2cc45b7b...|  d

25/06/23 16:15:39 ERROR Executor: Exception in task 0.0 in stage 129.0 (TID 249)
org.apache.spark.SparkNumberFormatException: [CAST_INVALID_INPUT] The value '13.49' of the type "STRING" cannot be cast to "BIGINT" because it is malformed. Correct the value as per the syntax, or change its target type. Use `try_cast` to tolerate malformed input and return NULL instead. SQLSTATE: 22018
== SQL (line 1, position 1) ==
amount > 0
^^^^^^^^^^

	at org.apache.spark.sql.errors.QueryExecutionErrors$.invalidInputInCastToNumberError(QueryExecutionErrors.scala:145)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.withException(UTF8StringUtils.scala:51)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils$.toLongExact(UTF8StringUtils.scala:31)
	at org.apache.spark.sql.catalyst.util.UTF8StringUtils.toLongExact(UTF8StringUtils.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage4.processNext(Unknown Source)
	at org.apache.spark.sql.execution.Buff

+--------+----------+----------+------+--------+--------+-----------+-----+----+---------------+------+------+------------------------+--------------+
|offer_id|offer_type|difficulty|reward|duration|channels|customer_id|event|time|offer_completed|reward|amount|event_timestamp_readable|event_datetime|
+--------+----------+----------+------+--------+--------+-----------+-----+----+---------------+------+------+------------------------+--------------+
+--------+----------+----------+------+--------+--------+-----------+-----+----+---------------+------+------+------------------------+--------------+



[Stage 138:>                                                        (0 + 7) / 7]

+--------------------+---------------+----+---------------+--------------------+------+------+------------------------+-------------------+
|         customer_id|          event|time|offer_completed|            offer_id|reward|amount|event_timestamp_readable|     event_datetime|
+--------------------+---------------+----+---------------+--------------------+------+------+------------------------+-------------------+
|0009655768c64bdeb...|   offer viewed| 192|              0|                NULL|  NULL|  NULL|     1969-12-31 21:03:12|1969-12-31 21:03:12|
|0009655768c64bdeb...|   offer viewed| 372|              0|                NULL|  NULL|  NULL|     1969-12-31 21:06:12|1969-12-31 21:06:12|
|0011e0d4e6b944f99...|   offer viewed| 354|              0|                NULL|  NULL|  NULL|     1969-12-31 21:05:54|1969-12-31 21:05:54|
|0011e0d4e6b944f99...|    transaction| 132|              0|                NULL|  NULL| 13.49|     1969-12-31 21:02:12|1969-12-31 21:02:12|
|0020c2b971eb4e918..

[Stage 141:>                                                        (0 + 7) / 7]

root
 |-- offer_id: string (nullable = true)
 |-- offer_type: string (nullable = true)
 |-- difficulty: integer (nullable = true)
 |-- reward: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- channels: string (nullable = true)

root
 |-- customer_id: string (nullable = true)
 |-- event: string (nullable = true)
 |-- time: integer (nullable = true)
 |-- offer_completed: integer (nullable = false)
 |-- offer_id: string (nullable = true)
 |-- reward: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- event_timestamp_readable: string (nullable = true)
 |-- event_datetime: timestamp (nullable = true)



[Stage 142:>                                                        (0 + 7) / 7]

+--------+------+
|offer_id|amount|
+--------+------+
|    NULL| 13.49|
|    NULL|  1.48|
|    NULL|  5.94|
|    NULL|  0.91|
|    NULL|  4.74|
|    NULL| 19.48|
|    NULL|  2.06|
|    NULL| 22.33|
|    NULL| 24.88|
|    NULL|  5.11|
|    NULL|  0.91|
|    NULL|  1.06|
|    NULL| 32.45|
|    NULL|  0.48|
|    NULL|  3.24|
|    NULL|  5.07|
|    NULL|  6.92|
|    NULL|  2.51|
|    NULL|  5.96|
|    NULL| 11.27|
+--------+------+
only showing top 20 rows


                                                                                

+--------+------+
|offer_id|amount|
+--------+------+
+--------+------+


Top 10 Customers by Lifetime Value (Total Spent):


[Stage 152:>                                                        (0 + 7) / 7]

+-----------+-----------+
|customer_id|total_spent|
+-----------+-----------+
+-----------+-----------+

