**Setting up PySpark**

In [None]:
# Uninstall existing PySpark to avoid conflicts
!pip uninstall -y pyspark

# Install Java 11
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Install PySpark
!pip install pyspark

# Set environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.10/dist-packages/pyspark"



Found existing installation: pyspark 3.5.3
Uninstalling pyspark-3.5.3:
  Successfully uninstalled pyspark-3.5.3
Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=09c4b5ae6707fb296646cb5ca75f112726873f49b1dcf9bd5b5580cfd5468620
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [None]:
# Initialize findspark
!pip install findspark
import findspark
findspark.init()

# Create a SparkSession
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("HealthcareProject") \
    .getOrCreate()

# Verify Spark installation
print(spark.version)

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
3.5.3


**Loading the dataset**

In [None]:
from google.colab import files
uploaded = files.upload()

Saving train.csv to train.csv


In [None]:
df = spark.read.csv("/content/train.csv", header=True, inferSchema=True)
df.show(5)


+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
| id|Gender|Age|Driving_License|Region_Code|Previously_Insured|Vehicle_Age|Vehicle_Damage|Annual_Premium|Policy_Sales_Channel|Vintage|Response|
+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
|  1|  Male| 44|              1|       28.0|                 0|  > 2 Years|           Yes|       40454.0|                26.0|    217|       1|
|  2|  Male| 76|              1|        3.0|                 0|   1-2 Year|            No|       33536.0|                26.0|    183|       0|
|  3|  Male| 47|              1|       28.0|                 0|  > 2 Years|           Yes|       38294.0|                26.0|     27|       1|
|  4|  Male| 21|              1|       11.0|                 1|   < 1 Year|            No|       28619.0|               152.0|    203|  

**Data preprocessing**

In [None]:
# Drop duplicates
df_clean = df.dropDuplicates()

# Fill missing values
df_clean = df_clean.na.fill({
    "Age": "Unknown",
    "Gender": "Unknown",
    "Driving_License": "No",
    "Region_Code": "Unknown",
    "Previously_Insured": "No",
    "Vehicle_Age": "Unknown",
    "Vehicle_Damage": "No",
    "Annual_Premium": 0,
    "Policy_Sales_Channel": "Unknown",
    "Vintage": 0
})
df_clean.show(5)


+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
| id|Gender|Age|Driving_License|Region_Code|Previously_Insured|Vehicle_Age|Vehicle_Damage|Annual_Premium|Policy_Sales_Channel|Vintage|Response|
+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+
| 99|Female| 21|              1|        2.0|                 0|   < 1 Year|           Yes|       34274.0|               152.0|     79|       0|
|128|Female| 22|              1|       25.0|                 0|   < 1 Year|           Yes|       23955.0|               160.0|     22|       0|
|415|Female| 25|              1|       43.0|                 1|   < 1 Year|            No|       27080.0|               160.0|    177|       0|
|539|  Male| 41|              1|       28.0|                 0|   1-2 Year|           Yes|       44898.0|               124.0|    154|  

In [None]:
# Convert categorical columns to numeric
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

# List of categorical columns to convert
categorical_columns = ["Gender", "Vehicle_Age", "Vehicle_Damage", "Driving_License", "Region_Code", "Previously_Insured"]

# Create a list of StringIndexer stages
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index") for col in categorical_columns]

# Create a Pipeline
pipeline = Pipeline(stages=indexers)

# Fit and transform the DataFrame
df_clean_indexed = pipeline.fit(df_clean).transform(df_clean)

# Show the transformed DataFrame with indexed columns
df_clean_indexed.show(5)

+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+------------+-----------------+--------------------+---------------------+-----------------+------------------------+
| id|Gender|Age|Driving_License|Region_Code|Previously_Insured|Vehicle_Age|Vehicle_Damage|Annual_Premium|Policy_Sales_Channel|Vintage|Response|Gender_index|Vehicle_Age_index|Vehicle_Damage_index|Driving_License_index|Region_Code_index|Previously_Insured_index|
+---+------+---+---------------+-----------+------------------+-----------+--------------+--------------+--------------------+-------+--------+------------+-----------------+--------------------+---------------------+-----------------+------------------------+
| 99|Female| 21|              1|        2.0|                 0|   < 1 Year|           Yes|       34274.0|               152.0|     79|       0|         1.0|              1.0|                 0.0|                  0.0|

In [None]:
# Assemble features
from pyspark.ml.feature import VectorAssembler

# List of feature columns to combine into a single vector
# Use the correct existing columns from your DataFrame
feature_columns = ["Age", "Annual_Premium", "Policy_Sales_Channel", "Vintage", "Response",
                   "Gender_index", "Vehicle_Age_index", "Vehicle_Damage_index", "Driving_License_index",
                   "Region_Code_index", "Previously_Insured_index"]

# Create a VectorAssembler
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Transform the DataFrame to include the features vector
df_features = assembler.transform(df_clean_indexed)  # Use df_clean_indexed after indexing

# Show the features vector
df_features.select("features").show(5)

+--------------------+
|            features|
+--------------------+
|[21.0,34274.0,152...|
|[22.0,23955.0,160...|
|[25.0,27080.0,160...|
|(11,[0,1,2,3],[41...|
|(11,[0,1,2,3,9],[...|
+--------------------+
only showing top 5 rows



**Train machine learning models**

Random Forest

In [None]:
# 1. Define the target column and feature columns
target_column = "Response"  # This is your target variable
feature_columns = ["Age", "Annual_Premium", "Policy_Sales_Channel", "Vintage",
                   "Gender_index", "Vehicle_Age_index", "Vehicle_Damage_index",
                   "Driving_License_index", "Region_Code_index", "Previously_Insured_index"]

# 2. Assemble features into a single vector (if not already done)
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')
df_features = assembler.transform(df_clean_indexed)

# 3. Select only the features and target column
final_data = df_features.select('features', target_column)

# 4. Split the data into training and test sets
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

# 5. Initialize the Random Forest Classifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

rf = RandomForestClassifier(featuresCol='features', labelCol=target_column, numTrees=100, maxBins=100)

# 6. Create a Pipeline
pipeline = Pipeline(stages=[rf])

# 7. Fit the model
model = pipeline.fit(train_data)

# 8. Make predictions
predictions = model.transform(test_data)

# 9. Show predictions
predictions.select('features', target_column, 'prediction').show(5)

# 10. Evaluate the model with multiple metrics

# Accuracy
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol=target_column, predictionCol='prediction', metricName='accuracy')
accuracy = evaluator_accuracy.evaluate(predictions)

# F1 Score
evaluator_f1 = MulticlassClassificationEvaluator(labelCol=target_column, predictionCol='prediction', metricName='f1')
f1_score = evaluator_f1.evaluate(predictions)

# Precision
evaluator_precision = MulticlassClassificationEvaluator(labelCol=target_column, predictionCol='prediction', metricName='weightedPrecision')
precision = evaluator_precision.evaluate(predictions)

# Recall
evaluator_recall = MulticlassClassificationEvaluator(labelCol=target_column, predictionCol='prediction', metricName='weightedRecall')
recall = evaluator_recall.evaluate(predictions)

# Print all the evaluation metrics
print(f"Model Accuracy: {accuracy:.2f}")
print(f"Model F1 Score: {f1_score:.2f}")
print(f"Model Precision: {precision:.2f}")
print(f"Model Recall: {recall:.2f}")


+--------------------+--------+----------+
|            features|Response|prediction|
+--------------------+--------+----------+
|(10,[0,1,2,3],[20...|       0|       0.0|
|(10,[0,1,2,3],[20...|       0|       0.0|
|(10,[0,1,2,3],[20...|       0|       0.0|
|(10,[0,1,2,3],[20...|       0|       0.0|
|(10,[0,1,2,3],[20...|       0|       0.0|
+--------------------+--------+----------+
only showing top 5 rows

Model Accuracy: 0.88
Model F1 Score: 0.82
Model Precision: 0.77
Model Recall: 0.88


Logistic Regression

In [None]:
# 1. Initialize the Logistic Regression Classifier
from pyspark.ml.classification import LogisticRegression

# 2. Create a Logistic Regression model
logistic_regression = LogisticRegression(featuresCol='features', labelCol=target_column)

# 3. Create a Pipeline with Logistic Regression
pipeline_lr = Pipeline(stages=[logistic_regression])

# 4. Fit the model
model_lr = pipeline_lr.fit(train_data)

# 5. Make predictions
predictions_lr = model_lr.transform(test_data)

# 6. Show predictions
predictions_lr.select('features', target_column, 'prediction').show(5)

# 7. Evaluate the model
evaluator_lr = MulticlassClassificationEvaluator(labelCol=target_column, predictionCol='prediction', metricName='accuracy')
accuracy_lr = evaluator_lr.evaluate(predictions_lr)

print(f"Logistic Regression Model Accuracy: {accuracy_lr:.2f}")

# Additional evaluation metrics (optional)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator_lr_f1 = MulticlassClassificationEvaluator(labelCol=target_column, predictionCol='prediction', metricName='f1')
f1_score = evaluator_lr_f1.evaluate(predictions_lr)

evaluator_lr_precision = MulticlassClassificationEvaluator(labelCol=target_column, predictionCol='prediction', metricName='weightedPrecision')
precision = evaluator_lr_precision.evaluate(predictions_lr)

evaluator_lr_recall = MulticlassClassificationEvaluator(labelCol=target_column, predictionCol='prediction', metricName='weightedRecall')
recall = evaluator_lr_recall.evaluate(predictions_lr)

print(f"Logistic Regression Model F1 Score: {f1_score:.2f}")
print(f"Logistic Regression Model Precision: {precision:.2f}")
print(f"Logistic Regression Model Recall: {recall:.2f}")

+--------------------+--------+----------+
|            features|Response|prediction|
+--------------------+--------+----------+
|(10,[0,1,2,3],[20...|       0|       0.0|
|(10,[0,1,2,3],[20...|       0|       0.0|
|(10,[0,1,2,3],[20...|       0|       0.0|
|(10,[0,1,2,3],[20...|       0|       0.0|
|(10,[0,1,2,3],[20...|       0|       0.0|
+--------------------+--------+----------+
only showing top 5 rows

Logistic Regression Model Accuracy: 0.88
Logistic Regression Model F1 Score: 0.82
Logistic Regression Model Precision: 0.77
Logistic Regression Model Recall: 0.88


K-Means Clustering

In [None]:
# Import necessary libraries for K-Means
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Define the feature columns to use for clustering
clustering_feature_columns = ["Age", "Annual_Premium", "Policy_Sales_Channel", "Vintage",
                              "Gender_index", "Vehicle_Age_index", "Vehicle_Damage_index",
                              "Driving_License_index", "Region_Code_index",
                              "Previously_Insured_index"]

# Create a new VectorAssembler for clustering
assembler_clustering = VectorAssembler(inputCols=clustering_feature_columns, outputCol='features_clustering')

# Transform the DataFrame to include the features vector for clustering
df_clustering = assembler_clustering.transform(df_clean_indexed)

# Specify the number of clusters you want to create
kmeans = KMeans(k=5, seed=42, featuresCol='features_clustering', predictionCol='prediction')  # Specify the features column

# Fit the model using the correct features column
model_kmeans = kmeans.fit(df_clustering)

# Make predictions
predictions_kmeans = model_kmeans.transform(df_clustering)

# Show the predictions (cluster assignments)
predictions_kmeans.select("features_clustering", "prediction").show(5)

# Evaluate clustering by computing the Silhouette score
evaluator = ClusteringEvaluator(featuresCol='features_clustering', predictionCol='prediction')

# Evaluate the clustering
silhouette = evaluator.evaluate(predictions_kmeans)
print(f"Silhouette Score: {silhouette:.2f}")

# Show the cluster centers
centers = model_kmeans.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

# Optionally, you can show the count of records in each cluster
cluster_counts = predictions_kmeans.groupBy("prediction").count()
cluster_counts.show()

+--------------------+----------+
| features_clustering|prediction|
+--------------------+----------+
|[21.0,34274.0,152...|         0|
|[22.0,23955.0,160...|         0|
|[25.0,27080.0,160...|         0|
|(10,[0,1,2,3],[41...|         2|
|(10,[0,1,2,3,8],[...|         2|
+--------------------+----------+
only showing top 5 rows

Silhouette Score: 0.76
Cluster Centers: 
[3.57931676e+01 2.87428985e+04 1.19901974e+02 1.54108183e+02
 4.83888155e-01 5.77753468e-01 5.60952937e-01 1.55875368e-03
 1.12281449e+01 5.25339667e-01]
[4.08921443e+01 2.74744760e+03 1.14120323e+02 1.54718138e+02
 4.31972166e-01 4.06503693e-01 4.16865043e-01 1.90746506e-03
 1.56451199e+01 3.64219008e-01]
[4.11486271e+01 4.20898907e+04 1.02578810e+02 1.54499415e+02
 4.40500491e-01 4.90122561e-01 4.53012769e-01 2.68181236e-03
 5.87328864e+00 4.20984755e-01]
[4.45612371e+01 6.51736348e+04 9.30834284e+01 1.54331113e+02
 4.42092779e-01 4.91903480e-01 4.24713771e-01 4.47019112e-03
 2.55124755e+00 3.98439995e-01]
[4.35000000e

**Saving Predictions and Model Results to CSV**

In [None]:
from pyspark.sql.functions import udf, col, concat_ws
from pyspark.sql.types import ArrayType, DoubleType
from google.colab import files
import pandas as pd
import shutil

# Define a UDF to convert the features vector to an array
def vector_to_array(v):
    return v.toArray().tolist() if v is not None else None

# Register the UDF
vector_to_array_udf = udf(vector_to_array, ArrayType(DoubleType()))

# Convert the features column to an array
predictions = predictions.withColumn("features_array", vector_to_array_udf(col("features")))

# Convert the features array to a string for saving to CSV
predictions = predictions.withColumn("features_string", concat_ws(",", col("features_array")))

# Save Random Forest Predictions
rf_predictions_path = "/content/rf_predictions.csv"
predictions.select("features_string", "prediction", target_column).write.csv(rf_predictions_path, header=True, mode="overwrite")

# Convert Logistic Regression predictions to array
predictions_lr = predictions_lr.withColumn("features_array", vector_to_array_udf(col("features")))
predictions_lr = predictions_lr.withColumn("features_string", concat_ws(",", col("features_array")))

# Save Logistic Regression Predictions
lr_predictions_path = "/content/lr_predictions.csv"
predictions_lr.select("features_string", "prediction", target_column).write.csv(lr_predictions_path, header=True, mode="overwrite")

# Convert K-Means predictions to array
predictions_kmeans = predictions_kmeans.withColumn("features_array", vector_to_array_udf(col("features_clustering")))
predictions_kmeans = predictions_kmeans.withColumn("features_string", concat_ws(",", col("features_array")))

# Save K-Means Clustering Predictions
kmeans_predictions_path = "/content/kmeans_predictions.csv"
predictions_kmeans.select("features_string", "prediction").write.csv(kmeans_predictions_path, header=True, mode="overwrite")

# Save Model Accuracy Results to a CSV file
# Create a DataFrame for model accuracy results
accuracy_results = {
    "Model": ["Random Forest", "Logistic Regression"],
    "Accuracy": [accuracy, accuracy_lr],
    "F1 Score": [None, f1_score],
    "Precision": [None, precision],
    "Recall": [None, recall]
}

# Convert to pandas DataFrame
accuracy_df = pd.DataFrame(accuracy_results)

# Save the accuracy results to a CSV file
accuracy_results_path = "/content/model_accuracy_results.csv"
accuracy_df.to_csv(accuracy_results_path, index=False)

# Download the generated files
files.download(rf_predictions_path)
files.download(lr_predictions_path)
files.download(kmeans_predictions_path)
files.download(accuracy_results_path)

# Optionally, create a zip file containing all the CSV files
shutil.make_archive('/content/predictions_and_results', 'zip', '/content')

# Download the zip file
files.download('/content/predictions_and_results.zip')

print(f"Predictions and model results saved and ready for download.")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Predictions and model results saved and ready for download.
