# Outbrain click prediction data processing

I downloaded the data files from https://www.kaggle.com/c/outbrain-click-prediction/data?select=page_views.csv.zip,

and uploaded to our HDFS. using a pySpark kernel.

first, let's load each CSV into a dataframe:

In [39]:
from pyspark.sql.types import IntegerType

events_df = spark.read.csv("/user/ykarni/kaggle/events.csv", header=True)
events_df = events_df.withColumn('timestamp', events_df['timestamp'].cast(IntegerType()))

In [40]:
events_df.columns

['display_id', 'uuid', 'document_id', 'timestamp', 'platform', 'geo_location']

In [41]:
events_df.show(5)

+----------+--------------+-----------+---------+--------+------------+
|display_id|          uuid|document_id|timestamp|platform|geo_location|
+----------+--------------+-----------+---------+--------+------------+
|         1|cb8c55702adb93|     379743|       61|       3|   US>SC>519|
|         2|79a85fa78311b9|    1794259|       81|       2|   US>CA>807|
|         3|822932ce3d8757|    1179111|      182|       2|   US>MI>505|
|         4|85281d0a49f7ac|    1777797|      234|       2|   US>WV>564|
|         5|8d0daef4bf5b56|     252458|      338|       2|       SG>00|
+----------+--------------+-----------+---------+--------+------------+
only showing top 5 rows



In [42]:
document_categories_df = spark.read.csv("/user/ykarni/kaggle/documents_categories.csv", header=True)

In [43]:
document_categories_df.columns

['document_id', 'category_id', 'confidence_level']

In [44]:
document_categories_df.select("category_id").distinct().count()

97

In [45]:
promoted_content_df = spark.read.csv("/user/ykarni/kaggle/promoted_content.csv", header=True)

In [46]:
promoted_content_df.columns

['ad_id', 'document_id', 'campaign_id', 'advertiser_id']

In [47]:
documents_meta_df = spark.read.csv("/user/ykarni/kaggle/documents_meta.csv", header=True)

In [48]:
documents_meta_df.columns

['document_id', 'source_id', 'publisher_id', 'publish_time']

In [49]:
clicks_train_df = spark.read.csv("/user/ykarni/kaggle/clicks_train.csv", header=True)

In [50]:
clicks_train_df.columns

['display_id', 'ad_id', 'clicked']

In [51]:
clicks_train_df.count()

87141731

In [52]:
clicks_test_df = spark.read.csv("/user/ykarni/kaggle/clicks_test.csv", header=True)

In [53]:
clicks_test_df.columns

['display_id', 'ad_id']

In [54]:
clicks_test_df.count()

32225162

In [55]:
page_views_df = spark.read.csv("/user/ykarni/kaggle/page_views.csv", header=True)
page_views_df = page_views_df.withColumn('timestamp', page_views_df['timestamp'].cast(IntegerType()))

In [56]:
page_views_df.columns

['uuid',
 'document_id',
 'timestamp',
 'platform',
 'geo_location',
 'traffic_source']

In [57]:
page_views_df.show(5)

+--------------+-----------+---------+--------+------------+--------------+
|          uuid|document_id|timestamp|platform|geo_location|traffic_source|
+--------------+-----------+---------+--------+------------+--------------+
|1fd5f051fba643|        120| 31905835|       1|          RS|             2|
|8557aa9004be3b|        120| 32053104|       1|       VN>44|             2|
|c351b277a358f0|        120| 54013023|       1|       KR>12|             1|
|8205775c5387f9|        120| 44196592|       1|       IN>16|             2|
|9cb0ccd8458371|        120| 65817371|       1|   US>CA>807|             2|
+--------------+-----------+---------+--------+------------+--------------+
only showing top 5 rows



In [58]:
from pyspark.sql.functions import col, when

convert the 0/1 format to the FW label format (1/-1):

In [59]:
clicks_train_df = clicks_train_df.withColumn("label", when(col("clicked") == "0","-1").otherwise(1))

next, we construct a "document-categories" mapping:

In [60]:
import pyspark.sql.functions as f

In [61]:
doc_categories_grouped = document_categories_df.filter("confidence_level > 0.9") \
    .groupBy("document_id") \
    .agg(f.concat_ws(" ", f.collect_list(document_categories_df["category_id"]))) \
    .withColumnRenamed("concat_ws( , collect_list(category_id))", "categories")

In [62]:
doc_categories_grouped.columns

['document_id', 'categories']

gather available data for the ads in our inventory:

In [63]:
ads = promoted_content_df.join(documents_meta_df, "document_id") \
    .join(doc_categories_grouped, "document_id", how='leftouter')

In [64]:
ads = ads.withColumnRenamed(existing="document_id", new="ad_document_id") \
   .withColumnRenamed("source_id", "ad_source_id") \
   .withColumnRenamed("publisher_id", "ad_publisher_id") \
   .withColumnRenamed("publish_time", "ad_publish_time") \
   .withColumnRenamed("categories", "ad_categories")

In [65]:
ads.columns

['ad_document_id',
 'ad_id',
 'campaign_id',
 'advertiser_id',
 'ad_source_id',
 'ad_publisher_id',
 'ad_publish_time',
 'ad_categories']

we'll use the page views data, to understand what interests specific users.

first step - collect just the user id and document id pairs.

second step - join with the document categories dataframe to get the user categories:

In [66]:
user_docs_df = page_views_df.select("uuid", "document_id").distinct()

In [67]:
user_categories = user_docs_df.join(document_categories_df.filter("confidence_level > 0.9"), "document_id", how="leftouter") \
    .select("uuid", "category_id") \
    .distinct() \
    .groupBy("uuid") \
    .agg(f.concat_ws(" ", f.collect_list(document_categories_df["category_id"]))) \
    .withColumnRenamed("concat_ws( , collect_list(category_id))", "user_categories")

In [68]:
user_categories.columns

['uuid', 'user_categories']

we want to count user page views for every event, but need to make sure we don't just aggregate on the entire dataset - for every event in time T, we want to count all of the page views of the user occurring before time T:

In [69]:
from pyspark.sql import Window
windowval = Window.partitionBy('uuid').orderBy('timestamp').rangeBetween(Window.unboundedPreceding, 0)

In [70]:
page_views_df = page_views_df.withColumn('previous_pageviews_count', f.count('uuid').over(windowval))

finally, we join everything into a single dataframe with all relevant data concerning each event where an ad was presented to a user:
* the context data (which document the user was reading, the section and publisher, the content categories)
* the ad information - which advertiser and campaign - and who is the content publisher and what section the content belongs to etc.
* the user context - user id, where the user came from (search, social, internal traffic), the geolocation of the request etc.
* the supervision - click/no click

In [71]:
labelled_events_full = clicks_train_df.join(events_df, "display_id") \
    .join(ads, "ad_id") \
    .join(documents_meta_df, "document_id") \
    .join(doc_categories_grouped, "document_id", how='leftouter') \
    .join(page_views_df, ["uuid", "document_id", "timestamp", "platform", "geo_location"], how="leftouter") \
    .join(user_categories, "uuid", how="leftouter") \
    .fillna(subset=['previous_pageviews_count'], value=0) \
    .na.fill("")

In [72]:
labelled_events_full.columns

['uuid',
 'document_id',
 'timestamp',
 'platform',
 'geo_location',
 'ad_id',
 'display_id',
 'clicked',
 'label',
 'ad_document_id',
 'campaign_id',
 'advertiser_id',
 'ad_source_id',
 'ad_publisher_id',
 'ad_publish_time',
 'ad_categories',
 'source_id',
 'publisher_id',
 'publish_time',
 'categories',
 'traffic_source',
 'previous_pageviews_count',
 'user_categories']

In [73]:
from pyspark.sql.functions import concat, col, lit

In [74]:
labelled_events_full.select( \
    concat(col("label"), lit(" ")), \
    concat(lit("A "), col("uuid"), lit(" ")), \
    concat(lit("B "), col("platform"), lit(" ")), \
    concat(lit("C "), col("geo_location"), lit(" ")), \
    concat(lit("D "), col("traffic_source"), lit(" ")), \
    concat(lit("E "), col("document_id"), lit(" ")), \
    concat(lit("F "), col("source_id"), lit(" ")), \
    concat(lit("G "), col("publisher_id"), lit(" ")), \
    concat(lit("H "), col("categories"), lit(" ")), \
    concat(lit("I "), col("ad_id"), lit(" ")), \
    concat(lit("J "), col("campaign_id"), lit(" ")), \
    concat(lit("K "), col("advertiser_id"), lit(" ")), \
    concat(lit("L "), col("ad_document_id"), lit(" ")), \
    concat(lit("M "), col("ad_source_id"), lit(" ")), \
    concat(lit("N "), col("ad_publisher_id"), lit(" ")), \
    concat(lit("O "), col("ad_categories"), lit(" ")), \
    concat(lit("P "), col("user_categories"), lit(" ")), \
    concat(lit("Q "), col("previous_pageviews_count"), lit(" ")),
    concat(lit("R "), col("display_id"), lit(" "))) \
    .write.csv("/user/ykarni/kaggle/train", header=False, sep="|", quote="", mode="overwrite")

and similarly we generate the test set for the ob kaggle submission:

In [75]:
test_set_full = clicks_test_df.join(events_df, "display_id") \
    .join(ads, "ad_id") \
    .join(documents_meta_df, "document_id") \
    .join(doc_categories_grouped, "document_id", how='leftouter') \
    .join(page_views_df, ["uuid", "document_id", "timestamp", "platform", "geo_location"], how="leftouter") \
    .join(user_categories, "uuid", how="leftouter") \
    .fillna(subset=['previous_pageviews_count'], value=0) \
    .na.fill(" ")

In [76]:
test_set_full.columns

['uuid',
 'document_id',
 'timestamp',
 'platform',
 'geo_location',
 'ad_id',
 'display_id',
 'ad_document_id',
 'campaign_id',
 'advertiser_id',
 'ad_source_id',
 'ad_publisher_id',
 'ad_publish_time',
 'ad_categories',
 'source_id',
 'publisher_id',
 'publish_time',
 'categories',
 'traffic_source',
 'previous_pageviews_count',
 'user_categories']

In [79]:
test_set_full.select(\
    concat(lit("A "), col("uuid"), lit(" ")), \
    concat(lit("B "), col("platform"), lit(" ")), \
    concat(lit("C "), col("geo_location"), lit(" ")), \
    concat(lit("D "), col("traffic_source"), lit(" ")), \
    concat(lit("E "), col("document_id"), lit(" ")), \
    concat(lit("F "), col("source_id"), lit(" ")), \
    concat(lit("G "), col("publisher_id"), lit(" ")), \
    concat(lit("H "), col("categories"), lit(" ")), \
    concat(lit("I "), col("ad_id"), lit(" ")), \
    concat(lit("J "), col("campaign_id"), lit(" ")), \
    concat(lit("K "), col("advertiser_id"), lit(" ")), \
    concat(lit("L "), col("ad_document_id"), lit(" ")), \
    concat(lit("M "), col("ad_source_id"), lit(" ")), \
    concat(lit("N "), col("ad_publisher_id"), lit(" ")), \
    concat(lit("O "), col("ad_categories"), lit(" ")), \
    concat(lit("P "), col("user_categories"), lit(" ")), \
    concat(lit("Q "), col("previous_pageviews_count"), lit(" ")),
    concat(lit("R "), col("display_id"), lit(" "))) \
    .write.csv("/user/ykarni/kaggle/test", header=False, sep="|", quote="", mode="overwrite")

once done, run the post_process_dataset.sh script to join the results to a single file, perform last minute cleanups, and do the train-dev split -
CONGRATULATIONS, you're ready to go.