## Authors: Hardik Chhabra, Kartik Sahajpal
##### Licensed under MIT License

In [None]:
!pip install optuna

## Importing Libraries

In [None]:
import torch
import pandas as pd
import os
import statistics
from torch.utils.data import DataLoader, TensorDataset
import optuna
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
import matplotlib.ticker as ticker
import numpy as np
from sklearn.ensemble import RandomForestRegressor
import os, math
from optuna.importance import get_param_importances
import pickle
from scipy.stats import skew, kurtosis
import joblib
import warnings
from torch import nn
import plotly
import time
from optuna.visualization import plot_optimization_history

**getAvaiableDevice:** try to allocate GPU, if not GPU then fall back to CPU resources.

In [None]:
def getAvailableDevice(index=0):
    """
    Returns a CUDA device if available at the given index;
    otherwise falls back to CPU.
    """
    if torch.cuda.device_count() > index:
        return torch.device(f"cuda:{index}")
    return torch.device("cpu")

**computeMAPE:** computes MAPE for predicted and true data.

**computeRMSE:** computes RMSE for predicted and true data.

**computeR2Score:** computes R2 score for predicted and true data

In [None]:
def computeMAPE(predictions, targets):
    """
    Computes Mean Absolute Percentage Error (MAPE).

    Args:
        predictions (Tensor): Model outputs
        targets (Tensor): Ground truth values

    Returns:
        Tensor: MAPE value
    """
    percentageError = torch.abs((predictions - targets) / targets)
    return torch.sum(percentageError) / targets.numel()


def computeRMSE(predictions, targets):
    """
    Computes Root Mean Squared Error (RMSE).

    Args:
        predictions (Tensor): Model outputs
        targets (Tensor): Ground truth values

    Returns:
        Tensor: RMSE value
    """
    squaredError = (predictions - targets) ** 2
    meanSquaredError = torch.sum(squaredError) / targets.numel()
    return torch.sqrt(meanSquaredError)


def computeR2Score(predictions, targets):
    """
    Computes the R-squared (coefficient of determination).

    Args:
        predictions (Tensor): Model outputs
        targets (Tensor): Ground truth values

    Returns:
        Tensor: R² score
    """
    targetMean = torch.mean(targets)
    residualSum = torch.sum((predictions - targets) ** 2)
    totalSum = torch.sum((targets - targetMean) ** 2)
    return 1 - (residualSum / totalSum)

**buildKFoldSplit:** Creates training and validation subsets for k-fold cross-validation when input features include an additional dimension.

**readDatasetFiles:** Loads battery datasets from Excel files, cleans voltage sequences, segments cycles, and organizes them into a structured dictionary.

**createDataLoaders:** Converts training and testing tensors into PyTorch DataLoader objects with batching and shuffling.

**transposeFeatureTensor:** Rearranges a three-dimensional tensor by swapping its first and second axes.

**generateBatches:** Iterates over a tensor and yields fixed-size mini-batches for efficient processing.

**splitCells:** Divides data into training and testing sets based on cell identifiers.

**splitCellsWithStats:** Extracts statistical voltage features (mean, standard deviation, minimum, maximum) and splits the data by cell identifiers.

**scaleFeatureData:** Normalizes feature tensors using predefined voltage bounds across training and testing data.

**scaleRawVoltageData:** Normalizes raw voltage tensors for both training and testing datasets using fixed voltage limits.

In [None]:
def buildKFoldSplit(kFolds, foldIndex, features, targets):
    """
    Generates training and validation splits for k-fold cross validation
    when feature tensors contain an extra dimension.
    """
    if kFolds <= 1:
        raise ValueError("k-fold validation requires k > 1")

    foldLength = features.shape[1] // kFolds
    trainX, trainY = None, None

    for fold in range(kFolds):
        sliceIdx = slice(fold * foldLength, (fold + 1) * foldLength)
        xSlice = features[:, sliceIdx, :]
        ySlice = targets[sliceIdx]

        if fold == foldIndex:
            valX, valY = xSlice, ySlice
        else:
            if trainX is None:
                trainX, trainY = xSlice, ySlice
            else:
                trainX = torch.cat((trainX, xSlice), dim=1)
                trainY = torch.cat((trainY, ySlice), dim=0)

    return trainX, trainY, valX, valY


def readDatasetFiles(targetFiles, datasetIndex=-1):
    """
    Loads and processes battery datasets from Excel files.
    """
    baseDir = "/content/dataset2/"
    outputData = {}

    for fileIdx, file in enumerate(os.listdir(baseDir)):
        if file not in targetFiles:
            continue

        filePath = os.path.join(baseDir, file)
        table = pd.read_excel(filePath)

        lastCycle = 0
        segmentCount = 0
        buffer = []
        totalRows = len(table)

        for row in range(totalRows):
            cycleNum = table.loc[row, "cycle"]
            capacityNorm = table.loc[row, "Capacity"] / 3500
            voltageStr = table.loc[row, "Voltages"].replace("\n", "")[1:-1].split(" ")
            voltageStr = [v for v in voltageStr if v != ""]
            voltageArr = np.array(list(map(float, voltageStr)))

            if (
                voltageArr[-1] < 4.1
                or any(np.diff(voltageArr) > 0.001)
                or any(np.diff(voltageArr, n=2) < -0.001)
            ):
                continue

            if cycleNum < lastCycle or row == totalRows - 1:
                segmentCount += 1
                rate = table.loc[row - 1, "rate"]
                temp = table.loc[row - 1, "Tem"]
                keyName = f"D{fileIdx+1}{rate}{temp}_{segmentCount}"

                outputData[keyName] = {
                    "cycle": np.array([item[0] for item in buffer]),
                    "V": np.stack([item[1] for item in buffer]),
                    "Q": np.array([item[2] for item in buffer]),
                }
                buffer = []

            lastCycle = cycleNum
            buffer.append([cycleNum, voltageArr, capacityNorm])

    return outputData


def createDataLoaders(trainX, trainY, testX, testY, batchSize):
    """
    Wraps tensors into PyTorch DataLoader objects.
    """
    trainingSet = TensorDataset(trainX, trainY)
    testingSet = TensorDataset(testX, testY)

    trainLoader = DataLoader(trainingSet, batch_size=batchSize, shuffle=True)
    testLoader = DataLoader(testingSet, batch_size=batchSize, shuffle=True)

    return trainLoader, testLoader


def transposeFeatureTensor(tensorInput):
    """
    Rearranges a 3D tensor by swapping the first two dimensions.
    """
    dimA, dimB, dimC = tensorInput.shape
    outputArray = np.empty((dimB, dimA, dimC))

    for b in range(dimB):
        for a in range(dimA):
            outputArray[b, a] = tensorInput[a, b]

    return torch.tensor(outputArray, dtype=torch.float64)


def generateBatches(dataTensor, batchSize=1024):
    """
    Iterates over a tensor in fixed-size batches.
    """
    totalLength = dataTensor.shape[0]
    for start in range(0, totalLength, batchSize):
        yield dataTensor[start : start + batchSize]


def splitCells(dataDictionary, testCellIds=None, trainCellIds=None):
    """
    Separates data into training and testing sets based on cell identifiers.
    """
    useTrainIds = trainCellIds is not None
    selectedIds = trainCellIds if useTrainIds else testCellIds

    trainX, trainY, testX, testY = {}, {}, {}, {}

    for key, entry in dataDictionary.items():
        cellNumber = int(key.split("_")[-1])

        assignToTest = (cellNumber in selectedIds) ^ useTrainIds
        voltageTensor = torch.tensor(entry["V"], dtype=torch.float32)
        cycleTensor = torch.tensor(entry["cycle"], dtype=torch.float32)

        if assignToTest:
            testX[key], testY[key] = voltageTensor, cycleTensor
        else:
            trainX[key], trainY[key] = voltageTensor, cycleTensor

    return trainX, trainY, testX, testY


def splitCellsWithStats(dataDictionary, testCellIds=None, trainCellIds=None):
    """
    Extracts statistical voltage features and splits data by cell.
    """
    useTrainIds = trainCellIds is not None
    selectedIds = trainCellIds if useTrainIds else testCellIds

    trainX, trainY, testX, testY = {}, {}, {}, {}

    for key, entry in dataDictionary.items():
        means, stds, mins, maxs = [], [], [], []

        for v in entry["V"]:
            means.append(statistics.mean(v))
            stds.append(statistics.stdev(v))
            mins.append(min(v))
            maxs.append(max(v))

        means = np.array(means)
        stds = np.array(stds)
        mins = np.array(mins)
        maxs = np.array(maxs)

        seqLen = len(entry["V"][0])
        meanMat = np.tile(means[:, None], (1, seqLen))
        stdMat = np.tile(stds[:, None], (1, seqLen))
        minMat = np.tile(mins[:, None], (1, seqLen))
        maxMat = np.tile(maxs[:, None], (1, seqLen))

        featureTensor = torch.tensor(
            [minMat, meanMat, maxMat, stdMat], dtype=torch.float32
        )
        labelTensor = torch.tensor(entry["Q"], dtype=torch.float32)

        cellNumber = int(key.split("_")[-1])
        assignToTest = (cellNumber in selectedIds) ^ useTrainIds

        if assignToTest:
            testX[key], testY[key] = featureTensor, labelTensor
        else:
            trainX[key], trainY[key] = featureTensor, labelTensor

    return trainX, trainY, testX, testY


def scaleFeatureData(trainMap, testMap):
    """
    Normalizes feature tensors using fixed voltage bounds.
    """
    trainStack = torch.concat(list(trainMap.values()), dim=1)
    testStack = torch.concat(list(testMap.values()), dim=1)

    upperBound = torch.tensor(4.2)
    lowerBound = torch.tensor(4.1)

    trainNorm = (trainStack - lowerBound) / (upperBound - lowerBound)
    testNorm = (testStack - lowerBound) / (upperBound - lowerBound)

    for key in testMap:
        testMap[key] = (testMap[key] - lowerBound) / (upperBound - lowerBound)

    return trainNorm, testNorm


def scaleRawVoltageData(trainMap, testMap):
    """
    Normalizes raw voltage values across training and testing sets.
    """
    trainStack = torch.concat(list(trainMap.values()))
    testStack = torch.concat(list(testMap.values()))

    upperBound = torch.tensor(4.2)
    lowerBound = torch.tensor(4.1)

    trainNorm = (trainStack - lowerBound) / (upperBound - lowerBound)
    testNorm = (testStack - lowerBound) / (upperBound - lowerBound)

    for key in testMap:
        testMap[key] = (testMap[key] - lowerBound) / (upperBound - lowerBound)

    return trainNorm, testNorm

## CNN Model Architecture

In [None]:
class CNNModel(nn.Module):
    def __init__(self, dropout=0.001, **kwargs):
        super(CNNModel, self).__init__(**kwargs)
        self.conv_layers = nn.Sequential(
                nn.Conv1d(4, 16, 3, padding=1), nn.ReLU(), nn.BatchNorm1d(16), # Set first argument of this nn.conv1d to number of features used
                nn.Conv1d(16, 16, 3, padding=1), nn.ReLU(), nn.BatchNorm1d(16),
                nn.Conv1d(16, 32, 3, padding=1), nn.ReLU(), nn.MaxPool1d(2, stride=2), nn.BatchNorm1d(32),
                nn.Conv1d(32, 32, 3, padding=1), nn.ReLU(), nn.BatchNorm1d(32),
                nn.Conv1d(32, 64, 3, padding=1), nn.ReLU(), nn.MaxPool1d(2, stride=2), nn.BatchNorm1d(64),
                nn.Conv1d(64, 64, 3, padding=1), nn.ReLU(), nn.BatchNorm1d(64),
                nn.Conv1d(64, 64, 3, padding=1), nn.ReLU(), nn.BatchNorm1d(64)
        )

        self.lstm_layer = nn.LSTM(input_size=64, hidden_size=64, num_layers=1, batch_first=True)
        self.gru_layer = nn.GRU(input_size=64, hidden_size=64, num_layers=1, batch_first=True)
        self.birnn_layer = nn.RNN(input_size=64, hidden_size=64, num_layers=1, batch_first=True, bidirectional=True)
        self.final_conv = nn.Sequential(
            nn.Conv1d(64, 64, 3, padding = 1), nn.ReLU(), nn.BatchNorm1d(64) # Change 64 to nn.Conv1d(128, ..) for Bidirectional layer
        )

        self.flatten_layer = nn.Flatten()
        self.fc1 = nn.Sequential(nn.Dropout(dropout), nn.Linear(3 * 64, 32), nn.ReLU())
        self.fc2 = nn.Sequential(nn.Dropout(dropout), nn.Linear(32, 1))

    def forward(self, inputs):
        inputs = inputs.float()
        conv_output = self.conv_layers(inputs)
        conv_output = conv_output.permute(0, 2, 1)
        conv_output, _ = self.lstm_layer(conv_output) # Change the layer as per experiment
        conv_output = conv_output.permute(0, 2, 1)
        conv_output = self.final_conv(conv_output)

        conv_output = self.flatten_layer(conv_output)
        fc_output = self.fc1(conv_output)
        fc_output = self.fc2(fc_output)
        return torch.squeeze(fc_output)

Train model function

In [None]:
def trainNeuralNetwork(
    network,
    trainFeatures,
    trainTargets,
    testFeatures,
    testTargets,
    numEpochs,
    learningRate,
    batchSize,
    optimizer,
    computeDevice,
    # enableFeatureMode,
    visualizer=None
):
    """
    Trains a neural network model and evaluates it on test data.
    """
    startTimestamp = time.time()
    lossFunction = nn.MSELoss()

    # if enableFeatureMode:
    trainFeatures = transposeFeatureTensor(trainFeatures)
    testFeatures = transposeFeatureTensor(testFeatures)

    print("Training samples:", trainFeatures.shape)
    print("Training labels:", len(trainTargets))
    print("Testing samples:", testFeatures.shape)
    print("Testing labels:", len(testTargets))
    # else:
    #     print("Training samples:", trainFeatures.shape)
    #     print("Training labels:", len(trainTargets))
    #     print("Testing samples:", testFeatures.shape)
    #     print("Testing labels:", len(testTargets))

    trainFeatures = trainFeatures.to(computeDevice)
    trainTargets = trainTargets.to(computeDevice)
    testFeatures = testFeatures.to(computeDevice)
    testTargets = testTargets.to(computeDevice)

    trainingSet = TensorDataset(trainFeatures, trainTargets)
    trainingLoader = DataLoader(
        trainingSet, batch_size=batchSize, shuffle=True
    )

    lrScheduler = torch.optim.lr_scheduler.OneCycleLR(
        optimizer=optimizer,
        max_lr=learningRate,
        steps_per_epoch=len(trainingLoader),
        epochs=numEpochs,
        pct_start=0.3,
        div_factor=25,
    )

    for epochIndex in range(numEpochs):
        network.train()
        batchLosses = []

        for batchX, batchY in trainingLoader:
            optimizer.zero_grad()
            predictions = network(batchX)
            batchLoss = lossFunction(
                predictions.view(batchY.shape), batchY
            )

            batchLoss.backward()
            optimizer.step()
            lrScheduler.step()

            batchLosses.append(batchLoss.detach().cpu().item())

        if epochIndex == numEpochs - 1:
            print("--" * 20)
            network.eval()

            with torch.no_grad():
                outputChunks = []
                for chunk in generateBatches(testFeatures, batchSize=4096):
                    outputChunks.append(
                        # network(chunk, enableFeatureMode).detach()
                        network(chunk).detach()
                    )

                fullPredictions = torch.concat(outputChunks, dim=0)
                fullPredictions = fullPredictions.view(testTargets.shape)

                mapeValue = computeMAPE(fullPredictions, testTargets)
                rmseValue = computeRMSE(fullPredictions, testTargets)
                r2Value = computeR2Score(fullPredictions, testTargets)

                print(
                    f"Final Epoch {epochIndex}: "
                    f"MAPE={mapeValue:.6f}, "
                    f"RMSE={rmseValue:.6f}, "
                    f"R2={r2Value:.6f}"
                )

    elapsedTime = time.time() - startTimestamp

    return elapsedTime, [
        numEpochs,
        mapeValue.cpu().numpy(),
        rmseValue.cpu().numpy(),
        r2Value.cpu().numpy(),
    ]

In [None]:
# model path
modelRootDir = "/content/CNN_LSTM_min_max_mean_std"

### NOTE:

**enableFeatureMode = True** → use statistical feature representation

**enableFeatureMode = False** → use raw voltage representation (baseline method)


In [None]:
runBayesOpt = True
bayesEpochs = 2
# enableFeatureMode = True  # set runBayesOpt to False when loading saved results

numFolds = 4

# Load and prepare dataset
datasetFile = "Dataset_1_NCA_battery.xlsx"
batteryData = readDatasetFiles(datasetFile)

cellPartitions = [
    [11, 12, 28, 29, 30, 31, 32],
    list(range(1, 11)) + list(range(13, 22)),
    [22, 23, 24, 25, 26, 27, 33, 34, 35],
    [36, 37, 38],
    np.arange(39, 67),
]

samplesPerGroup = [1, 4, 2, 1, 6]  # stratified sampling plan

randomSeed = 5
rng = np.random.RandomState(seed=randomSeed)

selectedTestCells = [
    rng.choice(group, samplesPerGroup[idx], replace=False)
    for idx, group in enumerate(cellPartitions)
]
selectedTestCells = [cell for group in selectedTestCells for cell in group]
testCellKeys = selectedTestCells

# Feature-based or raw-voltage-based data handling
# if enableFeatureMode:
trainFeatureMap, trainLabelMap, testFeatureMap, testLabelMap = (
        splitCellsWithStats(batteryData, testCellIds=testCellKeys)
)

trainFeatures, testFeatures = scaleFeatureData(
        trainFeatureMap, testFeatureMap
)
trainLabels = torch.cat(list(trainLabelMap.values()))
shuffleIndices = np.arange(trainFeatures.shape[1])

# else:
#     trainFeatureMap, trainLabelMap, testFeatureMap, testLabelMap = (
#         splitCells(batteryData, testCellIds=testCellKeys)
#     )

#     trainFeatures, testFeatures = scaleRawVoltageData(
#         trainFeatureMap, testFeatureMap
#     )
#     trainLabels = torch.cat(list(trainLabelMap.values()))
#     shuffleIndices = np.arange(trainFeatures.shape[0])

# Shuffle training samples
rng.shuffle(shuffleIndices)

# if enableFeatureMode:
trainFeatures = trainFeatures[:, shuffleIndices, :]
trainLabels = trainLabels[shuffleIndices]
# else:
#     trainFeatures = trainFeatures[shuffleIndices]
#     trainLabels = trainLabels[shuffleIndices]

# Assemble test labels
testLabels = torch.cat(list(testLabelMap.values()), dim=0)

# Select computation device
computeDevice = getAvailableDevice()

# Model save location
modelSaveDir = modelRootDir

# Execute pipeline
warnings.filterwarnings(
    "ignore",
    message="The objective has been evaluated at this point before.",
)

Objective function used by Optuna for hyperparameter tuning

In [None]:
def optunaObjective(trial):
    batchSize = trial.suggest_int("batchSize", 32, 512, step=32)
    learningRate = trial.suggest_float("learningRate", 1e-5, 5e-2, log=True)
    weightDecay = trial.suggest_float("weightDecay", 1e-6, 1e-3, log=True)
    epochCount = trial.suggest_int("epochCount", 10, 11)
    # epochCount = trial.suggest_int("epochCount", 50, 150, step=10)
    dropoutRate = trial.suggest_float("dropoutRate", 0.001, 0.5, log=True)

    accumulatedRmse = 0.0

    for foldIdx in range(numFolds):
        # if enableFeatureMode:
        foldData = buildKFoldSplit(
                numFolds, foldIdx, trainFeatures, trainLabels
        )
        # else:
        #     foldData = buildKFoldSplit(
        #         numFolds, foldIdx, trainFeatures, trainLabels
        #     )

        modelInstance = CNNModel(dropout=dropoutRate)
        modelInstance = modelInstance.to(computeDevice)

        decayParams = (
            param
            for name, param in modelInstance.named_parameters()
            if not name.endswith("bias") and "bn" not in name
        )
        noDecayParams = (
            param
            for name, param in modelInstance.named_parameters()
            if name.endswith("bias") or "bn" in name
        )

        optimizerParams = [
            {"params": decayParams},
            {"params": noDecayParams, "weight_decay": 0.0},
        ]

        optimizerInstance = torch.optim.Adam(
            optimizerParams, lr=learningRate, weight_decay=weightDecay
        )

        _, metrics = trainNeuralNetwork(
            modelInstance,
            *foldData,
            epochCount,
            learningRate,
            batchSize,
            optimizerInstance,
            computeDevice,
            # enableFeatureMode,
            visualizer=None,
        )

        accumulatedRmse += metrics[2]

    return accumulatedRmse / numFolds


# Executes Optuna optimization with the selected sampling strategy
def executeOptunaSearch(trialCount, samplerType, objectiveFn, experimentName):
    if samplerType == "TPE":
        sampler = optuna.samplers.TPESampler(
            n_startup_trials=10, n_ei_candidates=24
        )
    elif samplerType == "GP":
        from optuna.integration import SkoptSampler

        sampler = SkoptSampler(
            skopt_kwargs={
                "base_estimator": "GP",
                "n_initial_points": 10,
                "acq_func": "EI",
            }
        )
    else:
        sampler = None

    study = optuna.create_study(
        sampler=sampler,
        direction="minimize",
        study_name=experimentName,
    )

    study.optimize(
        objectiveFn,
        n_trials=trialCount,
        show_progress_bar=True,
    )

    joblib.dump(study, f"{experimentName}.pkl")

    historyFigure = plot_optimization_history(study)
    plotly.offline.plot(
        historyFigure, filename=f"{experimentName}.html"
    )

    with open(f"{experimentName}.txt", "w") as outputFile:
        print(
            study.best_params,
            study.best_value,
            file=outputFile,
        )

    print(get_param_importances(study))

    return study.best_trial.params, study.best_trial.values

## Optimized parameter execution & extraction

In [None]:
if runBayesOpt:
    optimalParams, optimalScore = executeOptunaSearch(
        bayesEpochs,
        "TPE",
        optunaObjective,
        modelRootDir,  # study identifier (experimentName)
    )

    print(f"Optimal hyperparameters: {optimalParams}")
    print(f"Optimal objective value: {optimalScore}")

    with open(
        "CNN_LSTM_best_params_min_max_mean_std.pkl",
        "wb",
    ) as paramFile:
        pickle.dump(optimalParams, paramFile)

else:
    with open(
        "CNN_LSTM_best_params_min_max_mean_std.pkl",
        "rb",
    ) as paramFile:
        optimalParams = pickle.load(paramFile)

dropoutRate = optimalParams["dropoutRate"]
learningRate = optimalParams["learningRate"]
numEpochs = optimalParams["epochCount"]
batchSize = optimalParams["batchSize"]

trainingResults = []

Model retraining with optimal hyperparameters

In [None]:
for runIndex in range(10):
    # cnnModel = CNNModel(enableFeatureMode, dropoutRate)
    cnnModel = CNNModel(dropoutRate)
    cnnModel = cnnModel.to(computeDevice)

    decayGroup = (
        param
        for name, param in cnnModel.named_parameters()
        if not name.endswith("bias") and "bn" not in name
    )
    noDecayGroup = (
        param
        for name, param in cnnModel.named_parameters()
        if name.endswith("bias") or "bn" in name
    )

    optimizerGroups = [
        {"params": decayGroup},
        {"params": noDecayGroup, "weight_decay": 0.0},
    ]

    optimizerInstance = torch.optim.Adam(
        optimizerGroups,
        learningRate,
        weight_decay=optimalParams["weightDecay"],
    )

    trainingTime, evalMetrics = trainNeuralNetwork(
        cnnModel,
        trainFeatures,
        trainLabels,
        testFeatures,
        testLabels,
        numEpochs,
        learningRate,
        batchSize,
        optimizerInstance,
        computeDevice,
        # enableFeatureMode,
        visualizer=None,
    )

    inferenceStart = time.time()

    # if enableFeatureMode:
    print(testFeatures.shape)

    reorderedTestX = testFeatures.permute(1, 2, 0)
    print(reorderedTestX.shape)

    inferenceOutputs = [
            cnnModel(
                sample.reshape(1, 4, 14)
                # , enableFeatureMode,
            )
            for sample in reorderedTestX[:1000].to(computeDevice)
    ]
    # else:
    #     inferenceOutputs = [
    #         cnnModel(
    #             sample.reshape(1, 14)
    #             # , enableFeatureMode,
    #         )
    #         for sample in testFeatures[:1000].to(computeDevice)
    #     ]

    inferenceEnd = time.time()

    avgInferenceTime = (
        inferenceEnd - inferenceStart
    ) / len(testFeatures[:1000])

    trainingResults.append([trainingTime, avgInferenceTime])

    print([trainingTime, avgInferenceTime])

trainingResults = np.array(trainingResults)

print(
    "Mean training duration for CNN:",
    np.mean(trainingResults[:, 0]),
)
print(
    "Mean inference duration:",
    np.mean(trainingResults[:, 1]),
)

os.makedirs("/content/model_save", exist_ok=True)

torch.save(
    cnnModel.state_dict(),
    f"/content/model_save/{modelRootDir.split("/")[2]}",
)

## Graphs

In [None]:
# Display average training and evaluation metrics
print(f'Average training duration: {trainingTime:.6f} s')
print(f'Test Set -> MAPE: {evalMetrics[1]:.6f}, RMSE: {evalMetrics[2]:.6f}, R2: {evalMetrics[3]:.6f}')

# Create subplots for visualizing individual battery predictions
# fig, axes = plt.subplots(3, 4, figsize=(17, 13))
fig, axes = plt.subplots(2, 5, figsize=(17, 13))
axes = axes.flat

# Load the trained model
cnnModel.load_state_dict(torch.load(f'model_save/{modelRootDir.split("/")[2]}', weights_only=True))
cnnModel.eval()

predictionsDict = {}
scatterPred, scatterTrue = [], []

print('--' * 20)

with torch.no_grad():
    batteryKeys = list(testXDict.keys())
    selectedKeys = np.random.choice(batteryKeys, len(axes), replace=False)

    for idx, key in enumerate(selectedKeys):
        sampleX = testXDict[key]
        reshapedX = transposeFeatureTensor(sampleX).to(computeDevice) # if enableFeatureMode else sampleX.to(computeDevice)

        # predictedY = cnnModel(reshapedX, enableFeatureMode).detach().cpu()
        predictedY = cnnModel(reshapedX).detach().cpu()
        scatterPred.extend(predictedY.numpy())
        scatterTrue.extend(testYDict[key].cpu().numpy())

        mapeScore = computeMAPE(predictedY, testYDict[key])
        rmseScore = computeRMSE(predictedY, testYDict[key])
        r2Score = computeR2Score(predictedY, testYDict[key])

        print(f'Battery {key} -> MAPE: {mapeScore:.6f}, RMSE: {rmseScore:.6f}, R2: {r2Score:.6f}')
        predictionsDict[key] = predictedY.numpy()

        cycleRange = np.arange(len(testYDict[key]))
        axes[idx].plot(cycleRange, testYDict[key], label='True SOH', linewidth=1, marker='+', markersize=1, zorder=1)
        axes[idx].plot(cycleRange, predictionsDict[key], label='Predicted SOH', linewidth=1, marker='_', markersize=1, zorder=1)

        rateLabel = str(round(float(key[2:key[2:].find("_")]), 2))
        tempLabel = key[key.rfind("_") - 2:key.rfind("_")]
        cellLabel = key[key.rfind("_") + 1:]
        axes[idx].set_title(f"NCA_{rateLabel}_{tempLabel} #{cellLabel}")

        if idx % 4 == 0:
            axes[idx].set_ylabel('SOH')
        if idx >= 8:
            axes[idx].set_xlabel('Cycle Number')
        axes[idx].legend(loc='upper right')

# Scatter plot of predictions vs. true SOH using logarithmic color scale for error
scatterPred, scatterTrue = np.array(scatterPred), np.array(scatterTrue)
predictionErrors = np.abs(scatterPred - scatterTrue)

scatterPlot = plt.scatter(
    scatterPred,
    scatterTrue,
    c=predictionErrors,
    cmap="viridis",
    norm=LogNorm(vmin=1e-4, vmax=1e-1),
    s=3,
    zorder=2,
)

colorBar = plt.colorbar(scatterPlot, ticks=[1e-4, 1e-3, 1e-2, 1e-1])
colorBar.set_label('Prediction Error', labelpad=10)
colorBar.ax.yaxis.set_major_formatter(ticker.LogFormatterMathtext())

plt.plot([0.7, 1], [0.7, 1], '--', color='gold', linewidth=2, label='Real SOH', zorder=1)
plt.xlim([0.7, 1])
plt.ylim([0.7, 1])
plt.xlabel("Actual SOH")
plt.ylabel("Predicted SOH")
plt.legend(loc='upper right')
plt.show()

## Transfer Learning

In [None]:
numFeatures = 3 # #CHANGE NUMBER OF FEATURES, CREATE num_features
modelSaveDir = "/content/New_CNN_GRU_min_max_mean"
tlModelPath = "/content/New_CNN_GRU_min_max_mean_TL_DS2"

#### Function Definition

**computeL2Loss:** Computes L2 distance between two model state dictionaries, ignoring batch-norm and tracking parameters.

**fineTuneModel:** Fine-tunes a model using a combination of prediction loss and L2 regularization toward a source model.

In [None]:
def computeL2Loss(paramSetA, paramSetB):
    """
    Computes L2 distance between two model state dictionaries, ignoring batch-norm and tracking parameters.
    """
    totalLoss = 0
    for key in paramSetA.keys():
        if 'running_mean' in key or 'running_var' in key or 'num_batches_tracked' in key:
            continue
        diff = paramSetA[key] - paramSetB[key]
        totalLoss += torch.sum(diff ** 2)
    return totalLoss

def fineTuneModel(model, trainX, trainY, testX, testY, epochs, learningRate, batchSize, optimizer, device, sourceParams, lambdaReg=0.05):
    """
    Fine-tunes a model using a combination of prediction loss and L2 regularization toward a source model.
    """
    startTime = time.time()
    mseLoss = nn.MSELoss()

    # Reshape features if using statistical features
    trainX = transposeFeatureTensor(trainX)
    testX = transposeFeatureTensor(testX)
    print("Train Data Shape:", len(trainX), len(trainX[0]), len(trainX[0][0]))
    print("Train Labels Shape:", len(trainY))
    print("Test Data Shape:", len(testX), len(testX[0]), len(testX[0][0]))
    print("Test Labels Shape:", len(testY))

    # Move tensors to device
    trainX, trainY = trainX.to(device), trainY.to(device)
    testX, testY = testX.to(device), testY.to(device)

    # Prepare DataLoader
    trainDataset = TensorDataset(trainX, trainY)
    trainLoader = DataLoader(trainDataset, batch_size=batchSize, shuffle=True)

    scheduler = torch.optim.lr_scheduler.OneCycleLR(
        optimizer,
        max_lr=learningRate,
        steps_per_epoch=len(trainLoader),
        epochs=epochs,
        pct_start=0.3,
        div_factor=25
    )

    for epoch in range(epochs):
        epochLosses = []
        model.train()
        for batchX, batchY in trainLoader:
            optimizer.zero_grad()
            predictions = model(batchX)
            l2RegLoss = computeL2Loss(sourceParams, model.state_dict())
            totalLoss = mseLoss(predictions.reshape(batchY.shape), batchY) + lambdaReg * l2RegLoss
            epochLosses.append(totalLoss.detach().cpu().numpy())
            totalLoss.backward()
            optimizer.step()
            scheduler.step()

        # Evaluate on the last epoch
        if epoch == epochs - 1:
            print('--' * 20)
            model.eval()
            with torch.no_grad():
                if testX.shape[1] != numFeatures:
                    testX = testX.reshape(len(testX[0]), len(testX), len(testX[0][0]))
                predictedY = [model(x).detach() for x in generateBatches(testX, batchSize=4096)]
                predictedY = torch.concat(predictedY, dim=0).reshape(testY.shape)

                mapeScore = computeMAPE(predictedY, testY)
                rmseScore = computeRMSE(predictedY, testY)
                r2Score = computeR2Score(predictedY, testY)

                print(f'Test Epoch {epoch} -> MAPE: {mapeScore:.6f}, RMSE: {rmseScore:.6f}, R2: {r2Score:.6f}')

    endTime = time.time()
    return endTime - startTime, [epochs, mapeScore.cpu().numpy(), rmseScore.cpu().numpy(), r2Score.cpu().numpy()]

In [None]:
needBo, boEpochs = False, 50
savePath = tlModelPath  # Path to save/load TL model
randomSeed = 10
performTransferLearning = True
datasetPath = r'/content/dataset2/Dataset_2_NCM_battery.xlsx'

# Load battery dataset containing voltage, current, and SOH (labels)
batteryData = readDatasetFiles(datasetPath)

# Define cell groups and stratified sampling parameters
cellGroups = [np.arange(1, 24), np.arange(24, 28), np.arange(28, 56)]
samplesPerGroup, samplingStep = [6, 1, 3], 10

# Generate stratified training cell list
trainCellsList = [np.random.RandomState(seed=randomSeed).choice(group, samplesPerGroup[i], replace=False)
                  for i, group in enumerate(cellGroups)]
trainCells = [cell for sublist in trainCellsList for cell in sublist]

# Split dataset into training and testing sets, using statistical features if enabled
trainXDict, trainYDict, testXDict, testYDict = splitCellsWithStats(batteryData, testCellIds=trainCells)
trainX, testX = scaleFeatureData(trainXDict, testXDict)
trainY = torch.cat([y for y in trainYDict.values()])
indices = np.arange(len(trainX[0]))

# Display dataset shapes
print("Training data shape:", len(trainX), len(trainX[0]), len(trainX[0][0]))
print("Testing data shape:", len(testX), len(testX[0]), len(testX[0][0]))

# Apply random shuffling and sparse sampling if using features
np.random.RandomState(seed=randomSeed).shuffle(indices)
trainX, trainY = trainX[:, indices[::samplingStep], :], trainY[indices[::samplingStep]]

# Optional second shuffling for added randomness
indices = np.arange(len(trainX[0]))
np.random.RandomState(seed=randomSeed).shuffle(indices)
trainX, trainY = trainX[:, indices, :], trainY[indices]

# Concatenate test labels
testY = torch.cat([y for y in testYDict.values()], dim=0)

# Move data to GPU if available
device = getAvailableDevice()

# Load pre-trained model for transfer learning
pretrainedParams = torch.load(f'/content/model_save/{modelRootDir.split("/")[2]}')  # Map to GPU if needed