In [15]:
# Hyperparameter tuning and optimization
import optuna
from ray.tune.search.optuna import OptunaSearch
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler
from ray.tune.integration.pytorch_lightning import TuneReportCallback, TuneReportCheckpointCallback

# PyTorch Lightning and callbacks
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import EarlyStopping, Callback, ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger

# Metrics
from torchmetrics import MeanAbsoluteError, MeanAbsolutePercentageError, MetricCollection

# Darts (Time series forecasting)
from darts import TimeSeries
from darts.dataprocessing.transformers import Scaler
from darts.models import DLinearModel, LightGBMModel, BlockRNNModel, TiDEModel

# Data handling and preprocessing
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
import tensorboard

# System utilities
import os

Load Data / Spilt Data

In [None]:
# 步骤1: 加载CSV文件
df = pd.read_csv('../DataSet/EDvisitfileL.csv', encoding='ISO-8859-1')

# 确保'date'列是DateTime类型
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)

# 分割数据集为训练集、验证集和测试集（假设您已经根据时间排序）
train_end = 3362            #L: 3362, T:3372, Ka:3208, Ke:3274, Y:2557, C: 3237
val_end = 3727              #L: 3727, T:3737, Ka:3573, Ke:3639, Y:2622, C: 3602

# Split the DataFrame
train_df = df.iloc[:train_end]
val_df = df.iloc[train_end:val_end]
test_df = df.iloc[val_end:]

# 步骤2: 使用MinMaxScaler缩放数据
# 定义并拟合scaler
scaler = MinMaxScaler()
scaler.fit(train_df[['No']])  # 只用训练数据拟合scaler

# 缩放训练集和验证集
train_df.loc[:, 'No_scaled'] = scaler.transform(train_df[['No']])
val_df.loc[:, 'No_scaled'] = scaler.transform(val_df[['No']])
test_df.loc[:, 'No_scaled'] = scaler.transform(test_df[['No']])  # 用相同的scaler转换测试集以避免数据泄露

# 转换为TimeSeries对象
train_series = TimeSeries.from_dataframe(train_df, value_cols='No_scaled')
val_series = TimeSeries.from_dataframe(val_df, value_cols='No_scaled')
test_series = TimeSeries.from_dataframe(test_df, value_cols='No_scaled')

# 原始数据转换为TimeSeries对象，如果需要
train_series_origin = TimeSeries.from_dataframe(train_df, value_cols='No')
val_series_origin = TimeSeries.from_dataframe(val_df, value_cols='No')
test_series_origin = TimeSeries.from_dataframe(test_df, value_cols='No')

# 选择需要的列创建多变量时间序列(都是one hot coding)
columns = ['Dayoff', 'Mon', 'Tue', 'Wed', 'Thr', 'Fri', 'Sat', 'Sun',  'YearScaled',
           'MonthScaled', 'Dayscaled', 'NewYear', '3Lock', 'Outbreak','COVID19']
df_multivariate = df[columns]

# 将DataFrame转换为多变量时间序列
ED_covariates = TimeSeries.from_dataframe(df_multivariate)

Hypertuning

In [43]:
class LossLoggingCallback(Callback):
    def __init__(self):
        super().__init__()
        self.val_losses = []  # To store validation losses
        self.train_losses = []  # To store training losses

    def on_validation_epoch_end(self, trainer, pl_module):
        val_loss = trainer.callback_metrics["val_loss"].item()
        self.val_losses.append(val_loss)
        print(f"Epoch {trainer.current_epoch}: val_loss={val_loss}")
        # Updated report call
        train.report({"loss": val_loss})  # Report the validation loss to Ray Train

    def on_train_epoch_end(self, trainer, pl_module, unused=None):
        if "train_loss" in trainer.callback_metrics:
            train_loss = trainer.callback_metrics["train_loss"].item()
            self.train_losses.append(train_loss)
            print(f"Epoch {trainer.current_epoch}: train_loss={train_loss}")
loss_logging_callback = LossLoggingCallback()
    

In [51]:
# Create the model using model_args from Ray Tune
def train_model(model_args, callbacks, train, val):
    torch_metrics = MetricCollection([MeanAbsolutePercentageError(), MeanAbsoluteError()])
    
    # Customize the ModelCheckpoint callback
    model_checkpoint_callback = ModelCheckpoint(
        dirpath="checkpoints",
        filename="{epoch}-{val_loss:.2f}",
        every_n_epochs=5,
    )
    
    model = LightGBMModel(
        output_chunk_length=30,
        lags=7,
        lags_past_covariates=model_args['lags_past_covariates'],
        verbose=-1,    
        )

    model.fit(
    series=[train_series],
    past_covariates=[ED_covariates],
    val_series=[val_series],
    val_past_covariates=[ED_covariates]
    )

In [52]:
# set up ray tune callback
config = {
    'lags_past_covariates': tune.randint(1,30),
}

# earlystopping
my_stopper = EarlyStopping(
    monitor="loss",
    patience=5,
    min_delta=0.001,
    mode='min',
)


tune_callback = TuneReportCheckpointCallback(
    {
        "loss": "val_loss",
    },
    on="validation_end",
)



reporter = CLIReporter(
    parameter_columns=list(config.keys()),
    metric_columns=["loss", "MAPE", "training_iteration"],
)

optuna_search = OptunaSearch(metric="loss",mode="min")

In [None]:
# Run Ray Tune, optimize hyperparameters by minimizing the MAPE on the validation set
num_samples = 30

scheduler = ASHAScheduler(max_t=1000, grace_period=5, reduction_factor=2)

train_fn_with_parameters = tune.with_parameters(
    train_model, callbacks=[my_stopper, tune_callback], train=train_series, val=val_series,
)

analysis = tune.run(
    train_fn_with_parameters,
    #resources_per_trial=resources_per_trial,
    metric="loss",  # any value in TuneReportCallback.
    mode="min",
    config=config,
    num_samples=num_samples,
    search_alg=optuna_search,
    scheduler=scheduler,
    progress_reporter=reporter,
    trial_dirname_creator=lambda trial: str(trial),
    name="tune_darts",
)

print("Best hyperparameters found were: ", analysis.best_config)