### In this project, I develop a model to predict whether a passenger will survive the shipwreck or not. The`titanic_dataset_csci4521.csv` consists of the following features:
- ### Passenger ID,
- ### Ticket class (1 = first class, 2 = second class, 3 = third class),
- ### Passenger name,
- ### Sex,
- ### Age,
- ### Number of siblings or spouses aboard,
- ### Number of parents or children aboard,
- ### Ticket number,
- ### Fare,
- ### Cabin number, and
- ### Port of Embarkation (C = Cherbourg, Q = Queenstown, S = Southampton)
### and label:
- ### Survived ($y_i=1$) or
- ### Not survived ($y_i = 0$).

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [29]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from pyspark.ml.feature import StringIndexer, VectorAssembler, ChiSqSelector, StandardScaler
from pyspark.ml.classification import RandomForestClassifier, LinearSVC, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, TrainValidationSplit

In [30]:
spark = SparkSession.builder.getOrCreate()
df = spark.read.csv('./drive/MyDrive/titanic_dataset_csci4521.csv', header=True)
df.show()
print("Number of samples: ", df.count())
print("Number of features: ", len(df.columns))

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|  22|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|  38|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|  26|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|  35|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|  35|    0|    0|          373450|   8.05| NULL|       S|
|          6|       0|     3|    Moran, Mr. James|  male|NULL|    0|    0|      

In [31]:
df.describe().show()
df.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-------+------------------+-------------------+------------------+--------------------+------+-----------------+------------------+-----------------+------------------+-----------------+-----+--------+
|summary|       PassengerId|           Survived|            Pclass|                Name|   Sex|              Age|             SibSp|            Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+------------------+-------------------+------------------+--------------------+------+-----------------+------------------+-----------------+------------------+-----------------+-----+--------+
|  count|               894|                894|               883|                 890|   889|              716|               893|              894|               894|              894|  204|     877|
|   mean| 445.6308724832215|  0.383668903803132|2.3080407701019254|                NULL|  NULL|30.48627094972067|0.8499440089585666|2.615212527964206| 260140.0888554217|32.18915838926171| 

In [32]:
df = df.withColumn("Age", F.abs(F.col("Age")))
df = df.withColumn("SibSP", F.abs(F.col("SibSP")))
df.describe().show()

+-------+------------------+-------------------+------------------+--------------------+------+------------------+------------------+-----------------+------------------+-----------------+-----+--------+
|summary|       PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSP|            Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+------------------+-------------------+------------------+--------------------+------+------------------+------------------+-----------------+------------------+-----------------+-----+--------+
|  count|               894|                894|               883|                 890|   889|               716|               893|              894|               894|              894|  204|     877|
|   mean| 445.6308724832215|  0.383668903803132|2.3080407701019254|                NULL|  NULL|30.542136871508376|0.8566629339305711|2.615212527964206| 260140.0888554217|32.18915838926

In [33]:
def replace_outliers_with_median(df, column_name, lower_bound, upper_bound):
  median_value = df.select(F.median(F.col(column_name))).collect()[0][0]
  return df.withColumn(column_name, F.when((F.col(column_name) < lower_bound) | (F.col(column_name) > upper_bound), median_value)
        .otherwise(F.col(column_name)))

df = replace_outliers_with_median(df, "Age", 0, 100)
df = replace_outliers_with_median(df, "SibSp", 0, 10)
df = replace_outliers_with_median(df, "Parch", 0, 10)
df.describe().show()

+-------+------------------+-------------------+------------------+--------------------+------+------------------+------------------+-----------------+------------------+-----------------+-----+--------+
|summary|       PassengerId|           Survived|            Pclass|                Name|   Sex|               Age|             SibSp|            Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+------------------+-------------------+------------------+--------------------+------+------------------+------------------+-----------------+------------------+-----------------+-----+--------+
|  count|               894|                894|               883|                 890|   889|               716|               893|              894|               894|              894|  204|     877|
|   mean| 445.6308724832215|  0.383668903803132|2.3080407701019254|                NULL|  NULL|29.642695530726254|0.5207166853303471|0.378076062639821| 260140.0888554217|32.18915838926

In [34]:
pclass_mode = df.select(F.mode(F.col("Pclass")).alias("mode")).collect()[0]["mode"]
sex_mode = df.select(F.mode(F.col("Sex")).alias("mode")).collect()[0]["mode"]
embarked_mode = df.select(F.mode(F.col("Embarked")).alias("mode")).collect()[0]["mode"]

df = df.fillna({
    "Pclass": pclass_mode,
    "Name": "Unknown",
    "Sex": sex_mode,
    "SibSp": 0,
    "Embarked": embarked_mode
})

avg_age = df.groupBy("Pclass", "Sex").agg(F.avg("Age").alias("avg_age"))
df = df.join(avg_age, ["Pclass", "Sex"], "left")
df = df.withColumn("Age",
    F.when(F.col("Age").isNull(), F.col("avg_age")).otherwise(F.col("Age"))
)


df = df.withColumn("cabin_bool", F.when(F.col("Cabin").isNull(), 0).otherwise(1))
df = df.withColumn("total_family_members", F.col("SibSp") + F.col("Parch") + 1)

df = df.drop("avg_age", "Cabin")

df.show(20)

+------+------+-----------+--------+--------------------+-----------------+-----+-----+----------------+-------+--------+----------+--------------------+
|Pclass|   Sex|PassengerId|Survived|                Name|              Age|SibSp|Parch|          Ticket|   Fare|Embarked|cabin_bool|total_family_members|
+------+------+-----------+--------+--------------------+-----------------+-----+-----+----------------+-------+--------+----------+--------------------+
|     3|  male|          1|       0|Braund, Mr. Owen ...|             22.0|  1.0|    0|       A/5 21171|   7.25|       S|         0|                 2.0|
|     1|female|          2|       1|Cumings, Mrs. Joh...|             38.0|  1.0|    0|        PC 17599|71.2833|       C|         1|                 2.0|
|     3|female|          3|       1|Heikkinen, Miss. ...|             26.0|  0.0|    0|STON/O2. 3101282|  7.925|       S|         0|                 1.0|
|     1|female|          4|       1|Futrelle, Mrs. Ja...|             35.0| 

In [35]:
from pyspark.sql.types import StringType, IntegerType, DoubleType, ArrayType

In [36]:
df_nodup = df.dropDuplicates()
dropped_rows = df.exceptAll(df_nodup)
dropped_ids = dropped_rows.select("PassengerId").distinct()
df = df_nodup

dropped_ids.show(truncate=False)

+-----------+
|PassengerId|
+-----------+
|334        |
|164        |
|510        |
+-----------+



In [37]:
get_title = F.udf(lambda name: name.split(',')[1].split('.')[0].strip() if name != "Unknown" else "Unknown", StringType())
df = df.withColumn("Title", get_title(df.Name))
mode_titles = df.filter(F.col("Title") != "Unknown").groupBy("Pclass", "Sex").agg(F.mode("Title").alias("ModeTitle"))

mode_titles.show()

df = df.join(mode_titles, ["Pclass", "Sex"], "left").withColumn("Title", F.when(F.col("Title") == "Unknown", F.col("ModeTitle")).otherwise(F.col("Title"))).drop("ModeTitle")
df = df.drop("Name", "Ticket", "PassengerId")
df.show()

+------+------+---------+
|Pclass|   Sex|ModeTitle|
+------+------+---------+
|     2|  male|       Mr|
|     2|female|      Mrs|
|     3|  male|       Mr|
|     1|  male|       Mr|
|     1|female|     Miss|
|     3|female|     Miss|
+------+------+---------+

+------+------+--------+-----------------+-----+-----+--------+--------+----------+--------------------+------+
|Pclass|   Sex|Survived|              Age|SibSp|Parch|    Fare|Embarked|cabin_bool|total_family_members| Title|
+------+------+--------+-----------------+-----+-----+--------+--------+----------+--------------------+------+
|     2|  male|       0|             59.0|  0.0|    0|    13.5|       S|         0|                 1.0|    Mr|
|     2|female|       1|28.71917808219178|  0.0|    0|   12.35|       Q|         1|                 1.0|  Miss|
|     3|female|       1|            21.75|  0.0|    0|  7.8792|       Q|         0|                 1.0|  Miss|
|     1|  male|       1|             49.0|  1.0|    0| 89.1042|    

---

In [38]:
df = df.withColumn("Sex", F.when(F.col("Sex") == "male", 0).otherwise(1))
df = df.withColumn("Embarked", F.when(F.col("Embarked") == "S", 0)
                              .when(F.col("Embarked") == "C", 1)
                              .when(F.col("Embarked") == "Q", 2)
                              .otherwise(F.col("Embarked").astype(IntegerType())))
indexer = StringIndexer(inputCol="Title", outputCol="Title_encoded")
df = indexer.fit(df).transform(df)
df.select("Sex", "Embarked", "Title").show(5)
df = df.drop("Title")

+---+--------+-----+
|Sex|Embarked|Title|
+---+--------+-----+
|  0|       0|   Mr|
|  1|       2| Miss|
|  1|       2| Miss|
|  0|       1|   Mr|
|  0|       0|   Mr|
+---+--------+-----+
only showing top 5 rows



In [39]:
df = df.withColumn("Pclass", F.col("Pclass").cast(IntegerType())) \
       .withColumn("Parch", F.col("Parch").cast(IntegerType())) \
       .withColumn("Fare", F.col("Fare").cast(DoubleType())).withColumn("Survived", F.col("Survived").cast(IntegerType()))

df.printSchema()

root
 |-- Pclass: integer (nullable = true)
 |-- Sex: integer (nullable = false)
 |-- Survived: integer (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: double (nullable = false)
 |-- Parch: integer (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: integer (nullable = true)
 |-- cabin_bool: integer (nullable = false)
 |-- total_family_members: double (nullable = true)
 |-- Title_encoded: double (nullable = false)



In [40]:
df.show(10)

+------+---+--------+-----------------+-----+-----+-------+--------+----------+--------------------+-------------+
|Pclass|Sex|Survived|              Age|SibSp|Parch|   Fare|Embarked|cabin_bool|total_family_members|Title_encoded|
+------+---+--------+-----------------+-----+-----+-------+--------+----------+--------------------+-------------+
|     2|  0|       0|             59.0|  0.0|    0|   13.5|       0|         0|                 1.0|          0.0|
|     2|  1|       1|28.71917808219178|  0.0|    0|  12.35|       2|         1|                 1.0|          1.0|
|     3|  1|       1|            21.75|  0.0|    0| 7.8792|       2|         0|                 1.0|          1.0|
|     1|  0|       1|             49.0|  1.0|    0|89.1042|       1|         1|                 2.0|          0.0|
|     2|  0|       0|             48.0|  0.0|    0|   13.0|       0|         0|                 1.0|          0.0|
|     2|  0|       0|             54.0|  0.0|    0|   26.0|       0|         0| 

In [41]:
feature_cols = ["Pclass", "Sex", "Age", "SibSp", "Parch", "Fare", "Embarked", "cabin_bool", "total_family_members", "Title_encoded"]
target_col = "Survived"

features_to_scale = ["Age", "SibSp", "Parch", "Fare", "total_family_members"]

# Step 1: Assemble features into a vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_assembled = assembler.transform(df)

# Step 2: Select features using ChiSqSelector
selector = ChiSqSelector(numTopFeatures=10, featuresCol="features", outputCol="selected_features", labelCol=target_col)
selector_model = selector.fit(df_assembled)
df_selected = selector_model.transform(df_assembled)

selected_indices = selector_model.selectedFeatures
selected_features = [feature_cols[i] for i in selected_indices]
print("Selected features:", selected_features)

# Step 3: Scale only the specified features
scaler = StandardScaler(inputCol="selected_features", outputCol="scaled_features")
scaler_model = scaler.fit(df_selected)
df_scaled = scaler_model.transform(df_selected)

def extract_scaled_features(vector):
    return vector.toArray().tolist()

extract_udf = F.udf(extract_scaled_features, ArrayType(DoubleType()))

df_final = df_scaled.withColumn("scaled_array", extract_udf("scaled_features"))

# Step 4: Replace original columns with scaled values only for features_to_scale
for i, col_name in enumerate(selected_features):
    if col_name in features_to_scale:
        df_final = df_final.withColumn(col_name, F.col("scaled_array")[i])
    else:
        # For features not to be scaled, keep the original values
        df_final = df_final.withColumn(col_name, F.col(col_name))

# Step 5: Drop unnecessary columns
df_final = df_final.drop("selected_features", "scaled_features", "scaled_array")
df_final.show()

Selected features: ['Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked', 'cabin_bool', 'total_family_members', 'Title_encoded']
+------+---+--------+------------------+------------------+------------------+-------------------+--------+----------+--------------------+-------------+--------------------+
|Pclass|Sex|Survived|               Age|             SibSp|             Parch|               Fare|Embarked|cabin_bool|total_family_members|Title_encoded|            features|
+------+---+--------+------------------+------------------+------------------+-------------------+--------+----------+--------------------+-------------+--------------------+
|     2|  0|       0| 4.450573862039556|               0.0|               0.0| 0.2716656986868853|       0|         0|  0.6216652707259592|          0.0|(10,[0,2,5,8],[2....|
|     2|  1|       1|2.1663868357942717|               0.0|               0.0| 0.2485238058357802|       2|         1|  0.6216652707259592|          1.0|[2.0,1.0,2

In [42]:
from tqdm import tqdm

def run_cross_validation(model, param_grid, train_data, evaluator):

  models = []
  metrics = []

  for param_map in tqdm(param_grid, desc="Cross-Validation Progress"):
      crossval = CrossValidator(
          estimator=model,
          estimatorParamMaps=[param_map],  # single config at a time
          evaluator=evaluator,
          numFolds=5
      )
      cv_model = crossval.fit(train_data)
      models.append(cv_model)
      metrics.append(cv_model.avgMetrics[0])  # avg metric for this param_map

    # Find the best model manually
  best_index = metrics.index(max(metrics))
  return models[best_index]

def evaluate_model(model_name, cv_model, train_data, test_data, evaluator):
    train_predictions = cv_model.transform(train_data)
    test_predictions = cv_model.transform(test_data)

    train_accuracy = evaluator.evaluate(train_predictions)
    test_accuracy = evaluator.evaluate(test_predictions)

    train_f1 = evaluator.setMetricName("f1").evaluate(train_predictions)
    test_f1 = evaluator.setMetricName("f1").evaluate(test_predictions)

    train_precision = evaluator.setMetricName("weightedPrecision").evaluate(train_predictions)
    test_precision = evaluator.setMetricName("weightedPrecision").evaluate(test_predictions)

    train_recall = evaluator.setMetricName("weightedRecall").evaluate(train_predictions)
    test_recall = evaluator.setMetricName("weightedRecall").evaluate(test_predictions)

    print(f"{model_name} Results:")
    print(f"Best Parameters: {cv_model.bestModel.extractParamMap()}")
    print(f"Training Accuracy: {train_accuracy:.4f}, F1 Score: {train_f1:.4f}, Precision: {train_precision:.4f}, Recall: {train_recall:.4f}")
    print(f"Test Accuracy: {test_accuracy:.4f}, F1 Score: {test_f1:.4f}, Precision: {test_precision:.4f}, Recall: {test_recall:.4f}")
    print("-----------------------------")

In [43]:
# assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
# df_final = assembler.transform(df_final)
df_model = df_final.select(['features','survived'])
train_df,test_df = df_model.randomSplit([0.75,0.25])
test_df.groupby('survived').count().show()
#train_data, test_data = df_final.randomSplit([0.7, 0.3], seed=501)
#evaluator = MulticlassClassificationEvaluator(labelCol=target_col, predictionCol="prediction", metricName="accuracy")

+--------+-----+
|survived|count|
+--------+-----+
|       1|   88|
|       0|  120|
+--------+-----+



In [44]:
rf_classifier=RandomForestClassifier(labelCol='survived').fit(train_df)
rf_predictions=rf_classifier.transform(test_df)
print('Accuracy: ', MulticlassClassificationEvaluator(labelCol='survived',metricName='accuracy').evaluate(rf_predictions))
print('Precision: ', MulticlassClassificationEvaluator(labelCol='survived',metricName='weightedPrecision').evaluate(rf_predictions))

Accuracy:  0.8317307692307693
Precision:  0.8327740961328747


In [47]:
rf_classifier2=RandomForestClassifier(labelCol='survived', featuresCol="features")
# paramGrid = ParamGridBuilder()
paramGrid = ParamGridBuilder()\
    .addGrid(rf_classifier2.maxDepth, [5]) \
    .addGrid(rf_classifier2.numTrees, [10]) \
    .build()

evaluator = MulticlassClassificationEvaluator(labelCol="survived", predictionCol="prediction", metricName="accuracy")
rf_cv_model = run_cross_validation(rf_classifier2, paramGrid, train_df, evaluator)
evaluate_model("Random Forest", rf_cv_model, train_df, test_df, evaluator)


#tvs = TrainValidationSplit(estimator=rf_classifier2, estimatorParamMaps=paramGrid,evaluator=MulticlassClassificationEvaluator(labelCol='survived'),trainRatio=0.8)
#model2 = tvs.fit(train_df)
#model2_predictions= model2.transform(test_df)

#print('Accuracy: ', MulticlassClassificationEvaluator(labelCol='survived',metricName='accuracy').evaluate(model2_predictions))
#print('Precision: ',MulticlassClassificationEvaluator(labelCol='survived',metricName='weightedPrecision').evaluate(model2_predictions))

Cross-Validation Progress: 100%|██████████| 1/1 [08:00<00:00, 480.47s/it]


Random Forest Results:
Best Parameters: {Param(parent='RandomForestClassifier_90d17430cd5e', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True, Param(parent='RandomForestClassifier_90d17430cd5e', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='RandomForestClassifier_90d17430cd5e', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='RandomForestClassifier_90d17430cd5e', name='featureSubsetStrategy', doc="The number of features to consid

In [48]:
rf = RandomForestClassifier(labelCol="survived", featuresCol="features")
rf_param_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [20, 30]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

evaluator = MulticlassClassificationEvaluator(labelCol="survived", predictionCol="prediction", metricName="accuracy")

rf_cv_model = run_cross_validation(rf, rf_param_grid, train_df, evaluator)

evaluate_model("Random Forest", rf_cv_model, train_df, test_df, evaluator)

Cross-Validation Progress: 100%|██████████| 4/4 [58:57<00:00, 884.29s/it]


Random Forest Results:
Best Parameters: {Param(parent='RandomForestClassifier_2ad34fb01a63', name='bootstrap', doc='Whether bootstrap samples are used when building trees.'): True, Param(parent='RandomForestClassifier_2ad34fb01a63', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='RandomForestClassifier_2ad34fb01a63', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='RandomForestClassifier_2ad34fb01a63', name='featureSubsetStrategy', doc="The number of features to consid

In [None]:
# Step 2: SVM Classifier
svm = LinearSVC(labelCol=target_col, featuresCol="features")
svm_param_grid = ParamGridBuilder() \
    .addGrid(svm.maxIter, [10, 20]) \
    .addGrid(svm.regParam, [0.1, 0.01]) \
    .build()

svm_cv_model = run_cross_validation(svm, svm_param_grid)

evaluate_model("SVM", svm_cv_model)

In [None]:
# Step 3: Logistic Regression
lr = LogisticRegression(labelCol=target_col, featuresCol="features")
lr_param_grid = ParamGridBuilder() \
    .addGrid(lr.maxIter, [10, 20]) \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

lr_cv_model = run_cross_validation(lr, lr_param_grid)

evaluate_model("Logistic Regression", lr_cv_model)

### Write your report here

---