In [None]:
import init

# import findspark
# findspark.init()

from pyrecdp.data_processor import *
from pyrecdp.utils import *

from pyspark.sql.types import IntegerType, StringType, StructType, StructField, TimestampType, FloatType, ArrayType, DoubleType
import datetime
import hashlib
import math
import time
import numpy as np
import pandas as pd
import pyspark.sql.functions as F
import tensorflow as tf
from pyspark import TaskContext
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema
from tensorflow_transform.tf_metadata import metadata_io
from examples.notebooks.wnd.data.feature_description import LABEL_COLUMN, DISPLAY_ID_COLUMN, CATEGORICAL_COLUMNS, \
    DOC_CATEGORICAL_MULTIVALUED_COLUMNS, BOOL_COLUMNS, INT_COLUMNS, FLOAT_COLUMNS, \
    FLOAT_COLUMNS_LOG_BIN_TRANSFORM, FLOAT_COLUMNS_SIMPLE_BIN_TRANSFORM, FLOAT_COLUMNS_NO_TRANSFORM, HASH_BUCKET_SIZES
import os

In [None]:
OUTPUT_BUCKET_FOLDER = "/tmp/spark/preprocessed/recdp/"
DATA_BUCKET_FOLDER = "/outbrain/orig/"
SPARK_TEMP_FOLDER = "/tmp/spark/spark-temp/"
TENSORFLOW_HADOOP = "data/tensorflow-hadoop-1.5.0.jar"
scala_udf_jars = "/root/ht/ML/recdp/ScalaProcessUtils/target/recdp-scala-extensions-0.1.0-jar-with-dependencies.jar"

conf = SparkConf().setMaster('spark://sr112:7077').set('spark.executor.memory', '40g').set('spark.driver.memory', '200g').set('spark.executor.cores', '10')
conf.set("spark.jars", TENSORFLOW_HADOOP)
conf.set("spark.driver.extraClassPath", f"{scala_udf_jars}")
conf.set("spark.executor.extraClassPath", f"{scala_udf_jars}")


sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [None]:
#######################  split train/test set #######################
events_schema = StructType(
    [StructField("display_id", IntegerType(), True),
     StructField("uuid_event", StringType(), True),
     StructField("document_id_event", IntegerType(), True),
     StructField("timestamp_event", IntegerType(), True),
     StructField("platform_event", IntegerType(), True),
     StructField("geo_location_event", StringType(), True)]
)

events_df = spark.read.schema(events_schema) \
    .options(header='true', inferschema='false', nullValue='\\N') \
    .csv(DATA_BUCKET_FOLDER + "events.csv") \
    .withColumn('day_event', (F.col('timestamp_event')/ 1000 / 60 / 60 / 24).cast(IntegerType())) \
    .withColumn('event_country', F.substring('geo_location_event', 0, 2)) \
    .withColumn('event_country_state', F.substring('geo_location_event', 0, 5)) \
    .alias('events')

print('Drop rows with empty "geo_location"...')
events_df = events_df.dropna(subset="geo_location_event")

print('Drop rows with empty "platform"...')
events_df = events_df.dropna(subset="platform_event")

promoted_content_schema = StructType(
    [StructField("ad_id", IntegerType(), True),
     StructField("document_id_promo", IntegerType(), True),
     StructField("campaign_id", IntegerType(), True),
     StructField("advertiser_id", IntegerType(), True)]
)

promoted_content_df = spark.read.schema(promoted_content_schema) \
    .options(header='true', inferschema='false', nullValue='\\N') \
    .csv(DATA_BUCKET_FOLDER + "promoted_content.csv") \
    .alias('promoted_content')

clicks_train_schema = StructType(
    [StructField("display_id", IntegerType(), True),
     StructField("ad_id", IntegerType(), True),
     StructField("clicked", IntegerType(), True)]
)

clicks_train_df = spark.read.schema(clicks_train_schema) \
    .options(header='true', inferschema='false', nullValue='\\N') \
    .csv(DATA_BUCKET_FOLDER + "clicks_train.csv") \
    .alias('clicks_train')

documents_meta_schema = StructType(
    [StructField("document_id_doc", IntegerType(), True),
     StructField("source_id", IntegerType(), True),
     StructField("publisher_id", IntegerType(), True),
     StructField("publish_time", TimestampType(), True)]
)

documents_meta_df = spark.read.schema(documents_meta_schema) \
    .options(header='true', inferschema='false', nullValue='\\N') \
    .csv(DATA_BUCKET_FOLDER + "documents_meta.csv") \
    .alias('documents_meta')

# Drop rows with empty "source_id"
documents_meta_df = documents_meta_df.dropna(subset="source_id")

source_publishers_df = documents_meta_df.select(["source_id", "publisher_id"]).dropDuplicates()

# get list of source_ids without publisher_id
rows_no_pub = source_publishers_df.filter("publisher_id is NULL")
source_ids_without_publisher = [row['source_id'] for row in rows_no_pub.collect()]

# maximum value of publisher_id used so far
max_pub = max(source_publishers_df.select(["publisher_id"]).dropna().collect())['publisher_id']

# rows filled with new publisher_ids
new_publishers = [(source, max_pub + 1 + nr) for nr, source in enumerate(source_ids_without_publisher)]
new_publishers_df = spark.createDataFrame(new_publishers, ("source_id", "publisher_id"))

# old and new publishers merged
fixed_source_publishers_df = source_publishers_df.dropna().union(new_publishers_df)

# update documents_meta with bew publishers
documents_meta_df = documents_meta_df.drop('publisher_id').join(fixed_source_publishers_df, on='source_id')

documents_total = documents_meta_df.count()


events_joined_df = events_df.join(documents_meta_df
                                  .withColumnRenamed('source_id', 'source_id_doc_event')
                                  .withColumnRenamed('publisher_id', 'publisher_doc_event')
                                  .withColumnRenamed('publish_time', 'publish_time_doc_event')
                                  .withColumnRenamed('document_id_doc', 'document_id_doc_event'),
                                  on=F.col("document_id_event") == F.col("document_id_doc_event"), how='left').alias('events')

clicks_train_joined_df = clicks_train_df \
    .join(promoted_content_df, on='ad_id', how='left') \
    .join(documents_meta_df, on=F.col("promoted_content.document_id_promo") == F.col("documents_meta.document_id_doc"), how='left') \
    .join(events_joined_df, on='display_id', how='left')

clicks_train_joined_df.createOrReplaceTempView('clicks_train_joined')


validation_display_ids_df = clicks_train_joined_df.select('display_id', 'day_event').distinct() \
    .sampleBy("day_event", fractions={0: 0.2, 1: 0.2, 2: 0.2, 3: 0.2, 4: 0.2,
                                      5: 0.2, 6: 0.2, 7: 0.2, 8: 0.2, 9: 0.2, 10: 0.2, 11: 1.0, 12: 1.0}, seed=0)

valid_id = validation_display_ids_df.select('display_id').distinct().createOrReplaceTempView("validation_display_ids")

valid_set_df = spark.sql('''
SELECT * FROM clicks_train_joined t
WHERE EXISTS (SELECT display_id FROM validation_display_ids
WHERE display_id = t.display_id)''')

s_time = time.time()
valid_set_df.write.format('parquet').mode('overwrite').save(OUTPUT_BUCKET_FOLDER + 'valid_set_df')
valid_set_df = spark.read.parquet(OUTPUT_BUCKET_FOLDER + 'valid_set_df')
print(f'valid_set_df time: {time.time() - s_time}')

train_set_df = spark.sql('''
SELECT * FROM clicks_train_joined t
WHERE NOT EXISTS (SELECT display_id FROM validation_display_ids
WHERE display_id = t.display_id)''')

s_time = time.time()
train_set_df.write.format('parquet').mode('overwrite').save(OUTPUT_BUCKET_FOLDER + 'train_set_df')
train_set_df = spark.read.parquet(OUTPUT_BUCKET_FOLDER + 'train_set_df')
print(f'train_set_df time: {time.time() - s_time}')

In [None]:
documents_categories_schema = StructType(
    [StructField("document_id_cat", IntegerType(), True),
     StructField("category_id", IntegerType(), True),
     StructField("confidence_level_cat", FloatType(), True)]
)

documents_categories_df = spark.read.schema(documents_categories_schema) \
    .options(header='true', inferschema='false', nullValue='\\N') \
    .csv(DATA_BUCKET_FOLDER + "documents_categories.csv") \
    .alias('documents_categories')

documents_topics_schema = StructType(
    [StructField("document_id_top", IntegerType(), True),
     StructField("topic_id", IntegerType(), True),
     StructField("confidence_level_top", FloatType(), True)]
)

documents_topics_df = spark.read.schema(documents_topics_schema) \
    .options(header='true', inferschema='false', nullValue='\\N') \
    .csv(DATA_BUCKET_FOLDER + "documents_topics.csv") \
    .alias('documents_topics')

documents_entities_schema = StructType(
    [StructField("document_id_ent", IntegerType(), True),
     StructField("entity_id", StringType(), True),
     StructField("confidence_level_ent", FloatType(), True)]
)

documents_entities_df = spark.read.schema(documents_entities_schema) \
    .options(header='true', inferschema='false', nullValue='\\N') \
    .csv(DATA_BUCKET_FOLDER + "documents_entities.csv") \
    .alias('documents_entities')

documents_categories_grouped_df = documents_categories_df.groupBy('document_id_cat') \
    .agg(F.collect_list('category_id').alias('category_id_list'),
         F.collect_list('confidence_level_cat').alias('confidence_level_cat_list')) \
    .alias('documents_categories_grouped').cache()

documents_topics_grouped_df = documents_topics_df.groupBy('document_id_top') \
    .agg(F.collect_list('topic_id').alias('topic_id_list'),
         F.collect_list('confidence_level_top').alias('confidence_level_top_list')) \
    .alias('documents_topics_grouped').cache()

documents_entities_grouped_df = documents_entities_df.groupBy('document_id_ent') \
    .agg(F.collect_list('entity_id').alias('entity_id_list'),
         F.collect_list('confidence_level_ent').alias('confidence_level_ent_list')) \
    .alias('documents_entities_grouped').cache()

In [None]:
########################## generate dictionary ##################################
country_value_cat = events_df.select('event_country').groupBy('event_country').count().filter('event_country is not null and count >= 10')

state_value_cal = events_df.select('event_country_state').groupBy('event_country_state').count().filter('event_country_state is not null and count >= 10')

geo_location_value_cat = events_df.select('geo_location_event').groupBy('geo_location_event').count().filter('geo_location_event is not null and count >= 10')


# ### Average CTR by ad_id
ad_id_popularity_df = train_set_df \
    .groupby('ad_id') \
    .agg(F.sum('clicked').alias('clicks'),F.count('*').alias('views')) \
    .withColumn('ctr', F.col('clicks') / F.col('views')) \
    .filter('views > 5').select('ad_id', 'ctr', 'views')

# ### Average CTR by document_id (promoted_content)
document_id_popularity_df = train_set_df \
    .groupby('document_id_promo') \
    .agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views')) \
    .withColumn('ctr', F.col('clicks') / F.col('views')) \
    .filter('views > 5').select('document_id_promo', 'ctr', 'views')

# ### Average CTR by source_id
source_id_popularity_df = train_set_df.select('clicked', 'source_id', 'ad_id') \
    .groupby('source_id').agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views')) \
    .withColumn('ctr', F.col('clicks') / F.col('views')) \
    .filter('views > 10 and source_id is not null').select('source_id', 'ctr', 'views')

# ### Average CTR by publisher_id
publisher_popularity_df = train_set_df.select('clicked', 'publisher_id', 'ad_id') \
    .groupby('publisher_id').agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views')) \
    .withColumn('ctr', F.col('clicks') / F.col('views')) \
    .filter('views > 10 and publisher_id is not null').select('publisher_id', 'ctr', 'views')

# ### Average CTR by advertiser_id
advertiser_id_popularity_df = train_set_df.select('clicked', 'advertiser_id', 'ad_id') \
    .groupby('advertiser_id').agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views')) \
    .withColumn('ctr', F.col('clicks') / F.col('views')) \
    .filter('views > 10 and advertiser_id is not null').select('advertiser_id', 'ctr', 'views')

# ### Average CTR by campaign_id
campaign_id_popularity_df = train_set_df.select('clicked', 'campaign_id', 'ad_id') \
    .groupby('campaign_id').agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views')) \
    .withColumn('ctr', F.col('clicks') / F.col('views')) \
    .filter('views > 10 and campaign_id is not null').select('campaign_id', 'ctr', 'views')


categories_docs_counts = documents_categories_df.groupBy('category_id').count().rdd.collectAsMap()

topics_docs_counts = documents_topics_df.groupBy('topic_id').count().rdd.collectAsMap()

entities_docs_counts = documents_entities_df.groupBy('entity_id').count().rdd.collectAsMap()

In [None]:
##################### udf used for feature engineering ###############################
def cosine_similarity_dicts(dict1, dict2):
    dict1_norm = math.sqrt(sum([v ** 2 for v in dict1.values()]))
    dict2_norm = math.sqrt(sum([v ** 2 for v in dict2.values()]))

    sum_common_aspects = 0.0
    intersections = 0
    for key in dict1:
        if key in dict2:
            sum_common_aspects += dict1[key] * dict2[key]
            intersections += 1

    return sum_common_aspects / (dict1_norm * dict2_norm), intersections


def cosine_similarity_doc_event_doc_ad_aspects(doc_event_aspect_ids, doc_event_aspects_confidence,
                                               doc_ad_aspect_ids, doc_ad_aspects_confidence,
                                               aspect_docs_counts):
    if doc_event_aspect_ids is None or len(doc_event_aspect_ids) == 0 \
            or doc_ad_aspect_ids is None or len(doc_ad_aspect_ids) == 0:
        return None, None

    doc_event_aspects = dict(zip(doc_event_aspect_ids, doc_event_aspects_confidence))
    doc_event_aspects_tfidf_confid = {}
    for key in doc_event_aspect_ids:
        tf = 1.0
        idf = math.log(math.log(documents_total / float(aspect_docs_counts[key])))
        confidence = doc_event_aspects[key]
        doc_event_aspects_tfidf_confid[key] = tf * idf * confidence

    doc_ad_aspects = dict(zip(doc_ad_aspect_ids, doc_ad_aspects_confidence))
    doc_ad_aspects_tfidf_confid = {}
    for key in doc_ad_aspect_ids:
        tf = 1.0
        idf = math.log(math.log(documents_total / float(aspect_docs_counts[key])))
        confidence = doc_ad_aspects[key]
        doc_ad_aspects_tfidf_confid[key] = tf * idf * confidence

    similarity, intersections = cosine_similarity_dicts(doc_event_aspects_tfidf_confid, doc_ad_aspects_tfidf_confid)

    if intersections > 0:
        # P(A intersect B)_intersections = P(A)^intersections * P(B)^intersections
        random_error = math.pow(len(doc_event_aspect_ids) / float(len(aspect_docs_counts)),
                                intersections) * math.pow(len(doc_ad_aspect_ids) / float(len(aspect_docs_counts)),
                                                          intersections)
    else:
        # P(A not intersect B) = 1 - P(A intersect B)
        random_error = 1 - ((len(doc_event_aspect_ids) / float(len(aspect_docs_counts))) *
                            (len(doc_ad_aspect_ids) / float(len(aspect_docs_counts))))

    confidence = 1.0 - random_error

    return similarity, confidence

def convert_odd_timestamp(timestamp_ms_relative):
    TIMESTAMP_DELTA = 1465876799998
    return datetime.datetime.fromtimestamp((int(timestamp_ms_relative) + TIMESTAMP_DELTA) // 1000)

def timestamp_delta(df, publish_time, timestamp):
    def timestamp_delta_udf(publish_time, timestamp):
        if timestamp > -1:
            dt_timestamp_event = convert_odd_timestamp(timestamp)
            if publish_time is not None:
                delta_days = (dt_timestamp_event - publish_time).days
                if 0 <= delta_days <= 365 * 10:  # 10 years
                    return float(delta_days)
    udf_inter = F.udf(lambda publish_time, timestamp: timestamp_delta_udf(publish_time, timestamp), DoubleType())
    df = df.withColumn(publish_time + '_delta', udf_inter(publish_time, timestamp))
    return df

# Setting Doc_event-doc_ad CB Similarity fields
def get_doc_event_doc_ad_cb_similarity_score_fn(df, doc_event_ids, doc_event_levels, doc_ad_ids, doc_ad_levels, cnt):
    udf_inter = F.udf(
        lambda doc_event_ids, doc_event_levels, doc_ad_ids, doc_ad_levels: 
        cosine_similarity_doc_event_doc_ad_aspects(doc_event_ids, doc_event_levels, doc_ad_ids, doc_ad_levels, cnt)[0], DoubleType())
    df = df.withColumn(doc_event_ids + '_sim', udf_inter(doc_event_ids, doc_event_levels, doc_ad_ids, doc_ad_levels))
    return df

In [None]:
def enrich_df(df):
    df_enriched = df \
        .join(documents_categories_grouped_df,
          on=F.col("document_id_promo") == F.col("documents_categories_grouped.document_id_cat"),
          how='left') \
        .join(documents_topics_grouped_df,
          on=F.col("document_id_promo") == F.col("documents_topics_grouped.document_id_top"),
          how='left') \
        .join(documents_entities_grouped_df,
          on=F.col("document_id_promo") == F.col("documents_entities_grouped.document_id_ent"),
          how='left') \
        .join(documents_categories_grouped_df
          .withColumnRenamed('category_id_list', 'doc_event_category_id_list')
          .withColumnRenamed('confidence_level_cat_list', 'doc_event_confidence_level_cat_list')
          .alias('documents_event_categories_grouped'),
          on=F.col("document_id_event") == F.col("documents_event_categories_grouped.document_id_cat"),
          how='left') \
        .join(documents_topics_grouped_df
          .withColumnRenamed('topic_id_list', 'doc_event_topic_id_list')
          .withColumnRenamed('confidence_level_top_list', 'doc_event_confidence_level_top_list')
          .alias('documents_event_topics_grouped'),
          on=F.col("document_id_event") == F.col("documents_event_topics_grouped.document_id_top"),
          how='left') \
        .join(documents_entities_grouped_df
          .withColumnRenamed('entity_id_list', 'doc_event_entity_id_list')
          .withColumnRenamed('confidence_level_ent_list', 'doc_event_confidence_level_ent_list')
          .alias('documents_event_entities_grouped'),
          on=F.col("document_id_event") == F.col("documents_event_entities_grouped.document_id_ent"),
          how='left') \
        .select('display_id', 'uuid_event', 'event_country', 'event_country_state', 'platform_event',
            'source_id_doc_event', 'publisher_doc_event', 'publish_time_doc_event',
            'publish_time', 'ad_id', 'document_id_promo', 'clicked',
            'geo_location_event', 'advertiser_id', 'publisher_id',
            'campaign_id', 'document_id_event',
            F.coalesce("doc_event_category_id_list", F.array())
            .alias('doc_event_category_id_list'),
            F.coalesce("doc_event_confidence_level_cat_list", F.array())
            .alias('doc_event_confidence_level_cat_list'),
            F.coalesce("doc_event_topic_id_list", F.array())
            .alias('doc_event_topic_id_list'),
            F.coalesce("doc_event_confidence_level_top_list", F.array())
            .alias('doc_event_confidence_level_top_list'),
            F.coalesce("doc_event_entity_id_list", F.array())
            .alias('doc_event_entity_id_list'),
            F.coalesce("doc_event_confidence_level_ent_list", F.array())
            .alias('doc_event_confidence_level_ent_list'),
            F.coalesce("source_id", F.lit(-1)).alias('source_id'),
            F.coalesce("timestamp_event", F.lit(-1)).alias('timestamp_event'),
            F.coalesce("category_id_list", F.array()).alias('category_id_list'),
            F.coalesce("confidence_level_cat_list", F.array())
            .alias('confidence_level_cat_list'),
            F.coalesce("topic_id_list", F.array()).alias('topic_id_list'),
            F.coalesce("confidence_level_top_list", F.array())
            .alias('confidence_level_top_list'),
            F.coalesce("entity_id_list", F.array()).alias('entity_id_list'),
            F.coalesce("confidence_level_ent_list", F.array())
            .alias('confidence_level_ent_list'))
    df_enriched = df_enriched.fillna(-1, subset=['source_id', 'timestamp_event'])
    return df_enriched

train_set_enriched_df = enrich_df(train_set_df)
s_time = time.time()
train_set_enriched_df.write.format('parquet').mode('overwrite').save(OUTPUT_BUCKET_FOLDER + 'train_set_enriched_df')
train_set_enriched_df = spark.read.parquet(OUTPUT_BUCKET_FOLDER + 'train_set_enriched_df')
print(f'train_set_enriched_df time: {time.time() - s_time}')

test_set_enriched_df = enrich_df(valid_set_df)
s_time = time.time()
test_set_enriched_df.write.format('parquet').mode('overwrite').save(OUTPUT_BUCKET_FOLDER + 'test_set_enriched_df')
test_set_enriched_df = spark.read.parquet(OUTPUT_BUCKET_FOLDER + 'test_set_enriched_df')
print(f'test_set_enriched_df time: {time.time() - s_time}')

documents_categories_grouped_df.unpersist()
documents_topics_grouped_df.unpersist()
documents_entities_grouped_df.unpersist()

In [None]:
def format_number(element, name):
    if name in BOOL_COLUMNS + CATEGORICAL_COLUMNS:
        return element.cast("int")
    else:
        return element

FEAT_CSV_ORDERED_COLUMNS = ['ad_views', 'campaign_id','doc_views',
                            'doc_event_days_since_published', 'doc_ad_days_since_published',
                            'pop_ad_id', 'pop_document_id', 'pop_publisher_id', 'pop_advertiser_id', 'pop_campain_id',
                            'pop_source_id',
                            'doc_event_doc_ad_sim_categories', 'doc_event_doc_ad_sim_topics',
                            'doc_event_doc_ad_sim_entities', 'ad_advertiser', 'doc_ad_publisher_id',
                            'doc_ad_source_id', 'doc_event_publisher_id', 'doc_event_source_id', 'event_country',
                            'event_country_state', 'event_geo_location', 'event_platform',
                            'traffic_source']
feature_vector_labels = ['ad_id_views', 'campaign_id','document_id_promo_views',
                            'publish_time_doc_event_delta', 'publish_time_delta', 
                            'ad_id_ctr', 'document_id_promo_ctr', 'publisher_id_ctr', 
                            'advertiser_id_ctr', 'campaign_id_ctr', 'source_id_ctr', 
                            'doc_event_category_id_list_sim', 'doc_event_topic_id_list_sim',
                            'doc_event_entity_id_list_sim', 
                            'advertiser_id', 'publisher_id', 'source_id', 'publisher_doc_event', 'source_id_doc_event', 
                            'event_country_count', 'event_country_state_count', 'geo_location_event_count', 'platform_event', 
                            'traffic_source']

In [None]:
def categorifyFeatures(df, proc, output_name="categorify", gen_dict=False):
    categorify_cols = ['ad_id_views', 'document_id_promo_views',
                            'ad_id_ctr', 'document_id_promo_ctr', 'publisher_id_ctr', 
                            'advertiser_id_ctr', 'campaign_id_ctr', 'source_id_ctr',
                            'event_country_count', 'event_country_state_count', 'geo_location_event_count']
    to_categorify_cols = ['ad_id', 'document_id_promo',
                            'ad_id', 'document_id_promo', 'publisher_id', 
                            'advertiser_id', 'campaign_id', 'source_id',
                            'event_country', 'event_country_state', 'geo_location_event']
    
    # transform dict column name to match recdp
    dfs = [ad_id_popularity_df.select(F.col('ad_id').alias('dict_col'), F.col('views').alias('dict_col_id')), 
            document_id_popularity_df.select(F.col('document_id_promo').alias('dict_col'), F.col('views').alias('dict_col_id')), 
            ad_id_popularity_df.select(F.col('ad_id').alias('dict_col'), F.col('ctr').alias('dict_col_id')), 
            document_id_popularity_df.select(F.col('document_id_promo').alias('dict_col'), F.col('ctr').alias('dict_col_id')), 
            publisher_popularity_df.select(F.col('publisher_id').alias('dict_col'), F.col('ctr').alias('dict_col_id')), 
            advertiser_id_popularity_df.select(F.col('advertiser_id').alias('dict_col'), F.col('ctr').alias('dict_col_id')), 
            campaign_id_popularity_df.select(F.col('campaign_id').alias('dict_col'), F.col('ctr').alias('dict_col_id')), 
            source_id_popularity_df.select(F.col('source_id').alias('dict_col'), F.col('ctr').alias('dict_col_id')), 
            country_value_cat.select(F.col('event_country').alias('dict_col'), F.col('count').alias('dict_col_id')), 
            state_value_cal.select(F.col('event_country_state').alias('dict_col'), F.col('count').alias('dict_col_id')), 
            geo_location_value_cat.select(F.col('geo_location_event').alias('dict_col'), F.col('count').alias('dict_col_id'))]

    dict_dfs = [{'col_name': name, 'dict': dfs[index]} for index, name in enumerate(categorify_cols)]
    # add new columns since recdp will overwrite cat columns
    for index, column in enumerate(categorify_cols):
        df = df.withColumn(column, F.col(to_categorify_cols[index]))

    op_categorify = Categorify(categorify_cols, dict_dfs=dict_dfs)
    op_fillna = FillNA(['event_country_count', 'event_country_state_count', 'geo_location_event_count'], 0)
    op_feature_modify = FeatureModification(cols={"platform_event": "f.col('platform_event') - 1"} , op='inline')
    proc.reset_ops([op_categorify, op_fillna, op_feature_modify])
    t1 = timer()
    df = proc.transform(df, name=output_name)
    t2 = timer()
    print("Categorify took %.3f" % (t2 - t1))

    return df

def convertType(df, proc, output_name="convertType"):
    op_feature_type_convert = FeatureModification(
        cols={"campaign_id": "f.col('campaign_id').cast(spk_type.DoubleType())", 
             "advertiser_id": "f.col('advertiser_id').cast(spk_type.DoubleType())", 
             "source_id": "f.col('source_id').cast(spk_type.DoubleType())", 
             "publisher_id": "f.col('publisher_id').cast(spk_type.DoubleType())", 
             "source_id_doc_event": "f.col('source_id_doc_event').cast(spk_type.DoubleType())", 
             "publisher_doc_event": "f.col('publisher_doc_event').cast(spk_type.DoubleType())"
             }, op='inline')
    proc.reset_ops([op_feature_type_convert])
    t1 = timer()
    df = proc.transform(df, name=output_name)
    t2 = timer()
    print("Convert type took %.3f" % (t2 - t1))
    
    return df
    

In [None]:
################################## feature engineer with RecDP ################################
path_prefix = "hdfs://"
current_path = "/wnd/"
proc = DataProcessor(spark, path_prefix, current_path=current_path, shuffle_disk_capacity="1200GB")

######################### trainset feature engineer ###############################
train_set_features_df = train_set_enriched_df
train_set_features_df = categorifyFeatures(train_set_features_df, proc, output_name="train_categorified", gen_dict=False)
train_set_features_df = timestamp_delta(train_set_features_df, 'publish_time', 'timestamp_event')
train_set_features_df = timestamp_delta(train_set_features_df, 'publish_time_doc_event', 'timestamp_event')

train_set_features_df = get_doc_event_doc_ad_cb_similarity_score_fn(
    train_set_features_df, 'doc_event_category_id_list', 'doc_event_confidence_level_cat_list', 
    'category_id_list', 'confidence_level_cat_list', categories_docs_counts)
train_set_features_df = get_doc_event_doc_ad_cb_similarity_score_fn(
    train_set_features_df, 'doc_event_topic_id_list', 'doc_event_confidence_level_top_list',
    'topic_id_list', 'confidence_level_top_list', topics_docs_counts)
train_set_features_df = get_doc_event_doc_ad_cb_similarity_score_fn(
    train_set_features_df, 'doc_event_entity_id_list', 'doc_event_confidence_level_ent_list', 
    'entity_id_list', 'confidence_level_ent_list', entities_docs_counts)

train_set_features_df = convertType(train_set_features_df, proc, output_name="train_convert_type")

train_set_features_df = train_set_features_df \
    .withColumn('traffic_source', F.lit(0).cast(DoubleType())) \
    .withColumnRenamed('document_id_promo', 'document_id') \
    .withColumnRenamed('clicked', 'label')

train_set_features_df = train_set_features_df.fillna(0, subset=feature_vector_labels)

train_feature_vectors_integral_csv_rdd_df = train_set_features_df.select(
    ['label'] + ['display_id'] + ['ad_id'] + [F.col('document_id').alias('doc_id')] + [F.col('document_id_event').alias('doc_event_id')] + [
        format_number(element, FEAT_CSV_ORDERED_COLUMNS[index]).alias(FEAT_CSV_ORDERED_COLUMNS[index]) for
        index, element in enumerate([F.col(column) for column in feature_vector_labels])]).replace(
    float('nan'), 0)


######################### testset feature engineer ###############################
test_set_features_df = test_set_enriched_df
test_set_features_df = categorifyFeatures(test_set_features_df, proc, output_name="test_categorified", gen_dict=False)
test_set_features_df = timestamp_delta(test_set_features_df, 'publish_time', 'timestamp_event')
test_set_features_df = timestamp_delta(test_set_features_df, 'publish_time_doc_event', 'timestamp_event')

test_set_features_df = get_doc_event_doc_ad_cb_similarity_score_fn(
    test_set_features_df, 'doc_event_category_id_list', 'doc_event_confidence_level_cat_list', 
    'category_id_list', 'confidence_level_cat_list', categories_docs_counts)
test_set_features_df = get_doc_event_doc_ad_cb_similarity_score_fn(
    test_set_features_df, 'doc_event_topic_id_list', 'doc_event_confidence_level_top_list',
    'topic_id_list', 'confidence_level_top_list', topics_docs_counts)
test_set_features_df = get_doc_event_doc_ad_cb_similarity_score_fn(
    test_set_features_df, 'doc_event_entity_id_list', 'doc_event_confidence_level_ent_list', 
    'entity_id_list', 'confidence_level_ent_list', entities_docs_counts)

test_set_features_df = convertType(test_set_features_df, proc, output_name="test_convert_type")

test_set_features_df = test_set_features_df \
    .withColumn('traffic_source', F.lit(0).cast(DoubleType())) \
    .withColumnRenamed('document_id_promo', 'document_id') \
    .withColumnRenamed('clicked', 'label')

test_set_features_df = test_set_features_df.fillna(0, subset=feature_vector_labels)

test_validation_feature_vectors_integral_csv_rdd_df = test_set_features_df.repartition(40,'display_id').orderBy('display_id').select(
    ['label'] + ['display_id'] + ['ad_id'] + [F.col('document_id').alias('doc_id')] + [F.col('document_id_event').alias('doc_event_id')] + [
        format_number(element, FEAT_CSV_ORDERED_COLUMNS[index]).alias(FEAT_CSV_ORDERED_COLUMNS[index]) for
        index, element in enumerate([F.col(column) for column in feature_vector_labels])]).replace(
    float('nan'), 0)

In [None]:
def log2_1p(x):
    return np.log1p(x) / np.log(2.0)


# calculate min and max stats for the given dataframes all in one go
def compute_min_max_logs(df):
    print(str(datetime.datetime.now()) + '\tComputing min and max')
    min_logs = {}
    max_logs = {}
    float_expr = []
    for name in FLOAT_COLUMNS_LOG_BIN_TRANSFORM + INT_COLUMNS:
        float_expr.append(F.min(name))
        float_expr.append(F.max(name))
    floatDf = all_df.agg(*float_expr).collect()
    print(floatDf)
    for name in FLOAT_COLUMNS_LOG_BIN_TRANSFORM:
        minAgg = floatDf[0]["min(" + name + ")"]
        maxAgg = floatDf[0]["max(" + name + ")"]
        min_logs[name + '_log_01scaled'] = log2_1p(minAgg * 1000)
        max_logs[name + '_log_01scaled'] = log2_1p(maxAgg * 1000)
    for name in INT_COLUMNS:
        minAgg = floatDf[0]["min(" + name + ")"]
        maxAgg = floatDf[0]["max(" + name + ")"]
        min_logs[name + '_log_01scaled'] = log2_1p(minAgg)
        max_logs[name + '_log_01scaled'] = log2_1p(maxAgg)

    return min_logs, max_logs

def log_and_norm(df, proc, output_name="log_norm", gen_dict=False):
    log_cols = {col + '_log_01': f'f.log1p("{col}")' for col in INT_COLUMNS}
    norm_cols = {col + 'scaled': f'(f.col("{col}")-{min_logs[col+"scaled"]}) / ({max_logs[col+"scaled"]-min_logs[col+"scaled"]})' for col in log_cols.keys()}
    op_log = FeatureAdd(cols=log_cols, op='inline')
    op_norm = FeatureAdd(cols=norm_cols, op='inline')
    proc.reset_ops([op_log, op_norm])

    op_fillna = FillNA(CATEGORICAL_COLUMNS, 0)
    proc.append_ops([op_fillna])

    hash_bucket_cols = {col: f'f.col("{col}") % {size}' for col, size in HASH_BUCKET_SIZES.items()}
    op_hash_bucket = FeatureModification(cols=hash_bucket_cols, op='inline')
    proc.append_ops([op_hash_bucket])
    
    t1 = timer()
    df = proc.transform(df, name=output_name)
    t2 = timer()
    print("Log and norm took %.3f" % (t2 - t1))

    for name, size in HASH_BUCKET_SIZES.items():
        df = df.withColumn(name, F.when(F.col(name)<0, F.col(name)+size).otherwise(F.col(name)))
    return df

In [None]:
all_df = test_validation_feature_vectors_integral_csv_rdd_df.union(train_feature_vectors_integral_csv_rdd_df)
min_logs, max_logs = compute_min_max_logs(all_df)

train_feature_norm = log_and_norm(train_feature_vectors_integral_csv_rdd_df, proc, output_name="log_norm_train")
test_feature_norm = log_and_norm(test_validation_feature_vectors_integral_csv_rdd_df, proc, output_name="log_norm_test")

In [None]:
def make_spec(output_dir, batch_size=None):
    fixed_shape = [batch_size, 1] if batch_size is not None else []
    spec = {}
    spec[LABEL_COLUMN] = tf.io.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
    spec[DISPLAY_ID_COLUMN] = tf.io.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
    for name in BOOL_COLUMNS:
        spec[name] = tf.io.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
    for name in INT_COLUMNS:
        spec[name + '_log_01scaled'] = tf.io.FixedLenFeature(shape=fixed_shape, dtype=tf.float32, default_value=None)
    for name in BOOL_COLUMNS + CATEGORICAL_COLUMNS:
        spec[name] = tf.io.FixedLenFeature(shape=fixed_shape, dtype=tf.int64, default_value=None)
    metadata = dataset_metadata.DatasetMetadata(dataset_schema.from_feature_spec(spec))
    metadata_io.write_metadata(metadata, output_dir)


def create_tf_example_spark(df):
    result = {}
    result[LABEL_COLUMN] = tf.train.Feature(int64_list=tf.train.Int64List(value=df[LABEL_COLUMN].to_list()))
    result[DISPLAY_ID_COLUMN] = tf.train.Feature(int64_list=tf.train.Int64List(value=df[DISPLAY_ID_COLUMN].to_list()))
    for name in FLOAT_COLUMNS:
        value = df[name].to_list()
        result[name] = tf.train.Feature(float_list=tf.train.FloatList(value=value))
    for name in INT_COLUMNS:
        nn = name + '_log_01scaled'
        value = df[nn].to_list()
        result[nn] = tf.train.Feature(float_list=tf.train.FloatList(value=value))
    for name in BOOL_COLUMNS + CATEGORICAL_COLUMNS:
        value = df[name].to_list()
        result[name] = tf.train.Feature(int64_list=tf.train.Int64List(value=value))
    tf_example = tf.train.Example(features=tf.train.Features(feature=result))
    return tf_example


def _transform_to_slices(rdds):
    taskcontext = TaskContext.get()
    partitionid = taskcontext.partitionId()
    csv = pd.DataFrame(list(rdds), columns=columns)
    num_rows = len(csv.index)
    examples = []
    for start_ind in range(0, num_rows, batch_size if batch_size is not None else 1):  # for each batch
        if start_ind + batch_size - 1 > num_rows:  # if we'd run out of rows
            csv_slice = csv.iloc[start_ind:]
            print("last Example has: ", len(csv_slice), partitionid)
            examples.append((csv_slice, len(csv_slice)))
            return examples
        else:
            csv_slice = csv.iloc[start_ind:start_ind + (batch_size if batch_size is not None else 1)]
        examples.append((csv_slice, len(csv_slice)))
    return examples


def _transform_to_tfrecords_from_slices(rdds):
    examples = []
    for slice in rdds:
        if len(slice[0]) != batch_size:
            print("slice size is not correct, dropping: ", len(slice[0]))
        else:
            examples.append(
                (bytearray((create_tf_example_spark(slice[0])).SerializeToString()), None))
    return examples


def _transform_to_tfrecords_from_reslice(rdds):
    examples = []
    all_dataframes = pd.DataFrame([])
    for slice in rdds:
        all_dataframes = all_dataframes.append(slice[0])
    num_rows = len(all_dataframes.index)
    examples = []
    for start_ind in range(0, num_rows, batch_size if batch_size is not None else 1):  # for each batch
        if start_ind + batch_size - 1 > num_rows:  # if we'd run out of rows
            csv_slice = all_dataframes.iloc[start_ind:]
            if TEST_SET_MODE:
                remain_len = batch_size - len(csv_slice)
                (m, n) = divmod(remain_len, len(csv_slice))
                print("remainder: ", len(csv_slice), remain_len, m, n)
                if m:
                    for i in range(m):
                        csv_slice = csv_slice.append(csv_slice)
                csv_slice = csv_slice.append(csv_slice.iloc[:n])
                print("after fill remainder: ", len(csv_slice))
                examples.append(
                    (bytearray((create_tf_example_spark(csv_slice)).SerializeToString()), None))
                return examples
            # drop the remainder
            print("dropping remainder: ", len(csv_slice))
            return examples
        else:
            csv_slice = all_dataframes.iloc[start_ind:start_ind + (batch_size if batch_size is not None else 1)]
            examples.append(
                (bytearray((create_tf_example_spark(csv_slice)).SerializeToString()), None))
    return examples

In [None]:
########################### convert data to TFRecods ######################
pd.set_option('display.max_columns', 1000)
evaluation = True
evaluation_verbose = False
LOCAL_DATA_TFRECORDS_DIR = "/outbrain/tfrecords-test/recdp"
train_output_string = '/train'
eval_output_string = '/eval'

TEST_SET_MODE = False

num_train_partitions = 40
num_valid_partitions = 40
batch_size = 4096
# write out tfrecords meta
make_spec(LOCAL_DATA_TFRECORDS_DIR + '/transformed_metadata', batch_size=batch_size)

columns = train_feature_norm.columns

TEST_SET_MODE = False
train_features = train_feature_norm.coalesce(30).rdd.mapPartitions(_transform_to_slices)
cached_train_features = train_features.cache()
train_full = cached_train_features.filter(lambda x: x[1] == batch_size)
# split out slies where we don't have a full batch so that we can reslice them so we only drop mininal rows
train_not_full = cached_train_features.filter(lambda x: x[1] < batch_size)
train_examples_full = train_full.mapPartitions(_transform_to_tfrecords_from_slices)
train_left = train_not_full.coalesce(1).mapPartitions(_transform_to_tfrecords_from_reslice)
all_train = train_examples_full.union(train_left)

all_train.saveAsNewAPIHadoopFile(LOCAL_DATA_TFRECORDS_DIR + train_output_string,
                                 "org.tensorflow.hadoop.io.TFRecordFileOutputFormat",
                                 keyClass="org.apache.hadoop.io.BytesWritable",
                                 valueClass="org.apache.hadoop.io.NullWritable")
train_features.unpersist()


valid_features = test_feature_norm.coalesce(num_valid_partitions).rdd.mapPartitions(_transform_to_slices)
cached_valid_features = valid_features.cache()
valid_full = cached_valid_features.filter(lambda x: x[1] == batch_size)
valid_not_full = cached_valid_features.filter(lambda x: x[1] < batch_size)
valid_examples_full = valid_full.mapPartitions(_transform_to_tfrecords_from_slices)
valid_left = valid_not_full.coalesce(1).mapPartitions(_transform_to_tfrecords_from_reslice)
all_valid = valid_examples_full.union(valid_left)

all_valid.saveAsNewAPIHadoopFile(LOCAL_DATA_TFRECORDS_DIR + eval_output_string,
                                 "org.tensorflow.hadoop.io.TFRecordFileOutputFormat",
                                 keyClass="org.apache.hadoop.io.BytesWritable",
                                 valueClass="org.apache.hadoop.io.NullWritable")
valid_features.unpersist()

In [None]:
spark.stop()