In [1]:
! pip install pyspark boto3

Defaulting to user installation because normal site-packages is not writeable
Collecting boto3
  Downloading boto3-1.37.2-py3-none-any.whl.metadata (6.6 kB)
Collecting botocore<1.38.0,>=1.37.2 (from boto3)
  Downloading botocore-1.37.2-py3-none-any.whl.metadata (5.7 kB)
Collecting s3transfer<0.12.0,>=0.11.0 (from boto3)
  Downloading s3transfer-0.11.3-py3-none-any.whl.metadata (1.7 kB)
Downloading boto3-1.37.2-py3-none-any.whl (139 kB)
Downloading botocore-1.37.2-py3-none-any.whl (13.3 MB)
   ---------------------------------------- 0.0/13.3 MB ? eta -:--:--
   ---------------------------------------- 0.0/13.3 MB ? eta -:--:--
    --------------------------------------- 0.3/13.3 MB ? eta -:--:--
    --------------------------------------- 0.3/13.3 MB ? eta -:--:--
   - -------------------------------------- 0.5/13.3 MB 799.2 kB/s eta 0:00:17
   -- ------------------------------------- 0.8/13.3 MB 838.9 kB/s eta 0:00:15
   --- ------------------------------------ 1.0/13.3 MB 853.0 kB/s 

ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
awscli 1.32.108 requires botocore==1.34.108, but you have botocore 1.37.2 which is incompatible.
awscli 1.32.108 requires s3transfer<0.11.0,>=0.10.0, but you have s3transfer 0.11.3 which is incompatible.

[notice] A new release of pip is available: 24.3.1 -> 25.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Titanic_Classification_Local") \
    .config("spark.master", "local[*]") \
    .getOrCreate()

In [21]:
df = spark.read.csv("train.csv", header=True, inferSchema=True)
df.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| NULL|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| NULL|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [22]:
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Selecting required columns
df = df.select("Survived", "Pclass", "Sex", "Age", "SibSp", "Parch", "Fare")

# Convert 'Sex' column (Male=1, Female=0)
df = df.withColumn("Sex", when(col("Sex") == "male", 1).otherwise(0))

# Handle missing values
df = df.fillna({"Age": df.selectExpr("avg(Age)").collect()[0][0]})

# Convert target column
label_indexer = StringIndexer(inputCol="Survived", outputCol="label")

# Assemble features
feature_assembler = VectorAssembler(
    inputCols=["Pclass", "Sex", "Age", "SibSp", "Parch", "Fare"],
    outputCol="features"
)


In [23]:
df.show(5)

+--------+------+---+----+-----+-----+-------+
|Survived|Pclass|Sex| Age|SibSp|Parch|   Fare|
+--------+------+---+----+-----+-----+-------+
|       0|     3|  1|22.0|    1|    0|   7.25|
|       1|     1|  0|38.0|    1|    0|71.2833|
|       1|     3|  0|26.0|    0|    0|  7.925|
|       1|     1|  0|35.0|    1|    0|   53.1|
|       0|     3|  1|35.0|    0|    0|   8.05|
+--------+------+---+----+-----+-----+-------+
only showing top 5 rows



In [24]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.pipeline import Pipeline

# Split data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

# Initialize model
lr = LogisticRegression(featuresCol="features", labelCol="label")

# Create pipeline
pipeline = Pipeline(stages=[label_indexer, feature_assembler, lr])

# Train model
model = pipeline.fit(train_df)

In [25]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

predictions = model.transform(test_df)

evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")
accuracy = evaluator.evaluate(predictions)

print(f"Model Accuracy: {accuracy}")


Model Accuracy: 0.7974238875878221


In [26]:
manual_data = spark.createDataFrame([
    (3, 1, 22.0, 1, 0, 7.25),
    (1, 0, 38.0, 1, 0, 71.28)
], ["Pclass", "Sex", "Age", "SibSp", "Parch", "Fare"])


# Modify the feature assembler to use a new column name
feature_assembler = VectorAssembler(
    inputCols=["Pclass", "Sex", "Age", "SibSp", "Parch", "Fare"],
    outputCol="features_new"  # New output column
)

# Transform manual data
manual_data = feature_assembler.transform(manual_data)

# Run predictions
manual_predictions = model.transform(manual_data)
manual_predictions.select("features_new", "prediction").show()




+--------------------+----------+
|        features_new|prediction|
+--------------------+----------+
|[3.0,1.0,22.0,1.0...|       0.0|
|[1.0,0.0,38.0,1.0...|       1.0|
+--------------------+----------+



In [27]:
model_path = "titanic_model_local"
model.write().overwrite().save(model_path)
print(f"Model saved to {model_path}")

Model saved to titanic_model_local


In [32]:
from pyspark.ml import PipelineModel
model = PipelineModel.load("./titanic_model_local")  # ✅ Correct



In [33]:
from pyspark.sql import SparkSession
from pyspark.ml import PipelineModel
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName("Local_Titanic_Prediction").getOrCreate()

# Load the full PipelineModel (not just LogisticRegressionModel)
model = PipelineModel.load("./titanic_model_local")  # ✅ Correct

# Create test data (Make sure column names match training)
manual_data = spark.createDataFrame([
    (3, 1, 22.0, 1, 0, 7.25),
    (1, 0, 38.0, 1, 0, 71.28)
], ["Pclass", "Sex", "Age", "SibSp", "Parch", "Fare"])

# Run prediction using the loaded model
manual_predictions = model.transform(manual_data)

# Show predictions
manual_predictions.select("features", "prediction").show()


+--------------------+----------+
|            features|prediction|
+--------------------+----------+
|[3.0,1.0,22.0,1.0...|       0.0|
|[1.0,0.0,38.0,1.0...|       1.0|
+--------------------+----------+

