#  Spark MlLib : case study  of Titanic dataset using DecisionTree and Randmom forest

    Tuhin Mahmud
    ACM SIGKDD Austin Meetup presentation
    September 20th, 2017


![titanic](titanic/titanic.jpg "Title")

    Data Source:
    https://www.kaggle.com/c/titanic

In [1]:
# Configure the necessary Spark environment
import os
import sys

# Spark home
spark_home = os.environ.get("SPARK_HOME")

# If Spark V1.4.x is detected, then add ' pyspark-shell' to
# the end of the 'PYSPARK_SUBMIT_ARGS' environment variable
spark_release_file = spark_home + "/RELEASE"
if os.path.exists(spark_release_file) and "Spark 1.4" in open(spark_release_file).read():
    pyspark_submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "")
    if not "pyspark-shell" in pyspark_submit_args: pyspark_submit_args += " pyspark-shell"
    os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

# Add the spark python sub-directory to the path
sys.path.insert(0, spark_home + "/python")

# Add the py4j to the path.
# You may need to change the version number to match your install
sys.path.insert(0, os.path.join(spark_home, "python/lib/py4j-0.8.2.1-src.zip"))

# Initialize PySpark to predefine the SparkContext variable 'sc'
execfile(os.path.join(spark_home, "python/pyspark/shell.py"))

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.2.0
      /_/

Using Python version 2.7.13 (default, Dec 20 2016 23:05:08)
SparkSession available as 'spark'.


In [2]:
#https://github.com/6chaoran/DataStory/blob/master/Titanic-Spark/pyspark-script.py
train_path='titanic/train.csv'
test_path='titanic/test.csv'

# Load csv file as RDD
#train_rdd = sc.textFile(train_path)
#test_rdd = sc.textFile(test_path)

train_df = sqlContext.read.format('com.databricks.spark.csv') \
     .options(header='true', inferschema='true').load(train_path)
test_df = sqlContext.read.format('com.databricks.spark.csv') \
     .options(header='true', inferschema='true').load(test_path)

In [3]:
## Add Survived column to test
## And append train/test data
from pyspark.sql.functions import lit, col
train_df = train_df.withColumn('Mark',lit('train'))
test_df = (test_df.withColumn('Survived',lit(0))
				  .withColumn('Mark',lit('test')))

test_df = test_df[train_df.columns]
df = train_df.unionAll(test_df)

In [4]:
df.toPandas().head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,Mark
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S,train
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C,train
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S,train
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S,train
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S,train


In [5]:
## Data Cleaning/Manipulation
## Convert Age, SibSp, Parch, Fare to Numeric
df = (df.withColumn('Age',df['Age'].cast("double"))
			.withColumn('SibSp',df['SibSp'].cast("double"))
			.withColumn('Parch',df['Parch'].cast("double"))
			.withColumn('Fare',df['Fare'].cast("double"))
			.withColumn('Survived',df['Survived'].cast("double"))
			)

df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: double (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: double (nullable = true)
 |-- Parch: double (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Mark: string (nullable = false)



In [6]:
## Impute missing Age and Fare
numVars = ['Survived','Age','SibSp','Parch','Fare']
def countNull(df,var):
	return df.where(df[var].isNull()).count()

missing = {var: countNull(df,var) for var in numVars}
missing

{'Age': 263, 'Fare': 1, 'Parch': 0, 'SibSp': 0, 'Survived': 0}

In [7]:
age_mean = df.groupBy().mean('Age').first()[0]
fare_mean = df.groupBy().mean('Fare').first()[0]
df = df.na.fill({'Age':age_mean,'Fare':fare_mean})

In [8]:
# Impute missing Age and Fare
numVars = ['Survived','Age','SibSp','Parch','Fare']
def countNull(df,var):
	return df.where(df[var].isNull()).count()

missing = {var: countNull(df,var) for var in numVars}
missing

{'Age': 0, 'Fare': 0, 'Parch': 0, 'SibSp': 0, 'Survived': 0}

In [9]:
df.toPandas().head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,Mark
0,1,0.0,3,"Braund, Mr. Owen Harris",male,22.0,1.0,0.0,A/5 21171,7.25,,S,train
1,2,1.0,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1.0,0.0,PC 17599,71.2833,C85,C,train
2,3,1.0,3,"Heikkinen, Miss. Laina",female,26.0,0.0,0.0,STON/O2. 3101282,7.925,,S,train
3,4,1.0,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1.0,0.0,113803,53.1,C123,S,train
4,5,0.0,3,"Allen, Mr. William Henry",male,35.0,0.0,0.0,373450,8.05,,S,train


In [10]:
# Feature Enginnering
## 1. Extract Title from Name
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import DoubleType

## created user defined function to extract title
getTitle = udf(lambda name: name.split(',')[1].split('.')[0].strip(),StringType())
df = df.withColumn('Title', getTitle(df['Name']))

## created userd defined function to extract Cabin
#getCabin = udf(lambda name: 1.0 if name!=None else 0.0,DoubleType())
#df = df.withColumn('hasCabin', getCabin(df['Cabin']))

In [11]:
df.toPandas().head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,Mark,Title
0,1,0.0,3,"Braund, Mr. Owen Harris",male,22.0,1.0,0.0,A/5 21171,7.25,,S,train,Mr
1,2,1.0,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1.0,0.0,PC 17599,71.2833,C85,C,train,Mrs
2,3,1.0,3,"Heikkinen, Miss. Laina",female,26.0,0.0,0.0,STON/O2. 3101282,7.925,,S,train,Miss
3,4,1.0,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1.0,0.0,113803,53.1,C123,S,train,Mrs
4,5,0.0,3,"Allen, Mr. William Henry",male,35.0,0.0,0.0,373450,8.05,,S,train,Mr


In [12]:
##  hadling categorical variable using StringIndexer
from pyspark.ml.feature import StringIndexer
si = StringIndexer(inputCol = 'Sex', outputCol = 'Sex_indexed')
df = si.fit(df).transform(df).drop('Sex').withColumnRenamed('Sex_indexed','Sex')

si = StringIndexer(inputCol = 'Title', outputCol = 'Title_indexed')
df = si.fit(df).transform(df).drop('Title').withColumnRenamed('Title_indexed','Title')

In [13]:
df.toPandas().head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,Mark,Sex,Title
0,1,0.0,3,"Braund, Mr. Owen Harris",22.0,1.0,0.0,A/5 21171,7.25,,S,train,0.0,0.0
1,2,1.0,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",38.0,1.0,0.0,PC 17599,71.2833,C85,C,train,1.0,2.0
2,3,1.0,3,"Heikkinen, Miss. Laina",26.0,0.0,0.0,STON/O2. 3101282,7.925,,S,train,1.0,1.0
3,4,1.0,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",35.0,1.0,0.0,113803,53.1,C123,S,train,1.0,2.0
4,5,0.0,3,"Allen, Mr. William Henry",35.0,0.0,0.0,373450,8.05,,S,train,0.0,0.0


In [14]:
numVars = ['Survived','Age','SibSp','Parch','Fare','Embarked']
missing = {var: countNull(df,var) for var in numVars}
missing

{'Age': 0, 'Embarked': 2, 'Fare': 0, 'Parch': 0, 'SibSp': 0, 'Survived': 0}

In [15]:
df.count()

1309

In [16]:
df=df.na.drop(subset=["Embarked"])

In [17]:
df.count()

1307

In [18]:
si = StringIndexer(inputCol = 'Embarked', outputCol = 'Embarked_indexed')
#df = si.fit(df).transform(df).drop('Embarked').withColumnRenamed('Embarked_indexed','Embarked')
df = si.fit(df).transform(df).drop('Embarked').withColumnRenamed('Embarked_indexed','Embarked')

In [19]:
df.toPandas().head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Age,SibSp,Parch,Ticket,Fare,Cabin,Mark,Sex,Title,Embarked
0,1,0.0,3,"Braund, Mr. Owen Harris",22.0,1.0,0.0,A/5 21171,7.25,,train,0.0,0.0,0.0
1,2,1.0,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",38.0,1.0,0.0,PC 17599,71.2833,C85,train,1.0,2.0,1.0
2,3,1.0,3,"Heikkinen, Miss. Laina",26.0,0.0,0.0,STON/O2. 3101282,7.925,,train,1.0,1.0,0.0
3,4,1.0,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",35.0,1.0,0.0,113803,53.1,C123,train,1.0,2.0,0.0
4,5,0.0,3,"Allen, Mr. William Henry",35.0,0.0,0.0,373450,8.05,,train,0.0,0.0,0.0


In [20]:
df=df.select('Survived','Pclass','Age','SibSp','Parch','Fare','Sex','Title','Mark')

In [21]:
df.toPandas().head()

Unnamed: 0,Survived,Pclass,Age,SibSp,Parch,Fare,Sex,Title,Mark
0,0.0,3,22.0,1.0,0.0,7.25,0.0,0.0,train
1,1.0,1,38.0,1.0,0.0,71.2833,1.0,2.0,train
2,1.0,3,26.0,0.0,0.0,7.925,1.0,1.0,train
3,1.0,1,35.0,1.0,0.0,53.1,1.0,2.0,train
4,0.0,3,35.0,0.0,0.0,8.05,0.0,0.0,train


In [22]:
import pyspark.mllib.regression as reg
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=['Pclass','Age','SibSp','Parch','Fare','Sex','Title'],
    outputCol="features")

df_n = assembler.transform(df)

In [23]:
df_n.toPandas().head()

Unnamed: 0,Survived,Pclass,Age,SibSp,Parch,Fare,Sex,Title,Mark,features
0,0.0,3,22.0,1.0,0.0,7.25,0.0,0.0,train,"[3.0, 22.0, 1.0, 0.0, 7.25, 0.0, 0.0]"
1,1.0,1,38.0,1.0,0.0,71.2833,1.0,2.0,train,"[1.0, 38.0, 1.0, 0.0, 71.2833, 1.0, 2.0]"
2,1.0,3,26.0,0.0,0.0,7.925,1.0,1.0,train,"[3.0, 26.0, 0.0, 0.0, 7.925, 1.0, 1.0]"
3,1.0,1,35.0,1.0,0.0,53.1,1.0,2.0,train,"[1.0, 35.0, 1.0, 0.0, 53.1, 1.0, 2.0]"
4,0.0,3,35.0,0.0,0.0,8.05,0.0,0.0,train,"(3.0, 35.0, 0.0, 0.0, 8.05, 0.0, 0.0)"


In [24]:


# split back train/test data
train = df_n.where(df_n.Mark =='train').drop('Mark')
test = df_n.where(df_n.Mark =='test').drop('Mark')

# random split further to get train/validate
train,validate = train.randomSplit([0.7,0.3],seed =121)

print 'Train Data Number of Row: '+ str(train.count())
print 'Validate Data Number of Row: '+ str(validate.count())
print 'Test Data Number of Row: '+ str(test.count())


Train Data Number of Row: 636
Validate Data Number of Row: 253
Test Data Number of Row: 418


In [25]:
train.toPandas().head()

Unnamed: 0,Survived,Pclass,Age,SibSp,Parch,Fare,Sex,Title,features
0,0.0,1,2.0,1.0,2.0,151.55,1.0,1.0,"[1.0, 2.0, 1.0, 2.0, 151.55, 1.0, 1.0]"
1,0.0,1,18.0,1.0,0.0,108.9,0.0,0.0,"[1.0, 18.0, 1.0, 0.0, 108.9, 0.0, 0.0]"
2,0.0,1,19.0,1.0,0.0,53.1,0.0,0.0,"[1.0, 19.0, 1.0, 0.0, 53.1, 0.0, 0.0]"
3,0.0,1,21.0,0.0,1.0,77.2875,0.0,0.0,"[1.0, 21.0, 0.0, 1.0, 77.2875, 0.0, 0.0]"
4,0.0,1,24.0,0.0,0.0,79.2,0.0,0.0,"(1.0, 24.0, 0.0, 0.0, 79.2, 0.0, 0.0)"


In [26]:
train.count()

636

In [27]:
train.printSchema()

root
 |-- Survived: double (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Age: double (nullable = false)
 |-- SibSp: double (nullable = true)
 |-- Parch: double (nullable = true)
 |-- Fare: double (nullable = false)
 |-- Sex: double (nullable = true)
 |-- Title: double (nullable = true)
 |-- features: vector (nullable = true)



In [28]:
train = train.select(col("Survived").alias("label"), col("features"))
test = test.select(col("Survived").alias("label"), col("features"))

In [29]:
train.toPandas().head()

Unnamed: 0,label,features
0,0.0,"[1.0, 2.0, 1.0, 2.0, 151.55, 1.0, 1.0]"
1,0.0,"[1.0, 18.0, 1.0, 0.0, 108.9, 0.0, 0.0]"
2,0.0,"[1.0, 19.0, 1.0, 0.0, 53.1, 0.0, 0.0]"
3,0.0,"[1.0, 21.0, 0.0, 1.0, 77.2875, 0.0, 0.0]"
4,0.0,"(1.0, 24.0, 0.0, 0.0, 79.2, 0.0, 0.0)"


In [31]:
test.toPandas().head()

Unnamed: 0,label,features
0,0.0,"(3.0, 34.5, 0.0, 0.0, 7.8292, 0.0, 0.0)"
1,0.0,"[3.0, 47.0, 1.0, 0.0, 7.0, 1.0, 2.0]"
2,0.0,"(2.0, 62.0, 0.0, 0.0, 9.6875, 0.0, 0.0)"
3,0.0,"(3.0, 27.0, 0.0, 0.0, 8.6625, 0.0, 0.0)"
4,0.0,"[3.0, 22.0, 1.0, 1.0, 12.2875, 1.0, 2.0]"


In [32]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(maxDepth=2, labelCol="label",featuresCol="features")
model = dt.fit(train)

In [33]:
print "numNodes = ", model.numNodes
print "depth = ", model.depth

numNodes =  7
depth =  2


In [34]:
predictions=model.transform(test)

In [335]:
predictions.toPandas().head(100)

Unnamed: 0,label,features,rawPrediction,probability,prediction
0,0.0,"(3.0, 34.5, 0.0, 0.0, 7.8292, 0.0, 0.0)","[324.0, 59.0]","[0.845953002611, 0.154046997389]",0.0
1,0.0,"[3.0, 47.0, 1.0, 0.0, 7.0, 1.0, 2.0]","[58.0, 51.0]","[0.532110091743, 0.467889908257]",0.0
2,0.0,"(2.0, 62.0, 0.0, 0.0, 9.6875, 0.0, 0.0)","[324.0, 59.0]","[0.845953002611, 0.154046997389]",0.0
3,0.0,"(3.0, 27.0, 0.0, 0.0, 8.6625, 0.0, 0.0)","[324.0, 59.0]","[0.845953002611, 0.154046997389]",0.0
4,0.0,"[3.0, 22.0, 1.0, 1.0, 12.2875, 1.0, 2.0]","[58.0, 51.0]","[0.532110091743, 0.467889908257]",0.0
5,0.0,"(3.0, 14.0, 0.0, 0.0, 9.225, 0.0, 0.0)","[11.0, 18.0]","[0.379310344828, 0.620689655172]",1.0
6,0.0,"[3.0, 30.0, 0.0, 0.0, 7.6292, 1.0, 1.0]","[58.0, 51.0]","[0.532110091743, 0.467889908257]",0.0
7,0.0,"[2.0, 26.0, 1.0, 1.0, 29.0, 0.0, 0.0]","[324.0, 59.0]","[0.845953002611, 0.154046997389]",0.0
8,0.0,"[3.0, 18.0, 0.0, 0.0, 7.2292, 1.0, 2.0]","[58.0, 51.0]","[0.532110091743, 0.467889908257]",0.0
9,0.0,"[3.0, 21.0, 2.0, 0.0, 24.15, 0.0, 0.0]","[324.0, 59.0]","[0.845953002611, 0.154046997389]",0.0


In [35]:
df=predictions
tp = df[(df.label == 1) & (df.prediction == 1)].count()
tn = df[(df.label == 0) & (df.prediction == 0)].count()
fp = df[(df.label == 0) & (df.prediction == 1)].count()
fn = df[(df.label == 1) & (df.prediction == 0)].count()
print "True Positives:", tp
print "True Negatives:", tn
print "False Positives:", fp
print "False Negatives:", fn
print "Total", df.count()
print "Accuracy:", (tp + tn)*1.0 /( tp + tn + fp + fn)

True Positives: 0
True Negatives: 319
False Positives: 99
False Negatives: 0
Total 418
Accuracy: 0.763157894737


## Random Forest

In [36]:
train.toPandas().head()

Unnamed: 0,label,features
0,0.0,"[1.0, 2.0, 1.0, 2.0, 151.55, 1.0, 1.0]"
1,0.0,"[1.0, 18.0, 1.0, 0.0, 108.9, 0.0, 0.0]"
2,0.0,"[1.0, 19.0, 1.0, 0.0, 53.1, 0.0, 0.0]"
3,0.0,"[1.0, 21.0, 0.0, 1.0, 77.2875, 0.0, 0.0]"
4,0.0,"(1.0, 24.0, 0.0, 0.0, 79.2, 0.0, 0.0)"


In [37]:
test.toPandas().head()

Unnamed: 0,label,features
0,0.0,"(3.0, 34.5, 0.0, 0.0, 7.8292, 0.0, 0.0)"
1,0.0,"[3.0, 47.0, 1.0, 0.0, 7.0, 1.0, 2.0]"
2,0.0,"(2.0, 62.0, 0.0, 0.0, 9.6875, 0.0, 0.0)"
3,0.0,"(3.0, 27.0, 0.0, 0.0, 8.6625, 0.0, 0.0)"
4,0.0,"[3.0, 22.0, 1.0, 1.0, 12.2875, 1.0, 2.0]"


In [38]:
from pyspark.ml.classification import RandomForestClassifier

# Create an initial RandomForest model.
rf = RandomForestClassifier(labelCol="label", featuresCol="features")

# Train model with Training Data
rfModel = rf.fit(train)

In [39]:
#Make predictions on test data using the Transformer.transform() method.
predictions = rfModel.transform(test)

In [40]:
predictions.toPandas().head()

Unnamed: 0,label,features,rawPrediction,probability,prediction
0,0.0,"(3.0, 34.5, 0.0, 0.0, 7.8292, 0.0, 0.0)","[18.0316819173, 1.96831808269]","[0.901584095865, 0.0984159041347]",0.0
1,0.0,"[3.0, 47.0, 1.0, 0.0, 7.0, 1.0, 2.0]","[12.0421131795, 7.95788682046]","[0.602105658977, 0.397894341023]",0.0
2,0.0,"(2.0, 62.0, 0.0, 0.0, 9.6875, 0.0, 0.0)","[16.6551829518, 3.3448170482]","[0.83275914759, 0.16724085241]",0.0
3,0.0,"(3.0, 27.0, 0.0, 0.0, 8.6625, 0.0, 0.0)","[17.5291038444, 2.47089615559]","[0.87645519222, 0.12354480778]",0.0
4,0.0,"[3.0, 22.0, 1.0, 1.0, 12.2875, 1.0, 2.0]","[10.2317417201, 9.76825827988]","[0.511587086006, 0.488412913994]",0.0


In [41]:
df=predictions
tp = df[(df.label == 1) & (df.prediction == 1)].count()
tn = df[(df.label == 0) & (df.prediction == 0)].count()
fp = df[(df.label == 0) & (df.prediction == 1)].count()
fn = df[(df.label == 1) & (df.prediction == 0)].count()
print "True Positives:", tp
print "True Negatives:", tn
print "False Positives:", fp
print "False Negatives:", fn
print "Total", df.count()
print "Accuracy:", (tp + tn)*1.0 /( tp + tn + fp + fn)

True Positives: 0
True Negatives: 278
False Positives: 140
False Negatives: 0
Total 418
Accuracy: 0.665071770335
