In [11]:
!pip install pyspark tensorflow matplotlib

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
from pyspark.ml.feature import StringIndexer
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

# Initialize SparkSession
spark = SparkSession.builder.appName("SparkToTensorFlowNN").getOrCreate()

# Load and preprocess your dataset
data = spark.read.csv(
    r"C:\Users\keyar\Documents\Projects\SJAccidentPrediction\Medical_Examiner-Coroner,_Motor_Vehicle_Deaths_dataset_20250912.csv",
    header=True,
    inferSchema=True
)
data = data.dropna(subset=["Age", "Gender", "Incident Zip"])

# Create HighRisk label column
zip_counts = data.groupBy("Incident Zip").count().orderBy("count", ascending=False)
top_zips = [row['Incident Zip'] for row in zip_counts.take(10)]

data = data.withColumn(
    "HighRisk",
    when(col("Incident Zip").isin(top_zips), 1).otherwise(0)
)

# Encode categorical 'Gender' column with StringIndexer
gender_indexer = StringIndexer(inputCol="Gender", outputCol="GenderIndex", handleInvalid='keep')
data = gender_indexer.fit(data).transform(data)

# Select features and label columns, convert to Pandas
features = ['Age', 'GenderIndex']
pandas_df = data.select(features + ['HighRisk']).toPandas()

# Separate features and labels
X = pandas_df[features].values
y = pandas_df['HighRisk'].values

# Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.3, random_state=42)

# Define simple neural network model
model = tf.keras.Sequential([
    tf.keras.layers.Dense(32, activation='relu', input_shape=(X_train.shape[1],)),
    tf.keras.layers.Dense(16, activation='relu'),
    tf.keras.layers.Dense(1, activation='sigmoid')
])

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['AUC'])

# Train
model.fit(X_train, y_train, epochs=20, batch_size=32, validation_split=0.2)

# Evaluate
loss, auc = model.evaluate(X_test, y_test)
print(f"Neural Network Test AUC: {auc}")

spark.stop()


Epoch 1/20


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 17ms/step - AUC: 0.5019 - loss: 0.6755 - val_AUC: 0.5547 - val_loss: 0.6800
Epoch 2/20
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - AUC: 0.5163 - loss: 0.6669 - val_AUC: 0.5726 - val_loss: 0.6785
Epoch 3/20
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - AUC: 0.5185 - loss: 0.6643 - val_AUC: 0.6039 - val_loss: 0.6768
Epoch 4/20
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - AUC: 0.5363 - loss: 0.6630 - val_AUC: 0.6008 - val_loss: 0.6779
Epoch 5/20
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - AUC: 0.5392 - loss: 0.6625 - val_AUC: 0.6000 - val_loss: 0.6777
Epoch 6/20
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step - AUC: 0.5369 - loss: 0.6617 - val_AUC: 0.5947 - val_loss: 0.6791
Epoch 7/20
[1m23/23[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 7ms/step - AUC: 0.5447 - loss: 0.66

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
from pyspark.ml.feature import StringIndexer
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.preprocessing import StandardScaler

# Initialize SparkSession
spark = SparkSession.builder.appName("SimulatedFederatedLearning").getOrCreate()

# Load and preprocess your dataset
data = spark.read.csv(
    r"C:\Users\keyar\Documents\Projects\SJAccidentPrediction\Medical_Examiner-Coroner,_Motor_Vehicle_Deaths_dataset_20250912.csv",
    header=True,
    inferSchema=True
)
data = data.dropna(subset=["Age", "Gender", "Incident Zip"])

# Create HighRisk label column
zip_counts = data.groupBy("Incident Zip").count().orderBy("count", ascending=False)
top_zips = [row['Incident Zip'] for row in zip_counts.take(10)]

data = data.withColumn(
    "HighRisk",
    when(col("Incident Zip").isin(top_zips), 1).otherwise(0)
)

# Encode categorical features
gender_indexer = StringIndexer(inputCol="Gender", outputCol="GenderIndex", handleInvalid='keep')
zip_indexer = StringIndexer(inputCol="Incident Zip", outputCol="IncidentZipIndex", handleInvalid='keep')

data = gender_indexer.fit(data).transform(data)
data = zip_indexer.fit(data).transform(data)

# Select relevant features and label
features = ['Age', 'GenderIndex', 'IncidentZipIndex']
data_selected = data.select(features + ['HighRisk'])

# Number of simulated clients
num_clients = 3

# Split data into partitions for each client
clients_data = data_selected.randomSplit([1.0/num_clients]*num_clients, seed=42)

# Function to convert Spark DataFrame to scaled numpy arrays for TensorFlow
def preprocess_client_data(client_spark_df):
    client_pd = client_spark_df.toPandas()
    X = client_pd[features].values
    y = client_pd['HighRisk'].values
    scaler = StandardScaler()
    X[:, 0] = scaler.fit_transform(X[:, 0].reshape(-1, 1)).flatten()  # scale Age only
    return X, y, scaler

# Define Neural Network model creation function
def create_model():
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(len(features),)),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dropout(0.3),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['AUC'])
    return model

# Train local models on each client
local_models = []
client_data_processed = []

for round in range(3):  # Simulate multiple federated rounds
    print(f"--- Federated Round {round+1} ---")
    count=0
    for client_df in clients_data:
        X_client, y_client, scaler = preprocess_client_data(client_df)
        client_data_processed.append((X_client, y_client, scaler))
        if round == 0:
            model = create_model()
        else:
            model = create_model()
            model.set_weights(local_models[count].get_weights())  # Start from last global weights
        count+=1
        model.fit(X_client, y_client, epochs=5, batch_size=32, verbose=1)
        local_models.append(model)

    # Federated averaging: average weights across local models
    average_weights = []
    for weights in zip(*[model.get_weights() for model in local_models]):
        average_weights.append(np.mean(weights, axis=0))

    # Update local models with averaged weights
    for model in local_models:
        model.set_weights(average_weights)

# Evaluate averaged global model on all data combined
# Combine all client data
X_all = np.vstack([d[0] for d in client_data_processed])
y_all = np.hstack([d[1] for d in client_data_processed])

loss, auc = local_models[0].evaluate(X_all, y_all, verbose=0)
print(f"Federated Averaged Model AUC on combined data: {auc}")

spark.stop()


--- Federated Round 1 ---
Epoch 1/5




[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 4ms/step - AUC: 0.7338 - loss: 0.7674
Epoch 2/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - AUC: 0.9109 - loss: 0.5097 
Epoch 3/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9427 - loss: 0.4823 
Epoch 4/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - AUC: 0.9289 - loss: 0.4687 
Epoch 5/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - AUC: 0.9409 - loss: 0.4516 
Epoch 1/5




[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 4ms/step - AUC: 0.1940 - loss: 3.0615
Epoch 2/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.5079 - loss: 1.2599 
Epoch 3/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - AUC: 0.7996 - loss: 0.6095 
Epoch 4/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - AUC: 0.8476 - loss: 0.5551 
Epoch 5/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - AUC: 0.8822 - loss: 0.4939 
Epoch 1/5




[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - AUC: 0.3514 - loss: 1.7918   
Epoch 2/5
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - AUC: 0.6259 - loss: 0.8988 
Epoch 3/5
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - AUC: 0.8180 - loss: 0.6112 
Epoch 4/5
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 6ms/step - AUC: 0.9016 - loss: 0.4908 
Epoch 5/5
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9161 - loss: 0.4836 
--- Federated Round 2 ---
Epoch 1/5




[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 4ms/step - AUC: 0.9719 - loss: 0.4042
Epoch 2/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - AUC: 0.9867 - loss: 0.3778 
Epoch 3/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - AUC: 0.9893 - loss: 0.3586  
Epoch 4/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9910 - loss: 0.3342 
Epoch 5/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9895 - loss: 0.3107 
Epoch 1/5




[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - AUC: 0.9801 - loss: 0.3861   
Epoch 2/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9923 - loss: 0.3730 
Epoch 3/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9886 - loss: 0.3460 
Epoch 4/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - AUC: 0.9948 - loss: 0.3273 
Epoch 5/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - AUC: 0.9954 - loss: 0.2936 
Epoch 1/5




[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - AUC: 0.9660 - loss: 0.3821
Epoch 2/5
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9796 - loss: 0.3711 
Epoch 3/5
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9862 - loss: 0.3542 
Epoch 4/5
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9849 - loss: 0.3431 
Epoch 5/5
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9888 - loss: 0.3226 
--- Federated Round 3 ---
Epoch 1/5




[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - AUC: 0.9862 - loss: 0.3551
Epoch 2/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - AUC: 0.9929 - loss: 0.3297 
Epoch 3/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9933 - loss: 0.3071 
Epoch 4/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - AUC: 0.9891 - loss: 0.2860 
Epoch 5/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9904 - loss: 0.2674 
Epoch 1/5




[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 4ms/step - AUC: 0.9920 - loss: 0.3428   
Epoch 2/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9941 - loss: 0.3167 
Epoch 3/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9914 - loss: 0.2909 
Epoch 4/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - AUC: 0.9927 - loss: 0.2749 
Epoch 5/5
[1m15/15[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step - AUC: 0.9937 - loss: 0.2561 
Epoch 1/5




[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - AUC: 0.9832 - loss: 0.3493
Epoch 2/5
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9894 - loss: 0.3236 
Epoch 3/5
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9838 - loss: 0.3126  
Epoch 4/5
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9841 - loss: 0.2953 
Epoch 5/5
[1m13/13[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 5ms/step - AUC: 0.9870 - loss: 0.2855 
Federated Averaged Model AUC on combined data: 0.9977615475654602
