In [103]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

In [104]:
import pandas as pd
import numpy as np
import os
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings("ignore")
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
Debug = True

### Model Definition

In [105]:
class LSTMModel(nn.Module):
    def __init__(self, conv_input, input_size, hidden_size, num_layers, output_size):
        super(LSTMModel, self).__init__()
        self.conv = nn.Conv1d(conv_input, conv_input, 1)
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_size * 2, output_size)  # Multiply hidden_size by 2 because it's bidirectional
        self.num_layers = num_layers
        self.hidden_dim = hidden_size
        self.dropout = nn.Dropout(p=0.3)

    def forward(self, x):
        x = self.conv(x)
        
        # Initialize hidden state and cell state with dimensions accounting for bidirectionality
        h0 = torch.randn((self.num_layers * 2, x.shape[0], self.hidden_dim)).to(device)
        c0 = torch.randn((self.num_layers * 2, x.shape[0], self.hidden_dim)).to(device)
        
        output, _ = self.lstm(x, (h0, c0))
        output = self.dropout(output)
        output = self.fc(output[:, -1, :]) 
        return output

In [106]:
%pip install torchinfo

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.2 -> 25.0.1
[notice] To update, run: C:\Users\MAYANK PRATAP SINGH\AppData\Local\Programs\Python\Python311\python.exe -m pip install --upgrade pip


In [107]:
from torchinfo import summary

# Instantiate the model
model = LSTMModel(
    conv_input=60,   # in_channels for the Conv1d
    input_size=2,   
    hidden_size=128,
    num_layers=5,
    output_size=1
)
summary(model, input_size=(128, 60, 2), col_names=["input_size", "output_size", "num_params"])

Layer (type:depth-idx)                   Input Shape               Output Shape              Param #
LSTMModel                                [128, 60, 2]              [128, 1]                  --
├─Conv1d: 1-1                            [128, 60, 2]              [128, 60, 2]              3,660
├─LSTM: 1-2                              [128, 60, 2]              [128, 60, 256]            1,716,224
├─Dropout: 1-3                           [128, 60, 256]            [128, 60, 256]            --
├─Linear: 1-4                            [128, 256]                [128, 1]                  257
Total params: 1,720,141
Trainable params: 1,720,141
Non-trainable params: 0
Total mult-adds (Units.GIGABYTES): 13.18
Input size (MB): 0.06
Forward/backward pass size (MB): 15.85
Params size (MB): 6.88
Estimated Total Size (MB): 22.79

### Importing Data

In [89]:
#For 25C batteries
# Create empty lists to store the read DataFrames
dataframes_Cap = []
dataframes_EIS = []

# Use a loop to read files and assign names
for i in range(1, 9):
    # Construct file names
    file_name_cap = f"Capacity_data/Data_Capacity_25C{i:02}.txt"
    file_name_EIS = f"EIS_data/EIS_state_V_25C{i:02}.txt"  # Use State V
    
    if not os.path.isfile(file_name_cap):
        print(f"Cap file {file_name_cap} does not exist, skipping...")
        continue
    elif not os.path.isfile(file_name_EIS):
        print(f"EIS file {file_name_EIS} does not exist, skipping...")
        continue

    # Read files and add to the list
    df_cap = pd.read_csv(file_name_cap, sep="\t")
    df_EIS = pd.read_csv(file_name_EIS, sep="\t")
    
    if i == 1 or i == 5:
        cap_number = 3
    else:
        cap_number = 5
    
    # Exclude underperforming batteries
    if i == 4 or i == 8:
        continue
    cycle = []
    cap = []
    eis = []
    cycle_max = df_cap[df_cap.columns[1]].max()
    cycle_max2 = df_EIS[df_EIS.columns[1]].max()
    cycle_number = min(cycle_max, cycle_max2)
    
    for i in range(1, int(cycle_number) + 1):
        temp = df_cap[df_cap[df_cap.columns[1]] == i][df_cap.columns[cap_number]][-1:].max()
        temp_EIS_Re = np.array(df_EIS[df_EIS[df_EIS.columns[1]] == i][df_EIS.columns[3]][:])
        temp_EIS_Im = np.array(df_EIS[df_EIS[df_EIS.columns[1]] == i][df_EIS.columns[4]][:])
        cycle.append(i)
        cap.append(temp)
        eis.append(np.concatenate((temp_EIS_Re, temp_EIS_Im), axis=0))   
    dataframes_Cap.append(cap)
    dataframes_EIS.append(eis)

In [90]:
X = []
y = []
for i in range(0, len(dataframes_Cap)):
    for j in range(len(dataframes_Cap[i])):
        X.append(dataframes_EIS[i][j])
        y.append(dataframes_Cap[i][j])
X = np.array(X)
y = np.array(y)
print(X.shape, y.shape)

(1269, 120) (1269,)


In [91]:
# Read data at 35C
for i in range(1, 3):
    # Construct file names
    file_name_cap = f"Capacity_data/Data_Capacity_35C{i:02}.txt"
    file_name_EIS = f"EIS_data/EIS_state_V_35C{i:02}.txt"  # Use State V
    
    # Skip missing files
    if not os.path.isfile(file_name_cap):
        print(f"Cap file {file_name_cap} does not exist, skipping...")
        continue
    elif not os.path.isfile(file_name_EIS):
        print(f"EIS file {file_name_EIS} does not exist, skipping...")
        continue

    # Read files and add to the list
    df_cap = pd.read_csv(file_name_cap, sep="\t")
    df_EIS = pd.read_csv(file_name_EIS, sep="\t")
    cap_number = 3
    
    cycle = []
    cap = []
    eis = []
    cycle_max = df_cap[df_cap.columns[1]].max()
    cycle_max2 = df_EIS[df_EIS.columns[1]].max()
    cycle_number = min(cycle_max, cycle_max2)
    
    for i in range(1, int(cycle_number) + 1):
        temp = df_cap[df_cap[df_cap.columns[1]] == i][df_cap.columns[cap_number]][-1:].max()
        temp_EIS_Re = np.array(df_EIS[df_EIS[df_EIS.columns[1]] == i][df_EIS.columns[3]][:])
        temp_EIS_Im = np.array(df_EIS[df_EIS[df_EIS.columns[1]] == i][df_EIS.columns[4]][:])
        cycle.append(i)
        cap.append(temp)
        eis.append(np.concatenate((temp_EIS_Re, temp_EIS_Im), axis=0))
    dataframes_Cap.append(cap)
    dataframes_EIS.append(eis)

In [92]:
X = []
y = []
for i in range(0, len(dataframes_Cap)):
    for j in range(len(dataframes_Cap[i])):
        X.append(dataframes_EIS[i][j])
        y.append(dataframes_Cap[i][j])
X = np.array(X)
y = np.array(y)
print(X.shape, y.shape)

(1913, 120) (1913,)


In [93]:
# Read data at 45C
for i in range(1, 3):
    # Construct file names
    file_name_cap= f"Capacity_data/Data_Capacity_45C{i:02}.txt"
    file_name_EIS = f"EIS_data/EIS_state_V_45C{i:02}.txt"  # Use State V
    
    if not os.path.isfile(file_name_cap):
        print(f"Cap file {file_name_cap} does not exist, skipping...")
        continue
    elif not os.path.isfile(file_name_EIS):
        print(f"EIS file {file_name_EIS} does not exist, skipping...")
        continue

    # Read files and add to the list
    df_cap = pd.read_csv(file_name_cap, sep="\t")
    df_EIS = pd.read_csv(file_name_EIS, sep="\t")
    # print(df_cap.columns)
    cap_number = 3
    
    cycle = []
    cap = []
    eis = []
    cycle_max = df_cap[df_cap.columns[1]].max()
    cycle_max2 = df_EIS[df_EIS.columns[1]].max()
    cycle_number = min(cycle_max, cycle_max2)
    
    for i in range(1, int(cycle_number) + 1):
        temp = df_cap[df_cap[df_cap.columns[1]]==i][df_cap.columns[cap_number]][-1:].max()
        temp_EIS_Re = np.array(df_EIS[df_EIS[df_EIS.columns[1]]==i][df_EIS.columns[3]][:])
        temp_EIS_Im = np.array(df_EIS[df_EIS[df_EIS.columns[1]]==i][df_EIS.columns[4]][:])
        # temp = temp/max_scale
        cycle.append(i)
        cap.append(temp)
        eis.append(np.concatenate((temp_EIS_Re, temp_EIS_Im), axis=0))
    dataframes_Cap.append(cap)
    dataframes_EIS.append(eis)

In [94]:
X = []
y = []
for i in range(0, len(dataframes_Cap)):
    for j in range(len(dataframes_Cap[i])):
        X.append(dataframes_EIS[i][j])
        y.append(dataframes_Cap[i][j])
X = np.array(X)
y = np.array(y)
print(X.shape, y.shape)

(2522, 120) (2522,)


In [95]:
# Normalize each real part and each imaginary part of EIS separately
remax = []
immax = []
data = {}
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()

# Reshape the target variable 'y' and normalize it
y = y.reshape(-1,1)
y = scaler.fit_transform(y)

# Process each battery's data individually for cross-training and validation
start = 0
for i in range(len(dataframes_Cap)):
    feature_name = f'EIS{i+1:02}'
    target_name = f'Cap{i+1:02}'
    n = len(dataframes_Cap[i])
    
    # Normalize the real part of the EIS data
    X_r = X[start:start+n, :60].copy()  # Extract the real part of the data
    X_r_flat = X_r.flatten()
    X_r_min = X_r_flat[:60].min()
    X_r_max = X_r_flat[:60].max()
    remax.append(X_r_flat[:].max() / X_r_max)
    normalized_Xr_flat = ((X_r_flat.reshape(-1, 1)) - X_r_min) / (X_r_max - X_r_min)
    normalized_Xr_data = normalized_Xr_flat.reshape(X[start:start+n, :60].shape)
    
    # Normalize the imaginary part of the EIS data
    X_i = X[start:start+n, 60:]  # Extract the imaginary part of the data
    X_i_flat = X_i.flatten()
    X_i_min = X_i_flat[:60].min()
    X_i_max = X_i_flat[:60].max()
    immax.append(X_i_flat[:].max() / X_i_max)
    normalized_Xi_flat = ((X_i_flat.reshape(-1, 1)) - X_i_min) / (X_i_max - X_i_min)
    normalized_Xi_data = normalized_Xi_flat.reshape(X[start:start+n, 60:].shape)
    
    # Combine the normalized real and imaginary parts
    data[feature_name] = np.concatenate((normalized_Xr_data, normalized_Xi_data), axis=1)
    
    # Reshape the data into (batch, 60, 2) format, where real and imaginary parts are treated as a single feature
    data[feature_name] = data[feature_name].reshape(-1, 2, 60)
    data[feature_name] = data[feature_name].transpose(0, 2, 1)
    
    # Store the corresponding capacity values
    data[target_name] = y[start:start+n].reshape(-1, 1)
    start += n

In [96]:
# Combine training data from multiple EIS datasets, excluding certain datasets because we are using them for testing purpose
start = 0
for i in range(1, 11):
    if i == 1:
        trainning_data = data[f"EIS{i:02}"][start:].copy()
        trainning_target = data[f"Cap{i:02}"][start:].copy()
    elif i != 4 and i != 8 and i != 10:  # Exclude datasets 4, 8, and 10 from training
        trainning_data = np.vstack((trainning_data, data[f"EIS{i:02}"][start:]))
        trainning_target = np.vstack((trainning_target, data[f"Cap{i:02}"][start:]))

In [97]:
# Convert training data and targets to PyTorch tensors
trainning_data = torch.tensor(trainning_data, dtype=torch.float32)
trainning_target = torch.tensor(trainning_target, dtype=torch.float32)

In [98]:
print("Training Data Shape:", trainning_data.shape)
print("Training Target Shape:", trainning_target.shape)

Training Data Shape: torch.Size([1619, 60, 2])
Training Target Shape: torch.Size([1619, 1])


### Train process

In [None]:
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import KFold
import os

#Train Process
# Initialize the model, loss function, and optimizer
input_size = 2  # Number of features
hidden_size = 128
num_layers = 5
output_size = 1
conv_input = 60
batch_size = 128
epochs = 1500
n_splits = 5
from sklearn.model_selection import KFold  
kf = KFold(n_splits=n_splits, shuffle=True)

model_number = 0
for train_idx, val_idx in kf.split(trainning_data):  
    train_X, val_X = trainning_data[train_idx], trainning_data[val_idx]  
    train_y, val_y = trainning_target[train_idx], trainning_target[val_idx]  
    train_dataset = TensorDataset(train_X, train_y)  
    val_dataset = TensorDataset(val_X, val_y)  
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)  
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)  
    model = LSTMModel(conv_input, input_size, hidden_size, num_layers, output_size) 
    model = model.to(device)
    criterion = nn.MSELoss()   
    optimizer = optim.Adam(model.parameters(), lr=0.0001,betas=(0.5,0.999))  
     
    for epoch in range(epochs):
        model.train() 
        for i, (inputs, labels) in enumerate(train_loader):  
            inputs = inputs.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()  
            outputs = model(inputs)  
            loss = criterion(outputs, labels)  
            loss.backward()  
            optimizer.step()  
        model.eval()
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)
                outputs = model(inputs)  
                val_loss = criterion(outputs, labels)    
        if epoch%100 ==0:
            print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item()}, Validation Loss: {val_loss.item()}') 
    torch.save(model.state_dict(), f"model_weights/CNNBiLSTM/test{model_number}.pth")
    model_number += 1

In [None]:
from sklearn.metrics import mean_squared_error, r2_score 
import math
import os
import torch
import numpy as np
import matplotlib.pyplot as plt

# Titles for each plot
titles = ['25C01', '25C02', '25C03', '25C05', '25C06', '25C07', '35C01', '35C02', '45C01', '45C02']

ID = 1  # Tracking dataset indexing
title_idx = 0  # Title list index
start = 0
mean_RMSE_train = 0
mean_R2_train = 0

# Initialize model
model = LSTMModel(conv_input, input_size, hidden_size, num_layers, output_size)
model = model.to(device)

# Create directories if they don't exist
data_dir = "data"
figure_dir = "figure_results"
os.makedirs(data_dir, exist_ok=True)
os.makedirs(figure_dir, exist_ok=True)

# Create 3x4 subplot grid
fig, axs = plt.subplots(nrows=3, ncols=4, figsize=(15, 8))

# Loop through plots
for i in range(3):  
    for j in range(4):
        if title_idx < len(titles):  # Ensure we don't go out of bounds
            result = []
            attention_all = []  # Store attention weights for interpretability
            x = torch.tensor(data[f"EIS{ID:02}"], dtype=torch.float32)

            # Predict using multiple trained models (cross-validation)
            for k in range(n_splits):
                model.load_state_dict(torch.load(f"model_weights/CNNBiLSTM_Attention/test{k}.pth",
                                                  map_location=torch.device(device)))
                model.eval()
                with torch.no_grad():
                    outputs = model(x.to(device))
                    
                    # Handle case where attention weights are returned
                    if isinstance(outputs, tuple):
                        outputs, attention_weights = outputs
                        attention_all.append(attention_weights.cpu().detach().numpy())  # Store attention maps
                    else:
                        attention_weights = None  # Avoids error if model does not return attention
                    
                    outputs = outputs.cpu().detach().numpy()
                    outputs = scaler.inverse_transform(outputs)
                    result.append(outputs)

            # Aggregate results across folds and ensure 1D shape
            result = np.array(result)
            out = np.mean(result, axis=0).squeeze()  # Convert to 1D
            out_upper = np.max(result, axis=0).squeeze()
            out_lower = np.min(result, axis=0).squeeze()

            # Get true values and inverse transform
            true = np.array(data[f"Cap{ID:02}"]).reshape(-1, 1)  # Ensure shape
            true = scaler.inverse_transform(true).squeeze()  # Convert to 1D

            # Compute evaluation metrics
            MSE = mean_squared_error(out[start:], true[start:])
            R2_result = r2_score(true[start:], out[start:])
            RMSE_result = math.sqrt(MSE)
            mean_RMSE_train += RMSE_result
            mean_R2_train += R2_result

            # Format RMSE and R² values for display
            RMSE_str = "{:.4f}".format(RMSE_result)
            R2_str = "{:.4f}".format(R2_result)

            # X-axis values for plotting
            x_vals = np.linspace(0, x.shape[0], x.shape[0])

            # **Plot true vs predicted capacity using default Matplotlib colors**
            axs[i, j].plot(x_vals[start:], true[start:], label="True Capacity")  # Uses Matplotlib default (Blue)
            axs[i, j].plot(x_vals[start:], out[start:], label="Predicted Capacity")  # Uses Matplotlib default (Orange)
            axs[i, j].fill_between(x_vals[start:], out_upper[start:], out_lower[start:], color='orange', alpha=0.5)  # Shaded region

            axs[i, j].set_title(titles[title_idx])
            axs[i, j].text(0.95, 0.95, f"RMSE: {RMSE_str}", ha='right', va='top', fontsize=12, transform=axs[i, j].transAxes)
            axs[i, j].text(0.95, 0.85, f"R2: {R2_str}", ha='right', va='top', fontsize=12, transform=axs[i, j].transAxes)
            axs[i, j].legend(loc='lower left', fontsize='small')
            axs[i, j].grid(True)

            # Save predictions
            with open(f"{data_dir}/Nature_Cap_train_{titles[title_idx]}.txt", 'w') as file:
                for item in range(out[start:].shape[0]):
                    out_number = round(float(out[start:][item].flatten()), 4)
                    file.write(str(out_number) + '\n')

            # Save averaged attention weights (for interpretability)
            if attention_weights is not None:
                np.save(f"{data_dir}/Attention_Weights_{titles[title_idx]}.npy", np.mean(np.array(attention_all), axis=0))

            # Increment dataset ID and title index
            ID += 1
            title_idx += 1

            # Break condition to avoid errors
            if ID == 11:
                break

# Adjust subplot spacing
plt.tight_layout()

# Save final figure
plt.savefig(f'{figure_dir}/cap_alltempalldata_test_5_10_12_attention.png')

# Print overall metrics
print("train RMSE: ", mean_RMSE_train / len(titles))
print("train R2: ", mean_R2_train / len(titles))

# Show final visualization
plt.show()
