In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split

df = pd.read_csv("diabetes_binary_health_indicators_BRFSS2015.csv")

print(f"Total records: {len(df)}")
print(f"\nClass distribution:")
print(df['Diabetes_binary'].value_counts())

offline_df, online_df = train_test_split(
    df,
    test_size=0.2,
    random_state=42,
    stratify=df['Diabetes_binary']
)

offline_df.to_csv("offline.csv", index=False)
online_df.to_csv("online.csv", index=False)

print(f"\nOffline dataset: {len(offline_df)} records")
print(f"Online dataset: {len(online_df)} records")
print("\nFiles saved successfully!")

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DiabetesOfflineOptimized") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "512m") \
    .getOrCreate()

print(f"Spark version: {spark.version}")
print("Spark session created successfully!")

In [None]:
df = spark.read.csv("offline.csv", header=True, inferSchema=True)

df = df.cache()

print(f"Dataset shape: {df.count()} rows x {len(df.columns)} columns")
print("\nSchema:")
df.printSchema()

print("\nClass distribution:")
df.groupBy("Diabetes_binary").count().show()

print("\nFirst 5 rows:")
df.show(5)

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

label_col = "Diabetes_binary"
feature_cols = [c for c in df.columns if c != label_col]

print(f"Features ({len(feature_cols)}):")
for i, col in enumerate(feature_cols, 1):
    print(f"{i:2d}. {col}")

assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features_vec",
    handleInvalid="skip"
)

scaler = StandardScaler(
    inputCol="features_vec",
    outputCol="features",
    withMean=True,
    withStd=True
)

print("\nFeature engineering stages created!")

In [None]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder

lr = LogisticRegression(featuresCol="features", labelCol=label_col, maxIter=100)
lr_pipeline = Pipeline(stages=[assembler, scaler, lr])
lr_params = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.001, 0.01, 0.1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

print(f"Logistic Regression: {len(lr_params)} hyperparameter combinations")

rf = RandomForestClassifier(featuresCol="features", labelCol=label_col, seed=42)
rf_pipeline = Pipeline(stages=[assembler, scaler, rf])
rf_params = ParamGridBuilder() \
    .addGrid(rf.numTrees, [20, 50, 100]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .addGrid(rf.minInstancesPerNode, [1, 5]) \
    .build()

print(f"Random Forest: {len(rf_params)} hyperparameter combinations")

dt = DecisionTreeClassifier(featuresCol="features", labelCol=label_col, seed=42)
dt_pipeline = Pipeline(stages=[assembler, scaler, dt])
dt_params = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5, 10, 15, 20]) \
    .addGrid(dt.minInstancesPerNode, [1, 5, 10]) \
    .addGrid(dt.maxBins, [32, 64]) \
    .build()

print(f"Decision Tree: {len(dt_params)} hyperparameter combinations")

model_configs = [
    ("Logistic Regression", lr_pipeline, lr_params),
    ("Random Forest", rf_pipeline, rf_params),
    ("Decision Tree", dt_pipeline, dt_params)
]

print("\nAll models configured!")

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator
import time

f1_evaluator = MulticlassClassificationEvaluator(
    labelCol=label_col,
    metricName="f1"
)

acc_evaluator = MulticlassClassificationEvaluator(
    labelCol=label_col,
    metricName="accuracy"
)

auc_evaluator = BinaryClassificationEvaluator(
    labelCol=label_col,
    metricName="areaUnderROC"
)

results = []
best_model = None
best_name = ""
best_f1 = 0.0

for name, pipeline, param_grid in model_configs:
    print(f"\n{'='*60}")
    print(f"Training: {name}")
    print(f"{'='*60}")
    
    start_time = time.time()
    
    cv = CrossValidator(
        estimator=pipeline,
        estimatorParamMaps=param_grid,
        evaluator=f1_evaluator,
        numFolds=5,
        parallelism=2,
        seed=42
    )
    
    cv_model = cv.fit(df)
    
    training_time = time.time() - start_time
    
    predictions = cv_model.bestModel.transform(df)
    f1 = f1_evaluator.evaluate(predictions)
    accuracy = acc_evaluator.evaluate(predictions)
    auc = auc_evaluator.evaluate(predictions)
    
    print(f"\nResults:")
    print(f"  F1 Score:  {f1:.4f}")
    print(f"  Accuracy:  {accuracy:.4f}")
    print(f"  AUC-ROC:   {auc:.4f}")
    print(f"  Training time: {training_time:.2f}s")
    
    results.append({
        "name": name,
        "f1": f1,
        "accuracy": accuracy,
        "auc": auc,
        "time": training_time
    })
    
    if f1 > best_f1:
        best_f1 = f1
        best_model = cv_model.bestModel
        best_name = name

print(f"\n{'='*60}")
print("Training completed!")
print(f"{'='*60}")

In [None]:
import pandas as pd

results_df = pd.DataFrame(results)

print("\n" + "="*80)
print("MODEL COMPARISON")
print("="*80)
print(results_df.to_string(index=False))

print(f"\n{'='*80}")
print(f"BEST MODEL: {best_name}")
print(f"{'='*80}")
print(f"F1 Score:  {best_f1:.4f}")
print(f"{'='*80}")

In [None]:
import os
os.makedirs("saved_models", exist_ok=True)

model_name = best_name.lower().replace(" ", "_")
model_path = f"models/best_{model_name}"

best_model.write().overwrite().save(model_path)

print(f"\n✓ Model saved successfully!")
print(f"  Path: {model_path}")
print(f"  Model: {best_name}")
print(f"  F1 Score: {best_f1:.4f}")

In [None]:
from pyspark.ml import PipelineModel

loaded_model = PipelineModel.load(model_path)
print("✓ Model loaded successfully!")

sample = df.limit(10)
predictions = loaded_model.transform(sample)

print("\nSample predictions:")
predictions.select("Diabetes_binary", "prediction", "probability").show(10, truncate=False)

In [None]:
spark.stop()
print("Spark session stopped.")