In [1]:
# import spark, let the system find it easily with findspark
import findspark

In [2]:
findspark.init('/home/hadoop/spark-2.4.0-bin-hadoop2.7')

In [3]:
# create a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('titanic_spark').getOrCreate()

In [4]:
# read data source. format accepts 'json', 'csv', 'txt'
df = spark.read.format('csv').option('header', 'True').option('inferSchema', 'True').load('data/titanic.csv')
df.show(10)

# example if dataset is on an S3 bucket
# df = spark.read.csv('s3n://bigd-hadoop/titanic.csv')

+---------+------+--------+--------------------+-------+-----------+--------------------+-----+----------+-----+------+
|row_names|pclass|survived|                name|    age|   embarked|           home_dest| room|    ticket| boat|   sex|
+---------+------+--------+--------------------+-------+-----------+--------------------+-----+----------+-----+------+
|        1|   1st|       1|Allen, Miss Elisa...|29.0000|Southampton|        St Louis, MO|  B-5|24160 L221|    2|female|
|        2|   1st|       0|Allison, Miss Hel...| 2.0000|Southampton|Montreal, PQ / Ch...|  C26|      null| null|female|
|        3|   1st|       0|Allison, Mr Hudso...|30.0000|Southampton|Montreal, PQ / Ch...|  C26|      null|(135)|  male|
|        4|   1st|       0|Allison, Mrs Huds...|25.0000|Southampton|Montreal, PQ / Ch...|  C26|      null| null|female|
|        5|   1st|       1|Allison, Master H...| 0.9167|Southampton|Montreal, PQ / Ch...|  C22|      null|   11|  male|
|        6|   1st|       1|  Anderson, M

In [5]:
# we can take a quick look at the data with describe()
df.describe().show()

+-------+-----------------+------+-------------------+--------------------+------------------+-----------+-------------------+------+------------------+-----------------+------+
|summary|        row_names|pclass|           survived|                name|               age|   embarked|          home_dest|  room|            ticket|             boat|   sex|
+-------+-----------------+------+-------------------+--------------------+------------------+-----------+-------------------+------+------------------+-----------------+------+
|  count|             1313|  1313|               1313|                1313|              1313|        821|                754|    77|                69|              347|  1313|
|   mean|            657.0|  null|  0.341964965727342|                null| 31.19418104265403|       null|               null|2131.0|          101216.5| 7.69620253164557|  null|
| stddev|379.1747618183468|  null|0.47454867068071604|                null|14.747525275652208|       null|    

In [6]:
# how many rows and columns?
print('dataframe rows: ', df.count())
print('columns: ', len(df.columns))

dataframe rows:  1313
columns:  11


In [7]:
# take a look at the schema
df.printSchema()

root
 |-- row_names: integer (nullable = true)
 |-- pclass: string (nullable = true)
 |-- survived: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- home_dest: string (nullable = true)
 |-- room: string (nullable = true)
 |-- ticket: string (nullable = true)
 |-- boat: string (nullable = true)
 |-- sex: string (nullable = true)



Looking at the schema, the first thing we notice is that 'age' and 'class' are strings. We expected those to be numbers, so we need to transform them first.

#### We will be showing the resulting dataframe everytime we make a tranformation of the data

In [8]:
# find out the different classes
df.select('pclass').distinct().show()

+------+
|pclass|
+------+
|   2nd|
|   1st|
|   3rd|
+------+



We use UDF (User Defined Functions) to create class_index (string to number conversion)


An example of UDF using the Lambda Function:

age_udf = udf(lambda age: 'young' if age <= 30 else 'senior', StringType())

df = df.withColumn('age_group', age_udf(df.age)).show()

In [9]:
from pyspark.sql.functions import udf, when, col
from pyspark.sql.types import StringType, DoubleType, IntegerType, ByteType

class_udf = udf(lambda pclass: 1 if pclass == '1st' else (2 if pclass == '2nd' else 3), IntegerType())
df = df.withColumn('class_index', class_udf(df['pclass']))
df.show(10)

+---------+------+--------+--------------------+-------+-----------+--------------------+-----+----------+-----+------+-----------+
|row_names|pclass|survived|                name|    age|   embarked|           home_dest| room|    ticket| boat|   sex|class_index|
+---------+------+--------+--------------------+-------+-----------+--------------------+-----+----------+-----+------+-----------+
|        1|   1st|       1|Allen, Miss Elisa...|29.0000|Southampton|        St Louis, MO|  B-5|24160 L221|    2|female|          1|
|        2|   1st|       0|Allison, Miss Hel...| 2.0000|Southampton|Montreal, PQ / Ch...|  C26|      null| null|female|          1|
|        3|   1st|       0|Allison, Mr Hudso...|30.0000|Southampton|Montreal, PQ / Ch...|  C26|      null|(135)|  male|          1|
|        4|   1st|       0|Allison, Mrs Huds...|25.0000|Southampton|Montreal, PQ / Ch...|  C26|      null| null|female|          1|
|        5|   1st|       1|Allison, Master H...| 0.9167|Southampton|Montreal

In [10]:
# next column 'survived'. It looks good with only 0's and 1's
df.select('survived').distinct().show()

+--------+
|survived|
+--------+
|       1|
|       0|
+--------+



In [11]:
# groupby 'age' just to find out how many NULL values do we have on that column
df.groupBy('age').count().orderBy('count', ascending=False).show(5)

+-------+-----+
|    age|count|
+-------+-----+
|     NA|  680|
|30.0000|   28|
|18.0000|   25|
|22.0000|   23|
|36.0000|   23|
+-------+-----+
only showing top 5 rows



In [12]:
# transform 'age' from StringType to DoubleType
df = df.withColumn('age', df['age'].cast(DoubleType()))
df.printSchema()

root
 |-- row_names: integer (nullable = true)
 |-- pclass: string (nullable = true)
 |-- survived: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: double (nullable = true)
 |-- embarked: string (nullable = true)
 |-- home_dest: string (nullable = true)
 |-- room: string (nullable = true)
 |-- ticket: string (nullable = true)
 |-- boat: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- class_index: integer (nullable = true)



In [13]:
# create age_imputed column before filling null values
from pyspark.sql import functions as F
df = df.select(col('*'), F.when(df['age'] > 0, 0).otherwise(1).alias('age_imputed'))
df.show(10)

+---------+------+--------+--------------------+------+-----------+--------------------+-----+----------+-----+------+-----------+-----------+
|row_names|pclass|survived|                name|   age|   embarked|           home_dest| room|    ticket| boat|   sex|class_index|age_imputed|
+---------+------+--------+--------------------+------+-----------+--------------------+-----+----------+-----+------+-----------+-----------+
|        1|   1st|       1|Allen, Miss Elisa...|  29.0|Southampton|        St Louis, MO|  B-5|24160 L221|    2|female|          1|          0|
|        2|   1st|       0|Allison, Miss Hel...|   2.0|Southampton|Montreal, PQ / Ch...|  C26|      null| null|female|          1|          0|
|        3|   1st|       0|Allison, Mr Hudso...|  30.0|Southampton|Montreal, PQ / Ch...|  C26|      null|(135)|  male|          1|          0|
|        4|   1st|       0|Allison, Mrs Huds...|  25.0|Southampton|Montreal, PQ / Ch...|  C26|      null| null|female|          1|          0|

In [14]:
# get the mean of 'age' to fill in the null values
from pyspark.sql.functions import mean, format_number

mean_age = df.select(format_number(mean(df['age']), 2)).collect()
#mean_age = df.select(mean(df['age'])).collect()
mean_age

[Row(format_number(avg(age), 2)='31.19')]

In [15]:
# mean_age as a float
mean_age = float(mean_age[0][0])
mean_age

31.19

In [16]:
# now we can fill the NULL values on 'age' with the mean_age
# df = df.na.fill(mean_age, 'age').show()

In [17]:
# or better use Spark's imputer feature
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['age'], outputCols=['new_age'])
model = imputer.fit(df)

df = model.transform(df)
df.show(10)

+---------+------+--------+--------------------+------+-----------+--------------------+-----+----------+-----+------+-----------+-----------+-------+
|row_names|pclass|survived|                name|   age|   embarked|           home_dest| room|    ticket| boat|   sex|class_index|age_imputed|new_age|
+---------+------+--------+--------------------+------+-----------+--------------------+-----+----------+-----+------+-----------+-----------+-------+
|        1|   1st|       1|Allen, Miss Elisa...|  29.0|Southampton|        St Louis, MO|  B-5|24160 L221|    2|female|          1|          0|   29.0|
|        2|   1st|       0|Allison, Miss Hel...|   2.0|Southampton|Montreal, PQ / Ch...|  C26|      null| null|female|          1|          0|    2.0|
|        3|   1st|       0|Allison, Mr Hudso...|  30.0|Southampton|Montreal, PQ / Ch...|  C26|      null|(135)|  male|          1|          0|   30.0|
|        4|   1st|       0|Allison, Mrs Huds...|  25.0|Southampton|Montreal, PQ / Ch...|  C26|

In [18]:
# take a look at 'embarked': 3 categories + null
df.groupBy('embarked').count().orderBy('count', ascending=False).show(5)

+-----------+-----+
|   embarked|count|
+-----------+-----+
|Southampton|  573|
|       null|  492|
|  Cherbourg|  203|
| Queenstown|   45|
+-----------+-----+



In [19]:
# import libraries for indexer creation
from pyspark.ml.feature import StringIndexer

# we can either impute the NULL values with whatever arbitrary value we want or let spark do it.
# here we use handleInvalid="keep" to account for those NULL values. It will automatically create a 4th category with all the Nulls
emb_indexer = StringIndexer(inputCol='embarked', outputCol='embarked_index', handleInvalid="keep")
df = emb_indexer.fit(df).transform(df)
df.show(10)

+---------+------+--------+--------------------+------+-----------+--------------------+-----+----------+-----+------+-----------+-----------+-------+--------------+
|row_names|pclass|survived|                name|   age|   embarked|           home_dest| room|    ticket| boat|   sex|class_index|age_imputed|new_age|embarked_index|
+---------+------+--------+--------------------+------+-----------+--------------------+-----+----------+-----+------+-----------+-----------+-------+--------------+
|        1|   1st|       1|Allen, Miss Elisa...|  29.0|Southampton|        St Louis, MO|  B-5|24160 L221|    2|female|          1|          0|   29.0|           0.0|
|        2|   1st|       0|Allison, Miss Hel...|   2.0|Southampton|Montreal, PQ / Ch...|  C26|      null| null|female|          1|          0|    2.0|           0.0|
|        3|   1st|       0|Allison, Mr Hudso...|  30.0|Southampton|Montreal, PQ / Ch...|  C26|      null|(135)|  male|          1|          0|   30.0|           0.0|
|   

In [20]:
# although less nulls than other columns, the problem is lack of uniformity on home_dest.
# this column will receive the same treatment as the 'name' column. we'll just drop it
df.groupBy('home_dest').count().orderBy('count', ascending=False).show()

+--------------------+-----+
|           home_dest|count|
+--------------------+-----+
|                null|  559|
|        New York, NY|   65|
|              London|   14|
|        Montreal, PQ|   10|
|Cornwall / Akron, OH|    9|
|       Paris, France|    9|
|Wiltshire, Englan...|    8|
|        Winnipeg, MB|    8|
|    Philadelphia, PA|    8|
|             Belfast|    7|
| Sweden Winnipeg, MN|    7|
|        Brooklyn, NY|    7|
|Rotherfield, Suss...|    5|
|Sweden Worcester, MA|    5|
|      Youngstown, OH|    5|
|          Ottawa, ON|    5|
|Haverford, PA / C...|    5|
|Bulgaria Chicago, IL|    5|
|Somerset / Bernar...|    5|
|Guernsey / Elizab...|    4|
+--------------------+-----+
only showing top 20 rows



In [21]:
# in 'room' we see a lot of null values, but we can still get some categories if we take only the first letter
df.groupBy('room').count().orderBy('count', ascending=False).show()

+--------+-----+
|    room|count|
+--------+-----+
|    null| 1236|
|    F-33|    4|
|   E-101|    3|
|     C26|    3|
|   C-101|    3|
|B-51/3/5|    2|
| B-58/60|    2|
|    C-83|    2|
|    C-93|    2|
|   C-126|    2|
|     B-5|    2|
|    B-18|    2|
|    C-87|    2|
|    C-85|    2|
|     D-?|    2|
|   C-125|    2|
|    D-35|    2|
|     C22|    2|
|    B-49|    2|
|     C-7|    2|
+--------+-----+
only showing top 20 rows



In [22]:
# create room_imputed column before filling nulls
df = df.select(col('*'), F.when(df['room'].isNull(), 1).otherwise(0).alias('room_imputed'))
df.show(10)

+---------+------+--------+--------------------+------+-----------+--------------------+-----+----------+-----+------+-----------+-----------+-------+--------------+------------+
|row_names|pclass|survived|                name|   age|   embarked|           home_dest| room|    ticket| boat|   sex|class_index|age_imputed|new_age|embarked_index|room_imputed|
+---------+------+--------+--------------------+------+-----------+--------------------+-----+----------+-----+------+-----------+-----------+-------+--------------+------------+
|        1|   1st|       1|Allen, Miss Elisa...|  29.0|Southampton|        St Louis, MO|  B-5|24160 L221|    2|female|          1|          0|   29.0|           0.0|           0|
|        2|   1st|       0|Allison, Miss Hel...|   2.0|Southampton|Montreal, PQ / Ch...|  C26|      null| null|female|          1|          0|    2.0|           0.0|           0|
|        3|   1st|       0|Allison, Mr Hudso...|  30.0|Southampton|Montreal, PQ / Ch...|  C26|      null|

In [23]:
# create room_category taking the first letter from room_number
from pyspark.sql.functions import col, substring
df = df.select(col('*'), substring(col('room'), 0, 1).alias('room_categ'))
df.show(10)

+---------+------+--------+--------------------+------+-----------+--------------------+-----+----------+-----+------+-----------+-----------+-------+--------------+------------+----------+
|row_names|pclass|survived|                name|   age|   embarked|           home_dest| room|    ticket| boat|   sex|class_index|age_imputed|new_age|embarked_index|room_imputed|room_categ|
+---------+------+--------+--------------------+------+-----------+--------------------+-----+----------+-----+------+-----------+-----------+-------+--------------+------------+----------+
|        1|   1st|       1|Allen, Miss Elisa...|  29.0|Southampton|        St Louis, MO|  B-5|24160 L221|    2|female|          1|          0|   29.0|           0.0|           0|         B|
|        2|   1st|       0|Allison, Miss Hel...|   2.0|Southampton|Montreal, PQ / Ch...|  C26|      null| null|female|          1|          0|    2.0|           0.0|           0|         C|
|        3|   1st|       0|Allison, Mr Hudso...|  

In [24]:
# create index for room
# we use again handleInvalid="keep" to include null values in the index
room_indexer = StringIndexer(inputCol='room_categ', outputCol='room_index', handleInvalid="keep")
df = room_indexer.fit(df).transform(df)
df.show(10)

+---------+------+--------+--------------------+------+-----------+--------------------+-----+----------+-----+------+-----------+-----------+-------+--------------+------------+----------+----------+
|row_names|pclass|survived|                name|   age|   embarked|           home_dest| room|    ticket| boat|   sex|class_index|age_imputed|new_age|embarked_index|room_imputed|room_categ|room_index|
+---------+------+--------+--------------------+------+-----------+--------------------+-----+----------+-----+------+-----------+-----------+-------+--------------+------------+----------+----------+
|        1|   1st|       1|Allen, Miss Elisa...|  29.0|Southampton|        St Louis, MO|  B-5|24160 L221|    2|female|          1|          0|   29.0|           0.0|           0|         B|       1.0|
|        2|   1st|       0|Allison, Miss Hel...|   2.0|Southampton|Montreal, PQ / Ch...|  C26|      null| null|female|          1|          0|    2.0|           0.0|           0|         C|       

In [25]:
# we'll drop 'ticket' too
df.groupBy('ticket').count().orderBy('count', ascending=False).show()

+-----------------+-----+
|           ticket|count|
+-----------------+-----+
|             null| 1244|
| 17608 L262 7s 6d|    5|
|       230136 L39|    4|
|     13529 L26 5s|    3|
|17754 L224 10s 6d|    3|
|    28220 L32 10s|    3|
|       230080 L26|    3|
|       24160 L221|    3|
|        13502 L77|    3|
| 17582 L153 9s 3d|    3|
|    17755 L512 6s|    2|
|            17754|    2|
|111361 L57 19s 7d|    2|
|  36973 L83 9s 6d|    2|
|17483 L221 15s 7d|    2|
|           392091|    2|
|            17483|    2|
|       229236 L13|    1|
|    17604 L39 12s|    1|
| 17591 L50 9s 11d|    1|
+-----------------+-----+
only showing top 20 rows



In [26]:
# extracting some data from 'boat' looks really challenging. we can try just to index it
df.groupBy('boat').count().orderBy('count', ascending=False).show()

+----+-----+
|boat|count|
+----+-----+
|null|  966|
|   4|   27|
|   5|   27|
|   7|   22|
|   3|   19|
|   8|   18|
|  11|   17|
|   6|   17|
|  13|   16|
|   9|   15|
|  14|   15|
|  12|   13|
|   2|   11|
|   D|   10|
|  10|    8|
|   B|    6|
|  15|    6|
|   C|    6|
|   A|    5|
| 5/7|    4|
+----+-----+
only showing top 20 rows



In [27]:
# create index for boat
boat_indexer = StringIndexer(inputCol='boat', outputCol='boat_index', handleInvalid="keep")
df = boat_indexer.fit(df).transform(df)
df.show(10)

+---------+------+--------+--------------------+------+-----------+--------------------+-----+----------+-----+------+-----------+-----------+-------+--------------+------------+----------+----------+----------+
|row_names|pclass|survived|                name|   age|   embarked|           home_dest| room|    ticket| boat|   sex|class_index|age_imputed|new_age|embarked_index|room_imputed|room_categ|room_index|boat_index|
+---------+------+--------+--------------------+------+-----------+--------------------+-----+----------+-----+------+-----------+-----------+-------+--------------+------------+----------+----------+----------+
|        1|   1st|       1|Allen, Miss Elisa...|  29.0|Southampton|        St Louis, MO|  B-5|24160 L221|    2|female|          1|          0|   29.0|           0.0|           0|         B|       1.0|      11.0|
|        2|   1st|       0|Allison, Miss Hel...|   2.0|Southampton|Montreal, PQ / Ch...|  C26|      null| null|female|          1|          0|    2.0|  

In [28]:
# final column to transform: 'sex'
df.groupBy('sex').count().orderBy('count', ascending=False).show()

+------+-----+
|   sex|count|
+------+-----+
|  male|  850|
|female|  463|
+------+-----+



In [29]:
# create index for sex
sex_indexer = StringIndexer(inputCol='sex', outputCol='sex_index')
df = sex_indexer.fit(df).transform(df)
df.show(5)

+---------+------+--------+--------------------+------+-----------+--------------------+----+----------+-----+------+-----------+-----------+-------+--------------+------------+----------+----------+----------+---------+
|row_names|pclass|survived|                name|   age|   embarked|           home_dest|room|    ticket| boat|   sex|class_index|age_imputed|new_age|embarked_index|room_imputed|room_categ|room_index|boat_index|sex_index|
+---------+------+--------+--------------------+------+-----------+--------------------+----+----------+-----+------+-----------+-----------+-------+--------------+------------+----------+----------+----------+---------+
|        1|   1st|       1|Allen, Miss Elisa...|  29.0|Southampton|        St Louis, MO| B-5|24160 L221|    2|female|          1|          0|   29.0|           0.0|           0|         B|       1.0|      11.0|      1.0|
|        2|   1st|       0|Allison, Miss Hel...|   2.0|Southampton|Montreal, PQ / Ch...| C26|      null| null|female

## ! IMPORTANT ! For all index columns created, now transform them to vectors

In [30]:
# recall all the columns on the dataframe
df.columns

['row_names',
 'pclass',
 'survived',
 'name',
 'age',
 'embarked',
 'home_dest',
 'room',
 'ticket',
 'boat',
 'sex',
 'class_index',
 'age_imputed',
 'new_age',
 'embarked_index',
 'room_imputed',
 'room_categ',
 'room_index',
 'boat_index',
 'sex_index']

In [31]:
# transform all indexes to one_hot_encode vectors
from pyspark.ml.feature import OneHotEncoderEstimator

class_encoder = OneHotEncoderEstimator(inputCols=['class_index'], outputCols=['class_vect'])
df = class_encoder.fit(df).transform(df)

In [32]:
emb_encoder = OneHotEncoderEstimator(inputCols=['embarked_index'], outputCols=['embarked_vect'])
df = emb_encoder.fit(df).transform(df)

In [33]:
room_encoder = OneHotEncoderEstimator(inputCols=['room_index'], outputCols=['room_vect'])
df = room_encoder.fit(df).transform(df)

In [34]:
boat_encoder = OneHotEncoderEstimator(inputCols=['boat_index'], outputCols=['boat_vect'])
df = boat_encoder.fit(df).transform(df)

In [35]:
sex_encoder = OneHotEncoderEstimator(inputCols=['sex_index'], outputCols=['sex_vect'])
df = sex_encoder.fit(df).transform(df)

In [36]:
# final step before creating any ML model: Single vector with all transformed columns and columns created when imputing values
df.columns

['row_names',
 'pclass',
 'survived',
 'name',
 'age',
 'embarked',
 'home_dest',
 'room',
 'ticket',
 'boat',
 'sex',
 'class_index',
 'age_imputed',
 'new_age',
 'embarked_index',
 'room_imputed',
 'room_categ',
 'room_index',
 'boat_index',
 'sex_index',
 'class_vect',
 'embarked_vect',
 'room_vect',
 'boat_vect',
 'sex_vect']

In [37]:
# first select only the labels and features we have created to this point
df = df.select(['survived', 'class_index', 'age_imputed', 'new_age', 'embarked_index'
               ,'room_imputed', 'room_categ', 'room_index', 'boat_index', 'sex_index'
               ,'class_vect', 'embarked_vect', 'room_vect', 'boat_vect', 'sex_vect'])
df.show()

+--------+-----------+-----------+-----------------+--------------+------------+----------+----------+----------+---------+-------------+-------------+-------------+---------------+-------------+
|survived|class_index|age_imputed|          new_age|embarked_index|room_imputed|room_categ|room_index|boat_index|sex_index|   class_vect|embarked_vect|    room_vect|      boat_vect|     sex_vect|
+--------+-----------+-----------+-----------------+--------------+------------+----------+----------+----------+---------+-------------+-------------+-------------+---------------+-------------+
|       1|          1|          0|             29.0|           0.0|           0|         B|       1.0|      11.0|      1.0|(3,[1],[1.0])|(3,[0],[1.0])|(7,[1],[1.0])|(99,[11],[1.0])|    (1,[],[])|
|       0|          1|          0|              2.0|           0.0|           0|         C|       0.0|      99.0|      1.0|(3,[1],[1.0])|(3,[0],[1.0])|(7,[0],[1.0])|     (99,[],[])|    (1,[],[])|
|       0|          

In [38]:
# create a single vector only with valid features (no indexes or labels)
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=['class_vect', 'age_imputed', 'new_age', 'embarked_vect', 'room_imputed', 'room_vect', 'boat_vect', 'sex_vect'],
    outputCol='features')

final_df = assembler.transform(df)
final_df = final_df.select('features', 'survived')

print("Assembled features: ['class_vect', 'age_imputed', 'new_age', 'embarked_vect', 'room_imputed', 'room_vect', 'boat_vect', 'sex_vect']")
final_df.show()

Assembled features: ['class_vect', 'age_imputed', 'new_age', 'embarked_vect', 'room_imputed', 'room_vect', 'boat_vect', 'sex_vect']
+--------------------+--------+
|            features|survived|
+--------------------+--------+
|(116,[1,4,5,10,27...|       1|
|(116,[1,4,5,9],[1...|       0|
|(116,[1,4,5,9,64,...|       0|
|(116,[1,4,5,9],[1...|       0|
|(116,[1,4,5,9,21,...|       1|
|(116,[1,4,5,12,19...|       1|
|(116,[1,4,5,11,29...|       1|
|(116,[1,4,5,14,11...|       0|
|(116,[1,4,5,9,27]...|       1|
|(116,[1,4,6,8,67,...|       0|
|(116,[1,4,6,8,99,...|       0|
|(116,[1,4,6,8,16]...|       1|
|(116,[1,3,4,6,10,...|       1|
|(116,[1,3,4,5,14,...|       1|
|(116,[1,3,4,5,8,1...|       0|
|(116,[1,4,6,10,22...|       1|
|(116,[1,4,6,10,11...|       0|
|(116,[1,4,6,9,115...|       0|
|(116,[1,4,5,11,17...|       1|
|(116,[1,4,5,11,17...|       1|
+--------------------+--------+
only showing top 20 rows



## Finally!!!
Now we can create our regression model

In [39]:
# split the data with an 80/20 ratio
from pyspark.ml.classification import LogisticRegression
train_data, test_data = final_df.randomSplit([0.8, 0.2], seed=7)

In [40]:
# logistic regression model
lr_titanic = LogisticRegression(featuresCol='features', labelCol='survived')

In [41]:
# fit model with training_data
lr_model = lr_titanic.fit(train_data)

In [42]:
# get predictions with test_data
lr_results = lr_model.transform(test_data)

In [43]:
# import evaluator for our model
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [44]:
lr_eval = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='survived')

In [45]:
lr_results.select('survived', 'prediction').show(10)

+--------+----------+
|survived|prediction|
+--------+----------+
|       0|       0.0|
|       1|       1.0|
|       1|       1.0|
|       1|       1.0|
|       1|       1.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
|       0|       0.0|
+--------+----------+
only showing top 10 rows



In [46]:
AUC = lr_eval.evaluate(lr_results)

In [47]:
AUC

0.8607638888888888

### That's a pretty good model.

Now, let's create other different models and compare performance

In [48]:
# we want to evaluate performance of Decision Trees, Random Forest and Gradient Boosting
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, GBTClassifier

In [49]:
# create the 3 different models
dtc = DecisionTreeClassifier(labelCol='survived', featuresCol='features')
rfc = RandomForestClassifier(labelCol='survived', featuresCol='features')
gbc = GBTClassifier(labelCol='survived', featuresCol='features')

In [50]:
# train the models with training data
dtc_model = dtc.fit(train_data)
rfc_model = rfc.fit(train_data)
gbc_model = gbc.fit(train_data)

In [51]:
# get the predictions with test_data 
dtc_preds = dtc_model.transform(test_data)
rfc_preds = rfc_model.transform(test_data)
gbc_preds = gbc_model.transform(test_data)

In [52]:
# to evaluate our model we use again BinaryClassificationEvaluator
binary_eval = BinaryClassificationEvaluator(labelCol='survived')

In [53]:
# taking a look at the schema of predictions, we see the resulting vectors available for evaluation
dtc_preds.printSchema()

root
 |-- features: vector (nullable = true)
 |-- survived: integer (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [54]:
print('Decision Tree Classifier:', binary_eval.evaluate(dtc_preds))

Decision Tree Classifier: 0.3241666666666667


In [55]:
# surprisingly, random forest perfomed worst than our logistic regression model
print('Random Forest Classifier:', binary_eval.evaluate(rfc_preds))

Random Forest Classifier: 0.8504513888888889


In [56]:
print('Gradient Boosting Classifier:', binary_eval.evaluate(gbc_preds))

Gradient Boosting Classifier: 0.8889236111111111


#### Finally, Gradient Boosting Classifier turns out to be the best one among the 4 chosen models 
#### (Logistic Regression, Decision Trees, Random Forest, Gradient Boosting)

As a final note, we should remember that we used the default parameters when creating the classification models.
There are dozens of parameters we can start tuning in order to get a better score on each one of the models created