# Advertising Analytics Click Prediction: ML
####[Ad impressions with clicks dataset](https://www.kaggle.com/c/avazu-ctr-prediction/data)

<img src="/files/img/fraud_ml_pipeline.png" alt="workflow" width="500">

This is the ML notebook for the series of Advertising Analytics Click Prediction notebooks.  For this stage, we will focus on creating features and training and evaluating the ML model.

<img src="https://s3-us-west-2.amazonaws.com/pub-tc/ML-workflow.png" width="800">

In [2]:
impression = spark.read \
  .parquet("/mnt/adtech/impression/parquet/train.csv/") \
  .selectExpr("*", "substr(hour, 7) as hr").repartition(64)

In [3]:
from pyspark.sql.functions import *

strCols = map(lambda t: t[0], filter(lambda t: t[1] == 'string', impression.dtypes))
intCols = map(lambda t: t[0], filter(lambda t: t[1] == 'int', impression.dtypes))

# [row_idx][json_idx]
strColsCount = sorted(map(lambda c: (c, impression.select(countDistinct(c)).collect()[0][0]), strCols), key=lambda x: x[1], reverse=True)
intColsCount = sorted(map(lambda c: (c, impression.select(countDistinct(c)).collect()[0][0]), intCols), key=lambda x: x[1], reverse=True)

In [4]:
# distinct counts for str columns
display(strColsCount)

_1,_2
device_ip,6729486
device_id,2686408
app_id,8552
device_model,8251
site_domain,7745
site_id,4737
app_domain,559
app_category,36
site_category,26
hr,24


In [5]:
# distinct counts for int columns
display(intColsCount)

_1,_2
C14,2626
C17,435
hour,240
C20,172
C19,68
C21,60
C16,9
C15,8
C1,7
banner_pos,7


In [6]:
# Include PySpark Feature Engineering methods
from pyspark.ml.feature import StringIndexer, VectorAssembler

# All of the columns (string or integer) are categorical columns
#  except for the [click] column
maxBins = 70
categorical = map(lambda c: c[0], filter(lambda c: c[1] <= maxBins, strColsCount))
categorical += map(lambda c: c[0], filter(lambda c: c[1] <= maxBins, intColsCount))
categorical.remove('click')

# Apply string indexer to all of the categorical columns
#  And add _idx to the column name to indicate the index of the categorical value
stringIndexers = map(lambda c: StringIndexer(inputCol = c, outputCol = c + "_idx"), categorical)

# Assemble the put as the input to the VectorAssembler 
#   with the output being our features
assemblerInputs = map(lambda c: c + "_idx", categorical)
vectorAssembler = VectorAssembler(inputCols = assemblerInputs, outputCol = "features")

# The [click] column is our label 
labelStringIndexer = StringIndexer(inputCol = "click", outputCol = "label")

# The stages of our ML pipeline 
stages = stringIndexers + [vectorAssembler, labelStringIndexer]

In [7]:
from pyspark.ml import Pipeline

# Create our pipeline
pipeline = Pipeline(stages = stages)

# create transformer to add features
featurizer = pipeline.fit(impression)

In [8]:
# dataframe with feature and intermediate transformation columns appended
featurizedImpressions = featurizer.transform(impression)

In [9]:
display(featurizedImpressions.select('features', 'label'))

features,label
"List(0, 12, List(1, 2, 3, 11), List(2.0, 1.0, 1.0, 1.0))",0.0
"List(0, 12, List(1, 2, 3, 11), List(2.0, 1.0, 1.0, 1.0))",0.0
"List(0, 12, List(1, 2, 3, 11), List(2.0, 1.0, 1.0, 1.0))",0.0
"List(0, 12, List(1, 2, 3, 11), List(2.0, 1.0, 1.0, 1.0))",0.0
"List(0, 12, List(0, 2, 5, 6, 10, 11), List(1.0, 8.0, 1.0, 1.0, 1.0, 2.0))",0.0
"List(0, 12, List(1, 2, 3, 5, 6, 11), List(3.0, 5.0, 1.0, 1.0, 1.0, 1.0))",1.0
"List(0, 12, List(1, 2, 3, 5, 6, 11), List(3.0, 5.0, 1.0, 1.0, 1.0, 1.0))",1.0
"List(0, 12, List(1, 2, 3, 5, 6, 11), List(3.0, 5.0, 1.0, 1.0, 1.0, 1.0))",1.0
"List(0, 12, List(0, 2, 3, 4, 10, 11), List(3.0, 11.0, 26.0, 11.0, 1.0, 3.0))",1.0
"List(0, 12, List(0, 2, 3, 4, 11), List(2.0, 16.0, 17.0, 12.0, 1.0))",1.0


In [10]:
train, test = featurizedImpressions \
  .select(["label", "features", "hr"]) \
  .randomSplit([0.7, 0.3], 42)
train.cache()
test.cache()

In [11]:
from pyspark.ml.classification import GBTClassifier

# Train our GBTClassifier model 
classifier = GBTClassifier(labelCol="label", featuresCol="features", maxBins=maxBins, maxDepth=10, maxIter=10)
model = classifier.fit(train)

In [12]:
# Execute our predictions
predictions = model.transform(test)

In [13]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate our GBTClassifier model using BinaryClassificationEvaluator()
ev = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
print ev.evaluate(predictions)

In [14]:
import json
features = map(lambda c: str(json.loads(json.dumps(c))['name']), \
               predictions.schema['features'].metadata.get('ml_attr').get('attrs').values()[0])
# convert numpy.float64 to str for spark.createDataFrame()
weights=map(lambda w: '%.10f' % w, model.featureImportances)
weightedFeatures = sorted(zip(weights, features), key=lambda x: x[1], reverse=True)
spark.createDataFrame(weightedFeatures).toDF("weight", "feature").createOrReplaceTempView('wf')

In [15]:
%sql 
select feature, weight 
from wf 
order by weight desc

feature,weight
C21_idx,0.4421319127
C19_idx,0.1150903324
site_category_idx,0.0946446858
app_category_idx,0.0851794287
hr_idx,0.0747225775
banner_pos_idx,0.0494653844
C18_idx,0.0457166715
C16_idx,0.0370146512
device_conn_type_idx,0.0241333606
C1_idx,0.0206914316


In [16]:
predictions.createOrReplaceTempView("predictions")

In [17]:
%sql describe predictions

col_name,data_type,comment
label,double,
features,vector,
hr,string,
rawPrediction,vector,
probability,vector,
prediction,double,


In [18]:
%sql select sum(case when prediction = label then 1 else 0 end) / (count(1) * 1.0) as accuracy
from predictions

accuracy
0.8328162140172459
