In [None]:
# 1-01 Set up PySpark session

!pip install pyspark

from google.colab import files
uploaded = files.upload()

spark = SparkSession.builder \
    .appName("Engagement_AB_Test_Processing") \
    .getOrCreate()



Saving conservation_dataset.csv to conservation_dataset (1).csv


In [None]:
# 1-02 Set up schema and Data Loading

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, countDistinct, sum as _sum
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, IntegerType

schema = StructType([
    StructField("row_index", IntegerType(), True),
    StructField("user_id", StringType(), True),
    StructField("message_type", StringType(), True),
    StructField("engaged", BooleanType(), True),
    StructField("total_messages_seen", IntegerType(), True),
    StructField("most_engagement_weekday", StringType(), True),
    StructField("most_engagement_hour", IntegerType(), True)
])

df = spark.read.csv("conservation_dataset.csv", header=True, schema=schema)

df.printSchema()
df.show(5)

root
 |-- row_index: integer (nullable = true)
 |-- user_id: string (nullable = true)
 |-- message_type: string (nullable = true)
 |-- engaged: boolean (nullable = true)
 |-- total_messages_seen: integer (nullable = true)
 |-- most_engagement_weekday: string (nullable = true)
 |-- most_engagement_hour: integer (nullable = true)

+---------+-------+------------+-------+-------------------+-----------------------+--------------------+
|row_index|user_id|message_type|engaged|total_messages_seen|most_engagement_weekday|most_engagement_hour|
+---------+-------+------------+-------+-------------------+-----------------------+--------------------+
|        0|1069124|Personalized|  false|                130|                 Monday|                  20|
|        1|1119715|Personalized|  false|                 93|                Tuesday|                  22|
|        2|1144181|Personalized|  false|                 21|                Tuesday|                  18|
|        3|1435133|Personalized| 

In [None]:
# 1-03 Data Cleaning

# 3.1 Check and drop rows where key columns are null.
df_clean = df.na.drop(subset=["user_id", "message_type"])

# 3.2 Check and remove users assigned to both experiment groups.
users_in_multiple_groups = df_clean.groupBy("user_id") \
    .agg(countDistinct("message_type").alias("group_count")) \
    .filter(col("group_count") > 1)

if users_in_multiple_groups.count() > 0:
    print(f"Found {users_in_multiple_groups.count()} users in multiple groups.")
    user_ids_to_remove = users_in_multiple_groups.select("user_id")
    df_clean = df_clean.join(user_ids_to_remove, on="user_id", how="left_anti")
else:
    print("No users found in multiple groups.")

# 3.3 Handle duplicates: If a user has multiple rows in the same group, keep only the first one.
df_clean = df_clean.dropDuplicates(["user_id", "message_type"])


No users found in multiple groups.


In [None]:
# 1-04 Data Transformation and Aggregation

# Transform 'engaged' from boolean to numeric
df_transformed = df_clean.withColumn("engaged_numeric", when(col("engaged") == True, 1).otherwise(0))

df_transformed.show(5)

# Aggregate data to summarize users, engaged users, and engagement rate per group.
summary = df_transformed.groupBy("message_type").agg(
    count("user_id").alias("n_users"),
    _sum("engaged_numeric").alias("n_engaged"),
    (_sum("engaged_numeric") / count("user_id")).alias("engagement_rate")
)

print("Aggregated Summary with Engagement Rate:")
summary.show()

+---------+-------+------------+-------+-------------------+-----------------------+--------------------+---------------+
|row_index|user_id|message_type|engaged|total_messages_seen|most_engagement_weekday|most_engagement_hour|engaged_numeric|
+---------+-------+------------+-------+-------------------+-----------------------+--------------------+---------------+
|   339935|1000006|Personalized|  false|                  4|               Saturday|                  17|              0|
|      879|1000007|Personalized|  false|                 65|                 Friday|                  11|              0|
|   377813|1000011|Personalized|  false|                 16|              Wednesday|                  10|              0|
|   161279|1000012|Personalized|  false|                 47|               Thursday|                  12|              0|
|   235124|1000017|Personalized|  false|                 25|               Saturday|                  23|              0|
+---------+-------+-----

In [None]:
# 1-06 Data Exporting
from google.colab import drive
drive.mount('/content/drive')

# Convert Spark DataFrame to a Pandas DataFrame for easier single-file saving.
summary_pd = summary.toPandas()

# Save the summary file to Google Drive.
output_file_path = "/content/drive/MyDrive/engagement_summary.csv"
summary_pd.to_csv(output_file_path, index=False)

print(f"Summary data exported to: {output_file_path}")
spark.stop()

Mounted at /content/drive
Summary data exported to: /content/drive/MyDrive/engagement_summary.csv
