<a href="https://colab.research.google.com/github/saktiworkstation/road-to-ai-developer/blob/main/22_11_4677_Sakti_Pyspark_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('classification').getOrCreate()

In [3]:
from itertools import chain
from pyspark.sql.functions import count, mean, when, lit, create_map, regexp_extract

In [4]:
df1 = spark.read.csv('/content/train.csv', header=True, inferSchema=True)
df2 = spark.read.csv('/content/test.csv',  header=True, inferSchema=True)

In [5]:
df1.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [6]:
df1.show(4)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
only showing top 4 rows



In [7]:
df1.limit(5).toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


In [8]:
df1.select('Survived', 'Pclass', 'Age', 'Fare').show(4)

+--------+------+----+-------+
|Survived|Pclass| Age|   Fare|
+--------+------+----+-------+
|       0|     3|22.0|   7.25|
|       1|     1|38.0|71.2833|
|       1|     3|26.0|  7.925|
|       1|     1|35.0|   53.1|
+--------+------+----+-------+
only showing top 4 rows



In [9]:
df1.select('Survived', 'Pclass', 'Age', 'Fare').summary().show()

+-------+-------------------+------------------+------------------+-----------------+
|summary|           Survived|            Pclass|               Age|             Fare|
+-------+-------------------+------------------+------------------+-----------------+
|  count|                891|               891|               714|              891|
|   mean| 0.3838383838383838| 2.308641975308642| 29.69911764705882| 32.2042079685746|
| stddev|0.48659245426485753|0.8360712409770491|14.526497332334035|49.69342859718089|
|    min|                  0|                 1|              0.42|              0.0|
|    25%|                  0|                 2|              20.0|           7.8958|
|    50%|                  0|                 3|              28.0|          14.4542|
|    75%|                  1|                 3|              38.0|             31.0|
|    max|                  1|                 3|              80.0|         512.3292|
+-------+-------------------+------------------+------

In [10]:
print('Number of rows: \t', df1.count())
print('Number of columns: \t', len(df1.columns))

Number of rows: 	 891
Number of columns: 	 12


# Exploratory Data analisis

In [11]:
df1.groupBy('Survived').count().show()

+--------+-----+
|Survived|count|
+--------+-----+
|       1|  342|
|       0|  549|
+--------+-----+



In [12]:
df1.groupBy('Survived').mean('Fare', 'Age').show()

+--------+------------------+------------------+
|Survived|         avg(Fare)|          avg(Age)|
+--------+------------------+------------------+
|       1| 48.39540760233917|28.343689655172415|
|       0|22.117886885245877| 30.62617924528302|
+--------+------------------+------------------+



In [13]:
df1.groupBy('Survived').pivot('Sex').count().show()

+--------+------+----+
|Survived|female|male|
+--------+------+----+
|       1|   233| 109|
|       0|    81| 468|
+--------+------+----+



In [14]:
df1.groupBy('Survived').pivot('Pclass').count().show()

+--------+---+---+---+
|Survived|  1|  2|  3|
+--------+---+---+---+
|       1|136| 87|119|
|       0| 80| 97|372|
+--------+---+---+---+



In [15]:
df1.groupBy('Survived').pivot('SibSp').count().show()

+--------+---+---+---+---+---+----+----+
|Survived|  0|  1|  2|  3|  4|   5|   8|
+--------+---+---+---+---+---+----+----+
|       1|210|112| 13|  4|  3|NULL|NULL|
|       0|398| 97| 15| 12| 15|   5|   7|
+--------+---+---+---+---+---+----+----+



In [16]:
df1.groupBy('Survived').pivot('Parch').count().show()

+--------+---+---+---+---+----+---+----+
|Survived|  0|  1|  2|  3|   4|  5|   6|
+--------+---+---+---+---+----+---+----+
|       1|233| 65| 40|  3|NULL|  1|NULL|
|       0|445| 53| 40|  2|   4|  4|   1|
+--------+---+---+---+---+----+---+----+



In [17]:
df1.groupBy('Survived').pivot('Embarked').count().show()

+--------+----+---+---+---+
|Survived|null|  C|  Q|  S|
+--------+----+---+---+---+
|       1|   2| 93| 30|217|
|       0|NULL| 75| 47|427|
+--------+----+---+---+---+



In [18]:
for col in df1.columns:
    print(col.ljust(20), df1.filter(df1[col].isNull()).count())

PassengerId          0
Survived             0
Pclass               0
Name                 0
Sex                  0
Age                  177
SibSp                0
Parch                0
Ticket               0
Fare                 0
Cabin                687
Embarked             2


In [19]:
df1.select('Fare', 'Embarked').summary('mean', '50%', 'max').show()

+-------+----------------+--------+
|summary|            Fare|Embarked|
+-------+----------------+--------+
|   mean|32.2042079685746|    NULL|
|    50%|         14.4542|    NULL|
|    max|        512.3292|       S|
+-------+----------------+--------+



In [20]:
df1 = df1.fillna({'Embarked': 'S', 'Fare':14.45})

In [21]:
df1 = df1.withColumn('Title', regexp_extract(df1['Name'],\
                '([A-Za-z]+)\.', 1))

df1.groupBy('Title').agg(count('Age'), mean('Age')).sort('count(Age)').show()

+--------+----------+------------------+
|   Title|count(Age)|          avg(Age)|
+--------+----------+------------------+
|     Don|         1|              40.0|
|Countess|         1|              33.0|
|    Lady|         1|              48.0|
|     Mme|         1|              24.0|
|    Capt|         1|              70.0|
|     Sir|         1|              49.0|
|Jonkheer|         1|              38.0|
|      Ms|         1|              28.0|
|     Col|         2|              58.0|
|    Mlle|         2|              24.0|
|   Major|         2|              48.5|
|     Rev|         6|43.166666666666664|
|      Dr|         6|              42.0|
|  Master|        36| 4.574166666666667|
|     Mrs|       108|35.898148148148145|
|    Miss|       146|21.773972602739725|
|      Mr|       398|32.368090452261306|
+--------+----------+------------------+



In [22]:
title_dic = {'Mr':'Mr', 'Miss':'Miss', 'Mrs':'Mrs', 'Master':'Master', \
             'Mlle': 'Miss', 'Major': 'Mr', 'Col': 'Mr', 'Sir': 'Mr',\
             'Don': 'Mr', 'Mme': 'Miss', 'Jonkheer': 'Mr', 'Lady': 'Mrs',\
             'Capt': 'Mr', 'Countess': 'Mrs', 'Ms': 'Miss', 'Dona': 'Mrs', \
             'Dr':'Mr', 'Rev':'Mr'}

mapping = create_map([lit(x) for x in chain(*title_dic.items())])

df1 = df1.withColumn('Title', mapping[df1['Title']])
df1.groupBy('Title').mean('Age').show()

+------+------------------+
| Title|          avg(Age)|
+------+------------------+
|  Miss|             21.86|
|Master| 4.574166666666667|
|    Mr| 33.02272727272727|
|   Mrs|35.981818181818184|
+------+------------------+



In [23]:
def age_imputer(df, title, age):

    '''This function search for the null in 'Age' column
    of the dataframe df. If there is null then it look
    for the title and fill the 'Age' with age argument.
    If 'Age' is not null, it will keep the same age.  '''

    return df.withColumn('Age', \
                         when((df['Age'].isNull()) & (df['Title']==title), \
                              age).otherwise(df['Age']))

In [24]:
df1 = age_imputer(df1, 'Mr', 33.02)
df1 = age_imputer(df1, 'Mrs', 35.98)
df1 = age_imputer(df1, 'Miss', 21.86)
df1 = age_imputer(df1, 'Master', 4.75)

In [25]:
df1 = df1.withColumn('FamilySize', df1['Parch'] + df1['SibSp']).\
            drop('Parch', 'SibSp')

In [26]:
df1 = df1.drop('PassengerID', 'Cabin', 'Name', 'Ticket', 'Title')

In [27]:
df1.show(4)

+--------+------+------+----+-------+--------+----------+
|Survived|Pclass|   Sex| Age|   Fare|Embarked|FamilySize|
+--------+------+------+----+-------+--------+----------+
|       0|     3|  male|22.0|   7.25|       S|         1|
|       1|     1|female|38.0|71.2833|       C|         1|
|       1|     3|female|26.0|  7.925|       S|         0|
|       1|     1|female|35.0|   53.1|       S|         1|
+--------+------+------+----+-------+--------+----------+
only showing top 4 rows



In [28]:
for col in df1.columns:
    print(col.ljust(20), df1.filter(df1[col].isNull()).count())

Survived             0
Pclass               0
Sex                  0
Age                  0
Fare                 0
Embarked             0
FamilySize           0


In [29]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression,\
                    RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [30]:
stringIndex = StringIndexer(inputCols=['Sex', 'Embarked'],
                       outputCols=['SexNum', 'EmbNum'])

stringIndex_model = stringIndex.fit(df1)

df1_ = stringIndex_model.transform(df1).drop('Sex', 'Embarked')
df1_.show(4)

+--------+------+----+-------+----------+------+------+
|Survived|Pclass| Age|   Fare|FamilySize|SexNum|EmbNum|
+--------+------+----+-------+----------+------+------+
|       0|     3|22.0|   7.25|         1|   0.0|   0.0|
|       1|     1|38.0|71.2833|         1|   1.0|   1.0|
|       1|     3|26.0|  7.925|         0|   1.0|   0.0|
|       1|     1|35.0|   53.1|         1|   1.0|   0.0|
+--------+------+----+-------+----------+------+------+
only showing top 4 rows



In [31]:
vec_asmbl = VectorAssembler(inputCols=df1_.columns[1:],
                           outputCol='features')

df1_ = vec_asmbl.transform(df1_).select('features', 'Survived')
df1_.show(4, truncate=False)

+------------------------------+--------+
|features                      |Survived|
+------------------------------+--------+
|[3.0,22.0,7.25,1.0,0.0,0.0]   |0       |
|[1.0,38.0,71.2833,1.0,1.0,1.0]|1       |
|[3.0,26.0,7.925,0.0,1.0,0.0]  |1       |
|[1.0,35.0,53.1,1.0,1.0,0.0]   |1       |
+------------------------------+--------+
only showing top 4 rows



In [72]:
train_df, valid_df = df1_.randomSplit([0.7, 0.3])

In [82]:
train_df, valid_df = df1_.randomSplit([0.8, 0.2])

In [83]:
train_df, valid_df = df1_.randomSplit([0.6, 0.4])

In [84]:
train_df, valid_df = df1_.randomSplit([0.85, 0.25])

In [73]:
train_df.show(10, truncate=False)

+-----------------------------+--------+
|features                     |Survived|
+-----------------------------+--------+
|(6,[0,1],[1.0,33.02])        |0       |
|(6,[0,1],[2.0,33.02])        |0       |
|(6,[0,1],[2.0,33.02])        |0       |
|(6,[0,1],[2.0,33.02])        |0       |
|(6,[0,1],[2.0,33.02])        |0       |
|(6,[0,1],[2.0,33.02])        |0       |
|(6,[0,1],[3.0,19.0])         |0       |
|(6,[0,1],[3.0,49.0])         |0       |
|[1.0,0.92,151.55,3.0,0.0,0.0]|1       |
|[1.0,2.0,151.55,3.0,1.0,0.0] |0       |
+-----------------------------+--------+
only showing top 10 rows



In [75]:
evaluator = MulticlassClassificationEvaluator(labelCol='Survived',
                                          metricName='accuracy')

In [76]:
ridge = LogisticRegression(labelCol='Survived',
                        maxIter=100,
                        elasticNetParam=0, # Ridge regression is choosen
                        regParam=0.03)

model = ridge.fit(train_df)
pred = model.transform(valid_df)
evaluator.evaluate(pred)

0.823076923076923

In [77]:
lasso = LogisticRegression(labelCol='Survived',
                           maxIter=100,
                           elasticNetParam=1, # Lasso
                           regParam=0.0003)

model = lasso.fit(train_df)
pred = model.transform(valid_df)
evaluator.evaluate(pred)

0.8307692307692308

In [78]:
rf = RandomForestClassifier(labelCol='Survived',
                           numTrees=100, maxDepth=3)

model = rf.fit(train_df)
pred = model.transform(valid_df)
evaluator.evaluate(pred)

0.8384615384615385

In [79]:
gb = GBTClassifier(labelCol='Survived', maxIter=75, maxDepth=3)

model = gb.fit(train_df)
pred = model.transform(valid_df)
evaluator.evaluate(pred)

0.8307692307692308

In [61]:
df2.show(4)

+-----------+------+------+----+------+--------+----------+
|PassengerId|Pclass|   Sex| Age|  Fare|Embarked|FamilySize|
+-----------+------+------+----+------+--------+----------+
|        892|     3|  male|34.5|7.8292|       Q|         0|
|        893|     3|female|47.0|   7.0|       S|         1|
|        894|     2|  male|62.0|9.6875|       Q|         0|
|        895|     3|  male|27.0|8.6625|       S|         0|
+-----------+------+------+----+------+--------+----------+
only showing top 4 rows



In [81]:
for col in df2.columns:
    print(col.ljust(20), df2.filter(df2[col].isNull()).count())

PassengerId          0
Pclass               0
Sex                  0
Age                  0
Fare                 0
Embarked             0
FamilySize           0


In [41]:
df2 = df2.fillna({'Embarked': 'S', 'Fare':14.45})
df2 = df2.withColumn('FamilySize', df2['Parch'] + df2['SibSp']).\
            drop('Parch', 'SibSp')

In [42]:
df2 = df2.withColumn('Title', regexp_extract(df2['Name'],\
                '([A-Za-z]+)\.', 1))

df2 = df2.withColumn('Title', mapping[df2['Title']])

df2.groupBy('Title').agg(count('Age'), mean('Age')).sort('count(Age)').show()

+------+----------+------------------+
| Title|count(Age)|          avg(Age)|
+------+----------+------------------+
|Master|        17| 7.406470588235294|
|   Mrs|        63|38.904761904761905|
|  Miss|        64|21.774843750000002|
|    Mr|       188|32.340425531914896|
+------+----------+------------------+



In [43]:
df2 = age_imputer(df2, 'Mr', 33.02)
df2 = age_imputer(df2, 'Mrs', 35.98)
df2 = age_imputer(df2, 'Miss', 21.86)
df2 = age_imputer(df2, 'Master', 4.75)

df2 = df2.drop('Cabin', 'Name', 'Ticket', 'Title') # keep PassengerId
df2.show(4)

+-----------+------+------+----+------+--------+----------+
|PassengerId|Pclass|   Sex| Age|  Fare|Embarked|FamilySize|
+-----------+------+------+----+------+--------+----------+
|        892|     3|  male|34.5|7.8292|       Q|         0|
|        893|     3|female|47.0|   7.0|       S|         1|
|        894|     2|  male|62.0|9.6875|       Q|         0|
|        895|     3|  male|27.0|8.6625|       S|         0|
+-----------+------+------+----+------+--------+----------+
only showing top 4 rows



In [44]:
for col in df2.columns:
    print(col.ljust(20), df2.filter(df2[col].isNull()).count())

PassengerId          0
Pclass               0
Sex                  0
Age                  0
Fare                 0
Embarked             0
FamilySize           0


In [45]:
pipeline_rf = Pipeline(stages=[stringIndex, vec_asmbl, rf])

paramGrid = ParamGridBuilder().\
            addGrid(rf.maxDepth, [3, 4, 5]).\
            addGrid(rf.minInfoGain, [0., 0.01, 0.1]).\
            addGrid(rf.numTrees, [1000]).\
            build()

selected_model = CrossValidator(estimator=pipeline_rf,
                                estimatorParamMaps=paramGrid,
                                evaluator=evaluator,
                                numFolds=5)

model_final = selected_model.fit(df1)
pred_train = model_final.transform(df1)
evaluator.evaluate(pred_train)

0.8484848484848485

In [46]:
pred_test = model_final.transform(df2)

predictions = pred_test.select('PassengerId', 'prediction')
predictions = predictions.\
                withColumn('Survived', predictions['prediction'].\
                cast('integer')).drop('prediction')
predictions.show(5)

+-----------+--------+
|PassengerId|Survived|
+-----------+--------+
|        892|       0|
|        893|       0|
|        894|       0|
|        895|       0|
|        896|       1|
+-----------+--------+
only showing top 5 rows



In [47]:
# Writing csv file in Spark
predictions.coalesce(1).write.csv('submission_file.csv', header=True)

In [48]:
# Reading the saved file from spark
spark.read.csv('submission_file.csv', header=True).show(4)

+-----------+--------+
|PassengerId|Survived|
+-----------+--------+
|        892|       0|
|        893|       0|
|        894|       0|
|        895|       0|
+-----------+--------+
only showing top 4 rows



In [49]:
# Writing csv file using Pandas
predictions.toPandas().to_csv('submission.csv', index=False)

In [50]:
# Inspecting csv file in pandas
import pandas as pd
pd.read_csv('submission.csv').head()

Unnamed: 0,PassengerId,Survived
0,892,0
1,893,0
2,894,0
3,895,0
4,896,1


In [51]:
model_final.write().save('titanic_classification.model')

In [52]:
! ls titanic_classification.model/*

titanic_classification.model/bestModel:
metadata  stages

titanic_classification.model/estimator:
metadata  stages

titanic_classification.model/evaluator:
metadata

titanic_classification.model/metadata:
part-00000  _SUCCESS
