In [2]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()



In [56]:
df = spark.read.option("header", "true").option("inferSchema", "true").csv("/Users/swkim/Data/data_engineering_school/titanic/train.csv") 
df.printSchema()


root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [57]:
from pyspark.sql.functions import *

mean_value = df.agg(mean(df["Age"]).alias("mean")).first().mean
fixed_df = df.na.fill(mean_value, ["Age"])


In [58]:
dfs = fixed_df.randomSplit([0.7, 0.3])
train_df = dfs[0].withColumnRenamed("Survived", "label")
test_df = dfs[1]


In [59]:
from pyspark.ml.feature import *

def handleCategorical(column): 
    stringIndexer = StringIndexer().setInputCol(column) \
      .setOutputCol(column + "_index")\
      .setHandleInvalid("skip")
    oneHot = OneHotEncoder().setInputCol(column + "_index").setOutputCol(column + "_onehot")
    return stringIndexer, oneHot


In [60]:
genderStages = handleCategorical("Sex")
embarkedStages = handleCategorical("Embarked")
pClassStages = handleCategorical("Pclass")


In [61]:
cols = ["Sex_onehot", "Embarked_onehot", "Pclass_onehot", "SibSp", "Parch", "Age", "Fare"]
vectorAssembler = VectorAssembler().setInputCols(cols).setOutputCol("features")

from pyspark.ml.classification import *
from pyspark.ml.pipeline import *


lr = LogisticRegression()
stages = [genderStages[0], genderStages[1], embarkedStages[0], embarkedStages[1], pClassStages[0], pClassStages[1], vectorAssembler, lr]
pipeline = Pipeline(stages=stages)


In [74]:
model = pipeline.fit(train_df)

In [75]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def accuracyScore(predictions_df, label, predictCol):
    evaluator = MulticlassClassificationEvaluator(labelCol=label, predictionCol=predictCol, metricName="accuracy")
    return evaluator.evaluate(predictions_df)

print("train accuracy with pipeline %.2f" % accuracyScore(model.transform(train_df), "label", "prediction"))
print("test accuracy with pipeline %.2f" % accuracyScore(model.transform(test_df), "Survived", "prediction"))


train accuracy with pipeline 0.81
test accuracy with pipeline 0.81
