In [75]:
t0_df = spark.read.csv("sample-data/train_sample.csv",header=True,inferSchema=True)
t0_df.printSchema()

root
 |-- ip: integer (nullable = true)
 |-- app: integer (nullable = true)
 |-- device: integer (nullable = true)
 |-- os: integer (nullable = true)
 |-- channel: integer (nullable = true)
 |-- click_time: timestamp (nullable = true)
 |-- attributed_time: timestamp (nullable = true)
 |-- is_attributed: integer (nullable = true)



### Original Schema
- ip - IP address of click
- app - app id for marketing
- device - device **type** id of user mobile phone (e.g. iphone 6, iphone 7, etc.)
- os - os version id of user mobile phone
- channel - channel id of mobile ad publisher
- click_time - timestamp of click (UTC)
- attributed_time - if user downloaded the app after clicking an ad, this is the time of the app download
- is_attributed - the target that is to be predicted

In [76]:
from pyspark.sql.functions import *
from pyspark.sql import functions as F

#Extract day and hour from click time
t0_df = t0_df.withColumn("day",dayofmonth(t0_df["click_time"]))
t0_df = t0_df.withColumn("hour",hour(t0_df["click_time"]))

t0_df.printSchema()

root
 |-- ip: integer (nullable = true)
 |-- app: integer (nullable = true)
 |-- device: integer (nullable = true)
 |-- os: integer (nullable = true)
 |-- channel: integer (nullable = true)
 |-- click_time: timestamp (nullable = true)
 |-- attributed_time: timestamp (nullable = true)
 |-- is_attributed: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)



### Features added
- day - day of month of click
- hour - hour of day of click

In [77]:
#Encode categorical features (app, device, os, channel)
from pyspark.ml.feature import OneHotEncoderEstimator

encoder = OneHotEncoderEstimator(inputCols=["ip","app","device","os","channel"],
                                outputCols=["ipVec","appVec","deviceVec","osVec","channelVec"])

model = encoder.fit(t0_df)
t0_df = model.transform(t0_df)
t0_df.show()

+------+---+------+---+-------+-------------------+---------------+-------------+---+----+----------------+----------------+----------------+-----------------+--------------------+
|    ip|app|device| os|channel|         click_time|attributed_time|is_attributed|day|hour|          appVec|           osVec|       deviceVec|       channelVec|               ipVec|
+------+---+------+---+-------+-------------------+---------------+-------------+---+----+----------------+----------------+----------------+-----------------+--------------------+
| 87540| 12|     1| 13|    497|2017-11-07 09:30:38|           null|            0|  7|   9|(551,[12],[1.0])|(866,[13],[1.0])|(3867,[1],[1.0])|(498,[497],[1.0])|(364757,[87540],[...|
|105560| 25|     1| 17|    259|2017-11-07 13:40:27|           null|            0|  7|  13|(551,[25],[1.0])|(866,[17],[1.0])|(3867,[1],[1.0])|(498,[259],[1.0])|(364757,[105560],...|
|101424| 12|     1| 19|    212|2017-11-07 18:05:24|           null|            0|  7|  18|(551,

In [78]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

#Select features to actually use in training
vectorAssembler = VectorAssembler(inputCols=[
    "appVec",
    "deviceVec",
    "osVec",
    "channelVec",
    "day",
    "hour"
], outputCol="features")

v_t0_df = vectorAssembler.transform(t0_df)
v_t0_df.select("features","is_attributed").show()

+--------------------+-------------+
|            features|is_attributed|
+--------------------+-------------+
|(5784,[12,552,443...|            0|
|(5784,[25,552,443...|            0|
|(5784,[12,552,443...|            0|
|(5784,[13,552,443...|            0|
|(5784,[12,552,441...|            0|
|(5784,[3,552,4435...|            0|
|(5784,[1,552,4435...|            0|
|(5784,[9,552,4443...|            0|
|(5784,[2,553,4440...|            0|
|(5784,[3,552,4437...|            0|
|(5784,[3,552,4440...|            0|
|(5784,[3,552,4431...|            0|
|(5784,[3,552,4440...|            0|
|(5784,[6,552,4438...|            0|
|(5784,[2,552,4431...|            0|
|(5784,[25,553,443...|            0|
|(5784,[2,552,4420...|            0|
|(5784,[3,552,4438...|            0|
|(5784,[20,553,450...|            0|
|(5784,[14,552,443...|            0|
+--------------------+-------------+
only showing top 20 rows



In [79]:
splits = v_final_df.randomSplit([0.6,0.4],1)

train_df = splits[0]
test_df = splits[1]

print(train_df.count(),test_df.count())

60059 39941


In [80]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol="is_attributed",featuresCol="features")
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)

In [81]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="is_attributed",
    rawPredictionCol="prediction")

dt_accuracy = evaluator.evaluate(dt_predictions)
print("DecisionTree:",dt_accuracy)

#lr_accuracy = evaluator.evaluate(lr_predictions)
#print("Logistic Regression:",lr_accuracy)

DecisionTree: 0.5156890871962319


In [82]:
#test_df = spark.read.csv("sample-data/test.csv",header=True,inferSchema=True)
