Importing Libraries
       

In [42]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O 

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

import os

Creating a spark session

In [43]:
spark = SparkSession.builder.getOrCreate()
spark

Reading the data and printing the schema inorder to check the data types of the attributes.

In [44]:
#train_data=pd.read_csv('/Users/Likith/Downloads/t_train.csv' )
sdf_train=spark.read.csv('/Users/Likith/Downloads/t_train.csv' ,inferSchema=True,header=True)
print(sdf_train.printSchema())
pdf = sdf_train.limit(5).toPandas()
pdf.T

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

None


Unnamed: 0,0,1,2,3,4
PassengerId,1,2,3,4,5
Survived,0,1,1,1,0
Pclass,3,1,3,1,3
Name,"Braund, Mr. Owen Harris","Cumings, Mrs. John Bradley (Florence Briggs Th...","Heikkinen, Miss. Laina","Futrelle, Mrs. Jacques Heath (Lily May Peel)","Allen, Mr. William Henry"
Sex,male,female,female,female,male
Age,22,38,26,35,35
SibSp,1,1,0,1,0
Parch,0,0,0,0,0
Ticket,A/5 21171,PC 17599,STON/O2. 3101282,113803,373450
Fare,7.25,71.2833,7.925,53.1,8.05


# Data Cleaning

In [46]:

sdf_typecast = sdf_train.withColumn('Ticket', sdf_train['Ticket'].cast("double"))
sdf_typecast = sdf_typecast.fillna(0)
# pdf = sdf_typecast.limit(5).toPandas()
# pdf.T

# Feature Engineering and Transformation

In [47]:

numeric_cols = ['PassengerId','Survived', 'Pclass','Age', 'SibSp','Parch','Ticket','Fare'] 
numeric_features = ['Pclass','Age', 'SibSp','Parch','Fare'] 
# string_features = [ 'Cabin', 'Embarked', 'Sex','Ticket']
# 'Name',
sdf_train_subset = sdf_typecast #.select(numeric_cols)    

In [48]:
_stages = []

In [49]:
from pyspark.ml.feature import VectorAssembler
assemblerInput = numeric_features # [f + '_vect' for f in string_features] + 
print(assemblerInput)
vectAssembler = VectorAssembler(inputCols  = assemblerInput, outputCol = "vect_features") #.fit(sdf_train_subset)  
_stages += [vectAssembler]
# handleInvalid = "keep" or "skip"

['Pclass', 'Age', 'SibSp', 'Parch', 'Fare']


In [50]:
#ML model
from pyspark.ml.classification import DecisionTreeClassifier

# dt = DecisionTreeClassifier(labelCol = 'Survived', featuresCol = 'vect_features') # ,maxDepth=1
# _stages += [dt]

In [51]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol = 'Survived', featuresCol = 'vect_features', numTrees = 100, maxDepth = 4)
_stages += [rf]

In [52]:
_stages

[VectorAssembler_aaf4e6753734, RandomForestClassifier_769d1d61dcba]

# Pipelining

In [53]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = _stages)

In [54]:
model = pipeline.fit(sdf_train_subset)

In [55]:
numeric_cols_test = ['PassengerId', 'Pclass','Age', 'Sex', 'SibSp','Parch','Ticket','Fare'] 

sdf_test_subset = sdf_test.withColumn('Ticket', sdf_test['Ticket'].cast("double")). \
                        fillna(0). \
                        select(numeric_cols_test)

In [56]:
sdf_predict = model.transform(sdf_test_subset)

In [57]:
pdf = sdf_predict.limit(10).toPandas()
pdf.T

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
PassengerId,892,893,894,895,896,897,898,899,900,901
Pclass,3,3,2,3,3,3,3,2,3,3
Age,34.5,47,62,27,22,14,30,26,18,21
Sex,male,female,male,male,female,male,female,male,female,male
SibSp,0,1,0,0,1,0,0,1,0,2
Parch,0,0,0,0,1,0,0,1,0,0
Ticket,330911,363272,240276,315154,3.1013e+06,7538,330972,248738,2657,0
Fare,7.8292,7,9.6875,8.6625,12.2875,9.225,7.6292,29,7.2292,24.15
vect_features,"[3.0, 34.5, 0.0, 0.0, 7.8292]","[3.0, 47.0, 1.0, 0.0, 7.0]","[2.0, 62.0, 0.0, 0.0, 9.6875]","[3.0, 27.0, 0.0, 0.0, 8.6625]","[3.0, 22.0, 1.0, 1.0, 12.2875]","[3.0, 14.0, 0.0, 0.0, 9.225]","[3.0, 30.0, 0.0, 0.0, 7.6292]","[2.0, 26.0, 1.0, 1.0, 29.0]","[3.0, 18.0, 0.0, 0.0, 7.2292]","[3.0, 21.0, 2.0, 0.0, 24.15]"
rawPrediction,"[82.0847023125501, 17.915297687449886]","[84.2343016652984, 15.765698334701614]","[67.99845286487059, 32.0015471351294]","[79.00334603781411, 20.99665396218588]","[55.71337873142116, 44.28662126857885]","[76.09651351482813, 23.903486485171875]","[79.04237285733028, 20.957627142669704]","[34.73938987182887, 65.26061012817108]","[77.73235704856025, 22.267642951439736]","[70.84566825991936, 29.15433174008062]"


# Performance Evaluation

In [58]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(labelCol="prediction")
print("Test Area Under ROC: " + str(evaluator.evaluate(sdf_predict, {evaluator.metricName: "areaUnderROC"})))

Test Area Under ROC: 1.0


In [60]:
sdf_submission = sdf_predict.select('PassengerId','Sex','prediction').withColumn('Survived',sdf_predict['prediction'].cast('integer')).select('PassengerId','Survived', 'Sex')
sdf_submission.show()

+-----------+--------+------+
|PassengerId|Survived|   Sex|
+-----------+--------+------+
|        892|       0|  male|
|        893|       0|female|
|        894|       0|  male|
|        895|       0|  male|
|        896|       0|female|
|        897|       0|  male|
|        898|       0|female|
|        899|       1|  male|
|        900|       0|female|
|        901|       0|  male|
|        902|       0|  male|
|        903|       0|  male|
|        904|       1|female|
|        905|       0|  male|
|        906|       1|female|
|        907|       1|female|
|        908|       0|  male|
|        909|       0|  male|
|        910|       0|female|
|        911|       0|female|
+-----------+--------+------+
only showing top 20 rows

