## Homework 2.

In [1]:
import py4j
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.mllib.tree import RandomForest, RandomForestModel, GradientBoostedTrees
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.mllib.regression import *
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import *
import math
import pandas as pd
import numpy as np
from pyspark.sql.functions import udf
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import types
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [2]:
#create spark context and read dataset
conf = (SparkConf().setMaster("local[4]")
        .setAppName("ML demo")
        .set("spark.executor.memory", "1g"))

sc = SparkContext(conf=conf)

sqlcontext = SQLContext(sc)

df = sqlcontext.read.format(
     'com.databricks.spark.csv').options(
     header='true').load('train.csv')

In [3]:
df.head()

Row(PassengerId=u'1', Survived=u'0', Pclass=u'3', Name=u'Braund, Mr. Owen Harris', Sex=u'male', Age=u'22', SibSp=u'1', Parch=u'0', Ticket=u'A/5 21171', Fare=u'7.25', Cabin=None, Embarked=u'S')

## Generate features

In [4]:
# 1) number of relatives

def get_relatives(name, parch, sibsp):
    last_name = name.split(",")[0]
    if last_name:
        family_size = 1 + int(parch) + int(sibsp)
        return family_size
    else:
        return 0
udf_get_relatives = udf(get_relatives, StringType())
df = df.withColumn('relatives', udf_get_relatives(df['Name'], df['Parch'], df['SibSp']))

In [5]:
# 2) age

def age_nominal(y):
    try:
        y = int(y)
    except:
        return -1
    return y // 20

udf_age = udf(age_nominal, returnType=IntegerType())
df = df.withColumn('age_nominal', udf_age(df['Age']))

In [6]:
# 3) title

def get_title(name):
    if pd.isnull(name):
        return "Null"

    title_search = re.search(' ([A-Za-z]+)\.', name)
    if title_search:
        return title_search.group(1).lower()
    else:
        return "None"
udf_get_title = udf(get_title, StringType())
df = df.withColumn('title', udf_get_title(df['Name']))

In [7]:
# 4) alone

def not_alone(sibsp, parch):
    return 0 if int(sibsp) + int(parch) == 0 else 1
udf_alone = udf(not_alone, returnType=IntegerType())
df = df.withColumn('not_alone', udf_alone(df['Sibsp'], df['Parch']))

In [8]:
# 5) fare

def fare_nominal(p):
    try:
        p = float(p)
    except:
        return -1
    return int(p // 5)

udf_price = udf(fare_nominal, returnType=IntegerType())
df = df.withColumn('fare_nominal', udf_price(df['Fare']))

filling the gaps in 'Embarked'

In [9]:
def Embarked_transform(x):
    if x != None:
        return x
    else:
        return ''

my_udf =udf(Embarked_transform, StringType())
df = df.withColumn('Embarked', my_udf(df['Embarked']))

In [10]:
stringIndexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(inputCol="EmbarkedIndex", outputCol="EmbarkedVec")
df_t = encoder.transform(indexed)

In [11]:
df_t

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string, relatives: string, age_nominal: int, title: string, not_alone: int, fare_nominal: int, EmbarkedIndex: double, EmbarkedVec: vector]

In [12]:
def parse_age(str_age):
    try:
        return float(str_age)
    except:
        return -1

In [13]:
def transf(r):
    return LabeledPoint(
        int(r.Survived),
        [
            int(r.Pclass),
            r.Sex == 'male',
            float(r.Fare),
            int(r.SibSp),
            int(r.Parch),
            parse_age(r.Age),
        ] + list(r.EmbarkedVec.toArray())
    )

In [14]:
data = df_t.rdd.map(transf)

In [15]:
train, test = data.randomSplit([0.8, 0.2])

In [30]:
rfc = RandomForest.trainClassifier(train, numClasses=2, categoricalFeaturesInfo={},numTrees=100)

### Validation

In [73]:
def acc(model, test):
    values = test.map(lambda x: x.features)
    yhat = model.predict(values)
    y = test.map(lambda x: x.label)
    predictionAndLabels = yhat.zip(y)
    errors = predictionAndLabels.map(lambda x: np.absolute(x[0] - x[1]))
    return 1 - errors.sum() / errors.count()

In [74]:
acc(rfc, test)

0.82000000000000006

### f1Score

In [172]:
def f1(model, test):
    values = test.map(lambda x: x.features)
    yhat = model.predict(values)
    yhat = yhat.map(lambda x: float(x))
    y = test.map(lambda x: float(x.label))
    predictions_and_labels = yhat.zip(y)
    metrics = MulticlassMetrics(predictions_and_labels)
    f1Score = metrics.fMeasure()
    
    return f1Score

In [173]:
f1(rfc,test)



0.82

### 1) LogisticRegressionWithLBFGS

In [174]:
log_regression = LogisticRegressionWithLBFGS.train(train, numClasses=2)

In [175]:
acc(log_regression,test)

0.79200000000000004

In [176]:
f1(log_regression,test)



0.792

### 2) SVMWithSGD

In [177]:
svm_sgd = SVMWithSGD.train(train, iterations=10)

In [178]:
acc(log_regression,test)

0.79200000000000004

In [179]:
f1(log_regression,test)



0.792

### 3) DecisionTree

In [180]:
decision_tree = DecisionTree.trainClassifier(train, numClasses=2, maxDepth=2, categoricalFeaturesInfo={})

In [181]:
acc(log_regression,test)

0.79200000000000004

In [182]:
f1(log_regression,test)



0.792