# Advertising Analytics Click Prediction: ML
https://www.kaggle.com/c/avazu-ctr-prediction/data



This is the ML part for Advertising Analytics Click Prediction notebooks.  For this stage, we will focus on creating features and training and evaluating the ML model.



In [0]:
impression = spark.read \
  .parquet("/mnt/adtech/impression/parquet/train.csv/") \
  .selectExpr("*", "substr(hour, 7) as hr").repartition(64)

In [0]:
from pyspark.sql.functions import *

strCols = map(lambda t: t[0], filter(lambda t: t[1] == 'string', impression.dtypes))
intCols = map(lambda t: t[0], filter(lambda t: t[1] == 'int', impression.dtypes))

# [row_idx][json_idx]
strColsCount = sorted(map(lambda c: (c, impression.select(countDistinct(c)).collect()[0][0]), strCols), key=lambda x: x[1], reverse=True)
intColsCount = sorted(map(lambda c: (c, impression.select(countDistinct(c)).collect()[0][0]), intCols), key=lambda x: x[1], reverse=True)

In [0]:
# distinct counts for str columns
display(strColsCount)

In [0]:
# distinct counts for int columns
display(intColsCount)

In [0]:
# Include PySpark Feature Engineering methods
from pyspark.ml.feature import StringIndexer, VectorAssembler

# All of the columns (string or integer) are categorical columns
#  except for the [click] column
maxBins = 70
categorical = map(lambda c: c[0], filter(lambda c: c[1] <= maxBins, strColsCount))
categorical += map(lambda c: c[0], filter(lambda c: c[1] <= maxBins, intColsCount))
categorical.remove('click')

# Apply string indexer to all of the categorical columns
#  And add _idx to the column name to indicate the index of the categorical value
stringIndexers = map(lambda c: StringIndexer(inputCol = c, outputCol = c + "_idx"), categorical)

# Assemble the put as the input to the VectorAssembler 
#   with the output being our features
assemblerInputs = map(lambda c: c + "_idx", categorical)
vectorAssembler = VectorAssembler(inputCols = assemblerInputs, outputCol = "features")

# The [click] column is our label 
labelStringIndexer = StringIndexer(inputCol = "click", outputCol = "label")

# The stages of our ML pipeline 
stages = stringIndexers + [vectorAssembler, labelStringIndexer]

In [0]:
from pyspark.ml import Pipeline

# Create our pipeline
pipeline = Pipeline(stages = stages)

# create transformer to add features
featurizer = pipeline.fit(impression)

In [0]:
# dataframe with feature and intermediate transformation columns appended
featurizedImpressions = featurizer.transform(impression)

In [0]:
display(featurizedImpressions.select('features', 'label'))

In [0]:
train, test = featurizedImpressions \
  .select(["label", "features", "hr"]) \
  .randomSplit([0.7, 0.3], 42)
train.cache()
test.cache()

In [0]:
from pyspark.ml.classification import GBTClassifier

# Train our GBTClassifier model 
classifier = GBTClassifier(labelCol="label", featuresCol="features", maxBins=maxBins, maxDepth=10, maxIter=10)
model = classifier.fit(train)

In [0]:
# Execute our predictions
predictions = model.transform(test)

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate our GBTClassifier model using BinaryClassificationEvaluator()
ev = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", metricName="areaUnderROC")
print ev.evaluate(predictions)

In [0]:
import json
features = map(lambda c: str(json.loads(json.dumps(c))['name']), \
               predictions.schema['features'].metadata.get('ml_attr').get('attrs').values()[0])
# convert numpy.float64 to str for spark.createDataFrame()
weights=map(lambda w: '%.10f' % w, model.featureImportances)
weightedFeatures = sorted(zip(weights, features), key=lambda x: x[1], reverse=True)
spark.createDataFrame(weightedFeatures).toDF("weight", "feature").createOrReplaceTempView('wf')

In [0]:
%sql 
select feature, weight 
from wf 
order by weight desc

In [0]:
predictions.createOrReplaceTempView("predictions")

In [0]:
%sql describe predictions

In [0]:
%sql select sum(case when prediction = label then 1 else 0 end) / (count(1) * 1.0) as accuracy
from predictions