In [1]:
import findspark
findspark.init()

In [2]:
# Loading the data into Spark using DataFrames

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Titanic Data').getOrCreate()

spark # prints what's in the object

In [3]:
# Read the training data and create a DataFrames
training_df = (spark.read
          .format("csv")
          .option('header', 'true')
          .load("./data/train.csv"))

training_df # prints the DF structure

DataFrame[PassengerId: string, Survived: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string]

In [4]:
# print some training data records and count from the DF
training_df.show()
training_df.count()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|  35|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

891

In [5]:
# Read the test data and create a DataFrames
test_df = (spark.read
          .format("csv")
          .option('header', 'true')
          .load("./data/test.csv"))

test_df # prints the DF structure

DataFrame[PassengerId: string, Pclass: string, Name: string, Sex: string, Age: string, SibSp: string, Parch: string, Ticket: string, Fare: string, Cabin: string, Embarked: string]

In [6]:
# print some test data records and count from the DF

test_df.show()
test_df.count()

+-----------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|        892|     3|    Kelly, Mr. James|  male|34.5|    0|    0|          330911| 7.8292| null|       Q|
|        893|     3|Wilkes, Mrs. Jame...|female|  47|    1|    0|          363272|      7| null|       S|
|        894|     2|Myles, Mr. Thomas...|  male|  62|    0|    0|          240276| 9.6875| null|       Q|
|        895|     3|    Wirz, Mr. Albert|  male|  27|    0|    0|          315154| 8.6625| null|       S|
|        896|     3|Hirvonen, Mrs. Al...|female|  22|    1|    1|         3101298|12.2875| null|       S|
|        897|     3|Svensson, Mr. Joh...|  male|  14|    0|    0|            7538|  9.225| null|       S|
|        898|     3|Connolly, Miss. Kate|femal

418

In [7]:
# Start Data prep!

# Prepare a training dataset by casting some of columns to required type!

from pyspark.sql.functions import col
training_ds = training_df.select(col('Survived').cast('float'),
                                 col('PassengerId').cast('int'),
                                 col('Pclass').cast('float'),
                                 col('Sex'),
                                 col('Age').cast('float'),
                                 col('Fare').cast('float'),
                                 col('Embarked')
                                )
training_ds.show()

+--------+-----------+------+------+----+-------+--------+
|Survived|PassengerId|Pclass|   Sex| Age|   Fare|Embarked|
+--------+-----------+------+------+----+-------+--------+
|     0.0|          1|   3.0|  male|22.0|   7.25|       S|
|     1.0|          2|   1.0|female|38.0|71.2833|       C|
|     1.0|          3|   3.0|female|26.0|  7.925|       S|
|     1.0|          4|   1.0|female|35.0|   53.1|       S|
|     0.0|          5|   3.0|  male|35.0|   8.05|       S|
|     0.0|          6|   3.0|  male|null| 8.4583|       Q|
|     0.0|          7|   1.0|  male|54.0|51.8625|       S|
|     0.0|          8|   3.0|  male| 2.0| 21.075|       S|
|     1.0|          9|   3.0|female|27.0|11.1333|       S|
|     1.0|         10|   2.0|female|14.0|30.0708|       C|
|     1.0|         11|   3.0|female| 4.0|   16.7|       S|
|     1.0|         12|   1.0|female|58.0|  26.55|       S|
|     0.0|         13|   3.0|  male|20.0|   8.05|       S|
|     0.0|         14|   3.0|  male|39.0| 31.275|       

In [8]:
# Prepare a test dataset by casting some of columns to required type!
test_ds = test_df.select(
                         col('PassengerId').cast('int'),
                         col('Pclass').cast('float'),
                         col('Sex'),
                         col('Age').cast('float'),
                         col('Fare').cast('float'),
                         col('Embarked')
                        )
test_ds.show()

+-----------+------+------+----+-------+--------+
|PassengerId|Pclass|   Sex| Age|   Fare|Embarked|
+-----------+------+------+----+-------+--------+
|        892|   3.0|  male|34.5| 7.8292|       Q|
|        893|   3.0|female|47.0|    7.0|       S|
|        894|   2.0|  male|62.0| 9.6875|       Q|
|        895|   3.0|  male|27.0| 8.6625|       S|
|        896|   3.0|female|22.0|12.2875|       S|
|        897|   3.0|  male|14.0|  9.225|       S|
|        898|   3.0|female|30.0| 7.6292|       Q|
|        899|   2.0|  male|26.0|   29.0|       S|
|        900|   3.0|female|18.0| 7.2292|       C|
|        901|   3.0|  male|21.0|  24.15|       S|
|        902|   3.0|  male|null| 7.8958|       S|
|        903|   1.0|  male|46.0|   26.0|       S|
|        904|   1.0|female|23.0|82.2667|       S|
|        905|   2.0|  male|63.0|   26.0|       S|
|        906|   1.0|female|47.0| 61.175|       S|
|        907|   2.0|female|24.0|27.7208|       C|
|        908|   2.0|  male|35.0|  12.35|       Q|


In [9]:
from pyspark.sql.functions import isnull, when, count, col
training_ds.select([count(when(isnull(c), c)).alias(c) for c in training_ds.columns]).show() # todo: learn to query
test_ds.select([count(when(isnull(c), c)).alias(c) for c in test_ds.columns]).show() # todo: learn to query

+--------+-----------+------+---+---+----+--------+
|Survived|PassengerId|Pclass|Sex|Age|Fare|Embarked|
+--------+-----------+------+---+---+----+--------+
|       0|          0|     0|  0|177|   0|       2|
+--------+-----------+------+---+---+----+--------+

+-----------+------+---+---+----+--------+
|PassengerId|Pclass|Sex|Age|Fare|Embarked|
+-----------+------+---+---+----+--------+
|          0|     0|  0| 86|   1|       0|
+-----------+------+---+---+----+--------+



In [10]:
# Eliminate rows with null values in columns from the training dataset
# training_ds = training_ds.replace('?', None).dropna(how='any') # todo: learn the Dataset API

# Replacing null values with 0
training_ds = training_ds.na.fill(0)
test_ds = test_ds.na.fill(0)

training_ds.show()
training_ds.count()

+--------+-----------+------+------+----+-------+--------+
|Survived|PassengerId|Pclass|   Sex| Age|   Fare|Embarked|
+--------+-----------+------+------+----+-------+--------+
|     0.0|          1|   3.0|  male|22.0|   7.25|       S|
|     1.0|          2|   1.0|female|38.0|71.2833|       C|
|     1.0|          3|   3.0|female|26.0|  7.925|       S|
|     1.0|          4|   1.0|female|35.0|   53.1|       S|
|     0.0|          5|   3.0|  male|35.0|   8.05|       S|
|     0.0|          6|   3.0|  male| 0.0| 8.4583|       Q|
|     0.0|          7|   1.0|  male|54.0|51.8625|       S|
|     0.0|          8|   3.0|  male| 2.0| 21.075|       S|
|     1.0|          9|   3.0|female|27.0|11.1333|       S|
|     1.0|         10|   2.0|female|14.0|30.0708|       C|
|     1.0|         11|   3.0|female| 4.0|   16.7|       S|
|     1.0|         12|   1.0|female|58.0|  26.55|       S|
|     0.0|         13|   3.0|  male|20.0|   8.05|       S|
|     0.0|         14|   3.0|  male|39.0| 31.275|       

891

In [11]:
# Spark ML library only works with numeric data. 
# But we still want to use the Sex and the Embarked column. 
# For that, we will need to encode (transform) them. Sex -> Gender; Embarked -> Boarded

from pyspark.ml.feature import StringIndexer # todo: learn Spark ML API

# for training ds
training_ds = StringIndexer(
    inputCol='Sex', 
    outputCol='Gender', 
    handleInvalid='keep').fit(training_ds).transform(training_ds)
training_ds = StringIndexer(
    inputCol='Embarked', 
    outputCol='Boarded', 
    handleInvalid='keep').fit(training_ds).transform(training_ds)

# for test ds
test_ds = StringIndexer(
    inputCol='Sex', 
    outputCol='Gender', 
    handleInvalid='skip').fit(test_ds).transform(test_ds)
test_ds = StringIndexer(
    inputCol='Embarked', 
    outputCol='Boarded', 
    handleInvalid='keep').fit(test_ds).transform(test_ds)

training_ds.show()
test_ds.show()
test_ds.count()

+--------+-----------+------+------+----+-------+--------+------+-------+
|Survived|PassengerId|Pclass|   Sex| Age|   Fare|Embarked|Gender|Boarded|
+--------+-----------+------+------+----+-------+--------+------+-------+
|     0.0|          1|   3.0|  male|22.0|   7.25|       S|   0.0|    0.0|
|     1.0|          2|   1.0|female|38.0|71.2833|       C|   1.0|    1.0|
|     1.0|          3|   3.0|female|26.0|  7.925|       S|   1.0|    0.0|
|     1.0|          4|   1.0|female|35.0|   53.1|       S|   1.0|    0.0|
|     0.0|          5|   3.0|  male|35.0|   8.05|       S|   0.0|    0.0|
|     0.0|          6|   3.0|  male| 0.0| 8.4583|       Q|   0.0|    2.0|
|     0.0|          7|   1.0|  male|54.0|51.8625|       S|   0.0|    0.0|
|     0.0|          8|   3.0|  male| 2.0| 21.075|       S|   0.0|    0.0|
|     1.0|          9|   3.0|female|27.0|11.1333|       S|   1.0|    0.0|
|     1.0|         10|   2.0|female|14.0|30.0708|       C|   1.0|    1.0|
|     1.0|         11|   3.0|female| 4

418

In [12]:
training_ds.dtypes

[('Survived', 'float'),
 ('PassengerId', 'int'),
 ('Pclass', 'float'),
 ('Sex', 'string'),
 ('Age', 'float'),
 ('Fare', 'float'),
 ('Embarked', 'string'),
 ('Gender', 'double'),
 ('Boarded', 'double')]

In [13]:
test_ds.dtypes

[('PassengerId', 'int'),
 ('Pclass', 'float'),
 ('Sex', 'string'),
 ('Age', 'float'),
 ('Fare', 'float'),
 ('Embarked', 'string'),
 ('Gender', 'double'),
 ('Boarded', 'double')]

In [14]:
# Drop unnecessary columns from training ds
training_ds = training_ds.drop('Sex')
training_ds = training_ds.drop('Embarked')
training_ds.show()

# Drop unnecessary columns from test ds
test_ds = test_ds.drop('Sex')
test_ds = test_ds.drop('Embarked')
test_ds.show()

+--------+-----------+------+----+-------+------+-------+
|Survived|PassengerId|Pclass| Age|   Fare|Gender|Boarded|
+--------+-----------+------+----+-------+------+-------+
|     0.0|          1|   3.0|22.0|   7.25|   0.0|    0.0|
|     1.0|          2|   1.0|38.0|71.2833|   1.0|    1.0|
|     1.0|          3|   3.0|26.0|  7.925|   1.0|    0.0|
|     1.0|          4|   1.0|35.0|   53.1|   1.0|    0.0|
|     0.0|          5|   3.0|35.0|   8.05|   0.0|    0.0|
|     0.0|          6|   3.0| 0.0| 8.4583|   0.0|    2.0|
|     0.0|          7|   1.0|54.0|51.8625|   0.0|    0.0|
|     0.0|          8|   3.0| 2.0| 21.075|   0.0|    0.0|
|     1.0|          9|   3.0|27.0|11.1333|   1.0|    0.0|
|     1.0|         10|   2.0|14.0|30.0708|   1.0|    1.0|
|     1.0|         11|   3.0| 4.0|   16.7|   1.0|    0.0|
|     1.0|         12|   1.0|58.0|  26.55|   1.0|    0.0|
|     0.0|         13|   3.0|20.0|   8.05|   0.0|    0.0|
|     0.0|         14|   3.0|39.0| 31.275|   0.0|    0.0|
|     0.0|    

In [15]:
# Spark works to predict with a column with all the features smashed together into a list-like structure.
# I want to predict “Survived”, I need to combine the information other columns into one column.
# That column is called "features" and it's value should look like say [3.0, 22.0, 7.25, 0, 0]

# Assemble all the features with VectorAssembler
required_features = [
                    'Pclass',
                    'Age',
                    'Fare',
                    'Gender',
                    'Boarded'
                   ]
from pyspark.ml.feature import VectorAssembler # todo
assembler = VectorAssembler(inputCols=required_features, outputCol='features')

transformed_training_data = assembler.transform(training_ds)
transformed_test_data = assembler.transform(test_ds)

transformed_training_data
transformed_test_data

DataFrame[PassengerId: int, Pclass: float, Age: float, Fare: float, Gender: double, Boarded: double, features: vector]

In [16]:
transformed_training_data.show()
transformed_test_data.show()

+--------+-----------+------+----+-------+------+-------+--------------------+
|Survived|PassengerId|Pclass| Age|   Fare|Gender|Boarded|            features|
+--------+-----------+------+----+-------+------+-------+--------------------+
|     0.0|          1|   3.0|22.0|   7.25|   0.0|    0.0|[3.0,22.0,7.25,0....|
|     1.0|          2|   1.0|38.0|71.2833|   1.0|    1.0|[1.0,38.0,71.2833...|
|     1.0|          3|   3.0|26.0|  7.925|   1.0|    0.0|[3.0,26.0,7.92500...|
|     1.0|          4|   1.0|35.0|   53.1|   1.0|    0.0|[1.0,35.0,53.0999...|
|     0.0|          5|   3.0|35.0|   8.05|   0.0|    0.0|[3.0,35.0,8.05000...|
|     0.0|          6|   3.0| 0.0| 8.4583|   0.0|    2.0|[3.0,0.0,8.458299...|
|     0.0|          7|   1.0|54.0|51.8625|   0.0|    0.0|[1.0,54.0,51.8624...|
|     0.0|          8|   3.0| 2.0| 21.075|   0.0|    0.0|[3.0,2.0,21.07500...|
|     1.0|          9|   3.0|27.0|11.1333|   1.0|    0.0|[3.0,27.0,11.1332...|
|     1.0|         10|   2.0|14.0|30.0708|   1.0|   

In [17]:
# At this point our data prep is done
# We will start Modeling now..

# As we have seperate training and testing data, we will use that as it is!
training_data = transformed_training_data
test_data = transformed_test_data

In [18]:
# Build and fit an ML model to our dataset to predict the “Survived” columns with all the other ones. 
# We will be using a Random Forest Classifier. This is actually an estimator that we have to fit.

from pyspark.ml.classification import RandomForestClassifier # todo
rf = RandomForestClassifier(labelCol='Survived', 
                            featuresCol='features',
                            maxDepth=5)

In [19]:
# Now we fit (train) the model with training data
model = rf.fit(training_data)

# This will give us something called a transformer.

model

RandomForestClassificationModel (uid=RandomForestClassifier_b1a82afd3c0f) with 20 trees

In [20]:
# And finally, we predict using the test dataset
predictions = model.transform(test_data)

predictions

DataFrame[PassengerId: int, Pclass: float, Age: float, Fare: float, Gender: double, Boarded: double, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [21]:
# Done! My first Spark ML model
predictions.show()

+-----------+------+----+-------+------+-------+--------------------+--------------------+--------------------+----------+
|PassengerId|Pclass| Age|   Fare|Gender|Boarded|            features|       rawPrediction|         probability|prediction|
+-----------+------+----+-------+------+-------+--------------------+--------------------+--------------------+----------+
|        892|   3.0|34.5| 7.8292|   0.0|    2.0|[3.0,34.5,7.82919...|[18.2556167583500...|[0.91278083791750...|       0.0|
|        893|   3.0|47.0|    7.0|   1.0|    0.0|[3.0,47.0,7.0,1.0...|[13.9587964748052...|[0.69793982374026...|       0.0|
|        894|   2.0|62.0| 9.6875|   0.0|    2.0|[2.0,62.0,9.6875,...|[17.6730740705420...|[0.88365370352710...|       0.0|
|        895|   3.0|27.0| 8.6625|   0.0|    0.0|[3.0,27.0,8.66250...|[17.2658843674402...|[0.86329421837201...|       0.0|
|        896|   3.0|22.0|12.2875|   1.0|    0.0|[3.0,22.0,12.2875...|[9.48490656769818...|[0.47424532838490...|       1.0|
|        897|   

In [22]:
# Finally saving the prediction to a local folder

output_predictions = predictions.select(col('PassengerId').cast('int'),
                                        col('prediction').cast('int').alias('Survived')
                                       )

output_predictions.show()
output_predictions.count()

+-----------+--------+
|PassengerId|Survived|
+-----------+--------+
|        892|       0|
|        893|       0|
|        894|       0|
|        895|       0|
|        896|       1|
|        897|       0|
|        898|       1|
|        899|       0|
|        900|       1|
|        901|       0|
|        902|       0|
|        903|       0|
|        904|       1|
|        905|       0|
|        906|       1|
|        907|       1|
|        908|       0|
|        909|       0|
|        910|       0|
|        911|       0|
+-----------+--------+
only showing top 20 rows



418

In [24]:
output_predictions.write.format('csv').option('header', True).mode('overwrite').option('sep',',').save('./build/')
# Submit the prediction to https://www.kaggle.com/c/titanic/submit