In [1]:
from pyspark.sql import SparkSession
from random import randint
from pyspark.sql.types import IntegerType, StructField, StructType
import pyspark.sql.functions as F
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from keras.layers import Dense
import numpy as np
#from sklearn.model_selection import train_test_split

In [2]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable

In [3]:
spark = SparkSession.builder.appName('SparkTitanic.com').getOrCreate()

Reading the data

In [4]:
train_df = spark.read.option("inferSchema", True)\
    .option("header", True)\
    .csv('./input/titanic/train.csv')
    #for production: '../input/titanic/train.csv'

Cleaning the data

In [5]:
train_df = train_df.select("Sex", "SibSp", "Parch", "Survived")
train_df = train_df.where(train_df["SibSp"].isNotNull())
train_df = train_df.where(train_df["Parch"].isNotNull())
train_df = train_df.where(train_df["Sex"].isNotNull())
train_df = train_df.where(train_df["Survived"].isNotNull())
train_df = train_df.withColumn("Sex", F.when(F.col("Sex")==F.lit("male"), 0.).otherwise(1.) )

models

In [6]:
x_train = list(map(lambda x: list(x), train_df.select("Sex", "SibSp", "Parch").collect()))
y_train = list(map(lambda x: list(x)[0], train_df.select("Survived").collect()))
x_train = np.array([[x[0], x[1], x[2]/5] for x in x_train], dtype="float32")
y_train = to_categorical(y_train,2)
#x_train, x_test, y_train, y_test = train_test_split(x_train, y_train, test_size=0.30, random_state=40)
model1 = Sequential()
model1.add(Dense(16, input_dim=3, activation='tanh'))
model1.add(Dense(12, activation='tanh'))
model1.add(Dense(2, activation='softmax'))
model1.compile(loss='binary_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
model1.fit(x_train, y_train, batch_size=5, epochs=12, shuffle=True, verbose=0)
#score1 = model1.evaluate(x_test, y_test, batch_size=20)



In [7]:
x_train = list(map(lambda x: list(x), train_df.select("Sex", "SibSp", "Parch").collect()))
y_train = list(map(lambda x: list(x)[0], train_df.select("Survived").collect()))
x_train = np.array([[x[0], x[1], x[2]/5] for x in x_train], dtype="float32")
y_train = to_categorical(y_train,2)
model2 = Sequential()
model2.add(Dense(16, input_dim=3, activation='tanh'))
model2.add(Dense(12, activation='tanh'))
model2.add(Dense(2, activation='softmax'))
model2.compile(loss='binary_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
model2.fit(x_train, y_train, batch_size=5, epochs=12, shuffle=True, verbose=0)

<keras.callbacks.History at 0x7fb0a43e7750>

In [8]:
x_train = list(map(lambda x: list(x), train_df.select("Sex", "SibSp", "Parch").collect()))
y_train = list(map(lambda x: list(x)[0], train_df.select("Survived").collect()))
x_train = np.array([[x[0], x[1], x[2]/5] for x in x_train], dtype="float32")
y_train = to_categorical(y_train,2)
model3 = Sequential()
model3.add(Dense(16, input_dim=3, activation='tanh'))
model3.add(Dense(12, activation='tanh'))
model3.add(Dense(2, activation='softmax'))
model3.compile(loss='binary_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
model3.fit(x_train, y_train, batch_size=5, epochs=12, shuffle=True, verbose=0)

<keras.callbacks.History at 0x7fb08c208890>

Prediction

In [9]:
def partialPrediction(row, model):

    if row.Sex == 'female':
        Sex = 1.
    elif row.Sex == 'male':
        Sex = 0.
    else:
        Sex = randint(0,1)*1.

    SibSp = row.SibSp
    if SibSp == None:
        SibSp = randint(0,1)
    
    Parch = row.Parch
    if Parch == None:
        Parch = randint(0,2)
    Parch = (Parch-1)/2
    p = model.predict([(Sex, SibSp, Parch)])[0]

    return int(np.argmax(p))

In [10]:
def prediction(row):
    if partialPrediction(row, model1) + partialPrediction(row, model2) + partialPrediction(row, model3) > 1:
        return 1
    else:
        return 0

In [11]:
test_df = spark.read.option("inferSchema", True)\
    .option("header", True)\
    .csv('./input/titanic/test.csv')

In [12]:
output_df = spark.createDataFrame(spark.sparkContext.parallelize([]), StructType([
    StructField('PassengerId', IntegerType(), True),
    StructField('Survived', IntegerType(), True)
]))

In [13]:
for row in test_df.collect():
    output_df = output_df.union( spark.sparkContext.parallelize([(row.PassengerId, prediction(row) )]).toDF(['PassengerId', 'Survived']) )

Output

In [14]:
output_df.toPandas().to_csv('./componentC.csv', index=False)