In [26]:
import pandas as pd
import numpy as np
import random
import os
import time
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
import time

from numpy import hstack, vstack
import itertools
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from itertools import product

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_absolute_percentage_error, r2_score

from torch.utils.data import DataLoader, TensorDataset

import warnings
warnings.filterwarnings(action='ignore')

In [27]:
def find_directory(foldername, filename = None, back_num = 0):
    cur = os.getcwd()
    for i in range(back_num):
        cur = os.path.abspath(os.path.join(cur, os.pardir))
    for folder in foldername:
        cur = os.path.join(cur, folder)
    if not os.path.exists(cur):
        os.makedirs(cur)
        print(f'{cur} created')
    if filename != None:
        cur = os.path.join(cur, filename)
    return cur

os.getcwd()

'c:\\Users\\dkstj\\Desktop\\Rapid\\machine_learning'

In [28]:
csv_add = find_directory(foldername = [], filename = 'SOC_Point_Data.csv')
dat = pd.read_csv(csv_add, index_col = (0,1,2,3,4))

In [29]:
def Get_Data_all(dat) :
    
    RPT_MODE = "0.1C"
    SOC_Range = [9,10,11,12]

    Time_Range = range(4, 1201, 2)
    SOC_Range = [str(i) for i in SOC_Range]
    
    Data = dat

    X = Data.loc[RPT_MODE, SOC_Range]
    Y = Data.loc[RPT_MODE, ["SOH", "Next_SOH", "Ratio_SOH", "Ratio_CYC"]].groupby(level = ["Next", "Path", "Number"]).mean()
    y = Y["Next_SOH"]
    
    return X, y

In [30]:
def Even_Split(X, y, test_size, rs):
    XX = {n: X.xs(key=n, level='Next', drop_level=False) for n in ["M", "D", "H"]}
    yy = {n: y.xs(key=n, level='Next', drop_level=False) for n in ["M", "D", "H"]}

    XXX, yyy = {n: [] for n in ["M", "D", "H"]}, {n: [] for n in ["M", "D", "H"]}
    for n in ["M", "D", "H"]:
        for path in range(1, 5):
            X_path = XX[n].loc[XX[n].index.get_level_values('Path').str.len() == path]
            y_path = yy[n].loc[yy[n].index.get_level_values('Path').str.len() == path]

            if 'Time' in y_path.index.names:
                y_path = y_path.reset_index('Time', drop=True)

            XXX[n].append(X_path)
            yyy[n].append(y_path)

    XX_tn, XX_te = {n: [] for n in ["M", "D", "H"]}, {n: [] for n in ["M", "D", "H"]}
    yy_tn, yy_te = {n: [] for n in ["M", "D", "H"]}, {n: [] for n in ["M", "D", "H"]}

    for n in ["M", "D", "H"]:
        for path in range(1, 5):
            X_temp = XXX[n][path-1]
            y_temp = yyy[n][path-1]

            cell_index = y_temp.index.drop_duplicates()

            cells_tn, cells_te = train_test_split(cell_index, test_size=test_size, random_state=rs)

            X_tn = X_temp[X_temp.index.droplevel('Time').isin(cells_tn)]
            X_te = X_temp[X_temp.index.droplevel('Time').isin(cells_te)]

            y_tn = y_temp.loc[cells_tn]
            y_te = y_temp.loc[cells_te]

            XX_tn[n].append(X_tn)
            XX_te[n].append(X_te)
            yy_tn[n].append(y_tn)
            yy_te[n].append(y_te)

    X_tn = pd.concat([pd.concat(XX_tn[n]) for n in ["M", "D", "H"]])
    X_te = pd.concat([pd.concat(XX_te[n]) for n in ["M", "D", "H"]])
    y_tn = pd.concat([pd.concat(yy_tn[n]) for n in ["M", "D", "H"]])
    y_te = pd.concat([pd.concat(yy_te[n]) for n in ["M", "D", "H"]])

    return X_tn, X_te, y_tn, y_te

In [31]:
def convert_to_tensor(X_tn, X_te, y_tn, y_te, batch_size=32, val_size=1/6, rs=100):
    """
    X_tn, X_te: MultiIndex DataFrame (Next, Path, Number, Time)
    y_tn, y_te: Series or DataFrame with index (Next, Path, Number)
    """

    X_tr, X_vl, y_tr, y_vl = Even_Split(X_tn, y_tn, val_size, rs)
    keys_train = y_tr.index
    keys_val = y_vl.index

    scaler = StandardScaler()
    X_tn_scaled = X_tn.copy()
    X_te_scaled = X_te.copy()

    scaler.fit(X_tn.values)
    X_tn_scaled.loc[:, :] = scaler.transform(X_tn.values)
    X_te_scaled.loc[:, :] = scaler.transform(X_te.values)

    next_map = {'M': 0, 'D': 1, 'H': 2}
    get_next = lambda idx: torch.nn.functional.one_hot(torch.tensor([next_map[i] for i in idx.get_level_values("Next")]), num_classes=3).float()

    def to_tensor(X_df, y_df, keys):
        X_group = X_df.groupby(level=["Next", "Path", "Number"])
        X_tensor = torch.stack([
            torch.tensor(X_group.get_group(k).values, dtype=torch.float32)
            for k in keys
        ])
        y_tensor = torch.tensor(y_df.loc[keys].values, dtype=torch.float32).unsqueeze(1)
        return X_tensor, y_tensor

    next_train = get_next(pd.MultiIndex.from_tuples(keys_train, names=["Next", "Path", "Number"]))
    next_val   = get_next(pd.MultiIndex.from_tuples(keys_val,   names=["Next", "Path", "Number"]))
    next_test  = get_next(y_te.index)
    
    X_train_tensor, y_train_tensor = to_tensor(X_tn_scaled, y_tn, keys_train)
    X_val_tensor,   y_val_tensor   = to_tensor(X_tn_scaled, y_tn, keys_val)
    X_test_tensor,  y_test_tensor  = to_tensor(X_te_scaled, y_te, y_te.index.drop_duplicates())

    train_loader = DataLoader(TensorDataset(X_train_tensor, next_train, y_train_tensor), batch_size=batch_size, shuffle=True)
    val_loader   = DataLoader(TensorDataset(X_val_tensor, next_val,  y_val_tensor),   batch_size=batch_size, shuffle=False)
    test_loader  = DataLoader(TensorDataset(X_test_tensor, next_test,  y_test_tensor),  batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader, X_train_tensor, y_train_tensor, X_val_tensor, y_val_tensor, X_test_tensor, y_test_tensor, scaler

In [32]:
def setRandomSeed(random_seed=0):
    os.environ['PYTHONHASHSEED'] = str(random_seed)
    torch.manual_seed(random_seed)
    torch.cuda.manual_seed(random_seed)
    torch.cuda.manual_seed_all(random_seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(random_seed)
    random.seed(random_seed)

In [33]:
class Trainer:
    def __init__(self, model, lr=1e-3, weight_decay=0, epoch=1000, patience=50, random_seed = 0):
        self.random_seed = random_seed
        #setRandomSeed(self.random_seed)
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model = model.to(self.device)
        self.criterion = nn.MSELoss()
        self.cri2 = MAPELoss()
        self.optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
        self.scheduler = torch.optim.lr_scheduler.StepLR(self.optimizer, step_size=200, gamma=0.8)
        self.epoch = epoch
        self.patience = patience

        self.train_loss = []
        self.test_loss = []
        self.val_loss = []

    def train(self, train_loader, test_loader=None, val_loader=None):
        best_val_loss = float('inf')
        best_model_state = None
        epochs_no_improve = 0

        for ep in range(1, self.epoch + 1):
            self.model.train()
            for x_batch, onehot, y_batch in train_loader:
                x_batch, onehot, y_batch = x_batch.to(self.device), onehot.to(self.device), y_batch.to(self.device)

                self.optimizer.zero_grad()
                outputs = self.model(x_batch, onehot)
                loss = self.cri2(outputs, y_batch)
                loss.backward()
                self.optimizer.step()

            self.scheduler.step()

            avg_train_loss = self.evaluate(train_loader)
            test_loss = self.evaluate(test_loader) if test_loader else None
            val_loss = self.evaluate(val_loader) if val_loader else None

            self.train_loss.append(avg_train_loss)
            self.test_loss.append(test_loss)
            self.val_loss.append(val_loss)

            if val_loader:
                if val_loss < best_val_loss - 1e-4:
                    best_val_loss = val_loss
                    best_model_state = self.model.state_dict()
                    epochs_no_improve = 0
                else:
                    epochs_no_improve += 1
                    if epochs_no_improve >= self.patience:
                        print(f"Early stopping at epoch {ep}")
                        break

        if best_model_state:
            self.model.load_state_dict(best_model_state)

    def evaluate(self, data_loader):
        if data_loader is None:
            return None
        self.model.eval()
        total_loss = 0
        with torch.no_grad():
            for x_batch, onehot, y_batch in data_loader:
                x_batch, onehot, y_batch = x_batch.to(self.device), onehot.to(self.device), y_batch.to(self.device)
                outputs = self.model(x_batch, onehot)
                loss = self.cri2(outputs, y_batch)
                total_loss += loss.item()
        return total_loss / len(data_loader)

    def predict(self, x, onehot):
        self.model.eval()
        with torch.no_grad():
            x = x.to(self.device)
            onehot = onehot.to(self.device)
            return self.model(x, onehot)

In [34]:
def plot_results(model, info, train_loader, val_loader, test_loader, plot = True):
    rs, c1, c2, k1, k2 = info
    
    y_true_train = []
    y_pred_train = []
    
    y_true_val = []
    y_pred_val = []
    
    y_true_test = []
    y_pred_test = []

    y_true_test_M = []
    y_pred_test_M = []

    y_true_test_D = []
    y_pred_test_D = []

    y_true_test_H = []
    y_pred_test_H = []
    
    for x_batch, onehot, y_batch in train_loader:
        preds = trainer.predict(x_batch, onehot)
        y_true_train.append(y_batch)
        y_pred_train.append(preds.cpu())
    
    for x_batch, onehot, y_batch in val_loader:
        preds = trainer.predict(x_batch, onehot)
        y_true_val.append(y_batch)
        y_pred_val.append(preds.cpu())
    
    for x_batch, onehot, y_batch in test_loader:
        preds = trainer.predict(x_batch, onehot)
        y_true_test.append(y_batch)
        y_pred_test.append(preds.cpu())
    
        onehot_np = onehot.cpu().numpy()
        y_true_np = y_batch.cpu().numpy()
        y_pred_np = preds.cpu().numpy()
    
        for i in range(len(onehot_np)):
            if np.array_equal(onehot_np[i], [1, 0, 0]):  # 'M'
                y_true_test_M.append(y_true_np[i])
                y_pred_test_M.append(y_pred_np[i])
            elif np.array_equal(onehot_np[i], [0, 1, 0]):  # 'D'
                y_true_test_D.append(y_true_np[i])
                y_pred_test_D.append(y_pred_np[i])
            elif np.array_equal(onehot_np[i], [0, 0, 1]):  # 'H'
                y_true_test_H.append(y_true_np[i])
                y_pred_test_H.append(y_pred_np[i])

    y_true_train = torch.cat(y_true_train).numpy()
    y_pred_train = torch.cat(y_pred_train).numpy()
    
    y_true_val = torch.cat(y_true_val).numpy()
    y_pred_val = torch.cat(y_pred_val).numpy()
    
    y_true_test = torch.cat(y_true_test).numpy()
    y_pred_test = torch.cat(y_pred_test).numpy()

    y_true_test_M = np.array(y_true_test_M)
    y_pred_test_M = np.array(y_pred_test_M)
    
    y_true_test_D = np.array(y_true_test_D)
    y_pred_test_D = np.array(y_pred_test_D)
    
    y_true_test_H = np.array(y_true_test_H)
    y_pred_test_H = np.array(y_pred_test_H)
    
    mape_M = mean_absolute_percentage_error(y_true_test_M, y_pred_test_M) * 100 if len(y_true_test_M) > 0 else np.nan
    mape_D = mean_absolute_percentage_error(y_true_test_D, y_pred_test_D) * 100 if len(y_true_test_D) > 0 else np.nan
    mape_H = mean_absolute_percentage_error(y_true_test_H, y_pred_test_H) * 100 if len(y_true_test_H) > 0 else np.nan
    
    train_mape = mean_absolute_percentage_error(y_true_train, y_pred_train) * 100
    val_mape = mean_absolute_percentage_error(y_true_val, y_pred_val) * 100
    test_mape = mean_absolute_percentage_error(y_true_test, y_pred_test) * 100
    print(f"Model: {model}, rs: {rs}, c1: {c1}, c2: {c2}, k1: {k1}, k2: {k2}\n Train MAPE: {train_mape:.2f}%, Val MAPE: {val_mape:.2f}%, Test MAPE: {test_mape:.2f}%")
    print(f"Test MAPE by 'Next': M: {mape_M:.2f}%, D: {mape_D:.2f}%, H: {mape_H:.2f}%")
    
    if plot == True:
        _ = plt.figure()
        _ = plt.scatter(y_true_train, y_pred_train, label = 'Train')
        _ = plt.scatter(y_true_val, y_pred_val, label = 'Val')
        _ = plt.scatter(y_true_test, y_pred_test, label = 'Test')
        min_val = min(y_true_train.min(), y_true_test.min())
        max_val = max(y_true_train.max(), y_true_test.max())
        _ = plt.plot([min_val, max_val], [min_val, max_val], 'k--', label='Ideal line')
        _ = plt.xlabel('True SOH')
        _ = plt.ylabel('Predicted SOH')
        _ = plt.legend()
        _ = plt.title(f'Random state: {rs}, hid: {hid}, {nl} layer, lr: {lr}')
    return train_mape, val_mape, test_mape, mape_M, mape_D, mape_H

def plot_loss(info, train_loss, val_loss, test_loss):
    rs, c1, c2, k1, k2 = info
    _ = plt.figure()
    _ = plt.plot(train_loss, label = 'Train loss')
    _ = plt.plot(val_loss, label = 'Val loss')
    _ = plt.plot(test_loss, label = 'Test loss')
    _ = plt.ylim([0, 2])
    _ = plt.xlabel('Epoch')
    _ = plt.ylabel('Loss')
    _ = plt.legend()
    _ = plt.title(f'Random state: {rs}, hid: {hid}, {nl} layers, lr: {lr}')

## 1. 2D CNN

In [35]:
class CNN2D_Compressor(nn.Module):
    def __init__(self, 
                 output_dim=4,
                 conv1_channels=16,
                 conv2_channels=16,
                 kernel_size1=(5, 2),
                 kernel_size2=(3, 1)):
        super(CNN2D_Compressor, self).__init__()

        self.feature_extractor = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=conv1_channels, kernel_size=kernel_size1, stride=1,
                      padding=(kernel_size1[0] // 2, kernel_size1[1] // 2)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2)),
            nn.Conv2d(conv1_channels, conv2_channels, kernel_size=kernel_size2,
                      padding=(kernel_size2[0] // 2, kernel_size2[1] // 2)),
            nn.ReLU(),
            nn.AdaptiveAvgPool2d((1, 1))
        )

        self.fc = nn.Linear(conv2_channels, output_dim)

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.feature_extractor(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

In [36]:
class CNN2D_MLP(nn.Module):
    def __init__(self, conv1_channels, conv2_channels, kernel_size1, kernel_size2, out_dim=4, hidden_dim=64, num_layers=2, next_dim = 3):
        super(CNN2D_MLP, self).__init__()
        self.encoder = CNN2D_Compressor(out_dim, conv1_channels, conv2_channels, kernel_size1, kernel_size2)
        
        layers = []

        input_dim = out_dim + next_dim
        output_dim = 1

        layers.append(nn.Linear(input_dim, hidden_dim))
        layers.append(nn.ReLU())

        for _ in range(num_layers - 1):
            layers.append(nn.Linear(hidden_dim, hidden_dim))
            layers.append(nn.ReLU())


        layers.append(nn.Linear(hidden_dim, output_dim))

        self.model = nn.Sequential(*layers)

    def forward(self, x, onehot):
        x_feat = self.encoder(x)
        x_concat = torch.cat([x_feat, onehot], dim=1)
        return self.model(x_concat)

class MAPELoss(nn.Module):
    def __init__(self, epsilon=1e-7):
        super(MAPELoss, self).__init__()
        self.epsilon = epsilon

    def forward(self, y_pred, y_true):
        return torch.mean(torch.abs((y_true - y_pred) / (y_true + self.epsilon))) * 100

## Test

In [37]:
random_states = [100, 120, 140, 160, 180]
bs = 12
ep = 1000
hid = 16
layer = 5
lr = 1e-3

conv1_channels = [8, 16]
conv2_channels = [8, 16]
kernel_size1 = [(2,2), (3,2)]
kernel_size2 = [(2,2), (3,2)]

results_df = pd.DataFrame(columns = ['rs', 'c1', 'c2', 'k1', 'k2', 'Train MAPE', 'Val MAPE', 'Test MAPE', 'Next M MAPE', 'Next D MAPE', 'Next H MAPE'])

X, y = Get_Data_all(dat)

for rs, c1, c2, k1, k2 in product(random_states, conv1_channels, conv2_channels, kernel_size1, kernel_size2):
    setRandomSeed(rs)
    start = time.time()
    X_tn, X_te, y_tn, y_te = Even_Split(X, y, 1/3, rs)
    train_loader, val_loader, test_loader, X_train_tensor, y_train_tensor, X_val_tensor, y_val_tensor, X_test_tensor, y_test_tensor, scaler = convert_to_tensor(X_tn, X_te, y_tn, y_te, bs, 1/6, rs)
    
    model = CNN2D_MLP(c1, c2, k1, k2, 4, hid, layer)
    trainer = Trainer(model, lr=lr, epoch = ep, random_seed = rs)
    
    trainer.train(train_loader, val_loader, test_loader)

    info = [rs, c1, c2, k1, k2]
    results = plot_results("2D CNN", info, train_loader, val_loader, test_loader, plot = False)

    temp_df = pd.DataFrame(info+list(results)).T
    temp_df.columns = ['rs', 'c1', 'c2', 'k1', 'k2', 'Train MAPE', 'Val MAPE', 'Test MAPE', 'Next M MAPE', 'Next D MAPE', 'Next H MAPE']
    results_df = pd.concat([results_df, temp_df])
    print(f"{time.time()-start:.2f} s")

Early stopping at epoch 160
Model: 2D CNN, rs: 100, c1: 8, c2: 8, k1: (2, 2), k2: (2, 2)
 Train MAPE: 0.95%, Val MAPE: 1.12%, Test MAPE: 1.11%
Test MAPE by 'Next': M: 1.14%, D: 1.11%, H: 1.07%
22.81 s
Early stopping at epoch 214
Model: 2D CNN, rs: 100, c1: 8, c2: 8, k1: (2, 2), k2: (3, 2)
 Train MAPE: 1.13%, Val MAPE: 1.15%, Test MAPE: 1.08%
Test MAPE by 'Next': M: 1.03%, D: 1.16%, H: 1.03%
29.54 s
Early stopping at epoch 154
Model: 2D CNN, rs: 100, c1: 8, c2: 8, k1: (3, 2), k2: (2, 2)
 Train MAPE: 0.95%, Val MAPE: 1.09%, Test MAPE: 1.15%
Test MAPE by 'Next': M: 1.22%, D: 1.14%, H: 1.08%
21.92 s
Early stopping at epoch 245
Model: 2D CNN, rs: 100, c1: 8, c2: 8, k1: (3, 2), k2: (3, 2)
 Train MAPE: 0.98%, Val MAPE: 1.13%, Test MAPE: 1.05%
Test MAPE by 'Next': M: 1.04%, D: 1.05%, H: 1.05%
33.26 s
Early stopping at epoch 206
Model: 2D CNN, rs: 100, c1: 8, c2: 16, k1: (2, 2), k2: (2, 2)
 Train MAPE: 1.04%, Val MAPE: 1.02%, Test MAPE: 1.20%
Test MAPE by 'Next': M: 1.26%, D: 1.11%, H: 1.25%
30

In [None]:
summary_df = results_df.groupby(['c1', 'c2', 'k1', 'k2'])[['Train MAPE', 'Val MAPE', 'Test MAPE', 'Next M MAPE', 'Next D MAPE', 'Next H MAPE']].mean().reset_index()
summary_df 
summary_df.to_csv('2D CNN_next_info_sum.csv')
results_df.to_csv('2D CNN_next_info.csv')

Unnamed: 0,c1,c2,k1,k2,Train MAPE,Val MAPE,Test MAPE,Next M MAPE,Next D MAPE,Next H MAPE
0,8,8,"(2, 2)","(2, 2)",0.954008,1.145687,0.981896,1.068266,0.930788,0.946634
1,8,8,"(2, 2)","(3, 2)",1.031902,1.178456,1.003655,1.052415,0.973837,0.984712
2,8,8,"(3, 2)","(2, 2)",0.983071,1.167823,1.087242,1.165246,1.054766,1.041715
3,8,8,"(3, 2)","(3, 2)",1.083834,1.183927,1.120445,1.168796,1.050875,1.141663
4,8,16,"(2, 2)","(2, 2)",0.975927,1.129331,1.007508,1.101383,0.903629,1.017512
5,8,16,"(2, 2)","(3, 2)",1.020755,1.115666,1.099553,1.216827,0.990186,1.091646
6,8,16,"(3, 2)","(2, 2)",1.132988,1.195284,1.183142,1.317258,1.069226,1.162944
7,8,16,"(3, 2)","(3, 2)",0.957826,1.133699,0.984758,1.049777,0.920061,0.984434
8,16,8,"(2, 2)","(2, 2)",1.03734,1.218445,1.059832,1.150992,1.017884,1.01062
9,16,8,"(2, 2)","(3, 2)",1.022197,1.190422,1.10386,1.174179,1.047484,1.089918
