### **Ads Click-Through-Rate Prediction with Spark MLlib**

Data: https://tianchi.aliyun.com/dataset/dataDetail?dataId=56

In [2]:
# import library
from functools import reduce
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, countDistinct, explode, array, lit, isnan, when, count
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
import matplotlib.pyplot as plt
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
!pip install mlflow
import mlflow

In [3]:
# read data
data_0 = spark.read.format("csv") \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ",") \
  .load("/FileStore/tables/merge_data_10k_4.csv")

data_1 = spark.read.format("csv") \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ",") \
  .load("/FileStore/tables/merge_data_10k_4.csv")

data_2 = spark.read.format("csv") \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ",") \
  .load("/FileStore/tables/merge_data_10k_4.csv")

data_3 = spark.read.format("csv") \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ",") \
  .load("/FileStore/tables/merge_data_10k_4.csv")

data_4 = spark.read.format("csv") \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ",") \
  .load("/FileStore/tables/merge_data_10k_4.csv")

display(data_0)

user,cate,btag,num,adgroup_id,userid,cms_segid,cms_group_id,final_gender_code,age_level,pvalue_level,shopping_level,occupation,new_user_class_level_,ad_time_stamp,pid,nonclk,clk,ad_cate_id,campaign_id,customer,ad_brand,price
897805,6591,pv,25,515153,897805,96,12,1,6,3,3,0,4.0,1494164903,430548_1007,1,0,4863,25135,103298,221007.0,108.0
739640,6432,pv,1,608534,739640,96,12,1,6,2,3,0,4.0,1494149435,430548_1007,1,0,7146,60067,142527,224985.0,109.0
739640,4520,pv,5,779953,739640,96,12,1,6,2,3,0,4.0,1494409300,430548_1007,1,0,6554,113993,146889,196783.0,158.0
750820,562,pv,103,551830,750820,96,12,1,6,3,3,0,3.0,1494604380,430548_1007,1,0,562,243690,28833,116097.0,439.0
750820,4565,pv,29,745756,750820,96,12,1,6,3,3,0,3.0,1494555962,430548_1007,1,0,6261,154448,65421,449570.0,665.0
750820,6261,pv,59,77760,750820,96,12,1,6,3,3,0,3.0,1494212844,430548_1007,1,0,4753,139312,15971,96398.0,39.0
750820,10875,pv,1,567767,750820,96,12,1,6,3,3,0,3.0,1494672368,430548_1007,1,0,4280,405787,48047,336831.0,582.0
750820,562,pv,103,766278,750820,96,12,1,6,3,3,0,3.0,1494212844,430548_1007,1,0,6261,200045,161334,,615.0
859569,4595,pv,1,687894,859569,96,12,1,6,2,3,0,1.0,1494083201,430539_1007,1,0,6427,96543,166329,,198.0
750820,5190,pv,4,464113,750820,96,12,1,6,3,3,0,3.0,1494414308,430548_1007,1,0,562,70968,89864,39274.0,79.0


In [4]:
# merge data
dfs = [data_0, data_1, data_2, data_3, data_4]
data = reduce(DataFrame.unionAll, dfs)
print((data.count(), len(data.columns)))

In [5]:
# 'userid' overlaps with 'user', drop it
data = data.drop('userid')

# if only link ad to CTR, drop all the variables in behavior table: btag, cate
data = data.drop(data.cate)
data = data.drop(data.num)
data = data.drop(data.btag)

# drop useless variables
data = data.drop(data.cms_segid)
data = data.drop(data.cms_group_id)
data = data.drop(data.customer)
data = data.drop(data.ad_brand)
data = data.drop(data.nonclk)

In [6]:
# drop campaign_id - too many categories
data.agg(countDistinct(col("campaign_id")).alias("count")).show()
data = data.drop(data.campaign_id)

In [7]:
# drop adgroup_id - too many categories
data.agg(countDistinct(col("adgroup_id")).alias("count")).show()
data = data.drop(data.adgroup_id)

In [8]:
# too many nulls in pvalue_level & new_user_class_level_, drop tehm
display(data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]))
data = data.drop(data.pvalue_level)
data = data.drop(data.new_user_class_level_)

user,final_gender_code,age_level,pvalue_level,shopping_level,occupation,new_user_class_level_,ad_time_stamp,pid,clk,ad_cate_id,price
0,0,0,6713775,0,0,2690645,0,0,0,0,0


In [9]:
# save to csv, create sql table
data.createOrReplaceTempView('data')
dbutils.fs.rm("taobao_data.csv", True)
data.write.format('com.databricks.spark.csv').save('taobao_data.csv')

In [10]:
# data = spark.read.format("csv") \
#   .option("inferSchema", True) \
#   .option("header", True) \
#   .option("sep", ",") \
#   .load("taobao_data.csv")
# display(data)

In [11]:
# imbalanced
data.groupBy("clk").count().show()

In [12]:
# oversampling
major_df = data.filter(col("clk") == 0)
minor_df = data.filter(col("clk") == 1)
ratio = int(major_df.count()/minor_df.count())
print("ratio: {}".format(ratio))

# duplicate the minority rows
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in range(ratio)]))).drop('dummy')

# combine both oversampled minority rows and previous majority rows 
combined_df = major_df.unionAll(oversampled_df)
combined_df.groupBy("clk").count().show()

In [13]:
combined_df.createOrReplaceTempView('oversampled')

In [14]:
# select first 7 days as train data
train = spark.sql(
    '''
    SELECT *
    FROM oversampled
    WHERE FROM_UNIXTIME(ad_time_stamp) <= "2017-05-12"
    ''')
train = train.drop(train.ad_time_stamp)
print((train.count(), len(train.columns)))
train.createOrReplaceTempView('train')

In [15]:
%sql
-- caculate average price and aggregate data
DROP TABLE IF EXISTS average;
CREATE TABLE average
AS SELECT user, ad_cate_id, AVG(price) AS avg_price
    FROM train
    GROUP BY user, ad_cate_id
    ORDER BY user, ad_cate_id

In [16]:
# append aggregated price 
# train = train.drop(train.price)
train = train.drop_duplicates()
train.createOrReplaceTempView('train')
print((train.count(), len(train.columns)))
train_merged = spark.sql(
    '''
    SELECT train.*, average.avg_price
    FROM train
    INNER JOIN average
    ON train.user = average.user AND train.ad_cate_id = average.ad_cate_id
    ORDER BY user, ad_cate_id
    ''')
display(train_merged)

user,final_gender_code,age_level,shopping_level,occupation,pid,clk,ad_cate_id,avg_price
37,2,2,3,0,430539_1007,0,1535,220.17391304347825
37,2,2,3,0,430539_1007,0,1665,114.1851851851852
37,2,2,3,0,430539_1007,0,2842,1000.0
37,2,2,3,0,430539_1007,0,4280,149.0
37,2,2,3,0,430539_1007,1,4281,120.3061224489796
37,2,2,3,0,430539_1007,0,4281,120.3061224489796
37,2,2,3,0,430539_1007,0,4282,598.5
37,2,2,3,0,430539_1007,0,4283,81.0
37,2,2,3,0,430539_1007,0,4520,99.24137931034484
37,2,2,3,0,430539_1007,0,4534,3980.0


**Data Preprocessing**

In [18]:
categoricalColumns = ['final_gender_code', 'age_level', 'shopping_level', 'occupation']
stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index').setHandleInvalid("keep")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

label_stringIdx = StringIndexer(inputCol = 'clk', outputCol = 'label')
stages += [label_stringIdx]

numericCols = ['avg_price']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [19]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(train_merged)
train_merged_processed = pipelineModel.transform(train_merged)
cols = train_merged.columns
selectedCols = ['label', 'features'] + cols
train_merged_processed = train_merged_processed.select(selectedCols)
train_merged_processed.printSchema()

In [20]:
test = spark.sql(
    '''
    SELECT *
    FROM oversampled
    WHERE FROM_UNIXTIME(ad_time_stamp) > "2017-05-12"
    ''')
test = test.drop(test.ad_time_stamp)
print((test.count(), len(test.columns)))
test.createOrReplaceTempView('test')

In [21]:
%sql
DROP TABLE IF EXISTS average;
CREATE TABLE average
AS SELECT user, ad_cate_id, AVG(price) AS avg_price
    FROM test
    GROUP BY user, ad_cate_id
    ORDER BY user, ad_cate_id

In [22]:
# append aggregated price 
test = test.drop(test.price)
test = test.drop_duplicates()
test.createOrReplaceTempView('test')
print((test.count(), len(test.columns)))
test_merged = spark.sql(
    '''
    SELECT test.*, average.avg_price
    FROM test
    INNER JOIN average
    ON test.user = average.user AND test.ad_cate_id = average.ad_cate_id
    ORDER BY user, ad_cate_id
    ''')
display(test_merged)

user,final_gender_code,age_level,shopping_level,occupation,pid,clk,ad_cate_id,avg_price
37,2,2,3,0,430539_1007,0,748,72.0
37,2,2,3,0,430539_1007,0,822,545.0
37,2,2,3,0,430539_1007,0,839,278.0
37,2,2,3,0,430539_1007,0,1028,127.26116504854366
37,2,2,3,0,430539_1007,0,1101,11.800000000000002
37,2,2,3,0,430539_1007,0,1535,220.17391304347825
37,2,2,3,0,430539_1007,0,1665,114.1851851851852
37,2,2,3,0,430539_1007,0,2842,1000.0
37,2,2,3,0,430539_1007,0,3787,203.64414414414412
37,2,2,3,0,430539_1007,0,4267,88.0


In [23]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(test_merged)
test_merged_processed = pipelineModel.transform(test_merged)
cols = test_merged.columns
selectedCols = ['label', 'features'] + cols
test_merged_processed = test_merged_processed.select(selectedCols)
test_merged_processed.printSchema()

**Logistic Regression**

In [25]:
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train_merged_processed)

In [26]:
trainingSummary = lrModel.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

In [27]:
predictions = lrModel.transform(test_merged_processed)
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [28]:
print("Multinomial coefficients: " + str(lrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(lrModel.interceptVector))

**Decision Tree**

In [30]:
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 3)
dtModel = dt.fit(train_merged_processed)
predictions = dtModel.transform(test_merged_processed)
predictions.groupBy("clk").count().show()

In [31]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

**Random Forest**

In [33]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label')
rfModel = rf.fit(train_merged_processed)

In [34]:
predictions = rfModel.transform(test_merged_processed)
predictions.agg(countDistinct(col("prediction")).alias("count")).show()

In [35]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

**Gradient Boosting Trees**

In [37]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=10)
gbtModel = gbt.fit(train_merged_processed)
predictions = gbtModel.transform(test_merged_processed)
predictions.agg(countDistinct(col("prediction")).alias("count")).show()

In [38]:
evaluator = BinaryClassificationEvaluator()
print("Test Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

In [39]:
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [2, 4, 6])
             .addGrid(gbt.maxBins, [20, 60])
             .addGrid(gbt.maxIter, [10, 20])
             .build())
evaluator = BinaryClassificationEvaluator()

cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = cv.fit(train_merged_processed)
predictions = cvModel.transform(test_merged_processed)
evaluator.evaluate(predictions)

**Auto ML**

In [41]:
import mlflow.mleap
with mlflow.start_run():
  cvModel = cv.fit(train_merged_processed)
  test_metric = evaluator.evaluate(cvModel.transform(test_merged_processed))
  mlflow.log_metric('test_' + evaluator.getMetricName(), test_metric) # Logs additional metrics
  mlflow.mleap.log_model(spark_model=cvModel.bestModel, sample_input=test, artifact_path='best-model')