# Titanic Dataset Big Data Pipeline
This notebook demonstrates a full big data pipeline using PySpark for the Titanic dataset, including data preprocessing, visualization, and training a classification model.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("TitanicPipeline") \
    .getOrCreate()

# Load the Titanic dataset
df = spark.read.csv("titanic.csv", header=True, inferSchema=True)
df.show(5)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/12 14:29:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+------+--------+--------------------+------+------+-----+-----+------+--------+-------+--------+
|pclass|survival|                name|   sex|   age|sibsp|parch|ticket|    fare|  cabin|embarked|
+------+--------+--------------------+------+------+-----+-----+------+--------+-------+--------+
|     1|       1|Allen, Miss. Elis...|female|  29.0|    0|    0| 24160|211.3375|     B5|       S|
|     1|       1|Allison, Master. ...|  male|0.9167|    1|    2|113781|  151.55|C22 C26|       S|
|     1|       0|Allison, Miss. He...|female|   2.0|    1|    2|113781|  151.55|C22 C26|       S|
|     1|       0|Allison, Mr. Huds...|  male|  30.0|    1|    2|113781|  151.55|C22 C26|       S|
|     1|       0|Allison, Mrs. Hud...|female|  25.0|    1|    2|113781|  151.55|C22 C26|       S|
+------+--------+--------------------+------+------+-----+-----+------+--------+-------+--------+
only showing top 5 rows



## Creating the stages of my pipeline

In [2]:
from pyspark.sql.functions import col

# Handle missing values
df = df.fillna({"age": df.select("age").dropna().rdd.map(lambda x: x[0]).mean()})
df = df.dropna(subset=["Embarked"])

# Feature Engineering
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Stage 1
# Replacing the string values in the column `sex` with numercial values
sex_indexer = StringIndexer(inputCol="sex", outputCol="sexIndex")

# Stage 2
# Replacing the string values in the column `embarked`` with numercial values
embarked_indexer = StringIndexer(inputCol="embarked", outputCol="embarkedIndex")

# Stage 3
# Selecting the features
assembler = VectorAssembler(inputCols=["pclass", "age", "fare", "sexIndex", "embarkedIndex"],
                            outputCol="features", handleInvalid="skip") # avoid handleInvalid="skip". It is prefered to have a look at your data and decide how to handle ALL the missing values

# Stage 4
# Train Logistic Regression Model
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol="features", labelCol="survival")

                                                                                

## Pipeline Integration
Combine all stages into a PySpark pipeline.

In [3]:
from pyspark.ml import Pipeline

# Split the dataset
train, test = df.randomSplit([0.8, 0.2], seed=42)

pipeline = Pipeline(stages=[sex_indexer, embarked_indexer, assembler, lr])
pipeline_model = pipeline.fit(train)
pipeline_predictions = pipeline_model.transform(test)

24/12/12 14:30:20 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/12/12 14:30:20 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


# Model Evaluation

In [5]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = pipeline_model.transform(test)
evaluator = BinaryClassificationEvaluator(labelCol="survival", rawPredictionCol="prediction")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")

AUC: 0.751700398142004


## Save the Model and Results
Save the trained model and prediction results.

In [7]:
# Save the pipeline model
pipeline_model.write().overwrite().save("titanic_model")