In [1]:
!pip install pyspark py4j



In [None]:
from typing import Any

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, sum as spark_sum
from pyspark.sql.types import DoubleType, IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.classification import (
  LogisticRegression,
  RandomForestClassifier,
  GBTClassifier
)
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Utils

In [38]:
def create_spark(app_name: str="DiabetesHealthIndicators") -> SparkSession:
  return (
    SparkSession.builder
    .appName(app_name)
    .getOrCreate()
  )

In [39]:
def load_data(spark: SparkSession, path: str) -> DataFrame:

  return spark.read.csv(
    path,
    header=True,
    inferSchema=True
  )

In [40]:
def get_features() -> list[str]:
  return [
    "HighBP", "HighChol", "CholCheck", "BMI", "Smoker", "Stroke",
    "HeartDiseaseorAttack", "PhysActivity", "Fruits", "Veggies",
    "HvyAlcoholConsump", "AnyHealthcare", "NoDocbcCost", "GenHlth",
    "MentHlth", "PhysHlth", "DiffWalk", "Sex", "Age", "Education", "Income"
  ]

In [41]:
def get_target() -> str:
  return "Diabetes_binary"

In [42]:
def cast_features_and_target(
  df: DataFrame,
  int_cols: list[str],
  double_cols: list[str],
  target_col: str | None = None
):
  for c in int_cols:
    df = df.withColumn(c, col(c).cast(IntegerType()))

  for c in double_cols:
    df = df.withColumn(c, col(c).cast(DoubleType()))

  if target_col:
    df = df.withColumn(target_col, col(target_col).cast(IntegerType()))

  return df

In [43]:
def get_preprocessing_stages(
  feature_cols: list[str], scaling: bool = False
) -> list[Any]:
  stages = [
    VectorAssembler(
      inputCols=feature_cols,
      outputCol="features_raw" if scaling else "features"
    )
  ]

  if scaling:
    stages.append(
      StandardScaler(
        inputCol="features_raw",
        outputCol="features"
      )
    )

  return stages

In [44]:
def build_models(
  target_col:str, feature_cols: list[str]
) -> dict[str, Any]:

  models = {}

  lr = LogisticRegression(
    labelCol=target_col,
    featuresCol="features",
    family="binomial"
  )

  lr_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [1e-2, 1e-1]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5]) \
    .build()

  models["LogisticRegression"] = (
    get_preprocessing_stages(feature_cols, scaling=True),
    lr,
    lr_grid
  )

  rf = RandomForestClassifier(
    labelCol=target_col,
    featuresCol="features"
  )

  rf_grid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [30, 50, 100]) \
    .addGrid(rf.maxDepth, [5, 8, 10]) \
    .build()

  models["RandomForest"] = (
    get_preprocessing_stages(feature_cols, scaling=False),
    rf,
    rf_grid
  )

  gbt = GBTClassifier(
    labelCol=target_col,
    featuresCol="features",
    maxIter=50
  )

  gbt_grid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [3, 5, 8]) \
    .addGrid(gbt.stepSize, [5e-2, 1e-1]) \
    .build()

  models["GBT"] = (
    get_preprocessing_stages(feature_cols, scaling=False),
    gbt,
    gbt_grid
  )

  return models

In [45]:
def train_val_split(
  df: DataFrame,
  train_size: float = 0.8,
  seed:int = 42
) -> list[DataFrame]:
  return df.randomSplit([train_size, 1 - train_size], seed=seed)

In [46]:
def train_and_select_best_model(
  df: DataFrame,
  models: dict[str, Any],
  target_col: str
) -> dict[str, Any]:
  train_df, val_df = train_val_split(df)

  evaluator = MulticlassClassificationEvaluator(
    labelCol=target_col,
    metricName="f1"
  )

  best_f1 = 0.0
  best_config = {}

  for model_name in models:
    print("="*50)
    print(f"Evaluating for {model_name}")
    print("-"*50)

    preprocessing, estimator, grid = models[model_name]

    cv = CrossValidator(
      estimator=estimator,
      estimatorParamMaps=grid,
      evaluator=evaluator,
      numFolds=5
    )

    pipeline = Pipeline(stages=preprocessing + [cv])
    model = pipeline.fit(train_df)

    preds = model.transform(val_df)
    f1 = evaluator.evaluate(preds)

    print(f"F1: {f1:.4f}\n")

    if f1 > best_f1:
      best_f1 = f1
      best_config = {
        "name": model_name,
        "preprocessing": preprocessing,
        "estimator": estimator
      }

    print(f"Current best model: {best_config['name']} (F1 = {best_f1:.4f})")

  return best_config

In [47]:
def train_model(df: DataFrame, config: dict[str, Any]) -> PipelineModel:
  pipeline = Pipeline(
    stages=config["preprocessing"] + [config["estimator"]]
  )

  model = pipeline.fit(df)
  return model

In [48]:
def save_model(model: PipelineModel, path: str) -> None:
  model.write().overwrite().save(path)

# Spark Session

In [17]:
spark = create_spark()
spark

# Loading Data

Според описот на податочното множество, се гледа дека категориските податоци се веќе енкодирани (во 0/1 ако се две класи; ординални доколку има значење на класите), па нема потреба од дополнително енкодирање.

In [None]:
df = load_data(
  spark,
  "offline.csv"
)

In [19]:
feature_cols = get_features()
target_col = get_target()

In [20]:
df.printSchema()

root
 |-- Diabetes_binary: double (nullable = true)
 |-- HighBP: double (nullable = true)
 |-- HighChol: double (nullable = true)
 |-- CholCheck: double (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Smoker: double (nullable = true)
 |-- Stroke: double (nullable = true)
 |-- HeartDiseaseorAttack: double (nullable = true)
 |-- PhysActivity: double (nullable = true)
 |-- Fruits: double (nullable = true)
 |-- Veggies: double (nullable = true)
 |-- HvyAlcoholConsump: double (nullable = true)
 |-- AnyHealthcare: double (nullable = true)
 |-- NoDocbcCost: double (nullable = true)
 |-- GenHlth: double (nullable = true)
 |-- MentHlth: double (nullable = true)
 |-- PhysHlth: double (nullable = true)
 |-- DiffWalk: double (nullable = true)
 |-- Sex: double (nullable = true)
 |-- Age: double (nullable = true)
 |-- Education: double (nullable = true)
 |-- Income: double (nullable = true)



In [21]:
int_cols = get_features()
int_cols.remove("BMI")

In [None]:
df = cast_features_and_target(
    df, int_cols=int_cols, double_cols=["BMI"], target_col=target_col
)

In [23]:
df.printSchema()

root
 |-- Diabetes_binary: integer (nullable = true)
 |-- HighBP: integer (nullable = true)
 |-- HighChol: integer (nullable = true)
 |-- CholCheck: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Smoker: integer (nullable = true)
 |-- Stroke: integer (nullable = true)
 |-- HeartDiseaseorAttack: integer (nullable = true)
 |-- PhysActivity: integer (nullable = true)
 |-- Fruits: integer (nullable = true)
 |-- Veggies: integer (nullable = true)
 |-- HvyAlcoholConsump: integer (nullable = true)
 |-- AnyHealthcare: integer (nullable = true)
 |-- NoDocbcCost: integer (nullable = true)
 |-- GenHlth: integer (nullable = true)
 |-- MentHlth: integer (nullable = true)
 |-- PhysHlth: integer (nullable = true)
 |-- DiffWalk: integer (nullable = true)
 |-- Sex: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Education: integer (nullable = true)
 |-- Income: integer (nullable = true)



In [24]:
df.select(target_col).distinct().show()

+---------------+
|Diabetes_binary|
+---------------+
|              1|
|              0|
+---------------+



In [25]:
df.show(10)
print(f"Count: {df.count()}")

+---------------+------+--------+---------+----+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+---+---------+------+
|Diabetes_binary|HighBP|HighChol|CholCheck| BMI|Smoker|Stroke|HeartDiseaseorAttack|PhysActivity|Fruits|Veggies|HvyAlcoholConsump|AnyHealthcare|NoDocbcCost|GenHlth|MentHlth|PhysHlth|DiffWalk|Sex|Age|Education|Income|
+---------------+------+--------+---------+----+------+------+--------------------+------------+------+-------+-----------------+-------------+-----------+-------+--------+--------+--------+---+---+---------+------+
|              0|     0|       0|        1|21.0|     0|     0|                   0|           0|     0|      1|                0|            1|          0|      1|      21|      21|       0|  1|  3|        5|     5|
|              0|     0|       0|        1|25.0|     0|     0|                   0|           0|     1|      0|                0|       

In [26]:
df.select([
  spark_sum(col(c).isNull().cast("int")).alias(f"null_count_{c}")
  for c in df.columns
]).show()

+--------------------------+-----------------+-------------------+--------------------+--------------+-----------------+-----------------+-------------------------------+-----------------------+-----------------+------------------+----------------------------+------------------------+----------------------+------------------+-------------------+-------------------+-------------------+--------------+--------------+--------------------+-----------------+
|null_count_Diabetes_binary|null_count_HighBP|null_count_HighChol|null_count_CholCheck|null_count_BMI|null_count_Smoker|null_count_Stroke|null_count_HeartDiseaseorAttack|null_count_PhysActivity|null_count_Fruits|null_count_Veggies|null_count_HvyAlcoholConsump|null_count_AnyHealthcare|null_count_NoDocbcCost|null_count_GenHlth|null_count_MentHlth|null_count_PhysHlth|null_count_DiffWalk|null_count_Sex|null_count_Age|null_count_Education|null_count_Income|
+--------------------------+-----------------+-------------------+--------------------

## Feature preparation

## Models

In [27]:
models = build_models(target_col, feature_cols)
models.keys()

dict_keys(['LogisticRegression', 'RandomForest', 'GBT'])

## Best Model Search

In [28]:
best_config = train_and_select_best_model(df, models, target_col)

Evaluating for LogisticRegression
--------------------------------------------------
F1: 0.8278

Current best model: LogisticRegression (F1 = 0.8278)
Evaluating for RandomForest
--------------------------------------------------
F1: 0.8233

Current best model: LogisticRegression (F1 = 0.8278)
Evaluating for GBT
--------------------------------------------------
F1: 0.8338

Current best model: GBT (F1 = 0.8338)


In [29]:
model = train_model(df, best_config)

In [30]:
save_model(model, "trained_model")