In [7]:
import numpy as np
import pandas as pd

In [8]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler, QuantileDiscretizer
from pyspark.ml.evaluation import *
from pyspark import SparkContext
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [9]:
spark = SparkSession.builder.appName("Spark on Titanic Data").getOrCreate()

24/07/18 10:49:17 WARN Utils: Your hostname, gimsehyeon-ui-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 172.30.1.79 instead (on interface en0)
24/07/18 10:49:17 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/18 10:49:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [10]:
train = spark.read.csv('./train.csv', header=True, inferSchema=True)
test = spark.read.csv('./train.csv', header=True, inferSchema=True)

                                                                                

In [11]:
train.show(10)
test.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|  male|NULL|    0|    0|      

In [12]:
train_count = train.count()
print(train_count)

891


In [13]:
survived_groupped_df = train.groupBy("Survived").count()

In [14]:
survived_groupped_df.show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



In [15]:
getRatio = F.udf(lambda x: round(x/train_count,2), DoubleType())
survived_groupped_df = survived_groupped_df.withColumn("Ratio", getRatio('count'))

In [16]:
survived_groupped_df.show()

# 62% was survived, 38% died

+--------+-----+-----+
|Survived|count|Ratio|
+--------+-----+-----+
|       1|  342| 0.38|
|       0|  549| 0.62|
+--------+-----+-----+



                                                                                

In [17]:
train.groupBy("Sex").count().show()

+------+-----+
|   Sex|count|
+------+-----+
|female|  314|
|  male|  577|
+------+-----+



In [18]:
train.groupBy("Sex").agg(F.mean('Survived'), F.sum('Survived')).show()

# Most of women survived

+------+-------------------+-------------+
|   Sex|      avg(Survived)|sum(Survived)|
+------+-------------------+-------------+
|female| 0.7420382165605095|          233|
|  male|0.18890814558058924|          109|
+------+-------------------+-------------+



In [19]:
train.createOrReplaceTempView('train')

In [20]:
spark.sql("SELECT Sex, round(SUM(Survived)/count(1), 2) as ratio FROM train GROUP BY Sex").show()

# 74% of the female were survived, 19% of men were survived

+------+-----+
|   Sex|ratio|
+------+-----+
|female| 0.74|
|  male| 0.19|
+------+-----+



In [21]:
combined = train.union(test)

In [22]:
combined.count()

1782

In [23]:
combined.createOrReplaceTempView("combined")

In [24]:
null_columns = []

for col_name in combined.columns:
    null_values = combined.where(F.col(col_name).isNull()).count()
    if null_values > 0:
        null_columns.append([col_name, null_values])

print(null_columns)

[['Age', 354], ['Cabin', 1374], ['Embarked', 4]]


In [25]:
spark.createDataFrame(null_columns, ['column', 'missing_value']).show()

+--------+-------------+
|  column|missing_value|
+--------+-------------+
|     Age|          354|
|   Cabin|         1374|
|Embarked|            4|
+--------+-------------+



In [26]:
combined.select('Name').show()

+--------------------+
|                Name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
|    Moran, Mr. James|
|McCarthy, Mr. Tim...|
|Palsson, Master. ...|
|Johnson, Mrs. Osc...|
|Nasser, Mrs. Nich...|
|Sandstrom, Miss. ...|
|Bonnell, Miss. El...|
|Saundercock, Mr. ...|
|Andersson, Mr. An...|
|Vestrom, Miss. Hu...|
|Hewlett, Mrs. (Ma...|
|Rice, Master. Eugene|
|Williams, Mr. Cha...|
|Vander Planke, Mr...|
|Masselmani, Mrs. ...|
+--------------------+
only showing top 20 rows



In [27]:
# Get the title from name
combined = combined.withColumn('Title',F.regexp_extract(F.col("Name"),"([A-Za-z]+)\.",1))

In [28]:
combined.createOrReplaceTempView('combined')
spark.sql("SELECT Title, count(1) FROM combined GROUP BY Title").show()

+--------+--------+
|   Title|count(1)|
+--------+--------+
|     Don|       2|
|    Miss|     364|
|Countess|       2|
|     Col|       4|
|     Rev|      12|
|    Lady|       2|
|  Master|      80|
|     Mme|       2|
|    Capt|       2|
|      Mr|    1034|
|      Dr|      14|
|     Mrs|     250|
|     Sir|       2|
|Jonkheer|       2|
|    Mlle|       4|
|   Major|       4|
|      Ms|       2|
+--------+--------+



In [29]:
# Dr, Rev, Major, Col, Mlle, Capt, Don ... are rare. 
titles_map = {
 'Capt' : 'Rare',
 'Col' : 'Rare',
 'Don': 'Rare',
 'Dona': 'Rare',
 'Dr' : 'Rare',
 'Jonkheer' :'Rare' ,
 'Lady': 'Rare',
 'Major': 'Rare',
 'Master': 'Master',
 'Miss' : 'Miss',
 'Mlle' : 'Rare',
 'Mme': 'Rare',
 'Mr': 'Mr',
 'Mrs': 'Mrs',
 'Ms': 'Rare',
 'Rev': 'Rare',
 'Sir': 'Rare',
 'Countess': 'Rare'
}
def impute_title(title):
    return titles_map[title]

title_map_func = F.udf(lambda x: impute_title(x), StringType())

combined = combined.withColumn('Title', title_map_func('Title'))
    

In [30]:
combined.createOrReplaceTempView('combined')
spark.sql("SELECT Title FROM combined GROUP BY Title").show()

+------+
| Title|
+------+
|  Miss|
|Master|
|    Mr|
|   Mrs|
|  Rare|
+------+



In [31]:
round(spark.sql("SELECT AVG(Age) FROM combined").collect()[0][0])

30

In [32]:
# Fill Average in NULL value

combined = combined.fillna(30, subset=['Age'])

In [33]:
groupped_embarked = spark.sql("SELECT Embarked, count(1) as count_it FROM combined GROUP BY Embarked ORDER BY count_it DESC")

In [34]:
groupped_embarked.show()

+--------+--------+
|Embarked|count_it|
+--------+--------+
|       S|    1288|
|       C|     336|
|       Q|     154|
|    NULL|       4|
+--------+--------+



In [35]:
embarked_mode = groupped_embarked.collect()[0][0]

In [36]:
print(embarked_mode)

S


In [37]:
combined = combined.fillna(embarked_mode, subset=["Embarked"])

In [38]:
combined.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|    Mr|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|   Mrs|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|  Miss|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|   Mrs|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|    Mr|
|          6|       0|  

In [39]:
combined = combined.withColumn("Cabin", combined.Cabin.substr(0, 1))

In [40]:
combined.createOrReplaceTempView('combined')
groupped_cabin = spark.sql("SELECT Cabin,count(1) as count_it FROM combined GROUP BY Cabin ORDER BY count_it DESC")
groupped_cabin.show()

+-----+--------+
|Cabin|count_it|
+-----+--------+
| NULL|    1374|
|    C|     118|
|    B|      94|
|    D|      66|
|    E|      64|
|    A|      30|
|    F|      26|
|    G|       8|
|    T|       2|
+-----+--------+



In [41]:
combined = combined.fillna('U', subset=['Cabin'])

In [42]:
combined = combined.withColumn("Faminly_size", F.col('SibSp') + F.col('Parch'))

In [43]:
combined.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|Faminly_size|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|    U|       S|    Mr|           1|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|    C|       C|   Mrs|           1|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|    U|       S|  Miss|           0|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1|    C|       S|   Mrs|           1|
|          5|       0|     3|Allen, Mr. Willia...|  mal

In [44]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(combined) for column in ["Sex", "Embarked", "Title", "Cabin"]]

                                                                                

In [45]:
pipeline = Pipeline(stages=indexers)
combined = pipeline.fit(combined).transform(combined)

combined.show(10)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+------------+---------+--------------+-----------+-----------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked| Title|Faminly_size|Sex_index|Embarked_index|Title_index|Cabin_index|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+------+------------+---------+--------------+-----------+-----------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|    U|       S|    Mr|           1|      0.0|           0.0|        0.0|        0.0|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|    C|       C|   Mrs|           1|      1.0|           1.0|        2.0|        1.0|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O

In [46]:
combined = combined.drop('Sex', 'PassengerId', 'Name', 'Title', 'SibSp', 'Parch', 'Ticket', 'Cabin', 'Embarked')

In [47]:
combined.show(10)

+--------+------+----+-------+------------+---------+--------------+-----------+-----------+
|Survived|Pclass| Age|   Fare|Faminly_size|Sex_index|Embarked_index|Title_index|Cabin_index|
+--------+------+----+-------+------------+---------+--------------+-----------+-----------+
|       0|     3|22.0|   7.25|           1|      0.0|           0.0|        0.0|        0.0|
|       1|     1|38.0|71.2833|           1|      1.0|           1.0|        2.0|        1.0|
|       1|     3|26.0|  7.925|           0|      1.0|           0.0|        1.0|        0.0|
|       1|     1|35.0|   53.1|           1|      1.0|           0.0|        2.0|        1.0|
|       0|     3|35.0|   8.05|           0|      0.0|           0.0|        0.0|        0.0|
|       0|     3|30.0| 8.4583|           0|      0.0|           2.0|        0.0|        0.0|
|       0|     1|54.0|51.8625|           0|      0.0|           0.0|        0.0|        4.0|
|       0|     3| 2.0| 21.075|           4|      0.0|           0.0|  

In [48]:
combined_pandas = combined.toPandas()

In [49]:
combined_pandas

Unnamed: 0,Survived,Pclass,Age,Fare,Faminly_size,Sex_index,Embarked_index,Title_index,Cabin_index
0,0,3,22.0,7.2500,1,0.0,0.0,0.0,0.0
1,1,1,38.0,71.2833,1,1.0,1.0,2.0,1.0
2,1,3,26.0,7.9250,0,1.0,0.0,1.0,0.0
3,1,1,35.0,53.1000,1,1.0,0.0,2.0,1.0
4,0,3,35.0,8.0500,0,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...
1777,0,2,27.0,13.0000,0,0.0,0.0,4.0,0.0
1778,1,1,19.0,30.0000,0,1.0,0.0,1.0,2.0
1779,0,3,30.0,23.4500,3,1.0,0.0,1.0,0.0
1780,1,1,26.0,30.0000,0,0.0,1.0,0.0,1.0


In [50]:
train_pandas = combined_pandas[:train_count]
test_pandas = combined_pandas[train_count:]

In [51]:
train = spark.createDataFrame(train_pandas)
test = spark.createDataFrame(test_pandas)
test = test.drop('Survived')

In [52]:
train.show(10)
test.show(10)

+--------+------+----+-------+------------+---------+--------------+-----------+-----------+
|Survived|Pclass| Age|   Fare|Faminly_size|Sex_index|Embarked_index|Title_index|Cabin_index|
+--------+------+----+-------+------------+---------+--------------+-----------+-----------+
|       0|     3|22.0|   7.25|           1|      0.0|           0.0|        0.0|        0.0|
|       1|     1|38.0|71.2833|           1|      1.0|           1.0|        2.0|        1.0|
|       1|     3|26.0|  7.925|           0|      1.0|           0.0|        1.0|        0.0|
|       1|     1|35.0|   53.1|           1|      1.0|           0.0|        2.0|        1.0|
|       0|     3|35.0|   8.05|           0|      0.0|           0.0|        0.0|        0.0|
|       0|     3|30.0| 8.4583|           0|      0.0|           2.0|        0.0|        0.0|
|       0|     1|54.0|51.8625|           0|      0.0|           0.0|        0.0|        4.0|
|       0|     3| 2.0| 21.075|           4|      0.0|           0.0|  

In [69]:
test_assembler = VectorAssembler(inputCols=test.columns,outputCol="features")
test_assembler_vector = test_assembler.transform(test)
test_assembler_vector.show()

+------+----+-------+------------+---------+--------------+-----------+-----------+--------------------+
|Pclass| Age|   Fare|Faminly_size|Sex_index|Embarked_index|Title_index|Cabin_index|            features|
+------+----+-------+------------+---------+--------------+-----------+-----------+--------------------+
|     3|22.0|   7.25|           1|      0.0|           0.0|        0.0|        0.0|(8,[0,1,2,3],[3.0...|
|     1|38.0|71.2833|           1|      1.0|           1.0|        2.0|        1.0|[1.0,38.0,71.2833...|
|     3|26.0|  7.925|           0|      1.0|           0.0|        1.0|        0.0|[3.0,26.0,7.925,0...|
|     1|35.0|   53.1|           1|      1.0|           0.0|        2.0|        1.0|[1.0,35.0,53.1,1....|
|     3|35.0|   8.05|           0|      0.0|           0.0|        0.0|        0.0|(8,[0,1,2],[3.0,3...|
|     3|30.0| 8.4583|           0|      0.0|           2.0|        0.0|        0.0|(8,[0,1,2,5],[3.0...|
|     1|54.0|51.8625|           0|      0.0|           

In [70]:
# Split the data into train and test splits
X_train, X_test = train_assembler_vector.randomSplit([0.8, 0.2], seed=11)

In [71]:
# Put all columns into vectors. VectorAssembler is a transformer that combines a given list of columns into a single vector column.
# It is useful for combining raw features and features generated by different feature transformers into a single feature verctor, 
# in order to train ML models.
assembler = VectorAssembler(inputCols=train.columns[1:],outputCol="features")
train_assembler_vector = assembler.transform(train)
train_assembler_vector.show()

+--------+------+----+-------+------------+---------+--------------+-----------+-----------+--------------------+
|Survived|Pclass| Age|   Fare|Faminly_size|Sex_index|Embarked_index|Title_index|Cabin_index|            features|
+--------+------+----+-------+------------+---------+--------------+-----------+-----------+--------------------+
|       0|     3|22.0|   7.25|           1|      0.0|           0.0|        0.0|        0.0|(8,[0,1,2,3],[3.0...|
|       1|     1|38.0|71.2833|           1|      1.0|           1.0|        2.0|        1.0|[1.0,38.0,71.2833...|
|       1|     3|26.0|  7.925|           0|      1.0|           0.0|        1.0|        0.0|[3.0,26.0,7.925,0...|
|       1|     1|35.0|   53.1|           1|      1.0|           0.0|        2.0|        1.0|[1.0,35.0,53.1,1....|
|       0|     3|35.0|   8.05|           0|      0.0|           0.0|        0.0|        0.0|(8,[0,1,2],[3.0,3...|
|       0|     3|30.0| 8.4583|           0|      0.0|           2.0|        0.0|        

In [72]:
# Random Forest Classifier model
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'Survived')
rfModel = rf.fit(X_train)
predictions = rfModel.transform(X_test)
predictions.select("prediction", "Survived", "features").show()

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       0.0|       0|[1.0,45.0,83.475,...|
|       0.0|       0|(8,[0,1,2,7],[1.0...|
|       0.0|       0|(8,[0,1,2],[2.0,6...|
|       0.0|       0|[3.0,2.0,21.075,4...|
|       0.0|       0|[3.0,2.0,29.125,5...|
|       0.0|       0|[3.0,7.0,39.6875,...|
|       0.0|       0|[3.0,8.0,21.075,4...|
|       1.0|       0|[3.0,18.0,17.8,1....|
|       0.0|       0|(8,[0,1,2,5],[3.0...|
|       0.0|       0|(8,[0,1,2,3],[3.0...|
|       0.0|       0|(8,[0,1,2,5],[3.0...|
|       0.0|       0|[3.0,30.0,15.5,1....|
|       1.0|       0|[3.0,31.0,18.0,1....|
|       1.0|       1|[1.0,38.0,71.2833...|
|       1.0|       1|[2.0,3.0,41.5792,...|
|       1.0|       1|[3.0,30.0,7.225,0...|
|       1.0|       1|[3.0,30.0,7.8792,...|
|       1.0|       0|[1.0,24.0,79.2,0....|
|       0.0|       0|[1.0,54.0,77.2875...|
|       1.0|       0|[3.0,14.5,14.4542...|
+----------

In [73]:
# Evaluation
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
print("Accuracy : " + str(evaluator.evaluate(predictions)))

Accuracy : 0.834319526627219


In [74]:
submission = pd.read_csv('./gender_submission.csv')

In [75]:
test = test.drop('Survived')

In [76]:
final_predictions = rfModel.transform(test_assembler_vector)
final_predictions = final_predictions.toPandas()

In [77]:
final_predictions

Unnamed: 0,Pclass,Age,Fare,Faminly_size,Sex_index,Embarked_index,Title_index,Cabin_index,features,rawPrediction,probability,prediction
0,3,22.0,7.2500,1,0.0,0.0,0.0,0.0,"(3.0, 22.0, 7.25, 1.0, 0.0, 0.0, 0.0, 0.0)","[18.139448416999365, 1.8605515830006347]","[0.9069724208499682, 0.09302757915003174]",0.0
1,1,38.0,71.2833,1,1.0,1.0,2.0,1.0,"[1.0, 38.0, 71.2833, 1.0, 1.0, 1.0, 2.0, 1.0]","[1.4350674702672797, 18.56493252973272]","[0.07175337351336399, 0.9282466264866359]",1.0
2,3,26.0,7.9250,0,1.0,0.0,1.0,0.0,"[3.0, 26.0, 7.925, 0.0, 1.0, 0.0, 1.0, 0.0]","[8.570949674326, 11.429050325674002]","[0.4285474837163, 0.5714525162837001]",1.0
3,1,35.0,53.1000,1,1.0,0.0,2.0,1.0,"[1.0, 35.0, 53.1, 1.0, 1.0, 0.0, 2.0, 1.0]","[1.4350674702672797, 18.56493252973272]","[0.07175337351336399, 0.9282466264866359]",1.0
4,3,35.0,8.0500,0,0.0,0.0,0.0,0.0,"(3.0, 35.0, 8.05, 0.0, 0.0, 0.0, 0.0, 0.0)","[18.182382026140175, 1.8176179738598253]","[0.9091191013070088, 0.09088089869299126]",0.0
...,...,...,...,...,...,...,...,...,...,...,...,...
886,2,27.0,13.0000,0,0.0,0.0,4.0,0.0,"(2.0, 27.0, 13.0, 0.0, 0.0, 0.0, 4.0, 0.0)","[15.018675305738828, 4.981324694261171]","[0.7509337652869414, 0.24906623471305855]",0.0
887,1,19.0,30.0000,0,1.0,0.0,1.0,2.0,"[1.0, 19.0, 30.0, 0.0, 1.0, 0.0, 1.0, 2.0]","[1.4323963870403835, 18.56760361295962]","[0.07161981935201917, 0.9283801806479808]",1.0
888,3,30.0,23.4500,3,1.0,0.0,1.0,0.0,"[3.0, 30.0, 23.45, 3.0, 1.0, 0.0, 1.0, 0.0]","[10.688253466593183, 9.31174653340682]","[0.534412673329659, 0.4655873266703409]",0.0
889,1,26.0,30.0000,0,0.0,1.0,0.0,1.0,"[1.0, 26.0, 30.0, 0.0, 0.0, 1.0, 0.0, 1.0]","[8.53199605526219, 11.46800394473781]","[0.42659980276310955, 0.5734001972368905]",1.0


In [78]:
submission['Survived'] = final_predictions['prediction']

In [79]:
submission['Survived'] = submission['Survived'].astype(int)

In [80]:
submission.to_csv("RF.csv", index=False)