![Lifeboat](http://titanicwiki.com/wp-content/uploads/2012/03/lifeboat-around-Titanic.jpg)

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages com.databricks:spark-csv_2.10:1.3.0 pyspark-shell"

from pyspark import SparkContext, SparkConf, StorageLevel
from pyspark.sql import SQLContext, HiveContext
import py4j

conf = SparkConf().setAppName("SparkJupyterTitanic") \
        .setMaster("yarn-client") \
        .set("spark.executor.memory", "512m") \
        .set("spark.executor.cores", "1") \
        .set("spark.executor.instances", "5") 
        
sc = SparkContext(conf=conf)

try:
    # Try to access HiveConf, it will raise exception if Hive is not added
    sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
    sqlContext = HiveContext(sc)
except py4j.protocol.Py4JError:
    sqlContext = SQLContext(sc)
except TypeError:
    sqlContext = SQLContext(sc)
sc

<pyspark.context.SparkContext at 0x7fda7c488090>

In [2]:
csv = "s3n://ltsai/smu-talk-8mar2016/csv/1.csv"

In [3]:
from pyspark.sql.types import *
customSchema = StructType([StructField('pclass', StringType(), True),
                           StructField('survived', IntegerType(), True),
                           StructField('name', StringType(), True),
                           StructField('sex', StringType(), True),
                           StructField('age', StringType(), True),
                           StructField('sibsp', IntegerType(), True),
                           StructField('parch', IntegerType(), True),
                           StructField('ticket', StringType(), True),
                           StructField('fare', FloatType(), True),
                           StructField('cabin', StringType(), True),
                           StructField('embarked', StringType(), True),
                           StructField('boat', StringType(), True),
                           StructField('body', StringType(), True),
                           StructField('home.dest', StringType(), True)])
df = sqlContext.read.format('com.databricks.spark.csv'). \
        options(header='true', treatEmptyValuesAsNulls='true'). \
        load(csv, schema = customSchema)

In [4]:
train_df, test_df = df.select("survived", "pclass", "sibsp", "parch").randomSplit([0.8, 0.2])
#train_df.count(),test_df.count()

In [None]:
from pyspark.sql import Row
from pyspark.mllib.linalg import Vectors

ntrain = train_df.map(lambda x: Row(label = float(x[0]) \
         ,features = Vectors.dense(x[1:]))).toDF().persist(StorageLevel.MEMORY_AND_DISK) 
ntest = test_df.map(lambda x: Row(features = Vectors.dense(x[1:]))).toDF()
ntrain.count()

In [7]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=50)

In [None]:
from time import time
t0 = time()
model = lr.fit(ntrain)
t1 = time()

print 'Time Taken ' + str(round(((t1 - t0)/60),2)) + 'mins'

In [8]:
pred = model.transform(ntest)



In [9]:
pred.where("prediction = 1.0").count()/float(test_df.where("survived=1").count())

0.7070707070707071

In [10]:
sc.stop()