In [1]:
# Import packages
import time
import pyspark
import os
import csv
from numpy import array
from pyspark.mllib.regression import LabeledPoint
from pyspark import SparkContext, SparkConf

In [2]:
sc = SparkContext.getOrCreate()
conf.getAll()

[('spark.master', 'local[*]'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'PySparkShell'),
 ('spark.ui.showConsoleProgress', 'true')]

## Load Data

In [3]:
trainTitanic = sc.textFile("/Users/vikrant/GoogleDrive/CSE 587 Data Intesive Computing/Lab 3/kaggle-titanic/data//train.csv")

In [4]:
trainHeader = trainTitanic.first()

In [5]:
trainHeader

'PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked'

In [6]:
trainTitanic = trainTitanic.filter(lambda line: line != trainHeader).mapPartitions(lambda x: csv.reader(x))

In [9]:
trainTitanic.first()

['1',
 '0',
 '3',
 'Braund, Mr. Owen Harris',
 'male',
 '22',
 '1',
 '0',
 'A/5 21171',
 '7.25',
 '',
 'S']

## Data Preprocessing

In [10]:
# Data preprocessing
def sexTransformMapper(elem):
    '''Function which transform "male" into 1 and else things into 0
    - elem : string
    - return : vector
    '''
     
    if elem == 'male' :
        return [0]
    else :
        return [1]
 

In [11]:
# Data Transformations and filter lines with empty strings
trainTitanic=trainTitanic.map(lambda line: line[1:3]+sexTransformMapper(line[4])+line[5:11])
trainTitanic=trainTitanic.filter(lambda line: line[3] != '' ).filter(lambda line: line[4] != '' )
trainTitanic.take(10)

[['0', '3', 0, '22', '1', '0', 'A/5 21171', '7.25', ''],
 ['1', '1', 1, '38', '1', '0', 'PC 17599', '71.2833', 'C85'],
 ['1', '3', 1, '26', '0', '0', 'STON/O2. 3101282', '7.925', ''],
 ['1', '1', 1, '35', '1', '0', '113803', '53.1', 'C123'],
 ['0', '3', 0, '35', '0', '0', '373450', '8.05', ''],
 ['0', '1', 0, '54', '0', '0', '17463', '51.8625', 'E46'],
 ['0', '3', 0, '2', '3', '1', '349909', '21.075', ''],
 ['1', '3', 1, '27', '0', '2', '347742', '11.1333', ''],
 ['1', '2', 1, '14', '1', '0', '237736', '30.0708', ''],
 ['1', '3', 1, '4', '1', '1', 'PP 9549', '16.7', 'G6']]

In [12]:
# creating "labeled point" rdds specific to MLlib "(label (v1, v2...vp])"
trainTitanicLP=trainTitanic.map(lambda line: LabeledPoint(line[0],[line[1:5]]))
trainTitanicLP.first()

LabeledPoint(0.0, [3.0,0.0,22.0,1.0])

In [14]:
# splitting dataset into train and test set
(trainData, testData) = trainTitanicLP.randomSplit([0.7, 0.3])

## Applying Random Forest

In [15]:
# Random forest : same parameters as sklearn (?)
from pyspark.mllib.tree import RandomForest
 
time_start=time.time()
model_rf = RandomForest.trainClassifier(trainData, numClasses = 2,
        categoricalFeaturesInfo = {}, numTrees = 100,
        featureSubsetStrategy='auto', impurity='gini', maxDepth=12,
        maxBins=32, seed=None)
 

In [16]:
model_rf.numTrees()
model_rf.totalNumNodes()
time_end=time.time()
time_rf=(time_end - time_start)
print("RF takes %d s" %(time_rf))
 

RF takes 18 s


In [17]:
# Predictions on test set
predictions = model_rf.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
 
# first metrics
from pyspark.mllib.evaluation import BinaryClassificationMetrics
metrics = BinaryClassificationMetrics(labelsAndPredictions)
 
# Area under precision-recall curve
print("Area under PR = %s" % metrics.areaUnderPR)
 
# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)

Area under PR = 0.5688159879336351
Area under ROC = 0.7947850318471338
