In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.sql.functions import col, when
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [4]:
spark = SparkSession.builder.getOrCreate()

cols_to_scale = ['Age','Fare']
cols_to_keep = ['Sex','Pclass']

df = spark.read.csv('../data/titanic.csv', header=True, inferSchema=True)
df = df[['Survived', 'Pclass', 'Sex', 'Age','Fare']]
df = df.dropna()
df = df.filter(df.Age != 0)
print(df.count())
# Expected: 787369

787369


In [8]:
df = df.withColumn("Sex", when(col("Sex") == "male", 0).otherwise(1))

assembler1 = VectorAssembler(
    inputCols=cols_to_scale,
    outputCol="features_to_scale")

assembler2 = VectorAssembler(
    inputCols=cols_to_keep,
    outputCol="unscaled"
)

scaler = MinMaxScaler(inputCol="features_to_scale", outputCol="scaled")

assembler3 = VectorAssembler(
    inputCols=["scaled", "unscaled"],
    outputCol="features"
)

pipeline = Pipeline(stages=[assembler1, assembler2, scaler, assembler3])
pipeline_model = pipeline.fit(df)
scaled_df = pipeline_model.transform(df)
scaled_df = scaled_df.select('Survived', 'features')

scaled_df.show(10, truncate=False)


+--------+--------------------------------------------------+
|Survived|features                                          |
+--------+--------------------------------------------------+
|0       |[0.24778761061946902,0.015904754758092696,1.0,3.0]|
|0       |[0.16814159292035397,0.01877004833956327,1.0,3.0] |
|0       |[0.23008849557522124,0.02095458141404183,1.0,3.0] |
|0       |[0.2743362831858407,0.0075532738640296014,1.0,3.0]|
|1       |[0.23893805309734512,0.04361173759832368,1.0,2.0] |
|0       |[0.2743362831858407,0.0,1.0,2.0]                  |
|1       |[0.19469026548672566,0.032490370040029116,1.0,3.0]|
|1       |[0.45132743362831856,0.05748836566650268,1.0,1.0] |
|0       |[0.39823008849557523,0.0,1.0,3.0]                 |
|0       |[0.24778761061946902,0.019152017872940935,1.0,3.0]|
+--------+--------------------------------------------------+
only showing top 10 rows



In [10]:
train, test = scaled_df.randomSplit([0.8, 0.2])

model = DecisionTreeClassifier(labelCol='Survived').fit(train)


+--------+----------------------------------+------------------+---------------------------------------+----------+
|Survived|features                          |rawPrediction     |probability                            |prediction|
+--------+----------------------------------+------------------+---------------------------------------+----------+
|0       |[0.0,0.008049327506103945,1.0,3.0]|[164878.0,60305.0]|[0.7321955920295937,0.2678044079704063]|0.0       |
|0       |[0.0,0.008180817014993144,1.0,3.0]|[164878.0,60305.0]|[0.7321955920295937,0.2678044079704063]|0.0       |
|0       |[0.0,0.01012364264862264,1.0,3.0] |[164878.0,60305.0]|[0.7321955920295937,0.2678044079704063]|0.0       |
|0       |[0.0,0.011354417905189346,1.0,3.0]|[164878.0,60305.0]|[0.7321955920295937,0.2678044079704063]|0.0       |
|0       |[0.0,0.011652218062277137,1.0,3.0]|[164878.0,60305.0]|[0.7321955920295937,0.2678044079704063]|0.0       |
|0       |[0.0,0.01359878901808654,1.0,3.0] |[164878.0,60305.0]|[0.73219

In [None]:
predictions = model.transform(test)
predictions.show(10, truncate=False)

In [11]:
accuracy = MulticlassClassificationEvaluator(labelCol="Survived", metricName="accuracy").evaluate(predictions)
precision = MulticlassClassificationEvaluator(labelCol="Survived", metricName="weightedPrecision").evaluate(predictions)

print("Accuracy: {}, Precision: {}".format(accuracy, precision))

Accuracy: 0.7052136616362192, Precision: 0.6975178545056624
