In [11]:
#########################################
#   UBITNAME
#   varunjai, dilipred
############################################
import time
import pyspark
import os
import csv
from numpy import array
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest
from pyspark import SparkContext, SparkConf

In [12]:

# read the training data file
#trainData1 = sc.textFile('C:/Users/Dilip Reddy/Downloads/train.csv')
trainData1 = sc.textFile('train.csv')
trainHeader = trainData1.first()
trainData1 = trainData1.filter(lambda line: line != trainHeader).mapPartitions(lambda x: csv.reader(x))
trainData1.first()

['1',
 '0',
 '3',
 'Braund, Mr. Owen Harris',
 'male',
 '22',
 '1',
 '0',
 'A/5 21171',
 '7.25',
 '',
 'S']

In [13]:
def sexTransformMapper(elem):
    '''Function which transform "male" into 1 and else things into 0
    - elem : string
    - return : vector
    '''
    if(elem == 'male'):
       return[0]
    else:
       return[1]

# Data Transformations and filter lines with empty strings
trainData1=trainData1.map(lambda line: line[1:3]+sexTransformMapper(line[4])+line[5:11])
trainData1=trainData1.filter(lambda line: line[3] != '' ).filter(lambda line: line[4] != '' )
trainData1.take(10)

# creating 'labeled point' rdds specific to MLlib "(label (v1,v2...vp)"
trainDataLP = trainData1.map(lambda line: LabeledPoint(line[0],[line[1:5]]))
trainDataLP.first()

# splitting dataset into train and test set
(trainData, testData) = trainDataLP.randomSplit([0.7,0.3])

In [14]:
from pyspark.mllib.tree import RandomForest

time_start = time.time()
model_rf = RandomForest.trainClassifier(trainData, numClasses = 2,
                                        categoricalFeaturesInfo = {}, numTrees = 100,
                                        featureSubsetStrategy='auto', impurity='gini',
                                        maxDepth=12, maxBins=32, seed=None)

model_rf.numTrees()
model_rf.totalNumNodes()
time_end = time.time()
time_rf = (time_end - time_start)
print('RF takes %d s' %(time_rf))

# predictions on test set
predictions = model_rf.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

RF takes 4 s


In [15]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
metric = BinaryClassificationMetrics(labelsAndPredictions)

# Area under  precision-recall curve
print('Area under PR = %s' % metric.areaUnderPR)

# area under ROC curve
print('Area under ROC = %s' % metric.areaUnderROC)

Area under PR = 0.6613779082900384
Area under ROC = 0.845457593027154


In [16]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer, OneHotEncoder, VectorAssembler, IndexToString
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import *
 
# Creatingt Spark SQL environment
from pyspark.sql import SparkSession, HiveContext
SparkContext.setSystemProperty("hive.metastore.uris", "thrift://nn1:9083")
spark = SparkSession.builder.enableHiveSupport().getOrCreate()

In [17]:
#train = spark.read.csv("C:/Users/Dilip Reddy/Downloads/train.csv", header = True)
train = spark.read.csv("train.csv", header = True)
# Displays the content of the DataFrame to stdout
train.show(10)
 
# String to float on some columns of the dataset : creates a new dataset
train = train.select(col("Survived"),col("Sex"),col("Embarked"),col("Pclass").cast("float"),col("Age").cast("float"),col("SibSp").cast("float"),col("Fare").cast("float"))
 
# dropping null values
train = train.dropna()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|  35|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [18]:
# Spliting in train and test set. Beware : It sorts the dataset
(traindf, testdf) = train.randomSplit([0.7,0.3])

# with pipeline
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
genderIndexer = StringIndexer(inputCol="Sex", outputCol="indexedSex")
embarkIndexer = StringIndexer(inputCol="Embarked", outputCol="indexedEmbarked")
 
surviveIndexer = StringIndexer(inputCol="Survived", outputCol="indexedSurvived")
 
# One Hot Encoder on indexed features
genderEncoder = OneHotEncoder(inputCol="indexedSex", outputCol="sexVec")
embarkEncoder = OneHotEncoder(inputCol="indexedEmbarked", outputCol="embarkedVec")

In [19]:
# Create the vector structured data (label,features(vector))
assembler = VectorAssembler(inputCols=["Pclass","sexVec","Age","SibSp","Fare","embarkedVec"],outputCol="features")
 
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedSurvived", featuresCol="features")
 
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[surviveIndexer, genderIndexer, embarkIndexer, genderEncoder,embarkEncoder, assembler, rf]) # genderIndexer,embarkIndexer,genderEncoder,embarkEncoder,
 
# Train model.  This also runs the indexers.
model = pipeline.fit(traindf)
 
# Predictions
predictions = model.transform(testdf)
 
# Select example rows to display.
predictions.columns 
 
# Select example rows to display.
predictions.select("prediction", "Survived", "features").show(5)

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[3.0,0.0,17.0,0.0...|
|       1.0|       0|[1.0,0.0,2.0,1.0,...|
|       1.0|       0|[2.0,0.0,38.0,0.0...|
|       1.0|       0|[2.0,0.0,57.0,0.0...|
|       0.0|       0|[3.0,0.0,2.0,4.0,...|
+----------+--------+--------------------+
only showing top 5 rows



In [20]:
# Select (prediction, true label) and compute test error
predictions = predictions.select(col("Survived").cast("Float"),col("prediction"))
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
 
rfModel = model.stages[6]
print(rfModel)  # summary only
 
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)
 
evaluatorf1 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="f1")
f1 = evaluatorf1.evaluate(predictions)
print("f1 = %g" % f1)
 
evaluatorwp = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="weightedPrecision")
wp = evaluatorwp.evaluate(predictions)
print("weightedPrecision = %g" % wp)
 
evaluatorwr = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="weightedRecall")
wr = evaluatorwr.evaluate(predictions)
print("weightedRecall = %g" % wr)
 

Test Error = 0.182692
RandomForestClassificationModel (uid=RandomForestClassifier_47928c890cb4e4a8c1f9) with 20 trees
Accuracy = 0.817308
f1 = 0.810675
weightedPrecision = 0.82878
weightedRecall = 0.817308
