### Author: Vincent Pham

### Intro

This is a tutorial on Spark using the Titanic data from kaggle. To get data, go to https://www.kaggle.com/c/titanic/data and download train.csv and test.csv.


Next, to easily open up a Spark cluster, we will utilize AWS EMR (Note: Amazon charges per hour for the EMR). But, since this is a small dataset, you can also run this on your local computer. 

### Reading in the Data

Now that we have the data, we need to read it into spark. We will run pyspark via the terminal for now. Locate the folder where you Installed Spark. my version is spark-1.5.2. Now that you are in this folder, enter <font color = 'teal'>./bin/pyspark</font> to open up PySpark. 

 

In [None]:
#train_file = "./train.csv"
#test_file = "./test.csv"

train_file = "/Users/vincentpham/Downloads/train.csv"
test_file = "/Users/vincentpham/Downloads/test.csv"

train = sc.textFile(train_file)
test = sc.textFile(test_file)

### Creating the Map Function

In PySpark, you can use Labeled points to denote your dependent variable and features. Before we run the model though, we need to clean the data. First, we need to remove the header from our dataset before running it through the map function. Another thing to consider is what features do we want to use? We will use passenger socio-economic class, sex, age, number of siblings/spouses on board, number of parents/children on board, and passenger fare. Our variable of interest is whether someone survived or not. We can now create our map function that runs through each line of the data and returns a LabelPoint object of survival indicator and an array of our features. This is done in parse_training.

In [None]:
from pyspark.mllib.regression import LabeledPoint
from numpy import array

#remove header
header_train = train.take(1)[0]
header_test = test.take(1)[0]
train = train.filter(lambda line: line != header_train)
test = test.filter(lambda line: line != header_test)

def parse_training(line):
    line_split = line.split(",")
    survived = line_split[1]
    
    ismale = 1.0
    if line_split[5] != "male":
        ismale = 0.0
    pclass = float(line_split[2])
    try:
        age = float(line_split[6])
    except:
        age = -1.0
    sib = float(line_split[7])
    parch = float(line_split[8])
    fare = float(line_split[10])
    features = array([ismale, pclass, age,sib,parch,fare])
    return LabeledPoint(survived, features)

parsed_data = train.map(parse_training)

### Dealing with Missing Data

For age, there are some missing values. I chose to remove observations that are missing age, but other methods can be applied such as setting the age to the mean or median against the total. Another idea, is to set it against the mean against the prefix of the name (for example, if the person has a Dr. in the prefix, then take the mean across all Doctors with age and apply it to the age of Doctors without age), but I will not try it here. We will use the filter function to remove observations where we marked age as missing with -1.0. 

In [None]:
parsed_data = parsed_data.filter(lambda line: line.features[2] != -1.0) 

### Training and Validation Set

Since the test file does not include a column for whether a passenger survived or not, we can not use it to test our model. Thus, we have to split our training dataset into a training set and a validation set. I will divide it by 70%/30% respectively.

In [None]:
train_data, valid_data = parsed_data.randomSplit((.7,.3), seed = 1)

### Logistic Regression

Spark has two algorithms to solve logistic regression: (1) mini-batch gradient descent and (2) L-BFGS. If you want a faster convergence, then L-BFGS is recommended, which we will be using in this tutorial. 

In [45]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

logit_model = LogisticRegressionWithLBFGS.train(train_data)


In order to measure the classification error on our validation data, we use map on the valid_data RDD and the model to predict each test point class. Classification results are returned in pairs, with the actual test label and the predicted one. This is used to calculate the classification error by using filter and count as follows.

In [None]:
labels_and_preds = valid_data.map(lambda p: (p.label, logit_model.predict(p.features)))
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(valid_data.count())
print "Test accuracy is %1.4f" % round(test_accuracy,4)


Running a logistic regression with age, gender, class, siblings/spouse, children/parent, and ticket price gave a 79% accuracy.


### Alternative Model: 

There are many other combinations of features that we can test out. Using intuition, it makes more sense if we reduce the number of features to gender (since women are more likely to get onto life boats), class (rich people may have more influences on getting a way to safety), and age (children and older people may have preference). For age, we will create dummy variables for whether someone is a child (younger than 13 years old) and whether someone is an elder (older than 60 years olds). Below we will run this model. 

In [None]:
def parse_training_reduced(line):
    line_split = line.split(",")
    survived = line_split[1]
    
    ismale = 1.0
    if line_split[5] != "male":
        ismale = 0.0
    pclass = float(line_split[2])
    
    ischild = 0
    iselder = 0
    try:
        age = float(line_split[6])
        if age <= 13:
            ischild = 1
        elif age >= 60:
            iselder = 60
    except:
        ischild = -1
        iselder = -1
    features = array([ismale, pclass, ischild, iselder])
    return LabeledPoint(survived, features)

parsed_data = train.map(parse_training_reduced)
#Remove observations that are missing age
parsed_data = parsed_data.filter(lambda line: line.features[2] != -1.0)
train_data, valid_data = parsed_data.randomSplit((.7,.3), seed = 1)

logit_model = LogisticRegressionWithLBFGS.train(train_data)

labels_and_preds = valid_data.map(lambda p: (p.label, logit_model.predict(p.features)))
test_accuracy = labels_and_preds.filter(lambda (v, p): v == p).count() / float(valid_data.count())
print "Test accuracy is %1.4f" % round(test_accuracy,4)

The accuracy for the reduced model is 79.91%. This is not a huge increase in accuracy, but with a small sample size, and smaller feature set, it is something to consider.

### Submitting to Kaggle

Afterall, this is a kaggle competition, so we need to export our prediction on the test data that Kaggle provides. I had to create a seperate map function since the number of columns are different between the test and train dataset due to the lack of survival indicator. To read it back into a csv file, I had to create another map function that concatonates the tuple of passenger id and survival prediction. Use the saveAsTextFile function to save the predictions. 

In [None]:
def parse_testing(line):
    line_split = line.split(",")
    person = line_split[0]
    ismale = 1.0
    if line_split[4] != "male":
        ismale = 0.0
    pclass = float(line_split[1])
    
    ischild = 0
    iselder = 0
    try:
        age = float(line_split[6])
        if age <= 13:
            ischild = 1
        elif age >= 60:
            iselder = 60
    except:
        pass
    features = array([ismale, pclass, ischild, iselder])
    return (person,features)

test_data = test.map(parse_testing)

def toCSVLine(data):
    return ','.join(str(d) for d in data)

preds = test_data.map(lambda p : (p[0],logit_model.predict(p[1])))

header = [("PassengerId","Survived")]
preds = sc.parallelize(header+preds.collect())
lines = preds.map(toCSVLine)
lines.repartition(1).saveAsTextFile('~./preds.csv')

Submitting this to Kaggle, I received 0.76555. Not great, but a good effort for my first attempt at Spark. 