In [1]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark import SparkContext
import pyspark.sql.functions as f
from pyspark.ml.classification import RandomForestClassifier

In [2]:
sc

## Load Data

In [3]:
data = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri","mongodb://34.210.118.215/gameofspark.train").load()

In [4]:
data.printSchema()

root
 |-- C1: integer (nullable = true)
 |-- C14: integer (nullable = true)
 |-- C15: integer (nullable = true)
 |-- C16: integer (nullable = true)
 |-- C17: integer (nullable = true)
 |-- C18: integer (nullable = true)
 |-- C19: integer (nullable = true)
 |-- C20: integer (nullable = true)
 |-- C21: integer (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- app_category: string (nullable = true)
 |-- app_domain: string (nullable = true)
 |-- app_id: string (nullable = true)
 |-- banner_pos: integer (nullable = true)
 |-- click: integer (nullable = true)
 |-- device_conn_type: integer (nullable = true)
 |-- device_id: string (nullable = true)
 |-- device_ip: string (nullable = true)
 |-- device_model: string (nullable = true)
 |-- device_type: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- id: double (nullable = true)
 |-- site_category: string (nullable = true)
 |-- site_domain: string (nullable = true)
 |-- site_id: 

## Preprocess

In [5]:
def indexStringCols(df, cols):
    newdf = df
    for c in cols:
        si = StringIndexer(inputCol=c, outputCol=c+"-num")
        sm = si.fit(newdf)
        newdf = sm.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-num", c)
    return newdf

def oneHotEncodeCols(df, cols):
    newdf = df
    for c in cols:
        onehotenc = OneHotEncoder(inputCol=c, outputCol=c+"-onehot", dropLast=False)
        newdf = onehotenc.transform(newdf).drop(c)
        newdf = newdf.withColumnRenamed(c+"-onehot", c)
    return newdf

In [6]:
# extract hour
data = data.withColumn('hh', (f.col('hour')%100))
# extract day of week
data = data.withColumn('dow', (f.col('hour')/100-141019).cast('int')%7)
# extract first 3 digits of ip address
data = data.withColumn('ip', f.conv(data.device_ip.substr(0,2),16,10).alias('deci').cast('int'))

In [8]:
stringCols_noIds = ['app_category','site_category']

In [9]:
dfnumeric = indexStringCols(data, stringCols_noIds)

In [10]:
feat_selected = ["C1", "banner_pos", "site_category", "app_category", "device_type", 
                 "device_conn_type", "C15", "C16", "C18", "C19", "C21","ip","hh","dow"]

In [12]:
dfonehot = oneHotEncodeCols(dfnumeric, feat_selected)

In [13]:
va = VectorAssembler(inputCols=feat_selected, outputCol="features")
dfassembled = va.transform(dfonehot).select("features", "click").withColumnRenamed("click", "label")

### split data

In [16]:
splits = dfassembled.randomSplit([0.8, 0.2])
df_train = splits[0].cache()
df_test = splits[1].cache()

## Logistic Regression

In [19]:
from pyspark.ml.classification import LogisticRegression

In [27]:
lr_base = LogisticRegression()
lr_basemodel = lr_base.fit(df_train)

In [28]:
lr_basemodel_eval = lr_basemodel.evaluate(df_test)

In [73]:
lr_base.extractParamMap()

{Param(parent=u'LogisticRegression_48e5b260f31a5188ea07', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent=u'LogisticRegression_48e5b260f31a5188ea07', name='labelCol', doc='label column name.'): 'label',
 Param(parent=u'LogisticRegression_48e5b260f31a5188ea07', name='rawPredictionCol', doc='raw prediction (a.k.a. confidence) column name.'): 'rawPrediction',
 Param(parent=u'LogisticRegression_48e5b260f31a5188ea07', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent=u'LogisticRegression_48e5b260f31a5188ea07', name='standardization', doc='whether to standardize the training features before fitting the model.'): True,
 Param(parent=u'LogisticRegression_48e5b260f31a5188ea07', name='predictionCol', doc='prediction column name.'): 'prediction',
 Param(parent=u'LogisticRegression_48e5b260f31a5188ea07', name='featuresCol

In [41]:
base_predictions = lr_basemodel_eval.predictions

In [45]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

In [49]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")




In [51]:
print 'accuracy:', evaluator.evaluate(base_predictions, {evaluator.metricName: "accuracy"})
print 'f1:', evaluator.evaluate(base_predictions, {evaluator.metricName: "f1"})
print 'AUC:', lr_basemodel_eval.areaUnderROC

 accuracy: 0.830603398635
f1: 0.75916575695
AUC: 0.683051308799


In [52]:
# n-fold cross validation
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [74]:
lr = LogisticRegression(maxIter=10000)
bceval = BinaryClassificationEvaluator(metricName='areaUnderROC')
cv = CrossValidator().setEstimator(lr).setEvaluator(bceval).setNumFolds(5)
paramGrid = ParamGridBuilder().addGrid(lr.elasticNetParam, [0.0, 0.01, 0.05, 0.1, 0.2, 0.5, 0.8, 1.0]).build()
cv.setEstimatorParamMaps(paramGrid)
cvmodel = cv.fit(df_train)

In [78]:
print cvmodel.bestModel._java_obj.getElasticNetParam()

0.2


In [79]:
predictions = cvmodel.bestModel.transform(df_test)
print bceval.evaluate(predictions)

0.683070801184
