In [10]:
import os
import sys

sys.path.append('/usr/spark-2.1.1/python') # spark path
import pyspark

from pyspark.sql import Row, DataFrameReader
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.sql.functions import *

from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.regression import LabeledPoint

In [2]:
# Definition
TRAIN_DATA_PATH = '/root/workspace/kkchuchu.mushroom/data/titanic/train.csv'
TEST_DATA_PATH = '/root/workspace/kkchuchu.mushroom/data/titanic/test.csv'

conf = pyspark.SparkConf()
conf.setMaster("spark://master:7077")
conf.setAppName("Titanic")

In [3]:
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [4]:
# read data
# the resource should be accessible from all workers as well.
passengers = spark.read.csv(TRAIN_DATA_PATH, header=True)

|Variable Name | Description|
|-------------|-------------|
|Survived|Survived (1) or died (0)|
|Pclass|	Passenger’s class|
|Name|	Passenger’s name|
|Sex|	Passenger’s sex|
|Age|	Passenger’s age|
|SibSp|	Number of siblings/spouses aboard|
|Parch|	Number of parents/children aboard|
|Ticket|	Ticket number|
|Fare|	Fare|
|Cabin|	Cabin|
|Embarked|	Port of embarkation|

### Feature Details

- Pclass is the Ticket-class: first (1), second (2), and third (3) class tickets were used. This is an ordinal integer feature.
- Name is the name of the passenger. The names also contain titles and some persons might share the same surname; indicating family relations. We know that some titles can indicate a certain age group. For instance Master is a boy while Mr is a man. This feature is a character string of variable length but similar format.
- Sex is an indicator whether the passenger was female or male. This is a categorical text string feature.
- Age is the integer age of the passenger. There are NaN values in this column.
- SibSp is another ordinal integer feature describing the number of siblings or spouses travelling with each passenger.
- Parch is another ordinal integer features that gives the number of parents or children travelling with each passenger.
- Ticket is a character string of variable length that gives the ticket number.
- Fare is a float feature showing how much each passenger paid for their rather memorable journey.
- Cabin gives the cabin number of each passenger. There are NaN in this column. This is another string feature.
Embarked shows the port of embarkation as a categorical character value.

In [15]:
passengers = passengers.withColumn('age', 
                                   passengers['Age'].cast(DoubleType())) \
                       .withColumn('sibsp', 
                                   passengers['SibSp'].cast(DoubleType())) \
                       .withColumn('parch', 
                                   passengers['Parch'].cast(DoubleType())) \
                       .withColumn('fare', 
                                   passengers['Fare'].cast(DoubleType())) \
                       .withColumn("sex",
                                   when(passengers.Sex.isin('male'), "1")
                                   .otherwise('0')) \
                       .withColumn("deck", 
                                   when(passengers.Cabin.isNull(), "null")
                                   .otherwise(passengers.Cabin.substr(0, 2))) \
                       .withColumn("embarked", 
                                   when(passengers.Embarked.isin("Q"), 0)
                                   .when(passengers.Embarked.isin("C"), 1)
                                   .when(passengers.Embarked.isin("S"), 2)) \
                       .withColumn("mother", 
                                   when( ( (passengers.sex == 0) & 
                                           (passengers.age > 18) & 
                                           (passengers.Name.like('%Miss%')) ), 1)
                                   .otherwise(0))

In [10]:
# Fill Embarked Null 
passengers = passengers.withColumn("Embarked",
                                   when(passengers.PassengerId.isin('62'), "C")
                                   .otherwise(passengers.Embarked)) \
                       .withColumn("Embarked",
                                   when(passengers.PassengerId.isin('830'), "C")
                                   .otherwise(passengers.Embarked))

In [24]:
# mice imputation
# 多重插值法
# 'PassengerId','Name','Ticket','Cabin','Family','Surname','Survived'
# Reference: https://www.rdocumentation.org/packages/mice/versions/2.30/topics/mice.impute.rf
mice_data = passengers.select(passengers.age, passengers.Ticket, passengers.Cabin, passengers.Survived)
def parse(l):
    target = l.Age
    features = [l.Ticket, l.Cabin, l.Survived]
    return LabeledPoint(target, features)

mice_target = mice_data.filter(mice_data.age.isNull())
mice_train = mice_data.filter(mice_data.age.isNotNull())
mice_lab = mice_train.rdd.map(lambda x: parse(x))

In [None]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = mice_train.randomSplit([0.7, 0.3])
model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={0:2, 1:4, 5:3, 6:2},
                                     numTrees=10, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=4, maxBins=32)

In [None]:
passengers.describe(['age', 'sibsp', 'parch', 'fare']).show()

In [12]:
passengers.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: string (nullable = true)
 |-- Pclass: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- sibsp: double (nullable = true)
 |-- parch: double (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [25]:
mice_train.show()

+----+----------------+-----+--------+
| age|          Ticket|Cabin|Survived|
+----+----------------+-----+--------+
|22.0|       A/5 21171| null|       0|
|38.0|        PC 17599|  C85|       1|
|26.0|STON/O2. 3101282| null|       1|
|35.0|          113803| C123|       1|
|35.0|          373450| null|       0|
|54.0|           17463|  E46|       0|
| 2.0|          349909| null|       0|
|27.0|          347742| null|       1|
|14.0|          237736| null|       1|
| 4.0|         PP 9549|   G6|       1|
|58.0|          113783| C103|       1|
|20.0|       A/5. 2151| null|       0|
|39.0|          347082| null|       0|
|14.0|          350406| null|       0|
|55.0|          248706| null|       1|
| 2.0|          382652| null|       0|
|31.0|          345763| null|       0|
|35.0|          239865| null|       0|
|34.0|          248698|  D56|       1|
|15.0|          330923| null|       1|
+----+----------------+-----+--------+
only showing top 20 rows



In [78]:
passengers.select(passengers.PassengerId, passengers.Embarked).filter((passengers.PassengerId == '830') | (passengers.PassengerId == '62')).collect()

[Row(PassengerId='62', Embarked='C'), Row(PassengerId='830', Embarked='C')]

In [83]:
lab = passengers
lab = lab.drop('Name')
lab = lab.drop('Ticket')
lab = lab.drop('Cabin')

In [84]:
def parse(l):
    target = l.Survived
    features = [l.sex, l.Pclass, l.sibsp, l.parch, l.fare, l.embarked, l.mother]
    return LabeledPoint(target, features)

lab = lab.rdd.map(lambda x: parse(x))

In [85]:
lab.take(num=10)

[LabeledPoint(0.0, [1.0,3.0,1.0,0.0,7.25,2.0,0.0]),
 LabeledPoint(1.0, [0.0,1.0,1.0,0.0,71.2833,1.0,0.0]),
 LabeledPoint(1.0, [0.0,3.0,0.0,0.0,7.925,2.0,1.0]),
 LabeledPoint(1.0, [0.0,1.0,1.0,0.0,53.1,2.0,0.0]),
 LabeledPoint(0.0, [1.0,3.0,0.0,0.0,8.05,2.0,0.0]),
 LabeledPoint(0.0, [1.0,3.0,0.0,0.0,8.4583,0.0,0.0]),
 LabeledPoint(0.0, [1.0,1.0,0.0,0.0,51.8625,2.0,0.0]),
 LabeledPoint(0.0, [1.0,3.0,3.0,1.0,21.075,2.0,0.0]),
 LabeledPoint(1.0, [0.0,3.0,0.0,2.0,11.1333,2.0,0.0]),
 LabeledPoint(1.0, [0.0,2.0,1.0,0.0,30.0708,1.0,0.0])]

In [86]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = lab.randomSplit([0.7, 0.3])

In [87]:
model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={0:2, 1:4, 5:3, 6:2},
                                     numTrees=10, featureSubsetStrategy="auto",
                                     impurity='gini', maxDepth=4, maxBins=32)

In [88]:
def f(x):
    (v, p) = x
    if v != p:
        return True
    else:
        return False
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda x: f(x)).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification forest model:')
print(model.toDebugString())

# Save and load model
# model.save(sc, "target/tmp/myRandomForestClassificationModel")
# sameModel = RandomForestModel.load(sc, "target/tmp/myRandomForestClassificationModel")

Test Error = 0.20588235294117646
Learned classification forest model:
TreeEnsembleModel classifier with 10 trees

  Tree 0:
    If (feature 4 <= 10.1708)
     If (feature 4 <= 7.5208)
      If (feature 0 in {0.0})
       If (feature 4 <= 7.0458)
        Predict: 0.0
       Else (feature 4 > 7.0458)
        Predict: 1.0
      Else (feature 0 not in {0.0})
       If (feature 5 in {0.0,2.0})
        Predict: 0.0
       Else (feature 5 not in {0.0,2.0})
        Predict: 0.0
     Else (feature 4 > 7.5208)
      If (feature 3 <= 1.0)
       If (feature 0 in {0.0})
        Predict: 1.0
       Else (feature 0 not in {0.0})
        Predict: 0.0
      Else (feature 3 > 1.0)
       Predict: 1.0
    Else (feature 4 > 10.1708)
     If (feature 6 in {1.0})
      If (feature 2 <= 1.0)
       If (feature 1 in {3.0})
        Predict: 0.0
       Else (feature 1 not in {3.0})
        Predict: 1.0
      Else (feature 2 > 1.0)
       Predict: 0.0
     Else (feature 6 not in {1.0})
      If (feature 4 <= 52