# Predicting the SOH of Batteries Using deep learning

## Import Library

In [33]:
import argparse
import datetime
import logging
import os
from itertools import cycle, zip_longest

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.io import loadmat
from sklearn import metrics
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
import torch
import torch.nn as nn
from torch.autograd import grad
from torch.utils.data import DataLoader, TensorDataset, random_split, Subset
from torcheval.metrics import R2Score

# Check if GPU is available
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)


cpu


## Select the feature, loading data function

In [34]:
# Assign the training feature in "attribs"
attribs = ['cycle', 'voltage_measured', 'current_measured',  
                "time"] #'temperature_measured', 'current_load', 'voltage_load',
fn = len(attribs)

In [35]:
# def load_data(battery, err=None):
#     # Construct the file path based on battery and error parameters.
#     if err is not None:
#         base_path = f'battery_data/Err_{err}/'
#         battery_name = f"{battery}_LF"
#     else:
#         base_path = "battery_data/"
#         battery_name = battery

#     # Determine the file extension (.csv or .xlsx)
#     xlsx_path = os.path.join(base_path, f"{battery_name}.xlsx")
#     csv_path = os.path.join(base_path, f"{battery_name}.csv")

#     # If the file is .xlsx, convert it to .csv
#     if os.path.exists(xlsx_path):
#         df_xlsx = pd.read_excel(xlsx_path)  # Read the Excel file
#         df_xlsx.to_csv(csv_path, index=False)  # Save as CSV
#         print(f"Converted {xlsx_path} to {csv_path}")

#     # Load the data from the CSV file
#     df = pd.read_csv(csv_path)

#     # Check if the expected columns exist
#     # required_columns = {'cycle', 'type', 'ambient_temperature', 'date_time',
#     #                     'capacity', 'voltage_measured', 'current_measured', 
#     #                     'temperature_measured', 'current_load', 
#     #                     'voltage_load', 'time'}
#     required_columns = {'Cycle_Index', 'Start_Time', 'End_Time', 'Test_Time (s)',
#                         'Min_Current (A)', 'Max_Current (A)', 'Min_Voltage (V)', 
#                         'Max_Voltage (V)', 'Charge_Capacity (Ah)', 
#                         'Discharge_Capacity (Ah)', 'Charge_Energy (Wh)', 'Discharge_Energy (Wh)'}
    
#     if not required_columns.issubset(df.columns):
#         raise ValueError(f"Missing required columns in {csv_path}")

#     # Convert `date_time` to a proper datetime format
#     df['Test_Time (s)'] = pd.to_datetime(df['Test_Time (s)'])

#     # Filter only discharge cycles
#     df_discharge = df[df['Discharge_Capacity (Ah)'] > 0].copy()

#     # Create dataset with time-series data
#     dataset = df_discharge[['Cycle_Index', 'Start_Time', 'End_Time', 'Test_Time (s)',
#                         'Min_Current (A)', 'Max_Current (A)', 'Min_Voltage (V)', 
#                         'Max_Voltage (V)', 'Charge_Capacity (Ah)', 
#                         'Discharge_Capacity (Ah)', 'Charge_Energy (Wh)', 'Discharge_Energy (Wh)']]

#     # Create cycle-level summary (one row per cycle)
#     capacity_data = df_discharge[['Cycle_Index', 'Test_Time (s)', 'Charge_Capacity (Ah)']].drop_duplicates()

#     print("hello")
#     return [dataset, capacity_data]
def load_data(battery, err=None):
  
    if err is not None:
        base_path = f'battery_data/Err_{err}/'
        battery_name = f"{battery}_LF"
    else:
        base_path = "battery_data/"
        battery_name = battery

    cycle_csv = os.path.join(base_path, f"{battery_name}_cycle_data.csv")
    timeseries_csv = os.path.join(base_path, f"{battery_name}_timeseries.csv")

    cycle_xlsx = os.path.join(base_path, f"{battery_name}_cycle_data.xlsx")
    if os.path.exists(cycle_xlsx):
        df_cycle_xlsx = pd.read_excel(cycle_xlsx)
        df_cycle_xlsx.to_csv(cycle_csv, index=False)
        print(f"Converted {cycle_xlsx} to {cycle_csv}")

    timeseries_xlsx = os.path.join(base_path, f"{battery_name}_timeseries.xlsx")
    if os.path.exists(timeseries_xlsx):
        df_timeseries_xlsx = pd.read_excel(timeseries_xlsx)
        df_timeseries_xlsx.to_csv(timeseries_csv, index=False)
        print(f"Converted {timeseries_xlsx} to {timeseries_csv}")

    df_cycle = pd.read_csv(cycle_csv)
    required_cycle_cols = {'Cycle_Index', 'Test_Time (s)', 'Discharge_Capacity (Ah)'}
    if not required_cycle_cols.issubset(df_cycle.columns):
        raise ValueError(f"Missing required columns in cycle data: {cycle_csv}")

    try:
        df_cycle['Test_Time (s)'] = pd.to_datetime(df_cycle['Test_Time (s)'])
    except Exception as e:
        print("Cycle data 'Test_Time (s)' conversion error:", e)

    df_timeseries = pd.read_csv(timeseries_csv)
    required_ts_cols = {'Cycle_Index', 'Test_Time (s)', 'Current (A)', 'Voltage (V)', 'Discharge_Capacity (Ah)'}
    if not required_ts_cols.issubset(df_timeseries.columns):
        raise ValueError(f"Missing required columns in timeseries data: {timeseries_csv}")

    try:
        df_timeseries['Test_Time (s)'] = pd.to_datetime(df_timeseries['Test_Time (s)'])
    except Exception as e:
        print("Timeseries data 'Test_Time (s)' conversion error:", e)

    df_timeseries['Relative_Time (s)'] = df_timeseries.groupby('Cycle_Index')['Test_Time (s)'] \
        .transform(lambda x: (x - x.min()).dt.total_seconds())


    # Construct DataFrame
    detailed_df = pd.DataFrame({
        'cycle': df_timeseries['Cycle_Index'],
        # 'ambient_temperature': np.nan,   
        'date_time': df_timeseries['Test_Time (s)'],   
        'capacity': df_timeseries['Discharge_Capacity (Ah)'],
        'voltage_measured': df_timeseries['Voltage (V)'],
        'current_measured': df_timeseries['Current (A)'],
        # 'temperature_measured': np.nan,   
        # 'current_load': np.nan,           
        # 'voltage_load': np.nan,           
        'time': df_timeseries['Relative_Time (s)']   
    })

    capacity_df = pd.DataFrame({
        'cycle': df_cycle['Cycle_Index'],
        # 'ambient_temperature': np.nan,
        # 'date_time': df_cycle['Test_Time (s)'],
        'capacity': df_cycle['Discharge_Capacity (Ah)']
    })

    print("Detailed timeseries data head:")
    print(detailed_df.head())
    print("Detailed timeseries data shape:", detailed_df.shape)
    print("\nCycle capacity data head:")
    print(capacity_df.head())
    print("Cycle capacity data shape:", capacity_df.shape)

    return [
        detailed_df,  #  ['cycle', 'ambient_temperature', 'date_time', 'capacity', 'voltage_measured',
                      #           'current_measured', 'temperature_measured', 'current_load', 'voltage_load', 'time']
        capacity_df   #   ['cycle', 'ambient_temperature', 'date_time', 'capacity']
    ]


Data analysis helper function

In [36]:
def get_logger(log_name='log.txt'):
    logger = logging.getLogger('mylogger')
    logger.setLevel(level=logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s - function:%(funcName)s - %(levelname)s - %(message)s',datefmt='%Y-%m-%d %H:%M')

    if log_name is not None:
        handler = logging.FileHandler(log_name)
        handler.setLevel(logging.DEBUG)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

    return logger

class AverageMeter(object):
    """Computes and stores the average and current value"""

    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count


def eval_metrix(true_label,pred_label):
    MAE = metrics.mean_absolute_error(true_label,pred_label)
    MAPE = metrics.mean_absolute_percentage_error(true_label,pred_label)
    RMSE = np.sqrt(metrics.mean_squared_error(true_label,pred_label))
    R_squared = metrics.r2_score(true_label, pred_label)   


    return [MAE,MAPE,R_squared,RMSE]

def write_to_txt(txt_name,txt):
    with open(txt_name,'a') as f:
        f.write(txt)
        f.write('\n')



## Neural networks

1. MLP (Multi-Layer Perceptron)

In [37]:
class Sin(nn.Module):
    """Sine activation function as a custom nn.Module."""
    def forward(self, x):
        return torch.sin(x)


class MLP(nn.Module):
    """Multi-Layer Perceptron with sinusoidal activation."""
    def __init__(self, input_dim=fn, output_dim=1, layers_num=4, hidden_dim=50, dropout=0.2):
        super(MLP, self).__init__()
        assert layers_num >= 2, "layers_num must be greater than or equal to 2"
        
        layers = []
        for i in range(layers_num):
            if i == 0:
                layers.append(nn.Linear(input_dim, hidden_dim))
                layers.append(Sin())
            elif i == layers_num - 1:
                layers.append(nn.Linear(hidden_dim, output_dim))
            else:
                layers.append(nn.Linear(hidden_dim, hidden_dim))
                layers.append(Sin())
                layers.append(nn.Dropout(p=dropout))
        
        self.net = nn.Sequential(*layers)
        self._init_weights()

    def _init_weights(self):
        """Initialize weights using Xavier initialization."""
        for layer in self.net:
            if isinstance(layer, nn.Linear):
                nn.init.xavier_normal_(layer.weight)

    def forward(self, x):
        """Forward pass through the MLP."""
        return self.net(x)


2. CNN (Convolutional Neural Network)


In [38]:
class CNN(nn.Module):
    """Basic 1D CNN for regression tasks."""
    def __init__(self, input_dim=fn):
        super(CNN, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv1d(in_channels=1, out_channels=8, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(8),
            nn.ReLU()
        )
        self.layer2 = nn.Sequential(
            nn.Conv1d(in_channels=8, out_channels=16, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm1d(16),
            nn.ReLU()
        )
        self.layer3 = nn.Sequential(
            nn.Conv1d(in_channels=16, out_channels=24, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm1d(24),
            nn.ReLU()
        )
        self.layer4 = nn.Sequential(
            nn.Conv1d(in_channels=24, out_channels=16, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(16),
            nn.ReLU()
        )
        self.layer5 = nn.Sequential(
            nn.Conv1d(in_channels=16, out_channels=8, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm1d(8),
            nn.ReLU()
        )
        self.fc = None  # Fully connected layer will be dynamically set

    def forward(self, x):
        """Forward pass through the CNN."""
        N, L = x.shape[0], x.shape[1]
        x = x.view(N, 1, L)  # Reshape for 1D convolution
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)
        x = x.view(N, -1)  # Flatten features
        if self.fc is None or self.fc.in_features != x.shape[1]:
            self.fc = nn.Linear(x.shape[1], 1).to(x.device)  # Dynamically set fc input size
        out = self.fc(x)
        return out.view(N, 1)


3. LSTM (Long Short-Term Memory)

In [39]:
class LSTM(nn.Module):
    def __init__(self):
        super(LSTM, self).__init__()
        self.lstm = nn.LSTM(input_size=4, hidden_size=50, num_layers=2, batch_first=True, dropout=0.2)
        self.fc = nn.Sequential(
            nn.Linear(50, 32),
            nn.ReLU(),
            nn.Linear(32, 1)
        )

    def forward(self, x):
        N, L = x.shape[0], x.shape[1]
        x = x.view(N, 1, L)
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])
        return out.view(N, 1)

4. TransformerRegressor

In [40]:
class TransformerRegressor(nn.Module):
    """Transformer-based regression model."""
    def __init__(self, num_features=fn, d_model=64, nhead=4, num_layers=3):
        super(TransformerRegressor, self).__init__()
        self.input_layer = nn.Linear(num_features, d_model)
        self.positional_encoding = nn.Parameter(torch.zeros(1, 1, d_model))
        
        # Initialize with learnable values instead of zeros
        nn.init.xavier_uniform_(self.positional_encoding)
        
        self.transformer_encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, 
            nhead=nhead,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(self.transformer_encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(d_model, 1)

    def forward(self, x):
        """Forward pass through the Transformer."""
        # Project to d_model dimension
        x = self.input_layer(x)
        
        # Add sequence dimension
        x = x.unsqueeze(1)
        
        # Add positional encoding
        x = x + self.positional_encoding[:, :x.size(1), :]
        
        # Process through transformer
        x = self.transformer_encoder(x)
        
        # Global pooling - take mean across sequence dimension
        x = x.squeeze(1) if x.size(1) == 1 else x.mean(dim=1)
        
        # Project to output
        return self.fc(x)

5. AttentionNetwork

In [41]:
class AttentionNetwork(nn.Module):
    """Attention-based regression model."""
    def __init__(self, num_features=fn, hidden_dim=64):
        super(AttentionNetwork, self).__init__()
        self.fc1 = nn.Linear(num_features, hidden_dim)
        self.attention_layer = nn.Linear(hidden_dim, 1)
        self.fc2 = nn.Linear(hidden_dim, 1)

    def forward(self, x):
        """Forward pass with attention mechanism."""
        x = torch.relu(self.fc1(x))
        attention_weights = torch.sigmoid(self.attention_layer(x))
        x = x * attention_weights
        return self.fc2(x)


## Baseline

The BASE class defines a baseline framework for training, validation, and testing a model with functionality for monitoring metrics and implementing early stopping. Below is the code with key sections explained in brief comments.

In [42]:
class BASE():
    def __init__(self, model, train_loader, valid_loader, test_loader, args):
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.model = model.to(self.device)
        self.args = args
        self.train_loader = train_loader
        self.valid_loader = valid_loader
        self.test_loader = test_loader

        self.save_dir = args.save_folder
        if not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)
        self.epochs = args.epochs
        self.logger = get_logger(os.path.join(args.save_folder, args.log_dir))

        self.loss_meter = AverageMeter()
        self.loss_func = nn.HuberLoss(delta=1.0)  # instead of nn.MSELoss()
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=args.warmup_lr)
        self.scheduler = torch.optim.Adam(self.model.parameters(), lr=args.lr)
        self.Bestmodel_R_squared = []
        self.Bestmodel_MAE = []
        self.Bestmodel_MAPE = []
        self.Bestmodel_RMSE = []
        
        # Updated paths
        self.train_path = os.path.join(self.save_dir, "true_label.npy")
        self.pred_path = os.path.join(self.save_dir, "pred_label.npy")

    def clear_logger(self):
        self.logger.removeHandler(self.logger.handlers[0])
        self.logger.handlers.clear()

    def train_one_epoch(self, epoch):
        self.model.train()
        self.loss_meter.reset()
        for (x1, y1) in self.train_loader:
            x1 = x1.to(self.device)
            y1 = y1.to(self.device)

            y_pred = self.model(x1)
            loss = self.loss_func(y_pred, y1)

            self.optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            self.optimizer.step()

            self.loss_meter.update(loss.item())
        return self.loss_meter.avg

    def valid(self, epoch):
        self.model.eval()
        self.loss_meter.reset()
        with torch.no_grad():
            for (x1, y1) in self.valid_loader:
                x1 = x1.to(self.device)
                y1 = y1.to(self.device)

                y_pred = self.model(x1)
                loss = self.loss_func(y_pred, y1)
                self.loss_meter.update(loss.item())
        return self.loss_meter.avg

    def test(self):
        self.model.eval()
        self.loss_meter.reset()
        true_label = []
        pred_label = []
        with torch.no_grad():
            for (x1, y1) in self.test_loader:
                x1 = x1.to(self.device)
                y_pred = self.model(x1)

                true_label.append(y1.cpu().detach().numpy())
                pred_label.append(y_pred.cpu().detach().numpy())

        # Concatenate and save labels
        true_label = np.concatenate(true_label, axis=0)
        pred_label = np.concatenate(pred_label, axis=0)
        
        # Save to defined paths
        np.save(self.train_path, true_label)
        np.save(self.pred_path, pred_label)

        return true_label, pred_label, self.train_path, self.pred_path

    def train(self):
        self.Bestmodel_R_squared.clear()
        self.Bestmodel_MAE.clear()
        self.Bestmodel_MAPE.clear()
        self.Bestmodel_RMSE.clear()
        min_loss = 10
        early_stop = 0
        for epoch in range(1, self.epochs + 1):
            early_stop += 1
            train_loss = self.train_one_epoch(epoch)
            current_lr = self.scheduler.step()
            valid_loss = self.valid(epoch)

            if valid_loss < min_loss and self.test_loader is not None:
                min_loss = valid_loss
                true_label, pred_label, self.train_path, self.pred_path = self.test()
                [MAE, MAPE, MSE, RMSE] = eval_metrix(pred_label, true_label)
                self.Bestmodel_R_squared.append(MSE)
                self.Bestmodel_MAE.append(MAE)
                self.Bestmodel_MAPE.append(MAPE)
                self.Bestmodel_RMSE.append(RMSE)
                early_stop = 0
            if early_stop > self.args.early_stop:
                break
        self.clear_logger()


## Data-processing
1. Randomly select the test data (several per each cycle)
2. Random select and Split the rest of the HF dataset and LF dataset according to the requirements
3. Match the dataset with labels
4. Transfer the dataset and label to Dataloader to be trined on GPU

Assistant function for data matching 

In [43]:
def match(dataset, capacity):
    # Define columns to match on: 'cycle' and 'capacity'.
    attrib = ['cycle', 'capacity']
    dis_ele = capacity[attrib].copy()

    # Calculate SOH (State of Health) as a percentage of initial capacity.
    initial_capacity = dis_ele['capacity'].iloc[0]
    dis_ele['SOH'] = dis_ele['capacity'] / initial_capacity
    capacity['SOH'] = dis_ele['SOH']

    # Initialize lists for Label and Cycle.
    Label = []
    Cycle = []
    
    # Iterate over each row in dataset and match cycles with capacity data.
    for idx, row in dataset.iterrows():
        matched_row = capacity[capacity['cycle'] == row['cycle']]
        
        # If a match is found, append the corresponding SOH to Label.
        if not matched_row.empty:
            Label.append(matched_row['SOH'].values[0])
            Cycle.append(row['cycle'])
        else:
            # If no match, append None to indicate missing data.
            Label.append(None)
            Cycle.append(None)
    
    # Convert matched data to DataFrame for easy access.
    Label_df = pd.DataFrame({'cycle': Cycle, 'SOH': Label})

    return Label_df

#Normalize features exclude others 
def normalize(data, exclude_columns): 
    excluded_data = data[[exclude_columns]]   
    features_data = data.drop(columns=[exclude_columns])   
    
    scaler = MinMaxScaler(feature_range=(0, 1))
    normalized_features = scaler.fit_transform(features_data)  
    
    normalized_dataset = pd.DataFrame(normalized_features, columns=features_data.columns)
    normalized_dataset[exclude_columns] = excluded_data.values  
    return normalized_dataset
def Pre_diff(x, y):
    x = x.values if isinstance(x, pd.DataFrame) else x
    y = y.values if isinstance(y, pd.DataFrame) else y
    tensor_X = torch.from_numpy(x).float()
    tensor_Y = torch.from_numpy(y).float().view(-1, 1)
    dataset = TensorDataset(tensor_X, tensor_Y)
    return dataset

Data_processing function

In [44]:
def create_dataloaderB(dataset, split_ratios=(0.9, 0.1), batch_size=64, type='regression'):
    """
    Create train and validation data loaders from the given dataset.

    Args:
        dataset: The input dataset for training and validation.
        split_ratios (tuple): Ratios to split the dataset (train_ratio, valid_ratio).
        batch_size (int): Batch size for data loaders.
        type (str): Task type, typically "regression".

    Returns:
        train_loader: DataLoader for training data.
        valid_loader: DataLoader for validation data.
    """
    assert sum(split_ratios) == 1

    train_ratio, valid_ratio = split_ratios    
    # Split the dataset into train and validation sets
    train_size = int(train_ratio * len(dataset))
    valid_size = int(valid_ratio * len(dataset))
 
    if type == 'regression':
        train_dataset, valid_dataset = random_split(dataset, [train_size, valid_size])

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False)
    
    return train_loader, valid_loader




def Data_processing(battery_code, err=0.02, total_data=1000, test_size=2, train_ratio=0.8, valid_ratio=0.2, type="regression"):
    """
    Load and preprocess battery data for training, validation, and testing.
    """
    def clean_battery_data(dataset, capacity):
        """Remove outliers and noise from battery data"""
        cleaned_dataset = pd.DataFrame(columns=dataset.columns)
        cleaned_capacity = pd.DataFrame(columns=capacity.columns)
        window_size = 5  # For moving average
        threshold = 2.0  # Standard deviations for outlier detection
        
        # Calculate moving average of capacity
        capacity['moving_avg'] = capacity['capacity'].rolling(window=window_size, center=True).mean()
        
        # Calculate standard deviation
        std_dev = capacity['capacity'].std()
        mean_capacity = capacity['capacity'].mean()
        
        # Create mask for valid data points
        valid_mask = (
            (capacity['capacity'] > 0.1) &  # Remove near-zero capacity
            (capacity['capacity'] < 1.5) &  # Remove unreasonably high capacity
            (abs(capacity['capacity'] - mean_capacity) <= threshold * std_dev)  # Remove statistical outliers
        )
        
        # Apply mask to get cleaned data
        cleaned_capacity = capacity[valid_mask].copy()
        
        # Only keep dataset rows that correspond to valid capacity measurements
        cleaned_dataset = dataset[dataset['cycle'].isin(cleaned_capacity['cycle'])].copy()
        
        # Remove the moving average column as it's no longer needed
        if 'moving_avg' in cleaned_capacity.columns:
            cleaned_capacity = cleaned_capacity.drop('moving_avg', axis=1)
            
        return cleaned_dataset, cleaned_capacity

    # Load the entire dataset and capacity information
    BS_dataset, BS_capacity = load_data(battery_code) 

    # Clean the data before processing
    BS_dataset, BS_capacity = clean_battery_data(BS_dataset, BS_capacity)

    # Extract test dataset by selecting test_size points per cycle
    Test_dataset = pd.DataFrame(columns=BS_dataset.columns)
    for cycle_index, group in BS_dataset.groupby('cycle'):
        test_points = group.sample(n=test_size, random_state=42)
        Test_dataset = pd.concat([Test_dataset, test_points])
        
    # Ensure data types match and sort the test dataset
    for col in Test_dataset.columns:
        Test_dataset[col] = Test_dataset[col].astype(BS_dataset[col].dtype)

    Test_dataset = Test_dataset.sort_values(by=['cycle', 'time']).reset_index(drop=True)
    Test_label = match(Test_dataset, BS_capacity)
    Test_train = normalize(Test_dataset[attribs], 'cycle')
    print(f"Test set size: {len(Test_train)}")

    # Create DataLoader for test data
    test_loader = DataLoader(
        Pre_diff(Test_train, Test_label["SOH"].to_numpy()), 
        batch_size=64, 
        shuffle=False
    )

    # Remove test samples from the original dataset and sort it
    BS_dataset = BS_dataset.drop(Test_dataset.index)
    BS_dataset = BS_dataset.sort_values(by=['cycle', 'time']).reset_index(drop=True)

    # Sample the required number of data points for training
    if type == "regression":
        BS_dataset = BS_dataset.sample(n=total_data)
        BS_dataset = BS_dataset.sort_values(by=['cycle', 'time']).reset_index(drop=True)
        BS_capacity = BS_capacity.merge(BS_dataset[['cycle']], on='cycle', how='inner')

    # Compute labels and normalize training dataset
    BS_label = match(BS_dataset, BS_capacity)
    BS_train = normalize(BS_dataset[attribs], 'cycle')
    print(f"The total size of the dataset for the baseline task is: {len(BS_train)}")

    # Create DataLoaders for training and validation sets
    train_loader, valid_loader = create_dataloaderB(
        Pre_diff(BS_train, BS_label["SOH"].to_numpy()),
        split_ratios=(train_ratio, valid_ratio),
        batch_size=64,
        type=type
    )

    return train_loader, valid_loader, test_loader, BS_capacity, BS_label


## Evaluation, plot and save

Display the evaluation matrics and save to xlsx files

In [45]:
def plot_soh_comparison(true_label_path, pred_label_path, hf_capacity, figsize=(12, 8), dpi=600):
    """
    Plot comparison between true and predicted SOH values over cycles.

    Args:
        true_label_path (str): Path to the true SOH label data (numpy file).
        pred_label_path (str): Path to the predicted SOH label data (numpy file).
        hf_capacity (pd.DataFrame): DataFrame containing cycle and SOH information.
        figsize (tuple): Figure size for plotting.
        dpi (int): Dots per inch for figure resolution.
    """
    # Load true and predicted labels
    true_label = np.load(true_label_path)
    pred_label = np.load(pred_label_path)
    
    # Flatten arrays for plotting
    tl = true_label.flatten()
    pl = pred_label.flatten()

    tl_avg = [(tl[i] + tl[i+1]) / 2 for i in range(0, len(tl), 2)]
    pl_avg = [(pl[i] + pl[i+1]) / 2 for i in range(0, len(pl), 2)]

    cycles = np.arange(1, len(tl_avg) + 1)

    plt.figure(figsize=(12, 6))
    plt.plot(cycles, tl_avg, marker='o', linestyle='--', color='blue', label='True SOH (tl)')
    plt.plot(cycles, pl_avg, marker='x', linestyle='-', color='red', label='Predicted SOH (pl)')
    plt.xlabel('Cycle Index')
    plt.ylabel('SOH')
    plt.title('Comparison of True and Predicted SOH')
    plt.legend()
    plt.grid(True)
    plt.ylim(0, 1.5)  # 设置y轴范围为0到1.5

    plt.show()


In [46]:
def save_to_excel(me_matrix, n, m, Way, data_name, err, data_count, test_size):
    """
    Save performance metrics to an Excel file.

    Args:
        me_matrix (list): Matrix containing performance metrics for different runs.
        n (int): Number of initializations.
        m (int): Number of repetitions per initialization.
        Way (str): Model architecture used.
        data_name (str): Dataset name.
        err (float): Noise level applied during preprocessing.
        data_count (int): Total number of training data points.
        test_size (int): Number of test points per cycle.
    """
    data = {
        'n': [],
        'm': [],
        'R Squared': [],
        'MAE': [],
        'MAPE': [],
        'RMSE': []
    }

    # Collect metrics for each repetition
    for init_index, init_me_list in enumerate(me_matrix):
        for repeat_index in range(m):
            data['n'].append(init_index + 1)
            data['m'].append(repeat_index + 1)
            me = init_me_list[repeat_index]
            data['R Squared'].append(me['R Squared'])
            data['MAE'].append(me['MAE'])
            data['MAPE'].append(me['MAPE'])
            data['RMSE'].append(me['RMSE'])

    # Compute and save average metrics for each initialization
    for init_index, init_me_list in enumerate(me_matrix):
        avg_me = [sum(item[key] for item in init_me_list) / len(init_me_list) for key in init_me_list[0].keys()]
        data['n'].append(init_index + 1)
        data['m'].append('Ave')
        data['R Squared'].append(avg_me[0])
        data['MAE'].append(avg_me[1])
        data['MAPE'].append(avg_me[2])
        data['RMSE'].append(avg_me[3])

    # Compute overall average metrics
    all_rsquared = [me['R Squared'] for init_me_list in me_matrix for me in init_me_list]
    all_mae = [me['MAE'] for init_me_list in me_matrix for me in init_me_list]
    all_mape = [me['MAPE'] for init_me_list in me_matrix for me in init_me_list]
    all_rmse = [me['RMSE'] for init_me_list in me_matrix for me in init_me_list]

    overall_average_me = {
        'R Squared': sum(all_rsquared) / len(all_rsquared),
        'MAE': sum(all_mae) / len(all_mae),
        'MAPE': sum(all_mape) / len(all_mape),
        'RMSE': sum(all_rmse) / len(all_rmse)
    }

    data['n'].append('Ave')
    data['m'].append('Ave')
    data['R Squared'].append(overall_average_me['R Squared'])
    data['MAE'].append(overall_average_me['MAE'])
    data['MAPE'].append(overall_average_me['MAPE'])
    data['RMSE'].append(overall_average_me['RMSE'])

    # Save data to Excel
    df = pd.DataFrame(data)
    save_path = f"Final_result/Benchmark/{Way}/"
    file_name = f"{data_name}_{err}_{data_count}_{test_size}.xlsx"

    if not os.path.exists(save_path):
        os.makedirs(save_path)
    df.to_excel(os.path.join(save_path, file_name), index=False)


def print_evaluation_matrix(me_matrix, n, m, Way, data_name, err, data_count, test_size, plot_switch, 
                            true_label_path=None, pred_label_path=None, hf_capacity=None):
    """
    Print and visualize the evaluation matrix for training results.

    Args:
        me_matrix (list): Performance metrics matrix for all repetitions.
        n (int): Number of data initializations.
        m (int): Number of repetitions per initialization.
        Way (str): Model architecture used.
        data_name (str): Dataset name.
        err (float): Noise level applied.
        data_count (int): Total training data points used.
        test_size (int): Number of test points per cycle.
        plot_switch (str): Set to 'ON' to enable result plotting.
        true_label_path (str): Path to the true label for plotting.
        pred_label_path (str): Path to the predicted label for plotting.
        hf_capacity (pd.DataFrame): Cycle and true SOH data.
    """
    for init_index, init_me_list in enumerate(me_matrix):
        if plot_switch == "ON":
            print(f"Data Initialization {init_index+1} Results:")
            for repeat_index, me in enumerate(init_me_list):
                print(f"Repeat {repeat_index+1}: R Squared: {me['R Squared']:.8f}, "
                      f"MAE: {me['MAE']:.6f}, "
                      f"MAPE: {me['MAPE']:.6f}, "
                      f"RMSE: {me['RMSE']:.6f}")

    # Compute and print average performance for each initialization
    average_me_matrix = [[sum(item[key] for item in init_me_list) / len(init_me_list) for key in init_me_list[0].keys()] for init_me_list in me_matrix]
    for init_index, avg_me in enumerate(average_me_matrix):
        if plot_switch == "ON":
            print(f"Average Performance for Data Initialization {init_index+1}:")
            print(f"R Squared: {avg_me[0]:.8f}, MAE: {avg_me[1]:.6f}, MAPE: {avg_me[2]:.6f}, RMSE: {avg_me[3]:.6f}")

    # Compute and print overall average performance
    all_rsquared = [me['R Squared'] for init_me_list in me_matrix for me in init_me_list]
    all_mae = [me['MAE'] for init_me_list in me_matrix for me in init_me_list]
    all_mape = [me['MAPE'] for init_me_list in me_matrix for me in init_me_list]
    all_rmse = [me['RMSE'] for init_me_list in me_matrix for me in init_me_list]

    overall_average_me = {
        'R Squared': sum(all_rsquared) / len(all_rsquared),
        'MAE': sum(all_mae) / len(all_mae),
        'MAPE': sum(all_mape) / len(all_mape),
        'RMSE': sum(all_rmse) / len(all_rmse)
    }

    print("Overall Average Model Performance:")
    print(f"R Squared: {overall_average_me['R Squared']:.8f}, "
          f"MAE: {overall_average_me['MAE']:.6f}, "
          f"MAPE: {overall_average_me['MAPE']:.6f}, "
          f"RMSE: {overall_average_me['RMSE']:.6f}")

    # Plot the SOH comparison if plot switch is enabled
    if plot_switch == "ON" and true_label_path and pred_label_path and hf_capacity is not None:
        plot_soh_comparison(true_label_path, pred_label_path, hf_capacity)


plot function, using when needed

## Main

Train ()
getargs

In [47]:
class ArgsNamespace:
    """Class to create an object for managing hyperparameters."""
    def __init__(self, **kwargs):
        self.__dict__.update(kwargs)


def load_model(args):
    """
    Load the specified model based on user input.

    Args:
        args: Hyperparameters and model configurations.

    Returns:
        model: Initialized model instance.
    """
    if args.model == 'MLP':
        model = MLP()
    elif args.model == 'CNN':
        model = CNN()
    elif args.model == 'LSTM':
        model = LSTM()
    elif args.model == 'Trans':
        model = TransformerRegressor()
    elif args.model == 'Atten':
        model = AttentionNetwork()
    else:
        raise ValueError("Unknown model type specified.")
    return model


def get_hyperparams():
    """
    Define hyperparameters for the training process.

    Returns:
        dict: Hyperparameter configuration.
    """
    hyperparams = {
        'epochs': 200,  # Maximum number of training epochs
        'early_stop': 20,  # Early stopping threshold
        'warmup_epochs': 30,  # Number of warmup epochs
        'warmup_lr': 0.005,  # Learning rate during warmup
        'lr': 0.01,  # Learning rate for training

        # Model structure parameters
        'F_layers_num': 3,
        'F_hidden_dim': 25,

        # Directories
        'log_dir': 'text testinglog.txt',
        'save_folder': 'results_base'
    }
    return hyperparams


def Run_task(init_index, m, train_loader, valid_loader, test_loader, me_list, Way):
    """
    Execute the training process with repeated experiments.

    Args:
        init_index (int): Index of the current initialization.
        m (int): Number of experiment repetitions for the current initialization.
        train_loader: Training data loader.
        valid_loader: Validation data loader.
        test_loader: Test data loader.
        me_list (list): List to store performance metrics.
        Way (str): Model architecture ("MLP", "CNN", "LSTM", etc.).
    """

    # Load hyperparameters and initialize arguments
    hyperparams = get_hyperparams()
    args = ArgsNamespace(**hyperparams)
    setattr(args, 'model', Way)  # Set the model type
    
    for repeat_index in range(m):
        # Load and train the model
        model = load_model(args)
        trainer = BASE(model, train_loader, valid_loader, test_loader, args)
        trainer.train()  # Execute training process
        true_label_path = trainer.train_path
        pred_label_path = trainer.pred_path
        # Collect evaluation metrics
        metrics = {
            'R Squared': trainer.Bestmodel_R_squared[-1],
            'MAE': trainer.Bestmodel_MAE[-1],
            'MAPE': trainer.Bestmodel_MAPE[-1],
            'RMSE': trainer.Bestmodel_RMSE[-1]
        }
        me_list.append(metrics)
        print(f"Repetition {init_index+1}, {repeat_index+1} results: R^2={metrics['R Squared']:.4f}, "
              f"MAE={metrics['MAE']:.4f}, MAPE={metrics['MAPE']:.4f}, RMSE={metrics['RMSE']:.4f}")

    return args, true_label_path, pred_label_path


### 3.3.1 Benchmark Testing

In [48]:
n = 5  # Number of initialization runs 
m = 5  # Number of repetitions for each run
plot_switch = 'ON'  # Set to 'ON' to enable plotting
datasets = ["CALCE_CX2-16_prism_LCO_25C_0-100_0.5-0.5C_a", "CALCE_CX2-25_prism_LCO_25C_0-100_0.5-0.5C_b", "CALCE_CX2-33_prism_LCO_25C_0-100_0.5-0.5C_d", "CALCE_CX2-34_prism_LCO_25C_0-100_0.5-0.5C_e", "CALCE_CX2-36_prism_LCO_25C_0-100_0.5-0.5C_f", "CALCE_CX2-37_prism_LCO_25C_0-100_0.5-0.5C_g", "CALCE_CX2-38_prism_LCO_25C_0-100_0.5-0.5C_h"]  # List of datasets, "B0006", "B0007", "B0018"
noise = 0.002  # Noise level, e.g., 0.005, 0.01, 0.015, 0.02
data_count = 4000  # Total number of data points in the training set
test_size = 2  # Number of test points per cycle. Total test set size = cycle_number * test_size
Ways = ["MLP", "CNN", "LSTM", "Trans", "Atten"]  # Model architectures to evaluate ,"" "CNN", "LSTM", "Trans"

# Main training loop over datasets and model types
for dataset in datasets:
    for Way in Ways:
        print(Way)
        me_matrix = []  # Initialize evaluation matrix for each dataset
        for init_index in range(n):  # Loop through different initial data splits
            print(dataset)

            # Load and preprocess data
            train_loader, valid_loader, test_loader, data_capacity, data_label = Data_processing(
                dataset, err=noise, total_data=data_count, test_size=test_size,
                train_ratio=0.8, valid_ratio=0.2, type="regression"
            )

            me_list = []  # Initialize list for metrics of the current initialization
            args, true_label_path, pred_label_path = Run_task(init_index, m, train_loader, valid_loader, test_loader, me_list, Way=Way)
            me_matrix.append(me_list)  # Append metrics for this run
            hf_capacity = data_capacity
        
        if plot_switch == "ON":
            plot_soh_comparison(true_label_path, pred_label_path, data_capacity)

        # Print and save evaluation results for the current dataset
        print_evaluation_matrix(
            me_matrix, n, m, Way, dataset, noise, data_count, test_size, plot_switch
        )



MLP
CALCE_CX2-16_prism_LCO_25C_0-100_0.5-0.5C_a


KeyboardInterrupt: 