In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, avg, lit

# Khởi tạo SparkSession
spark = SparkSession.builder \
    .appName("AdsDataExploration") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

print("Spark Session is ready!")

Spark Session is ready!


In [3]:
data_path = "../data/ads/raw_data/" 

events_df = spark.read.csv(f"{data_path}events.csv", header=True, inferSchema=True)
clicks_train_df = spark.read.csv(f"{data_path}clicks_train.csv", header=True, inferSchema=True)

print("Dataframes loaded successfully.")

Dataframes loaded successfully.


In [4]:
events_df.printSchema()
events_df.show(5)

root
 |-- display_id: integer (nullable = true)
 |-- uuid: string (nullable = true)
 |-- document_id: integer (nullable = true)
 |-- timestamp: integer (nullable = true)
 |-- platform: string (nullable = true)
 |-- geo_location: string (nullable = true)

+----------+--------------+-----------+---------+--------+------------+
|display_id|          uuid|document_id|timestamp|platform|geo_location|
+----------+--------------+-----------+---------+--------+------------+
|         1|cb8c55702adb93|     379743|       61|       3|   US>SC>519|
|         2|79a85fa78311b9|    1794259|       81|       2|   US>CA>807|
|         3|822932ce3d8757|    1179111|      182|       2|   US>MI>505|
|         4|85281d0a49f7ac|    1777797|      234|       2|   US>WV>564|
|         5|8d0daef4bf5b56|     252458|      338|       2|       SG>00|
+----------+--------------+-----------+---------+--------+------------+
only showing top 5 rows



In [5]:
clicks_train_df.printSchema()
clicks_train_df.show(5)

root
 |-- display_id: integer (nullable = true)
 |-- ad_id: integer (nullable = true)
 |-- clicked: integer (nullable = true)

+----------+------+-------+
|display_id| ad_id|clicked|
+----------+------+-------+
|         1| 42337|      0|
|         1|139684|      0|
|         1|144739|      1|
|         1|156824|      0|
|         1|279295|      0|
+----------+------+-------+
only showing top 5 rows



In [6]:
events_subset_df = events_df.select("display_id", "uuid")
clicks_subset_df = clicks_train_df.select("display_id", "ad_id", "clicked")

In [7]:
merged_df = events_subset_df.join(clicks_subset_df, "display_id")


merged_df = merged_df.withColumn("clicked", col("clicked").cast("integer"))


user_features_df = merged_df.groupBy("uuid").agg(
    count("ad_id").alias("total_ads_seen"),
    sum("clicked").alias("total_clicks")
)

epsilon = 1e-6
user_features_df = user_features_df.withColumn(
    "click_through_rate",
    col("total_clicks") / (col("total_ads_seen") + lit(epsilon))
)

print("User Features DataFrame created:")


User Features DataFrame created:


In [8]:
user_features_df.show(10)

+--------------+--------------+------------+-------------------+
|          uuid|total_ads_seen|total_clicks| click_through_rate|
+--------------+--------------+------------+-------------------+
|87b5671eabff1f|             4|           1|0.24999993750001562|
|f83b1469b03770|            12|           2|0.16666665277777895|
|19ce800b90b035|             3|           1|0.33333322222225925|
|c20213d4ef495e|             9|           1|0.11111109876543349|
|54cf707ca615ee|             6|           1|0.16666663888889352|
|10f07c7adea6a5|             3|           1|0.33333322222225925|
|552b9b00f78202|             5|           1|  0.199999960000008|
|a423ff991e4080|             3|           1|0.33333322222225925|
|5693339264f548|             3|           1|0.33333322222225925|
|b84f6f0c53c72e|             4|           1|0.24999993750001562|
+--------------+--------------+------------+-------------------+
only showing top 10 rows



In [9]:
processed_path = "../data/processed/user_features.parquet"


user_features_df.write.mode("overwrite").parquet(processed_path)

print(f"User features data saved successfully to: {processed_path}")

print("\nVerifying the saved data by reading it back:")
spark.read.parquet(processed_path).show(10)

User features data saved successfully to: ../data/processed/user_features.parquet

Verifying the saved data by reading it back:
+--------------+--------------+------------+-------------------+
|          uuid|total_ads_seen|total_clicks| click_through_rate|
+--------------+--------------+------------+-------------------+
|10000a34905274|            12|           2|0.16666665277777895|
|10000a91d9899d|             4|           1|0.24999993750001562|
|10000e5327e96b|             6|           2|0.33333327777778704|
|10001d737d2983|             6|           1|0.16666663888889352|
|10001f505856f1|             4|           1|0.24999993750001562|
|1000206f97aeae|             4|           1|0.24999993750001562|
|10003257b458c4|             2|           1|  0.499999750000125|
|100039c3f24f11|             6|           1|0.16666663888889352|
|1000455ca42b79|             4|           1|0.24999993750001562|
|100058f344f2d5|             4|           1|0.24999993750001562|
+--------------+-----------