In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("titanic").getOrCreate()

In [4]:
train = spark.read.csv('./train.csv', header="true", inferSchema="true")
test = spark.read.csv('./test.csv', header="true", inferSchema="true")

In [5]:
from pyspark.sql.functions import col

# Find columns with missing values
def findNullColumns(df):
    nullCols = []
    numRows = df.count()
    for k in df.columns:
        if df.filter(col(k).isNotNull()).count() != numRows:
            nullCols.append(k)
    return nullCols

findNullColumns(train)

['Age', 'Cabin', 'Embarked']

In [6]:
# Find percentage of data missing values

train.filter(train.Age.isNull()).count() / train.count()


0.19865319865319866

In [7]:
train = train.filter(train.Age.isNotNull())
train.count()

714

In [8]:
train.filter(train.Embarked.isNull()).count() / train.count()
train = train.filter(train.Embarked.isNotNull())
train.count()

712

In [9]:
train.filter(train.Cabin.isNull()).count() / train.count()
train = train.drop('Cabin')
train.count()

712

In [10]:
train.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = true)



In [11]:
train = train.drop('Cabin')
train = train.drop('Embarked')
train

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double]

In [12]:
from pyspark.ml.feature import Bucketizer

ageSplits = [0, 16, 32, 48, 64, 200]
ageBucketizer = Bucketizer(splits=ageSplits, inputCol='Age', outputCol='BucketedAge')
fareSplits = [-float('inf'), 7.91, 14.454, 31, float('inf')]
fareBucketizer = Bucketizer(splits=fareSplits, inputCol='Fare', outputCol='BucketedFare')

train_cleaned = ageBucketizer.transform(train).drop('Age')
train_cleaned = fareBucketizer.transform(train_cleaned).drop('Fare')
train_cleaned.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- BucketedAge: double (nullable = true)
 |-- BucketedFare: double (nullable = true)



In [13]:
from pyspark.ml.feature import StringIndexer, VectorIndexer

sexFeatureIndexer = StringIndexer(inputCol='Sex', outputCol='IndexedSex').fit(train_cleaned)
train_cleaned = sexFeatureIndexer.transform(train_cleaned).drop('Sex')

train_cleaned.show()


+-----------+--------+------+--------------------+-----+-----+----------------+-----------+------------+----------+
|PassengerId|Survived|Pclass|                Name|SibSp|Parch|          Ticket|BucketedAge|BucketedFare|IndexedSex|
+-----------+--------+------+--------------------+-----+-----+----------------+-----------+------------+----------+
|          1|       0|     3|Braund, Mr. Owen ...|    1|    0|       A/5 21171|        1.0|         0.0|       0.0|
|          2|       1|     1|Cumings, Mrs. Joh...|    1|    0|        PC 17599|        2.0|         3.0|       1.0|
|          3|       1|     3|Heikkinen, Miss. ...|    0|    0|STON/O2. 3101282|        1.0|         1.0|       1.0|
|          4|       1|     1|Futrelle, Mrs. Ja...|    1|    0|          113803|        2.0|         3.0|       1.0|
|          5|       0|     3|Allen, Mr. Willia...|    0|    0|          373450|        2.0|         1.0|       0.0|
|          7|       0|     1|McCarthy, Mr. Tim...|    0|    0|          

In [14]:
train_cleaned = train_cleaned.drop('Ticket')
train_cleaned = train_cleaned.drop('PassengerId')
train_cleaned = train_cleaned.drop('Name')
train_cleaned.show()

+--------+------+-----+-----+-----------+------------+----------+
|Survived|Pclass|SibSp|Parch|BucketedAge|BucketedFare|IndexedSex|
+--------+------+-----+-----+-----------+------------+----------+
|       0|     3|    1|    0|        1.0|         0.0|       0.0|
|       1|     1|    1|    0|        2.0|         3.0|       1.0|
|       1|     3|    0|    0|        1.0|         1.0|       1.0|
|       1|     1|    1|    0|        2.0|         3.0|       1.0|
|       0|     3|    0|    0|        2.0|         1.0|       0.0|
|       0|     1|    0|    0|        3.0|         3.0|       0.0|
|       0|     3|    3|    1|        0.0|         2.0|       0.0|
|       1|     3|    0|    2|        1.0|         1.0|       1.0|
|       1|     2|    1|    0|        0.0|         2.0|       1.0|
|       1|     3|    1|    1|        0.0|         2.0|       1.0|
|       1|     1|    0|    0|        3.0|         2.0|       1.0|
|       0|     3|    0|    0|        1.0|         1.0|       0.0|
|       0|

In [15]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['Pclass', 'SibSp', 'Parch', 'BucketedAge', 'BucketedFare', 'IndexedSex'], outputCol='features')
train_cleaned = assembler.transform(train_cleaned)
cols = ['Pclass', 'SibSp', 'Parch', 'BucketedAge', 'BucketedFare', 'IndexedSex']
train_cleaned = train_cleaned.drop(*cols)
train_cleaned.show()

+--------+--------------------+
|Survived|            features|
+--------+--------------------+
|       0|[3.0,1.0,0.0,1.0,...|
|       1|[1.0,1.0,0.0,2.0,...|
|       1|[3.0,0.0,0.0,1.0,...|
|       1|[1.0,1.0,0.0,2.0,...|
|       0|[3.0,0.0,0.0,2.0,...|
|       0|[1.0,0.0,0.0,3.0,...|
|       0|[3.0,3.0,1.0,0.0,...|
|       1|[3.0,0.0,2.0,1.0,...|
|       1|[2.0,1.0,0.0,0.0,...|
|       1|[3.0,1.0,1.0,0.0,...|
|       1|[1.0,0.0,0.0,3.0,...|
|       0|[3.0,0.0,0.0,1.0,...|
|       0|[3.0,1.0,5.0,2.0,...|
|       0| (6,[0,5],[3.0,1.0])|
|       1|[2.0,0.0,0.0,3.0,...|
|       0|[3.0,4.0,1.0,0.0,...|
|       0|[3.0,1.0,0.0,1.0,...|
|       0|[2.0,0.0,0.0,2.0,...|
|       1|[2.0,0.0,0.0,2.0,...|
|       1|[3.0,0.0,0.0,0.0,...|
+--------+--------------------+
only showing top 20 rows



In [18]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol='Survived', featuresCol='features', numTrees=10)
rf.fit(train_cleaned)

RandomForestClassificationModel (uid=RandomForestClassifier_414890e91c229ddf998d) with 10 trees

In [20]:
rf.

Param(parent='RandomForestClassifier_414890e91c229ddf998d', name='numTrees', doc='Number of trees to train (>= 1).')

In [16]:
%%script false

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def sexToInt(value):
    if value == 'male':
        return 0
    elif value == 'female':
        return 1

def ageToInt(value):
    if value <= 16:
        return 0
    elif value > 16 and value <= 32:
        return 1
    elif value > 32 and value <= 48:
        return 2
    elif value > 48 and value <= 64:
        return 3
    else:
        return 4

def fareToInt(value):
    if value <= 7.91:
        return 0
    elif value > 7.91 and value <= 14.454:
        return 1
    elif value > 14.454 and value <= 31:
        return 2
    else:
        return 3

udfSexValueToInt = udf(sexToInt, IntegerType())
udfAgeValueToInt = udf(ageToInt, IntegerType())
udfFareValueToInt = udf(fareToInt, IntegerType())

full_data_cleaned = []

for dataset in [train, test]:
    dataset_cleaned = dataset.withColumn('SexCategory', udfSexValueToInt('Sex'))
    dataset_cleaned = dataset_cleaned.drop('Sex')
    
    dataset_cleaned = dataset_cleaned.withColumn('AgeCategory', udfAgeValueToInt('Age'))
    dataset_cleaned = dataset_cleaned.drop('Age')
    
    dataset_cleaned = dataset_cleaned.withColumn('FareCategory', udfFareValueToInt('Fare'))
    dataset_cleaned = dataset_cleaned.drop('Fare')
    
    full_data_cleaned.append(dataset_cleaned)
    

In [17]:
full_data_cleaned[0].head(3)

NameError: name 'full_data_cleaned' is not defined