In [17]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.dummy import DummyClassifier
from sklearn.linear_model import LogisticRegression
import mlflow
import mlflow.sklearn
from mlflow.models import infer_signature

In [18]:
titanic_df = pd.read_csv('Titanic-Dataset.csv', usecols=['Pclass', 'Survived', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked'])

In [19]:
titanic_df

Unnamed: 0,Survived,Pclass,Sex,Age,SibSp,Parch,Fare,Embarked
0,0,3,male,22.0,1,0,7.2500,S
1,1,1,female,38.0,1,0,71.2833,C
2,1,3,female,26.0,0,0,7.9250,S
3,1,1,female,35.0,1,0,53.1000,S
4,0,3,male,35.0,0,0,8.0500,S
...,...,...,...,...,...,...,...,...
886,0,2,male,27.0,0,0,13.0000,S
887,1,1,female,19.0,0,0,30.0000,S
888,0,3,female,,1,2,23.4500,S
889,1,1,male,26.0,0,0,30.0000,C


In [20]:
titanic_df['Sex'] = pd.factorize(titanic_df['Sex'])[0]

In [21]:
titanic_df['Embarked'] = pd.factorize(titanic_df['Embarked'])[0]

In [22]:
titanic_df = titanic_df.dropna()

In [23]:
x = titanic_df.drop('Survived', axis = 1)

In [24]:
y = titanic_df['Survived']

In [25]:
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3)
x_test, y_test, x_val, y_val = train_test_split(x_test, y_test, test_size=0.5)

In [26]:
mdl = LogisticRegression(max_iter=500)

In [27]:
mdl.fit(x_train, y_train)

In [28]:
mdl.predict(x_test)

array([0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0,
       0, 0, 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 1, 0, 0, 1, 1, 0, 0,
       1, 0, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1,
       1, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 1,
       0, 0, 1, 1, 1, 0, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 0, 1, 1])

In [30]:
mlflow.set_tracking_uri("http://127.0.0.1:5001")

In [31]:
with mlflow.start_run(run_name="my_model_run"):
    # Log model parameters (optional but recommended)
    mlflow.log_param("penalty", mdl.penalty)
    mlflow.log_param("C", mdl.C)
    mlflow.sklearn.log_model(mdl, "mdl")



🏃 View run my_model_run at: http://127.0.0.1:5001/#/experiments/0/runs/b195e03545974e09a584facadaafc64f
🧪 View experiment at: http://127.0.0.1:5001/#/experiments/0


In [None]:
result = mlflow.register_model(
    "runs:/d16076a3ec534311817565e6527539c0/sklearn-model", "mdl"
)

In [6]:
sex_index = StringIndexer(inputCol='Sex', outputCol="Sex_index")
embarked_index = StringIndexer(inputCol='Embarked', outputCol="Embarked_index")

In [7]:
titanic_df = sex_index.fit(titanic_df).transform(titanic_df)

                                                                                

In [8]:
titanic_df = embarked_index.fit(titanic_df).transform(titanic_df)

In [9]:
titanic_df.show()

+--------+------+------+----+-----+-----+-------+--------+-----------+-----+---------+--------------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Family_Size|Alone|Sex_index|Embarked_index|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+---------+--------------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|          1|    0|      0.0|           0.0|
|       1|     1|female|38.0|    1|    0|71.2833|       C|          1|    0|      1.0|           1.0|
|       1|     3|female|26.0|    0|    0|  7.925|       S|          0|    1|      1.0|           0.0|
|       1|     1|female|35.0|    1|    0|   53.1|       S|          1|    0|      1.0|           0.0|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|          0|    1|      0.0|           0.0|
|       0|     3|  male|30.0|    0|    0| 8.4583|       Q|          0|    1|      0.0|           2.0|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|          0|    1|      

In [10]:
features = ['Pclass', 'Age', 'SibSp', 'SibSp', 'Parch', 'Fare', 'Alone', 'Sex_index', 'Embarked_index']

In [11]:
feature = VectorAssembler(inputCols=features, outputCol="features")
feature_vector= feature.transform(titanic_df)

In [12]:
feature_vector.show(5)

+--------+------+------+----+-----+-----+-------+--------+-----------+-----+---------+--------------+--------------------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Family_Size|Alone|Sex_index|Embarked_index|            features|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+---------+--------------+--------------------+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|          1|    0|      0.0|           0.0|[3.0,22.0,1.0,1.0...|
|       1|     1|female|38.0|    1|    0|71.2833|       C|          1|    0|      1.0|           1.0|[1.0,38.0,1.0,1.0...|
|       1|     3|female|26.0|    0|    0|  7.925|       S|          0|    1|      1.0|           0.0|[3.0,26.0,0.0,0.0...|
|       1|     1|female|35.0|    1|    0|   53.1|       S|          1|    0|      1.0|           0.0|[1.0,35.0,1.0,1.0...|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|          0|    1|      0.0|           0.0|(9,[0,1,5,6],[3.0...|
+--------+------

In [13]:
(training_data, test_data) = feature_vector.randomSplit([0.8, 0.2],seed = 42)

In [14]:
training_data.show(5)

+--------+------+------+----+-----+-----+------+--------+-----------+-----+---------+--------------+--------------------+
|Survived|Pclass|   Sex| Age|SibSp|Parch|  Fare|Embarked|Family_Size|Alone|Sex_index|Embarked_index|            features|
+--------+------+------+----+-----+-----+------+--------+-----------+-----+---------+--------------+--------------------+
|       0|     1|female| 2.0|    1|    2|151.55|       S|          3|    0|      1.0|           0.0|[1.0,2.0,1.0,1.0,...|
|       0|     1|female|25.0|    1|    2|151.55|       S|          3|    0|      1.0|           0.0|[1.0,25.0,1.0,1.0...|
|       0|     1|  male|18.0|    1|    0| 108.9|       C|          1|    0|      0.0|           1.0|[1.0,18.0,1.0,1.0...|
|       0|     1|  male|19.0|    1|    0|  53.1|       S|          1|    0|      0.0|           0.0|[1.0,19.0,1.0,1.0...|
|       0|     1|  male|19.0|    3|    2| 263.0|       S|          5|    0|      0.0|           0.0|[1.0,19.0,3.0,3.0...|
+--------+------+------+

# ML models

# LogisticRegression

In [15]:
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

In [16]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Survived", featuresCol="features")

lrModel = lr.fit(training_data)
lr_prediction = lrModel.transform(test_data)
lr_prediction.select("prediction", "Survived", "features").show(5)

21/09/26 09:42:58 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/09/26 09:42:58 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS


+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[1.0,50.0,0.0,0.0...|
|       1.0|       0|(9,[0,1,4,5],[1.0...|
|       1.0|       0|[1.0,24.0,0.0,0.0...|
|       0.0|       0|(9,[0,1,5,6],[1.0...|
|       0.0|       0|(9,[0,1,5,6],[1.0...|
+----------+--------+--------------------+
only showing top 5 rows



In [17]:
lr_accuracy = evaluator.evaluate(lr_prediction)
print("LogisticRegression [Accuracy] = %g"% (lr_accuracy))
print("LogisticRegression [Error] = %g " % (1.0 - lr_accuracy))

LogisticRegression [Accuracy] = 0.813793
LogisticRegression [Error] = 0.186207 


# DecisionTreeClassifier

In [18]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol="Survived", featuresCol="features")
dt_model = dt.fit(training_data)
dt_prediction = dt_model.transform(test_data)

dt_prediction.select("prediction", "Survived", "features").show(5)

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[1.0,50.0,0.0,0.0...|
|       0.0|       0|(9,[0,1,4,5],[1.0...|
|       0.0|       0|[1.0,24.0,0.0,0.0...|
|       0.0|       0|(9,[0,1,5,6],[1.0...|
|       0.0|       0|(9,[0,1,5,6],[1.0...|
+----------+--------+--------------------+
only showing top 5 rows



In [19]:
dt_accuracy = evaluator.evaluate(dt_prediction)
print("DecisionTreeClassifier [Accuracy] = %g"% (dt_accuracy))
print("DecisionTreeClassifier [Error] = %g " % (1.0 - dt_accuracy))

DecisionTreeClassifier [Accuracy] = 0.82069
DecisionTreeClassifier [Error] = 0.17931 


# RandomForestClassifier

In [20]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features")
rf_model = rf.fit(training_data)
rf_prediction = rf_model.transform(test_data)
rf_prediction.select("prediction", "Survived", "features").show(5)

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[1.0,50.0,0.0,0.0...|
|       0.0|       0|(9,[0,1,4,5],[1.0...|
|       0.0|       0|[1.0,24.0,0.0,0.0...|
|       0.0|       0|(9,[0,1,5,6],[1.0...|
|       0.0|       0|(9,[0,1,5,6],[1.0...|
+----------+--------+--------------------+
only showing top 5 rows



In [21]:
rf_accuracy = evaluator.evaluate(rf_prediction)
print("RandomForestClassifier [Accuracy] = %g"% (rf_accuracy))
print("RandomForestClassifier [Error] = %g" % (1.0 - rf_accuracy))

RandomForestClassifier [Accuracy] = 0.827586
RandomForestClassifier [Error] = 0.172414


# Gradient-boosted tree classifier

In [22]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="Survived", featuresCol="features",maxIter=10)
gbt_model = gbt.fit(training_data)
gbt_prediction = gbt_model.transform(test_data)
gbt_prediction.select("prediction", "Survived", "features").show(5)

+----------+--------+--------------------+
|prediction|Survived|            features|
+----------+--------+--------------------+
|       1.0|       0|[1.0,50.0,0.0,0.0...|
|       0.0|       0|(9,[0,1,4,5],[1.0...|
|       0.0|       0|[1.0,24.0,0.0,0.0...|
|       1.0|       0|(9,[0,1,5,6],[1.0...|
|       1.0|       0|(9,[0,1,5,6],[1.0...|
+----------+--------+--------------------+
only showing top 5 rows



In [23]:
gbt_accuracy = evaluator.evaluate(gbt_prediction)
print("Gradient-boosted [Accuracy] = %g"% (gbt_accuracy))
print("Gradient-boosted [Error] = %g"% (1.0 - gbt_accuracy))

Gradient-boosted [Accuracy] = 0.841379
Gradient-boosted [Error] = 0.158621


# Save & Load Model

In [24]:
rf_model.write().overwrite().save('rf_model')

In [25]:
from pyspark.ml.classification import RandomForestClassificationModel
type(RandomForestClassificationModel.load('rf_model'))

pyspark.ml.classification.RandomForestClassificationModel

# Pipeline

In [26]:
from pyspark.ml.pipeline import PipelineModel

In [27]:
titanic_df = spark.read.parquet('train.parquet')

In [28]:
titanic_df.show()

+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Family_Size|Alone|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
|       0|     3|  male|22.0|    1|    0|   7.25|       S|          1|    0|
|       1|     1|female|38.0|    1|    0|71.2833|       C|          1|    0|
|       1|     3|female|26.0|    0|    0|  7.925|       S|          0|    1|
|       1|     1|female|35.0|    1|    0|   53.1|       S|          1|    0|
|       0|     3|  male|35.0|    0|    0|   8.05|       S|          0|    1|
|       0|     3|  male|30.0|    0|    0| 8.4583|       Q|          0|    1|
|       0|     1|  male|54.0|    0|    0|51.8625|       S|          0|    1|
|       0|     3|  male| 2.0|    3|    1| 21.075|       S|          4|    0|
|       1|     3|female|27.0|    0|    2|11.1333|       S|          2|    0|
|       1|     2|female|14.0|    1|    0|30.0708|       C|          1|    0|

In [29]:
train, test = titanic_df.randomSplit([0.8, 0.2])

In [30]:
train.show(5)

+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
|Survived|Pclass|   Sex| Age|SibSp|Parch|   Fare|Embarked|Family_Size|Alone|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
|       0|     1|female| 2.0|    1|    2| 151.55|       S|          3|    0|
|       0|     1|female|25.0|    1|    2| 151.55|       S|          3|    0|
|       0|     1|female|50.0|    0|    0|28.7125|       C|          0|    1|
|       0|     1|  male|18.0|    1|    0|  108.9|       C|          1|    0|
|       0|     1|  male|19.0|    1|    0|   53.1|       S|          1|    0|
+--------+------+------+----+-----+-----+-------+--------+-----------+-----+
only showing top 5 rows



In [31]:
indexer_sex = StringIndexer(inputCol="Sex", outputCol="Sex_index")

In [32]:
indexer_embarked = StringIndexer(inputCol="Embarked", outputCol="Embarked_index")

In [33]:
feature = VectorAssembler(
    inputCols=["Pclass","Age","SibSp","Parch","Fare","Family_Size","Embarked_index","Sex_index"],
    outputCol="features")


In [34]:
rf_classifier = RandomForestClassifier(labelCol="Survived", featuresCol="features")

In [35]:
pipeline = Pipeline(stages=[indexer_sex, indexer_embarked, feature, rf_classifier])

In [36]:
p_model = pipeline.fit(train)

In [37]:
type(p_model)

pyspark.ml.pipeline.PipelineModel

In [38]:
p_model.write().overwrite().save('p_model')

In [39]:
model = PipelineModel.load('p_model')

In [40]:
prediction = p_model.transform(test)

In [41]:
test.show(5)

+--------+------+----+----+-----+-----+--------+--------+-----------+-----+
|Survived|Pclass| Sex| Age|SibSp|Parch|    Fare|Embarked|Family_Size|Alone|
+--------+------+----+----+-----+-----+--------+--------+-----------+-----+
|       0|     1|male|24.0|    0|    0|    79.2|       C|          0|    1|
|       0|     1|male|24.0|    0|    1|247.5208|       C|          1|    0|
|       0|     1|male|28.0|    0|    0|    47.1|       S|          0|    1|
|       0|     1|male|28.0|    1|    0| 82.1708|       C|          1|    0|
|       0|     1|male|30.0|    0|    0|     0.0|       S|          0|    1|
+--------+------+----+----+-----+-----+--------+--------+-----------+-----+
only showing top 5 rows



In [42]:
prediction.select(["Pclass","Age","SibSp","Parch","Fare","Family_Size","Embarked_index","Sex_index","prediction"]).show(5)

+------+----+-----+-----+--------+-----------+--------------+---------+----------+
|Pclass| Age|SibSp|Parch|    Fare|Family_Size|Embarked_index|Sex_index|prediction|
+------+----+-----+-----+--------+-----------+--------------+---------+----------+
|     1|24.0|    0|    0|    79.2|          0|           1.0|      0.0|       0.0|
|     1|24.0|    0|    1|247.5208|          1|           1.0|      0.0|       0.0|
|     1|28.0|    0|    0|    47.1|          0|           0.0|      0.0|       0.0|
|     1|28.0|    1|    0| 82.1708|          1|           1.0|      0.0|       0.0|
|     1|30.0|    0|    0|     0.0|          0|           0.0|      0.0|       0.0|
+------+----+-----+-----+--------+-----------+--------------+---------+----------+
only showing top 5 rows



In [43]:
prediction.printSchema()

root
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Family_Size: integer (nullable = true)
 |-- Alone: integer (nullable = true)
 |-- Sex_index: double (nullable = false)
 |-- Embarked_index: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [44]:
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

In [45]:
p_accuracy = evaluator.evaluate(prediction)
print("Pipeline model [Accuracy] = %g"% (p_accuracy))
print("Pipeline model [Error] = %g " % (1.0 - p_accuracy))

Pipeline model [Accuracy] = 0.834254
Pipeline model [Error] = 0.165746 


# Hyperparameter tuning

In [49]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

In [50]:
paramGrid = ParamGridBuilder()\
                  .addGrid(rf_classifier.maxDepth, [2, 3, 4])\
                  .addGrid(rf_classifier.maxBins, [4, 5, 6])\
                  .addGrid(rf_classifier.minInfoGain, [0.05, 0.1, 0.15])\
                  .build()

In [51]:
 tvs = TrainValidationSplit(estimator=pipeline,
                            estimatorParamMaps=paramGrid,
                            evaluator=evaluator,
                            trainRatio=0.8)

In [52]:
model = tvs.fit(train)

In [53]:
type(model)

pyspark.ml.tuning.TrainValidationSplitModel

In [54]:
model.bestModel

PipelineModel_fc494b8277eb

In [55]:
type(model.bestModel)

pyspark.ml.pipeline.PipelineModel

In [56]:
jo = model.bestModel.stages[-1]._java_obj
print('Max Depth: {}'.format(jo.getMaxDepth()))
print('Num Trees: {}'.format(jo.getMaxBins()))
print('Impurity: {}'.format(jo.getMinInfoGain()))

Max Depth: 2
Num Trees: 4
Impurity: 0.1
