# Advertising Analytics Click Prediction: ML
####[Ad impressions with clicks dataset](https://www.kaggle.com/c/avazu-ctr-prediction/data)

<img src="/files/img/fraud_ml_pipeline.png" alt="workflow" width="500">

This is the ML notebook for the series of Advertising Analytics Click Prediction notebooks.  For this stage, we will focus on creating features and training and evaluating the ML model.

<img src="https://s3-us-west-2.amazonaws.com/pub-tc/ML-workflow.png" width="800">

In [2]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
# ,,,,,,,,,,
schema = StructType([
            #StructField("id", StringType(), True),
            StructField("click", IntegerType(), True), #1
            StructField("hour", IntegerType(), True),
            StructField("C1", IntegerType(), True),
            StructField("banner_pos", IntegerType(), True),
            StructField("site_id", StringType(), True),
            StructField("site_domain", StringType(), True),
            StructField("site_category", StringType(), True),
            StructField("app_id", StringType(), True),
            StructField("app_domain", StringType(), True),
            StructField("app_category", StringType(), True),
            StructField("device_id", StringType(), True),
            StructField("device_ip", StringType(), True),
            StructField("device_model", StringType(), True),
            StructField("device_type", IntegerType(), True),
            StructField("device_conn_type", IntegerType(), True),
            StructField("C14", IntegerType(), True),
            StructField("C15", IntegerType(), True),
            StructField("C16", IntegerType(), True),
            StructField("C17", IntegerType(), True),
            StructField("C18", IntegerType(), True),
            StructField("C19", IntegerType(), True),
            StructField("C20", IntegerType(), True),
            StructField("C21", IntegerType(), True),])

In [3]:
from itertools import islice
par_files = [("/FileStore/tables/train.400k",'/FileStore/tables/train.400k.parquet'),("/FileStore/tables/train.4m",'/FileStore/tables/train.4m.parquet')]
for csvfile,parfile in par_files:
  rdd = sc.textFile(csvfile).mapPartitionsWithIndex(
      lambda idx, it: islice(it, 1, None) if idx == 0 else it 
  ).map(lambda line: line.split(",")
       ).map(lambda p: (int(p[1]), int(p[2]), int(p[3]), int(p[4]), p[5], p[6], p[7], p[8], p[9],p[10], p[11], p[12], p[13], int(p[14]), int(p[15]), int(p[16]), int(p[17]), int(p[18]), int(p[19]), int(p[20]), int(p[21]), int(p[22]), int(p[23])))
  df = sqlContext.createDataFrame(rdd, schema)
  df.write.parquet(parfile,mode='overwrite')

In [4]:
#train_file = "/FileStore/tables/train.400k.parquet"
#train_file = "/FileStore/tables/train.12m.parquet"
train_file = "/FileStore/tables/train.4m.parquet"
impression = spark.read \
  .parquet(train_file) \
  .selectExpr("*", "substr(hour, 7) as hr").repartition(64)

In [5]:
from pyspark.sql.functions import *

strCols = map(lambda t: t[0], filter(lambda t: t[1] == 'string', impression.dtypes))
intCols = map(lambda t: t[0], filter(lambda t: t[1] == 'int', impression.dtypes))

# [row_idx][json_idx]
strColsCount = sorted(map(lambda c: (c, impression.select(countDistinct(c)).collect()[0][0]), strCols), key=lambda x: x[1], reverse=True)
intColsCount = sorted(map(lambda c: (c, impression.select(countDistinct(c)).collect()[0][0]), intCols), key=lambda x: x[1], reverse=True)

In [6]:
# distinct counts for str columns
display(strColsCount)

_1,_2
device_ip,1623197
device_id,499268
device_model,6362
app_id,4874
site_domain,4268
site_id,3450
app_domain,304
app_category,27
hr,24
site_category,23


In [7]:
# distinct counts for int columns
display(intColsCount)

_1,_2
C14,2459
C17,427
hour,240
C20,166
C19,66
C21,60
C16,9
C15,8
C1,7
banner_pos,7


In [8]:
# Include PySpark Feature Engineering methods
from pyspark.ml.feature import StringIndexer, VectorAssembler,QuantileDiscretizer

stages = []
# All of the columns (string or integer) are categorical columns
#  except for the [click] column
maxBins =1000
categorical = list(map(lambda c: c[0], filter(lambda c: c[1] <= maxBins, strColsCount )) )
# TODO: how to use features that have bins over 1000
categorical += list(map(lambda c: c[0], filter(lambda c: c[1] <= maxBins, intColsCount)) )
categorical.remove('click')

tobe_assm = []
stringIndexers = []
for cat_ftr in categorical:
  indexer = StringIndexer(inputCol=cat_ftr, outputCol="%s_idx" % (cat_ftr))
  stringIndexers.append(indexer)
  tobe_assm.append("%s_idx" % (cat_ftr))

stages += stringIndexers

for colname,count in intColsCount:
  if count > maxBins:
    discretizer = QuantileDiscretizer(numBuckets=maxBins, inputCol=colname, outputCol="%s_qd" % (colname))
    tobe_assm.append("%s_qd" % (colname))
    stages.append(discretizer)
    

  
# Apply string indexer to all of the categorical columns
#  And add _idx to the column name to indicate the index of the categorical value

# Assemble the put as the input to the VectorAssembler 
#   with the output being our features

# The [click] column is our label 

# The stages of our ML pipeline 

stages.append(VectorAssembler( inputCols=tobe_assm, outputCol="features"))
stages.append(StringIndexer(inputCol="click", outputCol="label"  ))

In [9]:
from pyspark.ml import Pipeline

# Create our pipeline
pipeline = Pipeline(stages = stages)

# create transformer to add features
featurizer = pipeline.fit(impression)

In [10]:
# dataframe with feature and intermediate transformation columns appended
featurizedImpressions = featurizer.transform(impression)

In [11]:
display(featurizedImpressions.select('features', 'label'))

features,label
"List(1, 17, List(), List(2.0, 1.0, 3.0, 0.0, 1.0, 119.0, 9.0, 3.0, 4.0, 0.0, 0.0, 0.0, 0.0, 0.0, 2.0, 3.0, 168.0))",0.0
"List(0, 17, List(2, 4, 5, 7, 8, 11, 13, 15, 16), List(15.0, 57.0, 152.0, 19.0, 9.0, 1.0, 1.0, 2.0, 102.0))",0.0
"List(0, 17, List(2, 3, 5, 8, 16), List(21.0, 2.0, 183.0, 2.0, 36.0))",0.0
"List(0, 17, List(0, 1, 2, 4, 5, 6, 7, 15, 16), List(13.0, 1.0, 2.0, 117.0, 185.0, 39.0, 2.0, 1.0, 287.0))",0.0
"List(0, 17, List(2, 4, 5, 7, 8, 11, 13, 15, 16), List(4.0, 41.0, 50.0, 5.0, 9.0, 1.0, 1.0, 2.0, 344.0))",1.0
"List(0, 17, List(2, 3, 4, 5, 6, 7, 12, 15, 16), List(6.0, 1.0, 3.0, 32.0, 6.0, 2.0, 1.0, 1.0, 111.0))",0.0
"List(0, 17, List(2, 4, 5, 6, 7, 8, 11, 13, 15, 16), List(6.0, 4.0, 28.0, 2.0, 1.0, 7.0, 1.0, 1.0, 2.0, 11.0))",0.0
"List(0, 17, List(0, 1, 2, 4, 5, 6, 7, 8, 15, 16), List(9.0, 1.0, 23.0, 5.0, 204.0, 3.0, 6.0, 5.0, 1.0, 179.0))",0.0
"List(0, 17, List(2, 3, 5, 8, 16), List(23.0, 2.0, 204.0, 2.0, 36.0))",1.0
"List(1, 17, List(), List(0.0, 0.0, 2.0, 1.0, 191.0, 21.0, 0.0, 43.0, 5.0, 2.0, 2.0, 0.0, 1.0, 0.0, 0.0, 1.0, 28.0))",0.0


In [12]:
train, test = featurizedImpressions \
  .select(["label", "features", "hr"]) \
  .randomSplit([0.7, 0.3], 42)
train.cache()
test.cache()

In [13]:
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier, DecisionTreeClassifier, LogisticRegression

# Train your model !
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=20,maxBins=maxBins)
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=100,maxBins=maxBins)
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxBins=maxBins)
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=100) 

In [14]:
model1 = gbt.fit(train)

In [15]:
model2 = rf.fit(train)

In [16]:
model3 = dt.fit(train)

In [17]:
model4 = lr.fit(train)

In [18]:
# Execute our predictions
predictions1 = model1.transform(test)
predictions2 = model2.transform(test)
predictions3 = model3.transform(test)
predictions4 = model4.transform(test)

In [19]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate our GBTClassifier model using BinaryClassificationEvaluator()
ev = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
print("GBDT",ev.evaluate(predictions1))
print("RF",ev.evaluate(predictions2))
print("DT",ev.evaluate(predictions3))
print("LR",ev.evaluate(predictions4))

In [20]:
import json
features = map(lambda c: str(json.loads(json.dumps(c))['name']), \
               list(predictions1.schema['features'].metadata.get('ml_attr').get('attrs').values())[0])
# convert numpy.float64 to str for spark.createDataFrame()
weights=map(lambda w: '%.10f' % w, model1.featureImportances)
weightedFeatures = sorted(zip(weights, features), key=lambda x: x[1], reverse=True)
spark.createDataFrame(weightedFeatures).toDF("weight", "feature").createOrReplaceTempView('wf')

In [21]:
%sql 
select feature, weight 
from wf 
order by weight desc

feature,weight
C17_idx,0.3716606485
C14_qd,0.2515332671
hour_idx,0.1295274768
app_domain_idx,0.0773955886
app_category_idx,0.0514595118
site_category_idx,0.0432886444
C20_idx,0.0391117927
C1_idx,0.0147643736
banner_pos_idx,0.0125064455
device_conn_type_idx,0.0051389437


In [22]:
predictions1.createOrReplaceTempView("predictions1")
predictions2.createOrReplaceTempView("predictions2")
predictions3.createOrReplaceTempView("predictions3")
predictions4.createOrReplaceTempView("predictions4")

In [23]:
%sql describe predictions2

col_name,data_type,comment
label,double,
features,vector,
hr,string,
rawPrediction,vector,
probability,vector,
prediction,double,


In [24]:
%sql select "GBDT" as model ,sum(case when prediction = label then 1 else 0 end) / (count(1) * 1.0) as accuracy
from predictions1

model,accuracy
GBDT,0.8332452488567079


In [25]:
%sql select "GBDT" as model ,sum(case when prediction = label and label=1 then 1 else 0 end) / (sum(case when label==1 then 1 else 0 end)*1.0) as precision
from predictions1

model,precision
GBDT,0.0657539501054836


In [26]:
%sql select "GBDT" as model ,sum(case when prediction = label and label=1 then 1 else 0 end) / (sum(case when prediction=1 then 1 else 0 end)*1.0) as precision
from predictions1

model,precision
GBDT,0.5706055562668032


In [27]:
%sql select "GBDT" as model ,sum(case when prediction = label then 1 else 0 end) / (count(1) * 1.0) as accuracy
from predictions1

In [28]:
%sql select "RF" as model ,sum(case when prediction = label then 1 else 0 end) / (count(1) * 1.0) as accuracy
from predictions2

model,accuracy
RF,0.8304868431801775


In [29]:
%sql select "DT" as model ,sum(case when prediction = label then 1 else 0 end) / (count(1) * 1.0) as accuracy
from predictions3

model,accuracy
DT,0.8320323506217875


In [30]:
%sql select "LR" as model ,sum(case when prediction = label then 1 else 0 end) / (count(1) * 1.0) as accuracy
from predictions4

model,accuracy
LR,0.8303242898084872
