In [1]:
import pandas as pd
import os
import itertools
from pyspark.sql import SparkSession
import time
from lightgbm import LGBMClassifier
import lightgbm as lgb
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, classification_report
import uuid
# from spark_helper.core import create_spark_session
import psutil
import gc
import sys


# from code_monitor.instrument_code import SystemMonitor


In [2]:
DATA_DIR = "/home/jovyan/data"
RESULTS_DIR = "/home/jovyan/results/classification"

In [3]:
# clean up the results directory
if os.path.exists(RESULTS_DIR):
    for item in os.listdir(RESULTS_DIR):
        item_path = os.path.join(RESULTS_DIR, item)
        if os.path.isfile(item_path):
            os.remove(item_path)
        elif os.path.isdir(item_path):
            import shutil
            shutil.rmtree(item_path)


In [4]:
# .config(("spark.driver.maxResultSize", "4g")) \
# Create a Spark session
spark = SparkSession.builder \
    .appName("SparkLightgbmClassification") \
    .master(os.environ.get("SPARK_MASTER", "spark://spark-master:7077")) \
    .config("spark.executor.memory", "1g") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.driver.maxResultSize", "1g") \
    .config("spark.python.worker.faulthandler.enabled", "true") \
    .config("spark.sql.execution.pyspark.udf.faulthandler.enabled", "true") \
    .config("spark.submit.pyFiles", "code_monitor.zip") \
    .getOrCreate()
# spark = create_spark_session("spark_cluster.yaml")

# get spark context
sc = spark.sparkContext

print(f"Spark version: {spark.version}")
print(f"Spark UI available at: {spark.sparkContext.uiWebUrl}")

Spark version: 4.0.0-preview2
Spark UI available at: http://83943c36d3ec:4040


In [5]:
# get unique fold ids from the spark DataFrame
fold_ids = [1, 2, 3, 4]


fold_ids

[1, 2, 3, 4]

In [6]:
def train_lightgbm_model(params=None):
    """
    Train a LightGBM classification model with the specified parameters.
    
    Parameters:
    -----------
    params : dict, optional
        Parameters for LGBMClassifier. If None, default parameters will be used.
        
    Returns:
    --------
    str
        File path where results are stored
    """
    from code_monitor.instrument_code import SystemMonitor
    
    print(f"Training LightGBM classification model...{params}")

    monitor = SystemMonitor()

    memory_before_read_mb = monitor.get_current_rss()

    # Default parameters if none provided
    if params is None:
        raise ValueError("No parameters provided for LightGBM model.")
    
    fold_id = params.pop("fold_id")

    # record time to determine how long it takes to read the data
    start_time = time.perf_counter()
    test_df = pd.read_parquet(
        os.path.join(DATA_DIR, f"ts_fold_{fold_id}_test.parquet"),
    )
    train_df = pd.read_parquet(
        os.path.join(DATA_DIR, f"ts_fold_{fold_id}_train.parquet"),
    )
    end_time = time.perf_counter()
    train_test_read_time_sec = end_time - start_time

    train_df_mb = train_df.memory_usage(deep=True, index=True).sum() / (1024 * 1024)  # in MB
    test_df_mb = test_df.memory_usage(deep=True, index=True).sum() / (1024 * 1024)  # in MB

    train_x = train_df.drop(columns=["target", "date"])
    train_y = train_df["target"]
    test_x = test_df.drop(columns=["target", "date"])
    test_y = test_df["target"]

    # get memory usage after reading the data
    memory_after_read_mb = monitor.get_current_rss()

    # Initialize the classification model
    model = LGBMClassifier(verbose=-1, **params)
    
    # Train the model and measure time
    before_cpu_snapshot = monitor.snapshot_cpu()
    start_time = time.perf_counter()
    model.fit(
        train_x, train_y,
        eval_set=[(test_x, test_y)],
        eval_metric='logloss',
        callbacks=[
            lgb.early_stopping(stopping_rounds=500),
            lgb.log_evaluation(period=100),
        ]
    )
    after_cpu_snapshot = monitor.snapshot_cpu()
    end_time = time.perf_counter()
    training_time = end_time - start_time
    fit_cpu_utilization = monitor.compute_cpu_usage(before_cpu_snapshot, after_cpu_snapshot)

    # Log memory usage after training
    memory_after_training_mb = monitor.get_current_rss()
    
    # Make predictions
    y_pred_proba = model.predict_proba(test_x)[:, 1]  # Probability of positive class
    y_pred = model.predict(test_x)  # Class predictions
    
    # Calculate classification metrics
    accuracy = accuracy_score(test_y, y_pred)
    precision = precision_score(test_y, y_pred)
    recall = recall_score(test_y, y_pred)
    f1 = f1_score(test_y, y_pred)
    auc = roc_auc_score(test_y, y_pred_proba)

    # Store detailed classification report as string
    class_report = classification_report(test_y, y_pred)
    print(f"\nClassification Report for fold {fold_id}:")
    print(class_report)

    results = {
        "fold_id": fold_id,
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'auc_roc': auc,
        'training_time': training_time,
    }
    results.update(params)

    # generate uuid string for the results file name
    fp_id = str(uuid.uuid4())
    fp_name = os.path.join(RESULTS_DIR, f"results_fold_{fold_id}_{fp_id}.parquet")

    results_df = pd.DataFrame([results])
    results_df.to_parquet(
        fp_name,
        index=False,
    )

    gc.collect()  # Force garbage collection to free up memory

    return {"fp_name": fp_name, 
            "fold_id": fold_id,
            "train_df_mb": train_df_mb,
            "test_df_mb": test_df_mb,
            "train_test_read_time_sec": train_test_read_time_sec,
            "memory_before_read_mb": memory_before_read_mb,
            "memory_after_read_mb": memory_after_read_mb,
            "memory_after_training_mb": memory_after_training_mb,
            "fit_cpu_utilization": fit_cpu_utilization,
            "training_time": training_time,
            }

In [7]:
# Define the parameter grid for hyperparameter tuning

param_grid = {
    "n_jobs": [5],
    "fold_id": fold_ids,
    "lambda_l1": [0.3, 12, 40],
    "max_depth": [10, 15, 17],
    "colsample_bytree": [0.1, 0.3, 0.4],
    "alpha": [0.2],
    "num_leaves": [2048,],
    "learning_rate": [0.03],
    "lambda_l2": [0.01, 0.1],
    "max_bin": [256,],
    "bagging_fraction": [1],
    "deterministic": [False],
    "objective": ["huber"],
    "metric": ["huber"],
    "n_estimators": [20000],
    "random_state": [42],
    "importance_type": ["gain"],
}


# Generate all combinations of parameters
param_keys = list(param_grid.keys())
param_values = list(param_grid.values())
param_combinations = list(itertools.product(*param_values))

# Create a list of dictionaries, each representing a specific combination
param_dicts = []
for combo in param_combinations:
    param_dict = dict(zip(param_keys, combo))
    param_dicts.append(param_dict)

# Display the number of combinations and the first few combinations
print(f"Total number of parameter combinations: {len(param_dicts)}")
print("\nFirst 3 parameter combinations:")
for i in range(min(3, len(param_dicts))):
    print(f"Combination {i+1}:")
    print(param_dicts[i])


Total number of parameter combinations: 216

First 3 parameter combinations:
Combination 1:
{'n_jobs': 5, 'fold_id': 1, 'lambda_l1': 0.3, 'max_depth': 10, 'colsample_bytree': 0.1, 'alpha': 0.2, 'num_leaves': 2048, 'learning_rate': 0.03, 'lambda_l2': 0.01, 'max_bin': 256, 'bagging_fraction': 1, 'deterministic': False, 'objective': 'huber', 'metric': 'huber', 'n_estimators': 20000, 'random_state': 42, 'importance_type': 'gain'}
Combination 2:
{'n_jobs': 5, 'fold_id': 1, 'lambda_l1': 0.3, 'max_depth': 10, 'colsample_bytree': 0.1, 'alpha': 0.2, 'num_leaves': 2048, 'learning_rate': 0.03, 'lambda_l2': 0.1, 'max_bin': 256, 'bagging_fraction': 1, 'deterministic': False, 'objective': 'huber', 'metric': 'huber', 'n_estimators': 20000, 'random_state': 42, 'importance_type': 'gain'}
Combination 3:
{'n_jobs': 5, 'fold_id': 1, 'lambda_l1': 0.3, 'max_depth': 10, 'colsample_bytree': 0.3, 'alpha': 0.2, 'num_leaves': 2048, 'learning_rate': 0.03, 'lambda_l2': 0.01, 'max_bin': 256, 'bagging_fraction': 1, 

In [8]:
%%time
process_these = param_dicts[:10] + param_dicts[54:64] + param_dicts[108:118] + param_dicts[162:172] # Limit to the first 10 combinations for testing

rdd = sc.parallelize(process_these, numSlices=len(process_these))

rdd_result = rdd.map(lambda x: train_lightgbm_model(params=x)).collect()

print(f">>>>length of rdd_result: {len(rdd_result)}")


>>>>length of rdd_result: 40
CPU times: user 143 ms, sys: 16 ms, total: 159 ms
Wall time: 24min 4s


In [9]:
# pandas display of results with two decimal places and a comma as a thousands separator
pd.set_option('display.float_format', '{:,.2f}'.format)

# Convert the results to a DataFrame and summarize
df_results = pd.DataFrame(rdd_result)
print(df_results[[
    "fold_id", "memory_before_read_mb", "memory_after_read_mb", "memory_after_training_mb", "train_df_mb", "test_df_mb", "train_test_read_time_sec", "training_time", "fit_cpu_utilization",
    ]]\
    .groupby("fold_id")\
    .describe(percentiles=[0.5]).T) 

fold_id                               1        2        3        4
memory_before_read_mb    count    10.00    10.00    10.00    10.00
                         mean  1,081.07 1,915.50 2,304.24 2,593.62
                         std     719.16   221.70   232.62   166.41
                         min     249.52 1,609.78 1,956.88 2,348.37
                         50%   1,510.63 1,914.98 2,312.95 2,602.29
                         max   1,721.27 2,201.20 2,707.03 2,824.23
memory_after_read_mb     count    10.00    10.00    10.00    10.00
                         mean  1,288.56 2,135.42 2,602.55 2,861.38
                         std     575.26   195.86   174.54   142.97
                         min     625.23 1,836.91 2,367.89 2,695.29
                         50%   1,607.77 2,141.72 2,595.18 2,861.12
                         max   1,813.31 2,431.98 2,891.32 3,118.19
memory_after_training_mb count    10.00    10.00    10.00    10.00
                         mean  1,782.47 2,244.90 2,651.00 2,93

In [10]:
df_list = [pd.read_parquet(d["fp_name"]) for d in rdd_result]
results_df = pd.concat(df_list, ignore_index=True)

results_df.head()

Unnamed: 0,fold_id,accuracy,precision,recall,f1_score,auc_roc,training_time,n_jobs,lambda_l1,max_depth,...,learning_rate,lambda_l2,max_bin,bagging_fraction,deterministic,objective,metric,n_estimators,random_state,importance_type
0,1,0.93,0.95,0.87,0.91,0.96,60.19,5,0.3,10,...,0.03,0.01,256,1,False,huber,huber,20000,42,gain
1,1,0.93,0.95,0.88,0.91,0.96,30.39,5,0.3,10,...,0.03,0.1,256,1,False,huber,huber,20000,42,gain
2,1,0.96,0.96,0.93,0.95,0.97,58.5,5,0.3,10,...,0.03,0.01,256,1,False,huber,huber,20000,42,gain
3,1,0.96,0.96,0.93,0.94,0.97,42.26,5,0.3,10,...,0.03,0.1,256,1,False,huber,huber,20000,42,gain
4,1,0.96,0.96,0.93,0.95,0.97,73.39,5,0.3,10,...,0.03,0.01,256,1,False,huber,huber,20000,42,gain


In [11]:
results_df[["training_time", "accuracy", "precision", "recall", "f1_score", "auc_roc"]].describe()

Unnamed: 0,training_time,accuracy,precision,recall,f1_score,auc_roc
count,40.0,40.0,40.0,40.0,40.0,40.0
mean,134.63,0.94,0.96,0.89,0.92,0.97
std,84.51,0.02,0.01,0.04,0.02,0.0
min,30.39,0.91,0.93,0.82,0.88,0.96
25%,70.66,0.93,0.95,0.85,0.9,0.97
50%,109.79,0.94,0.96,0.91,0.93,0.97
75%,181.13,0.96,0.97,0.93,0.95,0.97
max,455.67,0.96,0.97,0.94,0.95,0.97


In [12]:
print(df_results)

                                              fp_name  fold_id  train_df_mb  \
0   /home/jovyan/results/classification/results_fo...        1        46.69   
1   /home/jovyan/results/classification/results_fo...        1        46.69   
2   /home/jovyan/results/classification/results_fo...        1        46.69   
3   /home/jovyan/results/classification/results_fo...        1        46.69   
4   /home/jovyan/results/classification/results_fo...        1        46.69   
5   /home/jovyan/results/classification/results_fo...        1        46.69   
6   /home/jovyan/results/classification/results_fo...        1        46.69   
7   /home/jovyan/results/classification/results_fo...        1        46.69   
8   /home/jovyan/results/classification/results_fo...        1        46.69   
9   /home/jovyan/results/classification/results_fo...        1        46.69   
10  /home/jovyan/results/classification/results_fo...        2        93.38   
11  /home/jovyan/results/classification/results_fo..

In [13]:
spark.stop()