In [1]:
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, DoubleType, StringType, ArrayType, StructType, StructField

from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import StringIndexer, VectorIndexer, VectorAssembler, Normalizer
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier, DecisionTreeClassifier, GBTClassifier

from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.tree import RandomForest, RandomForestModel

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import os
from math import log

In [2]:
sqlContext = SQLContext(sc)

## Fonctions

In [314]:
# Log Loss metric
def logloss(df):
    loglossRed1 =           df.map(lambda r: (r.indexedLabel, r.proba)) #probability[1]
    
    # Check if some proba are < 0
    neg = loglossRed1.filter(lambda (y,p): p <= 0.0 or p >= 1.0)
    negCount = neg.count()
    if not negCount == 0:
        print "!!! There so non-valid probability !!! " + str(negCount)
        loglossRed1 = loglossRed1.filter(lambda (y,p): p > 0.0 and p < 1.0)

    
    loglossRed2 =  loglossRed1.map(lambda (y,p): y*log(p) + (1.0-y)*log(1.0-p))
    loglossRed  =  loglossRed2.reduce(lambda a, b: a+b)
    
    return -1.0 * loglossRed / float(trainPredictions.count())

#print "Logloss on Training: " + str(logloss(trainPredictions))

In [399]:
# Shrink dow extrem proba
def shrink(val, factor = 0.2, trunc = 0.1):
    if val < 0.0 + trunc:
        val = 0.0 + trunc
    elif val > 1.0 - trunc:
        val = 1.0 - trunc
        
    if factor == None:
        return val
    
    return val * (1.0-factor) + factor/2.0
#print shrink(0.5)
#print shrink(0)
#print shrink(1)
#print shrink(0.95)
#print shrink(0.05)

def shrinkDf(df, factor = 0.2, trunc = 0.0):
    # proba=u'[1,null,null,[0.9413866396761132,0.05861336032388664]]
    shrinkUdf = udf(lambda probability: shrink(float(probability.split(',')[4][:-2]), factor, trunc), DoubleType())
    
    dfShrink1 = df.withColumn('proba', (df.probability.cast(StringType())))
    #print dfShrink1.take(1)
    dfShrink = dfShrink1.withColumn('proba', shrinkUdf(dfShrink1.proba))
    
    return dfShrink

In [400]:
trainPredictionsShrink = shrinkDf(trainPredictions, 0.3)

## Load

In [4]:
kaggleTrain = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('kaggle/train.csv')
kaggleTrain.cache()
print ""




In [5]:
kaggleTest = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('kaggle/test.csv')
kaggleTest.cache()
print ""




In [6]:
print "Kaggle Train count: " + str(kaggleTrain.count())
print "Kaggle Test count:  " + str(kaggleTest.count())

Kaggle Train count: 114321
Kaggle Test count:  114393


In [7]:
#print train.schema.fields
columnsDict = {}
for col in kaggleTrain.schema.fields:
    typeKey = str(col.dataType)
    colName = col.name
    
    if colName == 'ID':
        print "We have the ID columns, type: " + typeKey
        continue
    if colName == 'target':
        print "We have the target columns, type: " + typeKey
        continue
    
    if typeKey not in columnsDict:
        columnsDict[typeKey] = [col.name]
    else:
        columnsDict[typeKey].append(col.name)

print ""
for ct, cl in columnsDict.iteritems():
    print ct + " " + str(len(cl))

We have the ID columns, type: IntegerType
We have the target columns, type: IntegerType

StringType 19
DoubleType 108
IntegerType 4


In [8]:
kaggleTrain.stat.freqItems(["target"]).collect()

[Row(target_freqItems=[1, 0])]

In [9]:
kaggleTrain.stat.crosstab("target", "target").show()

+-------------+-----+-----+
|target_target|    0|    1|
+-------------+-----+-----+
|            1|    0|87021|
|            0|27300|    0|
+-------------+-----+-----+



## Split

In [10]:
# Split the data into train and test
splits = kaggleTrain.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

## Deal with missing values

In [143]:
def replacementFunction(df):
    description = df.describe()
    
    descriptionCol = description.collect()
    
    replacementDict = {}
    for col in columnsDict['DoubleType']:
        replacementDict[col] = descriptionCol[1][col]
        
    for col in columnsDict['IntegerType']:
        replacementDict[col] = descriptionCol[1][col]

    #print replacementDict
    print "Replacing!"
    return df.na.fill(replacementDict)

### Experimentation

In [144]:
description = train.describe()

In [145]:
description.select("v10", "target", "summary").show()

+------------------+-------------------+-------+
|               v10|             target|summary|
+------------------+-------------------+-------+
|             68428|              68479|  count|
|1.8805752686729722| 0.7608609938813359|   mean|
|1.3981410306820936|0.42656089711962514| stddev|
| -9.87531659989E-7|                  0|    min|
|     18.5339164478|                  1|    max|
+------------------+-------------------+-------+



In [85]:
descriptionCol = description.collect()

In [86]:
print "Mean V12 example:" + descriptionCol[1]['v12']

Mean V12 example:6.879828782575871


In [87]:
columnsDict['StringType']
print "Mean V3 example:" + descriptionCol[1]['v4']

Mean V3 example:4.142397975739431


### Replace Now!

In [146]:
trainWithoutNull = replacementFunction(train)
testWithoutNull = replacementFunction(test)
kaggleTestWithoutNull = replacementFunction(kaggleTest)

Replacing!
Replacing!
Replacing!


In [147]:
# Repartition
#trainWithoutNull = trainWithoutNull.repartition(20)
#testWithoutNull = testWithoutNull.repartition(20)

In [148]:
trainWithoutNull.rdd.getNumPartitions()

4

In [149]:
trainWithoutNull.cache()
testWithoutNull.cache()
kaggleTestWithoutNull.cache()

DataFrame[ID: int, v1: double, v2: double, v3: string, v4: double, v5: double, v6: double, v7: double, v8: double, v9: double, v10: double, v11: double, v12: double, v13: double, v14: double, v15: double, v16: double, v17: double, v18: double, v19: double, v20: double, v21: double, v22: string, v23: double, v24: string, v25: double, v26: double, v27: double, v28: double, v29: double, v30: string, v31: string, v32: double, v33: double, v34: double, v35: double, v36: double, v37: double, v38: int, v39: double, v40: double, v41: double, v42: double, v43: double, v44: double, v45: double, v46: double, v47: string, v48: double, v49: double, v50: double, v51: double, v52: string, v53: double, v54: double, v55: double, v56: string, v57: double, v58: double, v59: double, v60: double, v61: double, v62: int, v63: double, v64: double, v65: double, v66: string, v67: double, v68: double, v69: double, v70: double, v71: string, v72: int, v73: double, v74: string, v75: string, v76: double, v77: double

## Pipeline

In [150]:
# Create Label
labelIndexer = StringIndexer(inputCol="target", outputCol="indexedLabel")

In [164]:
# Create Feature vector
assembler = VectorAssembler(
    inputCols=columnsDict["IntegerType"] + columnsDict["DoubleType"],
    outputCol="features")

#output = assembler.transform(trainWithoutNull)
#output.schema
#trainFeat = trainWithoutNull.withColumn("label", trainWithoutNull.target.cast("Double"))

In [165]:
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
#featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4)

In [166]:
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)

In [419]:
# Train a GBT model.
gbt = RandomForestClassifier(featuresCol="normFeatures", labelCol="indexedLabel", numTrees=10, maxDepth=10)
#gbt = DecisionTreeClassifier(featuresCol="normFeatures", labelCol="indexedLabel")
#gbt = GBTClassifier(featuresCol="normFeatures", labelCol="indexedLabel", maxIter=10)

In [421]:
# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[assembler, labelIndexer, normalizer, gbt])

In [422]:
# Train model.  This also runs the indexer.
model = pipeline.fit(trainWithoutNull)

## Evaluation

In [170]:
print model.stages[-1] # summary only

RandomForestClassificationModel (uid=rfc_81c2a7dfc011) with 20 trees


In [171]:
model.stages[-1]

RandomForestClassificationModel (uid=rfc_81c2a7dfc011) with 20 trees

In [425]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="precision")

def evaluation(df):
    df.stat.crosstab("indexedLabel", "prediction").show()
    
    print df.select("prediction", "indexedLabel", "probability").take(3) # "rawPrediction"
    #print rainPredictions.select("prediction", "indexedLabel", "normFeatures").take(3)
    print ""
    
    precision = evaluator.evaluate(df)
    print "Precision = %g" % (precision)


## Crossvalidation

In [426]:
# Grid search
# 5->30 * 2 with 5 folds, it takes 3 days -> precision 0.851998
#numTrees = 29
grid = ParamGridBuilder().addGrid(gbt.numTrees, range(5, 30)) \
                         .addGrid(gbt.maxDepth, range(5, 30)) \
                         .build()

In [429]:
# Cross validation
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator, numFolds = 5)

In [None]:
cvModel = cv.fit(trainWithoutNull)
cvPredictions = cvModel.transform(trainWithoutNull)

In [440]:
evaluation(cvPredictions)

+-----------------------+----+-----+
|indexedLabel_prediction| 1.0|  0.0|
+-----------------------+----+-----+
|                    1.0|6435| 9941|
|                    0.0| 194|51909|
+-----------------------+----+-----+

[Row(prediction=0.0, indexedLabel=0.0, probability=DenseVector([0.9682, 0.0318])), Row(prediction=0.0, indexedLabel=0.0, probability=DenseVector([0.785, 0.215])), Row(prediction=0.0, indexedLabel=1.0, probability=DenseVector([0.7418, 0.2582]))]

Precision = 0.851998


In [431]:
cvPredictions

DataFrame[ID: int, target: int, v1: double, v2: double, v3: string, v4: double, v5: double, v6: double, v7: double, v8: double, v9: double, v10: double, v11: double, v12: double, v13: double, v14: double, v15: double, v16: double, v17: double, v18: double, v19: double, v20: double, v21: double, v22: string, v23: double, v24: string, v25: double, v26: double, v27: double, v28: double, v29: double, v30: string, v31: string, v32: double, v33: double, v34: double, v35: double, v36: double, v37: double, v38: int, v39: double, v40: double, v41: double, v42: double, v43: double, v44: double, v45: double, v46: double, v47: string, v48: double, v49: double, v50: double, v51: double, v52: string, v53: double, v54: double, v55: double, v56: string, v57: double, v58: double, v59: double, v60: double, v61: double, v62: int, v63: double, v64: double, v65: double, v66: string, v67: double, v68: double, v69: double, v70: double, v71: string, v72: int, v73: double, v74: string, v75: string, v76: double

In [444]:
cvModel.bestModel.params

[]

In [451]:
from pprint import pprint
pprint (vars(cvModel.bestModel.stages))

TypeError: vars() argument must have __dict__ attribute

In [455]:
testRF = cvModel.bestModel.stages[3]

In [482]:
cvModel.bestModel._java_obj

AttributeError: 'PipelineModel' object has no attribute '_java_obj'

### Training

In [173]:
# Make predictions.
trainPredictions = model.transform(trainWithoutNull)

In [174]:
evaluation(trainPredictions)

+-----------------------+-----+-----+
|indexedLabel_prediction|  1.0|  0.0|
+-----------------------+-----+-----+
|                    1.0|11014| 5362|
|                    0.0|  267|51836|
+-----------------------+-----+-----+

[Row(prediction=0.0, indexedLabel=0.0, probability=DenseVector([0.9414, 0.0586])), Row(prediction=0.0, indexedLabel=0.0, probability=DenseVector([0.8093, 0.1907])), Row(prediction=0.0, indexedLabel=1.0, probability=DenseVector([0.564, 0.436]))]

Precision = 0.9178


In [401]:
trainPredictionsShrink = shrinkDf(trainPredictions, 0.3)

In [402]:
trainPredictionsShrink.select('probability', 'proba').take(5) #.select('proba')

[Row(probability=DenseVector([0.9414, 0.0586]), proba=0.19102935222672063),
 Row(probability=DenseVector([0.8093, 0.1907]), proba=0.2834656654502874),
 Row(probability=DenseVector([0.564, 0.436]), proba=0.4552321196246967),
 Row(probability=DenseVector([0.25, 0.75]), proba=0.6749999999999999),
 Row(probability=DenseVector([0.8599, 0.1401]), proba=0.24807094041696903)]

In [403]:
print "Logloss on Training: " + str(logloss(trainPredictionsShrink))

Logloss on Training: 0.368739157473


### Testing

In [177]:
# Make predictions.
testPredictions = model.transform(testWithoutNull)

In [178]:
evaluation(testPredictions)

+-----------------------+----+-----+
|indexedLabel_prediction| 1.0|  0.0|
+-----------------------+----+-----+
|                    1.0|1331| 9593|
|                    0.0|1331|33587|
+-----------------------+----+-----+

[Row(prediction=0.0, indexedLabel=0.0, probability=DenseVector([0.771, 0.229])), Row(prediction=0.0, indexedLabel=0.0, probability=DenseVector([0.6226, 0.3774])), Row(prediction=0.0, indexedLabel=0.0, probability=DenseVector([0.65, 0.35]))]

Precision = 0.761703


In [404]:
testPredictionsShrink = shrinkDf(testPredictions, 0.3)

In [405]:
print "Logloss on Testing: " + str(logloss(testPredictionsShrink))

Logloss on Testing: 0.356206026244


## Make prediction and save

In [38]:
predictions = model.transform(kaggleTestWithoutNull)

In [39]:
predictions.stat.crosstab("prediction", "prediction").show()

+---------------------+----+------+
|prediction_prediction| 1.0|   0.0|
+---------------------+----+------+
|                  1.0|7648|     0|
|                  0.0|   0|106745|
+---------------------+----+------+



In [406]:
predictionsShrink = shrinkDf(predictions, 0.3)

In [407]:
predictionsShrink.select("ID", "proba").take(3) # "probability", "rawPrediction"

[Row(ID=0, proba=0.51834375),
 Row(ID=1, proba=0.21741697655700676),
 Row(ID=2, proba=0.6111387804549737)]

In [410]:
outputFile = "results/prediction.csv"
os.system("rm -rf " + outputFile)
predictionsShrink.select("ID", "proba").withColumnRenamed("proba", "PredictedProb").repartition(1).write.format('com.databricks.spark.csv').option("header", "true").save(outputFile)