# 1.Data Preprocessing

Loading and Initial Cleaning

In [1]:
import pandas as pd
import os


dataset_base_path = 'C:/Users/sahar/Documents/sut/lpd/project/Dataset_Li-ion' 

# Define the lists of train and test files
train_files = [
    '0degC/589_Mixed1', '0degC/589_Mixed2', '0degC/590_Mixed4',
    '0degC/590_Mixed5', '0degC/590_Mixed6', '0degC/590_Mixed8',
    '10degC/567_Mixed1', '10degC/567_Mixed2', '10degC/571_Mixed4',
    '10degC/571_Mixed5', '10degC/571_Mixed6', '10degC/571_Mixed8',
    '25degC/551_Mixed1', '25degC/551_Mixed2',
    '25degC/552_Mixed4', '25degC/552_Mixed5', '25degC/552_Mixed6',
    '25degC/552_Mixed8',
]
test_files = [
    '0degC/589_LA92', '0degC/589_UDDS', '0degC/589_US06', '0degC/590_Mixed7',
    '10degC/576_UDDS', '10degC/567_US06', '10degC/571_Mixed7',
    '25degC/551_LA92', '25degC/551_UDDS', '25degC/551_US06', '25degC/552_Mixed7',
]

# Define common column names
column_names = [
    'Time Stamp','Step','Status','Prog Time','Step Time','Cycle',
    'Cycle Level','Procedure','Voltage','Current','Temperature','Capacity','WhAccu','Cnt','Empty'
]

# --- Function to Load and Preprocess a Single File ---
def load_and_preprocess_file(file_path):
    """
    Loads a single battery data file, applies column names, filters by status,
    and calculates SoC.
    """
    try:
        # Read the CSV, skipping the header rows
        df = pd.read_csv(file_path, skiprows=30)
        
        # Assign the predefined column names
        df.columns = column_names
        
        # Remove the 'Empty' column if it exists
        if 'Empty' in df.columns:
            df = df.drop(columns=['Empty'])
            
        # Filter rows based on 'Status' column
        df = df[(df["Status"] == "TABLE") | (df["Status"] == "DCH")]
        
        # Calculate SoC Capacity and SoC Percentage
        
        # Ensure 'Capacity' column is numeric, convert if necessary
        df['Capacity'] = pd.to_numeric(df['Capacity'], errors='coerce')
        df.dropna(subset=['Capacity'], inplace=True) # Drop rows where Capacity conversion failed
        
        if not df.empty:
            max_discharge = abs(df["Capacity"].min()) # Get maximum discharge from this file
            df["SoC Capacity"] = max_discharge + df["Capacity"]
            # Ensure no division by zero if max_discharge is 0
            df["SoC Percentage"] = df["SoC Capacity"] / df["SoC Capacity"].max() if df["SoC Capacity"].max() != 0 else 0
            df["SoC Percentage"] = df["SoC Percentage"].clip(lower=0, upper=1) # Ensure SoC is between 0 and 1
        else:
            print(f"Warning: No valid data after filtering for {file_path}. Skipping SoC calculation.")
            df["SoC Capacity"] = 0
            df["SoC Percentage"] = 0
            
        return df

    except FileNotFoundError:
        print(f"Error: File not found at {file_path}. Please check your base path and file lists.")
        return pd.DataFrame() # Return empty DataFrame on error
    except Exception as e:
        print(f"An error occurred while processing {file_path}: {e}")
        return pd.DataFrame() # Return empty DataFrame on error

# --- Load and Preprocess Training Data ---
print("Loading and preprocessing training data...")
train_dataframes = []
for file_name in train_files:
    full_path = os.path.join(dataset_base_path, file_name + '.csv') # Append .csv
    print(f"Processing training file: {full_path}")
    df = load_and_preprocess_file(full_path)
    if not df.empty:
        df['Original_File'] = file_name # Keep track of original file
        train_dataframes.append(df)

if train_dataframes:
    train_df = pd.concat(train_dataframes, ignore_index=True)
    print("\nTraining data loaded and concatenated successfully!")
    print(f"Total training data shape: {train_df.shape}")
    print("\nFirst 5 rows of concatenated training DataFrame:")
    print(train_df.head())
    print("\nTraining DataFrame Info:")
    train_df.info()
else:
    train_df = pd.DataFrame()
    print("\nNo training data loaded. Check paths and file lists.")

# --- Load and Preprocess Testing Data ---
print("\nLoading and preprocessing testing data...")
test_dataframes = []
for file_name in test_files:
    full_path = os.path.join(dataset_base_path, file_name + '.csv') # Append .csv
    print(f"Processing testing file: {full_path}")
    df = load_and_preprocess_file(full_path)
    if not df.empty:
        df['Original_File'] = file_name # Keep track of original file
        test_dataframes.append(df)

if test_dataframes:
    test_df = pd.concat(test_dataframes, ignore_index=True)
    print("\nTesting data loaded and concatenated successfully!")
    print(f"Total testing data shape: {test_df.shape}")
    print("\nFirst 5 rows of concatenated testing DataFrame:")
    print(test_df.head())
    print("\nTesting DataFrame Info:")
    test_df.info()
else:
    test_df = pd.DataFrame()
    print("\nNo testing data loaded. Check paths and file lists.")

Loading and preprocessing training data...
Processing training file: C:/Users/sahar/Documents/sut/lpd/project/Dataset_Li-ion\0degC/589_Mixed1.csv
Processing training file: C:/Users/sahar/Documents/sut/lpd/project/Dataset_Li-ion\0degC/589_Mixed2.csv
Processing training file: C:/Users/sahar/Documents/sut/lpd/project/Dataset_Li-ion\0degC/590_Mixed4.csv
Processing training file: C:/Users/sahar/Documents/sut/lpd/project/Dataset_Li-ion\0degC/590_Mixed5.csv
Processing training file: C:/Users/sahar/Documents/sut/lpd/project/Dataset_Li-ion\0degC/590_Mixed6.csv
Processing training file: C:/Users/sahar/Documents/sut/lpd/project/Dataset_Li-ion\0degC/590_Mixed8.csv
Processing training file: C:/Users/sahar/Documents/sut/lpd/project/Dataset_Li-ion\10degC/567_Mixed1.csv
Processing training file: C:/Users/sahar/Documents/sut/lpd/project/Dataset_Li-ion\10degC/567_Mixed2.csv
Processing training file: C:/Users/sahar/Documents/sut/lpd/project/Dataset_Li-ion\10degC/571_Mixed4.csv
Processing training file: C

 Feature Engineering & Normalization

In [2]:
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt 


# --- Feature Engineering ---

print("Starting Feature Engineering...")

# Define a rolling window size for average calculations.
# The paper doesn't specify a window size, so we'll start with a reasonable value.
rolling_window_size = 30 # Example: 30 data points 

def apply_feature_engineering(df):
    """Applies feature engineering steps to a DataFrame."""
    if df.empty:
        return df

    # Ensure Voltage and Current are numeric
    df['Voltage'] = pd.to_numeric(df['Voltage'], errors='coerce')
    df['Current'] = pd.to_numeric(df['Current'], errors='coerce')
    
    # Drop rows where Voltage or Current became NaN due to coercion
    df.dropna(subset=['Voltage', 'Current'], inplace=True)

    if df.empty: # Check again after dropping NaNs
        return df
        
    # Calculate Average Voltage (V_avg) using a rolling mean
    # We apply this per original file to avoid averaging across different cycles/files
    df['V_avg'] = df.groupby('Original_File')['Voltage'].transform(lambda x: x.rolling(window=rolling_window_size, min_periods=1).mean())

    # Calculate Average Current (I_avg) using a rolling mean
    df['I_avg'] = df.groupby('Original_File')['Current'].transform(lambda x: x.rolling(window=rolling_window_size, min_periods=1).mean())

    # Calculate Power (Watts) = Voltage * Current
    df['Power'] = df['Voltage'] * df['Current']

    print(f"  Features engineered for DataFrame. New columns: V_avg, I_avg, Power.")
    return df

train_df = apply_feature_engineering(train_df.copy())
test_df = apply_feature_engineering(test_df.copy())

# --- Select Features for Deep Learning Model ---
# Based on the paper's input parameters: Current, Voltage, Temperature, Average Voltage, Average Current
# We also calculated Power, which can be a valuable feature.
# We will use 'SoC Percentage' as our target (y).

# The features list
features = ['Current', 'Voltage', 'Temperature', 'V_avg', 'I_avg', 'Power']
target = 'SoC Percentage'

# Check if all features exist in the DataFrames after engineering
missing_train_features = [f for f in features if f not in train_df.columns]
missing_test_features = [f for f in features if f not in test_df.columns]

if missing_train_features:
    print(f"Warning: Missing features in train_df: {missing_train_features}. Please check feature engineering.")
if missing_test_features:
    print(f"Warning: Missing features in test_df: {missing_test_features}. Please check feature engineering.")

# --- Normalization (Min-Max Scaling) ---
# We fit the scaler ONLY on the training data to prevent data leakage from the test set.
print("\nStarting Normalization (Min-Max Scaling)...")

# Initialize the MinMaxScaler
scaler = MinMaxScaler()

# Fit the scaler on the training data features and transform them
# We convert to numpy arrays because MinMaxScaler works well with them
X_train_scaled = scaler.fit_transform(train_df[features].values)
y_train = train_df[target].values

# Transform the test data features using the scaler fitted on training data
X_test_scaled = scaler.transform(test_df[features].values)
y_test = test_df[target].values

print(f"  Training features scaled. Shape: {X_train_scaled.shape}")
print(f"  Testing features scaled. Shape: {X_test_scaled.shape}")
print(f"  Training target shape: {y_train.shape}")
print(f"  Testing target shape: {y_test.shape}")

print("\nFirst 5 rows of scaled training features (X_train_scaled):")
print(X_train_scaled[:5])
print("\nFirst 5 values of training target (y_train):")
print(y_train[:5])

print("\nFeature Engineering and Normalization complete!")

Starting Feature Engineering...
  Features engineered for DataFrame. New columns: V_avg, I_avg, Power.
  Features engineered for DataFrame. New columns: V_avg, I_avg, Power.

Starting Normalization (Min-Max Scaling)...
  Training features scaled. Shape: (1222903, 6)
  Testing features scaled. Shape: (882077, 6)
  Training target shape: (1222903,)
  Testing target shape: (882077,)

First 5 rows of scaled training features (X_train_scaled):
[[0.74769365 0.96123602 0.00387602 0.98822162 0.72474142 0.66467244]
 [0.74716384 0.96018574 0.00387602 0.98767909 0.72445214 0.66397179]
 [0.74695183 0.95937038 0.00387602 0.98721746 0.72427854 0.66369227]
 [0.74695183 0.95878996 0.00387602 0.98683674 0.72419174 0.66369332]
 [0.74695183 0.95843756 0.00387602 0.98653549 0.72413966 0.66369395]]

First 5 values of training target (y_train):
[1.         0.99999574 0.99999574 0.99999574 0.99999574]

Feature Engineering and Normalization complete!


Sequence Creation

In [3]:
import numpy as np

# --- Configuration for Sequence Creation ---
# Define the sequence length (number of time steps the model looks back)
sequence_length = 60

print(f"Starting Sequence Creation with sequence_length = {sequence_length}...")

def create_sequences(X, y, seq_length):
    """
    Creates sequences from input features (X) and target (y) for RNN models.
    Each input sequence will have 'seq_length' time steps.
    The target (y) for a given sequence is typically the value at the end of that sequence.
    """
    xs, ys = [], []
    # Iterate through the data, creating sequences
    for i in range(len(X) - seq_length):
        # The input sequence is X from current position 'i' up to 'i + seq_length'
        x_seq = X[i:(i + seq_length)]
        # The target for this sequence is the y value at the end of the sequence
        y_val = y[i + seq_length]
        
        xs.append(x_seq)
        ys.append(y_val)
    
    # Convert lists to NumPy arrays
    # X_sequences will have shape (num_samples, seq_length, num_features)
    # y_sequences will have shape (num_samples, seq_length, num_features)
    return np.array(xs), np.array(ys)

# Create sequences for training data
X_train_sequences, y_train_sequences = create_sequences(X_train_scaled, y_train, sequence_length)

# Create sequences for testing data
X_test_sequences, y_test_sequences = create_sequences(X_test_scaled, y_test, sequence_length)

print(f"  Training sequences created. X_train_sequences shape: {X_train_sequences.shape}")
print(f"  Training target sequences created. y_train_sequences shape: {y_train_sequences.shape}")
print(f"  Testing sequences created. X_test_sequences shape: {X_test_sequences.shape}")
print(f"  Testing target sequences created. y_test_sequences shape: {y_test_sequences.shape}")

print("\nExample of the first training input sequence (X_train_sequences[0]):")
print(X_train_sequences[0])
print("\nExample of the first training target value (y_train_sequences[0]):")
print(y_train_sequences[0])

print("\nSequence Creation complete!")

Starting Sequence Creation with sequence_length = 60...
  Training sequences created. X_train_sequences shape: (1222843, 60, 6)
  Training target sequences created. y_train_sequences shape: (1222843,)
  Testing sequences created. X_test_sequences shape: (882017, 60, 6)
  Testing target sequences created. y_test_sequences shape: (882017,)

Example of the first training input sequence (X_train_sequences[0]):
[[0.74769365 0.96123602 0.00387602 0.98822162 0.72474142 0.66467244]
 [0.74716384 0.96018574 0.00387602 0.98767909 0.72445214 0.66397179]
 [0.74695183 0.95937038 0.00387602 0.98721746 0.72427854 0.66369227]
 [0.74695183 0.95878996 0.00387602 0.98683674 0.72419174 0.66369332]
 [0.74695183 0.95843756 0.00387602 0.98653549 0.72413966 0.66369395]
 [0.74684562 0.9579746  0.00387602 0.98625495 0.72408561 0.66355413]
 [0.74684562 0.95773967 0.00387602 0.98601988 0.72404701 0.66355456]
 [0.74695183 0.95750473 0.00387602 0.98581325 0.72403255 0.66369563]
 [0.74695183 0.9572698  0.00387602 0.9

# 2. Deep Learning Model Implementation.

Building the Models

In [4]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, GRU, Bidirectional, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import RootMeanSquaredError


# --- Configuration ---
# Get the shape of input features from our sequences
# X_train_sequences shape: (num_samples, sequence_length, num_features)
sequence_length = X_train_sequences.shape[1] # sequence_length
num_features = X_train_sequences.shape[2] # number of features 

# Hyperparameters for the models (initial values - these will be tuned later with Bayesian Optimization)
lstm_units = 100    # Number of units in the LSTM/GRU/BiLSTM layer
dense_units = 50    # Number of units in the subsequent Dense layer
dropout_rate = 0.2  # Dropout rate for regularization
learning_rate = 0.001 # Learning rate for the Adam optimizer

print(f"Input sequence length: {sequence_length}")
print(f"Number of input features: {num_features}")

# --- Build the LSTM Model ---
def build_lstm_model(seq_len, num_feat, lstm_u, dense_u, dropout_r, lr):
    model = Sequential([
        # Input Layer: LSTM layer expects input in (batch_size, sequence_length, num_features)
        # return_sequences=False by default for the last RNN layer, so it outputs only the last hidden state
        LSTM(units=lstm_u, activation='relu', input_shape=(seq_len, num_feat)),
        
        # Dropout layer for regularization (to prevent overfitting)
        Dropout(dropout_r),
        
        # Dense layer (fully connected layer)
        Dense(units=dense_u, activation='relu'),
        
        # Output layer: Single neuron with no activation for regression (predicting continuous SoC)
        Dense(units=1)
    ])
    
    # Compile the model
    # Optimizer: Adam is a popular choice
    optimizer = Adam(learning_rate=lr)
    # Loss Function: Mean Squared Error (MSE)
    loss_fn = 'mean_squared_error' 
    # Metrics: Root Mean Squared Error (RMSE) for evaluation, as mentioned in the paper
    metrics = [RootMeanSquaredError(name='rmse')]
    
    model.compile(optimizer=optimizer, loss=loss_fn, metrics=metrics)
    return model

print("\nBuilding LSTM Model...")
lstm_model = build_lstm_model(sequence_length, num_features, lstm_units, dense_units, dropout_rate, learning_rate)
lstm_model.summary() # Print a summary of the model's layers and parameters

# --- Build the GRU Model ---
def build_gru_model(seq_len, num_feat, gru_u, dense_u, dropout_r, lr):
    model = Sequential([
        GRU(units=gru_u, activation='relu', input_shape=(seq_len, num_feat)),
        Dropout(dropout_r),
        Dense(units=dense_u, activation='relu'),
        Dense(units=1)
    ])
    optimizer = Adam(learning_rate=lr)
    loss_fn = 'mean_squared_error'
    metrics = [RootMeanSquaredError(name='rmse')]
    model.compile(optimizer=optimizer, loss=loss_fn, metrics=metrics)
    return model

print("\nBuilding GRU Model...")
gru_model = build_gru_model(sequence_length, num_features, lstm_units, dense_units, dropout_rate, learning_rate) # Reusing lstm_units for GRU_u
gru_model.summary()

# --- Build the BiLSTM Model ---
def build_bilstm_model(seq_len, num_feat, bilstm_u, dense_u, dropout_r, lr):
    model = Sequential([
        # Bidirectional wrapper allows LSTM to process sequences in both forward and backward directions
        Bidirectional(LSTM(units=bilstm_u, activation='relu', return_sequences=False), input_shape=(seq_len, num_feat)),
        Dropout(dropout_r),
        Dense(units=dense_u, activation='relu'),
        Dense(units=1)
    ])
    optimizer = Adam(learning_rate=lr)
    loss_fn = 'mean_squared_error'
    metrics = [RootMeanSquaredError(name='rmse')]
    model.compile(optimizer=optimizer, loss=loss_fn, metrics=metrics)
    return model

print("\nBuilding BiLSTM Model...")
bilstm_model = build_bilstm_model(sequence_length, num_features, lstm_units, dense_units, dropout_rate, learning_rate) # Reusing lstm_units for BiLSTM_u
bilstm_model.summary()

print("\nDeep Learning Models (LSTM, GRU, BiLSTM) defined and compiled!")
print("These models are now ready for training.")

Input sequence length: 60
Number of input features: 6

Building LSTM Model...


  super().__init__(**kwargs)



Building GRU Model...



Building BiLSTM Model...


  super().__init__(**kwargs)



Deep Learning Models (LSTM, GRU, BiLSTM) defined and compiled!
These models are now ready for training.


# 3. Bayesian Optimization for Hyperparameter Tuning

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, GRU, Bidirectional, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import RootMeanSquaredError
from sklearn.model_selection import train_test_split # To create a validation set for tuning
import optuna
import numpy as np # Already imported, but good to keep track


print("Starting Bayesian Optimization for LSTM Model using Optuna...")

# --- Prepare Data for Hyperparameter Tuning ---
# Split the *training* data further into training and validation sets for Optuna.
# The test_size here refers to the proportion of the original training data that will be used for validation.
X_train_tune, X_val_tune, y_train_tune, y_val_tune = train_test_split(
    X_train_sequences, y_train_sequences, test_size=0.2, random_state=42
)

print(f"Original X_train_sequences shape: {X_train_sequences.shape}")
print(f"X_train_tune shape: {X_train_tune.shape}, y_train_tune shape: {y_train_tune.shape}")
print(f"X_val_tune shape: {X_val_tune.shape}, y_val_tune shape: {y_val_tune.shape}")


# --- Define the Objective Function for Optuna (for LSTM) ---
def objective_lstm(trial):
    # --- 1. Suggest Hyperparameters ---
    lstm_units = trial.suggest_categorical('lstm_units', [30, 50, 70, 100]) # Number of units in LSTM layer
    dense_units = trial.suggest_categorical('dense_units', [30, 50, 70, 100]) # Number of units in Dense layer
    dropout_rate = trial.suggest_float('dropout_rate', 0.1, 0.5) # Dropout regularization
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True) # Log-uniform distribution
    batch_size = trial.suggest_categorical('batch_size', [64, 128, 256]) # Batch size for training
    epochs = trial.suggest_int('epochs', 10, 30) # Number of training epochs (will use callbacks for early stopping)

    # --- 2. Build the Model with Suggested Hyperparameters ---
    model = Sequential([
        LSTM(units=lstm_units, activation='relu', input_shape=(sequence_length, num_features)),
        Dropout(dropout_rate),
        Dense(units=dense_units, activation='relu'),
        Dense(units=1) # Output layer for regression
    ])

    optimizer = Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss='mean_squared_error', metrics=[RootMeanSquaredError(name='rmse')])

    # --- 3. Train the Model ---
    # Use EarlyStopping to stop training if validation loss doesn't improve
    # This prevents overfitting and saves time during hyperparameter search
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_loss', patience=5, restore_best_weights=True
    )
    
    # Fit the model (train)
    history = model.fit(
        X_train_tune, y_train_tune,
        epochs=epochs,
        batch_size=batch_size,
        validation_data=(X_val_tune, y_val_tune),
        callbacks=[early_stopping],
        verbose=0 # Set to 0 to suppress verbose output during tuning
    )

    # --- 4. Evaluate and Return Performance ---
    # Get the best validation RMSE achieved during training (from early stopping)
    val_rmse = history.history['val_rmse'][-1] # Last value is from the best epoch
    
    # Pruning: If the trial is not promising, tell Optuna to stop it early.
    # This further speeds up the optimization.
    trial.report(val_rmse, trial.number) # Report current RMSE to Optuna
    if trial.should_prune():
        raise optuna.exceptions.TrialPruned()

    return val_rmse # Optuna aims to minimize this value


# --- Define the Objective Function for Optuna (for GRU) ---
print("\nDefining Objective Function for GRU Model...")

def objective_gru(trial):
    # --- 1. Suggest Hyperparameters for GRU ---
    gru_units = trial.suggest_categorical('gru_units', [30, 50, 70, 100])
    dense_units = trial.suggest_categorical('dense_units', [30, 50, 70, 100])
    dropout_rate = trial.suggest_float('dropout_rate', 0.1, 0.5)
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True)
    batch_size = trial.suggest_categorical('batch_size', [64, 128, 256])
    epochs = trial.suggest_int('epochs', 10, 30)

    # --- 2. Build the GRU Model with Suggested Hyperparameters ---
    model = Sequential([
        GRU(units=gru_units, activation='relu', input_shape=(sequence_length, num_features)),
        Dropout(dropout_rate),
        Dense(units=dense_units, activation='relu'),
        Dense(units=1) # Output layer for regression
    ])

    optimizer = Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss='mean_squared_error', metrics=[RootMeanSquaredError(name='rmse')])

    # --- 3. Train the Model ---
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_loss', patience=5, restore_best_weights=True
    )
    
    history = model.fit(
        X_train_tune, y_train_tune,
        epochs=epochs,
        batch_size=batch_size,
        validation_data=(X_val_tune, y_val_tune),
        callbacks=[early_stopping],
        verbose=0
    )

    # --- 4. Evaluate and Return Performance ---
    val_rmse = history.history['val_rmse'][-1]
    trial.report(val_rmse, trial.number)
    if trial.should_prune():
        raise optuna.exceptions.TrialPruned()

    return val_rmse

# --- Define the Objective Function for Optuna (for BiLSTM) ---
print("\nDefining Objective Function for BiLSTM Model...")

def objective_bilstm(trial):
    # --- 1. Suggest Hyperparameters for BiLSTM ---
    bilstm_units = trial.suggest_categorical('bilstm_units', [30, 50, 70, 100]) # Units for the LSTM layers within Bidirectional
    dense_units = trial.suggest_categorical('dense_units', [30, 50, 70, 100])
    dropout_rate = trial.suggest_float('dropout_rate', 0.1, 0.5)
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-2, log=True)
    batch_size = trial.suggest_categorical('batch_size', [64, 128, 256])
    epochs = trial.suggest_int('epochs', 20, 100)

    # --- 2. Build the BiLSTM Model with Suggested Hyperparameters ---
    model = Sequential([
        # Bidirectional wrapper around an LSTM layer
        Bidirectional(LSTM(units=bilstm_units, activation='relu', return_sequences=False), 
                      input_shape=(sequence_length, num_features)),
        Dropout(dropout_rate),
        Dense(units=dense_units, activation='relu'),
        Dense(units=1) # Output layer for regression
    ])

    optimizer = Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss='mean_squared_error', metrics=[RootMeanSquaredError(name='rmse')])

    # --- 3. Train the Model ---
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_loss', patience=5, restore_best_weights=True
    )
    
    history = model.fit(
        X_train_tune, y_train_tune,
        epochs=epochs,
        batch_size=batch_size,
        validation_data=(X_val_tune, y_val_tune),
        callbacks=[early_stopping],
        verbose=0
    )

    # --- 4. Evaluate and Return Performance ---
    val_rmse = history.history['val_rmse'][-1]
    trial.report(val_rmse, trial.number)
    if trial.should_prune():
        raise optuna.exceptions.TrialPruned()

    return val_rmse



# --- Run the Optuna Optimization Study ---
# Create an Optuna study object. 'direction="minimize"' means we want to find the minimum RMSE.
print("\nRunning Optuna study for LSTM model...")
# You can use a database to store results and resume studies, or 'InMemoryStorage()' for simple use
study_lstm = optuna.create_study(direction='minimize', sampler=optuna.samplers.TPESampler(seed=42)) 
# 'n_trials' specifies how many different hyperparameter combinations Optuna will try
study_lstm.optimize(objective_lstm, n_trials=20, show_progress_bar=True) # Run for 20 trials

print("\nBayesian Optimization for LSTM Model Complete!")
print(f"Best trial for LSTM model: {study_lstm.best_trial.value:.4f} RMSE")
print("Best hyperparameters found for LSTM:")
print(study_lstm.best_params)

# Store the best parameters for later use
best_lstm_params = study_lstm.best_params

# --- Run Optuna Optimization Study for GRU Model ---
print("\nRunning Optuna study for GRU model (this may take some time)...")
study_gru = optuna.create_study(direction='minimize', sampler=optuna.samplers.TPESampler(seed=42))
study_gru.optimize(objective_gru, n_trials=20, show_progress_bar=True) # Adjust n_trials as needed

print("\nBayesian Optimization for GRU Model Complete!")
print(f"Best trial for GRU model: {study_gru.best_trial.value:.4f} RMSE")
print("Best hyperparameters found for GRU:")
print(study_gru.best_params)
best_gru_params = study_gru.best_params

# --- Run Optuna Optimization Study for BiLSTM Model ---
print("\nRunning Optuna study for BiLSTM model (this may take some time)...")
study_bilstm = optuna.create_study(direction='minimize', sampler=optuna.samplers.TPESampler(seed=42))
study_bilstm.optimize(objective_bilstm, n_trials=20, show_progress_bar=True) # Adjust n_trials as needed

print("\nBayesian Optimization for BiLSTM Model Complete!")
print(f"Best trial for BiLSTM model: {study_bilstm.best_trial.value:.4f} RMSE")
print("Best hyperparameters found for BiLSTM:")
print(study_bilstm.best_params)
best_bilstm_params = study_bilstm.best_params

print("\nAll Bayesian Optimization studies for LSTM, GRU, and BiLSTM are now defined and can be run.")
print("The 'best_lstm_params', 'best_gru_params', and 'best_bilstm_params' variables now hold the optimal hyperparameters.")

Starting Bayesian Optimization for LSTM Model using Optuna...


[I 2025-06-04 20:27:38,736] A new study created in memory with name: no-name-ae119690-6fb9-4fd1-94f4-6eedcc7ee3a2


Original X_train_sequences shape: (1222843, 60, 6)
X_train_tune shape: (978274, 60, 6), y_train_tune shape: (978274,)
X_val_tune shape: (244569, 60, 6), y_val_tune shape: (244569,)

Defining Objective Function for GRU Model...

Defining Objective Function for BiLSTM Model...

Running Optuna study for LSTM model...


  0%|          | 0/20 [00:00<?, ?it/s]

  super().__init__(**kwargs)


[I 2025-06-04 21:33:11,819] Trial 0 finished with value: 0.14906355738639832 and parameters: {'lstm_units': 50, 'dense_units': 100, 'dropout_rate': 0.34044600469728353, 'learning_rate': 0.001331121608073689, 'batch_size': 128, 'epochs': 14}. Best is trial 0 with value: 0.14906355738639832.


# 4. Final Model Training and Evaluation!

In [None]:
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, GRU, Bidirectional, Dense, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import RootMeanSquaredError
import numpy as np
import pandas as pd # For creating a summary table



# Placeholder for best_params if Optuna was not run in the same session:
if 'best_lstm_params' not in locals():
    print("Warning: best_lstm_params not found. Using default values.")
    best_lstm_params = {
        'lstm_units': 100, 'dense_units': 50, 'dropout_rate': 0.2,
        'learning_rate': 0.001, 'batch_size': 32, 'epochs': 50
    }
if 'best_gru_params' not in locals():
    print("Warning: best_gru_params not found. Using default values.")
    best_gru_params = {
        'gru_units': 100, 'dense_units': 50, 'dropout_rate': 0.2,
        'learning_rate': 0.001, 'batch_size': 32, 'epochs': 50
    }
if 'best_bilstm_params' not in locals():
    print("Warning: best_bilstm_params not found. Using default values.")
    best_bilstm_params = {
        'bilstm_units': 100, 'dense_units': 50, 'dropout_rate': 0.2,
        'learning_rate': 0.001, 'batch_size': 32, 'epochs': 50
    }


print("\n--- Step 4: Final Model Training and Evaluation ---")

# --- Helper Function to Build, Train, and Evaluate Models ---
def train_and_evaluate_model(model_type, params, X_train, y_train, X_test, y_test, 
                             seq_len, num_feat):
    """
    Builds, trains, and evaluates a deep learning model with given parameters.
    """
    print(f"\n--- Training and Evaluating {model_type} Model ---")
    print(f"Using parameters: {params}")

    # Build the model based on type
    model = Sequential()
    if model_type == 'LSTM':
        model.add(LSTM(units=params['lstm_units'], activation='relu', input_shape=(seq_len, num_feat)))
    elif model_type == 'GRU':
        model.add(GRU(units=params['gru_units'], activation='relu', input_shape=(seq_len, num_feat)))
    elif model_type == 'BiLSTM':
        model.add(Bidirectional(LSTM(units=params['bilstm_units'], activation='relu', return_sequences=False), 
                                input_shape=(seq_len, num_feat)))
    else:
        raise ValueError("Invalid model_type. Choose 'LSTM', 'GRU', or 'BiLSTM'.")

    model.add(Dropout(params['dropout_rate']))
    model.add(Dense(units=params['dense_units'], activation='relu'))
    model.add(Dense(units=1)) # Output layer

    optimizer = Adam(learning_rate=params['learning_rate'])
    model.compile(optimizer=optimizer, loss='mean_squared_error', metrics=[RootMeanSquaredError(name='rmse')])

    # Callbacks for robust training
    early_stopping = tf.keras.callbacks.EarlyStopping(
        monitor='val_loss', patience=15, restore_best_weights=True, mode='min' # Increased patience slightly
    )
    reduce_lr_on_plateau = tf.keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss', factor=0.5, patience=7, min_lr=1e-6, verbose=0 # Reduce LR if no improvement
    )

    # Train the model on the full training dataset
    print("Training model...")
    history = model.fit(
        X_train, y_train,
        epochs=params['epochs'], # Max epochs, EarlyStopping will stop it earlier
        batch_size=params['batch_size'],
        validation_split=0.1, # Use a small split of the training data for monitoring validation loss during training
        callbacks=[early_stopping, reduce_lr_on_plateau],
        verbose=1 # Show training progress
    )

    print("\nEvaluating model on test set...")
    # Evaluate on the unseen test set
    test_loss, test_rmse = model.evaluate(X_test, y_test, verbose=0)
    print(f"{model_type} Test RMSE: {test_rmse:.4f}")

    # Predict on the test set to calculate Max Error
    y_pred = model.predict(X_test)
    
    # Calculate Max Error (absolute difference between true and predicted values)
    max_error = np.max(np.abs(y_test - y_pred.flatten())) # Flatten y_pred to match y_test shape
    print(f"{model_type} Max Error: {max_error:.4f}")
    
    return {'RMSE': test_rmse, 'Max Error': max_error, 'Model': model}


# --- Train and Evaluate Each Model ---
results = {}

# LSTM Model
lstm_result = train_and_evaluate_model(
    'LSTM', best_lstm_params, X_train_sequences, y_train_sequences, 
    X_test_sequences, y_test_sequences, sequence_length, num_features
)
results['LSTM'] = lstm_result

# GRU Model
gru_result = train_and_evaluate_model(
    'GRU', best_gru_params, X_train_sequences, y_train_sequences, 
    X_test_sequences, y_test_sequences, sequence_length, num_features
)
results['GRU'] = gru_result

# BiLSTM Model
bilstm_result = train_and_evaluate_model(
    'BiLSTM', best_bilstm_params, X_train_sequences, y_train_sequences, 
    X_test_sequences, y_test_sequences, sequence_length, num_features
)
results['BiLSTM'] = bilstm_result

# --- Summarize Results ---
print("\n--- Summary of Model Performance ---")
summary_data = []
for model_name, res in results.items():
    summary_data.append({
        'Model': model_name,
        'Test RMSE': res['RMSE'],
        'Test Max Error': res['Max Error']
    })

results_df = pd.DataFrame(summary_data)
print(results_df.to_string(index=False))

print("\nFinal models trained and evaluated. The results above show their performance on the unseen test set.")
print("This completes the replication process of the paper's methodology.")