In [1]:
import os
import sys

In [2]:
os.environ["JAVA_HOME"] = '/Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home/'
os.environ["PYSPARK_SUBMIT_ARGS"] = 'pyspark-shell'

os.environ["SPARK_HOME"] = '/usr/local/Cellar/apache-spark/2.2.0/libexec'

In [3]:
sys.path.append(os.environ['SPARK_HOME']+"/python")
sys.path.append(os.environ['SPARK_HOME']+"/python/lib/py4j-0.10.4-src.zip")

In [4]:
import numpy as np
import py4j
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.mllib.regression import LabeledPoint, LinearRegressionModel, LinearRegressionWithSGD

In [5]:
conf = (SparkConf().setMaster("local[8]")
        .setAppName("ML demo")
        .set("spark.executor.memory", "4g"))

In [6]:
sc = SparkContext(conf=conf)
sql = SQLContext(sc)

In [7]:
df = sql.read.format('com.databricks.spark.csv').options(header='true').load('train.csv')
df.show(5)

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male| 22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female| 38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female| 26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female| 35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male| 35|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+---+-----+-----+---------------

In [8]:
from pyspark.sql.functions import udf, array
from pyspark.sql import types

def Embarked_transform(x):
    if x != None:
        return x
    else:
        return ''

embarked_udf = udf(Embarked_transform, types.StringType())
df = df.withColumn('Embarked', embarked_udf(df['Embarked']))

In [9]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndexed")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="EmbarkedIndexed", outputCol="EmbarkedVec")
df_t = encoder.transform(indexed)
df_t = df_t.drop('Embarked')
df_t = df_t.drop('EmbarkedIndexed')

In [10]:
df_t = df_t.withColumn('Survived', df_t['Survived'].cast('int'))
df_t = df_t.withColumn('Pclass', df_t['Pclass'].cast('int'))
df_t = df_t.withColumn('Sex', df_t['Sex'] == 'male')
df_t = df_t.withColumn('Parch', df_t['Parch'].cast('int'))
df_t = df_t.withColumn('SibSp', df_t['SibSp'].cast('int'))
df_t = df_t.withColumn('Fare', df_t['Fare'].cast('float'))

In [11]:
def ticket_is_odd(x):
    last_sym = x[-1]
    if last_sym >= '0' and last_sym <= '9':
        return int(last_sym) % 2
    return -1

def cabin_letter(x):
    if x is None:
        return ' '
    else:
        return x.split(' ')[0][0]

udf_cabin_letter = udf(cabin_letter, types.StringType())
udf_has_relatives = udf(lambda arr: arr[0]+arr[1]>0, types.BooleanType())
udf_is_missis = udf(lambda x: 'Mrs.' in x, types.BooleanType())
udf_is_miss = udf(lambda x: 'Ms.' in x, types.BooleanType())
udf_ticket_is_odd = udf(ticket_is_odd, types.IntegerType())
# udf_parse_age = udf(parse_age, types.FloatType())

In [12]:
df_t = df_t.withColumn('IsMissis', udf_is_missis(df['Name']))
df_t = df_t.withColumn('IsMiss', udf_is_miss(df['Name']))
df_t = df_t.withColumn('HasRelatives', udf_has_relatives(array('Parch', 'SibSp')))
df_t = df_t.withColumn('CabinLetter', udf_cabin_letter(df['Cabin']))
df_t = df_t.withColumn('TicketIsOdd', udf_ticket_is_odd(df['Ticket']))

In [13]:
df_t.show(5)

+-----------+--------+------+--------------------+-----+---+-----+-----+----------------+-------+-----+-------------+--------+------+------------+-----------+-----------+
|PassengerId|Survived|Pclass|                Name|  Sex|Age|SibSp|Parch|          Ticket|   Fare|Cabin|  EmbarkedVec|IsMissis|IsMiss|HasRelatives|CabinLetter|TicketIsOdd|
+-----------+--------+------+--------------------+-----+---+-----+-----+----------------+-------+-----+-------------+--------+------+------------+-----------+-----------+
|          1|       0|     3|Braund, Mr. Owen ...| true| 22|    1|    0|       A/5 21171|   7.25| null|(3,[0],[1.0])|   false| false|        true|           |          1|
|          2|       1|     1|Cumings, Mrs. Joh...|false| 38|    1|    0|        PC 17599|71.2833|  C85|(3,[1],[1.0])|    true| false|        true|          C|          1|
|          3|       1|     3|Heikkinen, Miss. ...|false| 26|    0|    0|STON/O2. 3101282|  7.925| null|(3,[0],[1.0])|   false| false|       false

In [14]:
df_t.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: boolean (nullable = true)
 |-- Age: string (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: float (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- EmbarkedVec: vector (nullable = true)
 |-- IsMissis: boolean (nullable = true)
 |-- IsMiss: boolean (nullable = true)
 |-- HasRelatives: boolean (nullable = true)
 |-- CabinLetter: string (nullable = true)
 |-- TicketIsOdd: integer (nullable = true)



In [15]:
stringIndexer = StringIndexer(inputCol="CabinLetter", outputCol="CabinLetterIndexed")
model = stringIndexer.fit(df_t)
indexed = model.transform(df_t)
encoder = OneHotEncoder(inputCol="CabinLetterIndexed", outputCol="CabinLetterVec")
df_t = encoder.transform(indexed)
df_t = df_t.drop('CabinLetter')
df_t = df_t.drop('CabinLetterIndexed')

In [16]:
def parse_age(str_age):
    try:
        return float(str_age)
    except:
        return -1

In [17]:
def transf(r):
    return LabeledPoint(
        r['Survived'],
        [
            r['Pclass'],
            r['Sex'],
            r['Fare'],
            r['SibSp'],
            r['Parch'],
            parse_age(r['Age']),
            r['IsMissis'],
            r['IsMiss'],
            r['HasRelatives'],
            r['TicketIsOdd']
        ] + list(r['CabinLetterVec'].toArray()) + list(r['EmbarkedVec'].toArray())
    )

In [18]:
data = df_t.rdd.map(transf)
data.take(5)

[LabeledPoint(0.0, [3.0,1.0,7.25,1.0,0.0,22.0,0.0,0.0,1.0,1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0]),
 LabeledPoint(1.0, [1.0,0.0,71.2833023071,1.0,0.0,38.0,1.0,0.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0]),
 LabeledPoint(1.0, [3.0,0.0,7.92500019073,0.0,0.0,26.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0]),
 LabeledPoint(1.0, [1.0,0.0,53.0999984741,1.0,0.0,35.0,1.0,0.0,1.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0]),
 LabeledPoint(0.0, [3.0,1.0,8.05000019073,0.0,0.0,35.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0])]

In [19]:
train, test = data.randomSplit([0.7, 0.3])
train.cache()

PythonRDD[53] at RDD at PythonRDD.scala:48

In [20]:
def acc(model, test):
    values = test.map(lambda x: x.features)
    yhat = model.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    errors = comp.map(lambda x: abs(x[0]-x[1]))
    return 1-errors.sum()/errors.count()

def f1(model, test):
    values = test.map(lambda x: x.features)
    yhat = model.predict(values)
    y = test.map(lambda x: x.label)
    comp = yhat.zip(y)
    fn = comp.map(lambda x: 1 if (x[1] == 1) & (x[0] == 0) else 0).sum()
    fp = comp.map(lambda x: 1 if (x[1] == 0) & (x[0] == 1) else 0).sum()
    tp = comp.map(lambda x: 1 if (x[1] == 1) & (x[0] == 1) else 0).sum()
    precision = tp / (tp + fp)
    recall = tp / (tp + fn)
    f1 = 2 * precision * recall / (precision + recall)
    return f1

In [21]:
from pyspark.mllib.tree import RandomForestModel, RandomForest, GradientBoostedTreesModel, GradientBoostedTrees
from pyspark.mllib.classification import SVMModel, SVMWithSGD, LogisticRegressionWithLBFGS, LogisticRegressionModel, LogisticRegressionWithLBFGS

rfc = RandomForest.trainClassifier(train, numClasses=2, categoricalFeaturesInfo={}, numTrees=100)
lreg = LogisticRegressionWithLBFGS.train(train)
svm = SVMWithSGD.train(train)
gbt = GradientBoostedTrees.trainClassifier(train, categoricalFeaturesInfo={})

In [22]:
print('Random forest accuracy: {:.3f}, f1: {:.3f}'.format(acc(rfc, test), f1(rfc, test)))
print('Logistic regression accuracy: {:.3f}, f1: {:.3f}'.format(acc(lreg, test), f1(lreg, test)))
print('SVM accuracy: {:.3f}, f1: {:.3f}'.format(acc(svm, test), f1(svm, test)))
print('Gradient boosting accuracy: {:.3f}, f1: {:.3f}'.format(acc(gbt, test), f1(gbt, test)))

Random forest accuracy: 0.801, f1: 0.683
Logistic regression accuracy: 0.789, f1: 0.696
SVM accuracy: 0.658, f1: 0.616
Gradient boosting accuracy: 0.793, f1: 0.689
