In [None]:
import torch
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pandas.plotting import scatter_matrix
import sklearn
import joblib
import sys
from utils import capacity_vectorizer, get_indices, cycle_plotter, plot_capacity_vs_cycle
from scipy import io
import os
import random


SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)
    torch.cuda.manual_seed_all(SEED)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


In [None]:
import importlib
import utils
importlib.reload(utils)
from utils import generate_universal_segment_dataset

print("Generating Universal Segment Dataset (Uniform segments based on file limits)...")

df_universal_24 = generate_universal_segment_dataset(
    ['B0005', 'B0006', 'B0007', 'B0018'], 
    windows=None, 
    num_segments=12, 
    ambient_temp = 24,
    mode = 'random'
)

df_universal_4 = generate_universal_segment_dataset(
    ['B0049', 'B0050', 'B0051', 'B0053', 'B0054', 'B0055', 'B0056'], 
    windows=None, 
    num_segments=12, 
    ambient_temp = 4,
    mode = 'random'
)

if not df_universal_24.empty:
    print(f"Generated {len(df_universal_24)} training samples.")
    print(df_universal_24.head())
    print("\nSample of generated window ranges:")
    print(df_universal_24[['start_v', 'end_v']].drop_duplicates().head(12))
else:
    print("Dataset generation failed or returned empty.")

In [None]:
df_temp_24 = df_universal_24.copy()
df_temp_24.reset_index(inplace=True)
df_temp_24.drop(["index"], axis=1, inplace=True)
df_temp_24

In [None]:
df_temp_4 = df_universal_4.copy()
df_temp_4 = df_temp_4.loc[df_temp_4['capacity'] != 0]
df_temp_4.reset_index(inplace=True)
df_temp_4.drop(["index"], axis=1, inplace=True)

df_temp_4

In [None]:
df_temp_24.corr(numeric_only=True)

In [None]:
df_temp_4.corr(numeric_only=True)

In [None]:
df_temp_24 = df_temp_24[df_temp_24['capacity'] > 0].copy()
df_temp_24.reset_index(drop=True, inplace=True)

from sklearn.ensemble import IsolationForest
clf = IsolationForest(n_estimators=100, contamination=0.03, warm_start=True, random_state=42)
outlier_locs = clf.fit_predict(df_temp_24[['cycle_num', 'capacity']])

df_temp_24 = df_temp_24[outlier_locs != -1]
df_temp_24.reset_index(drop=True, inplace=True)
df_temp_24.corr(numeric_only=True)

In [None]:
df_temp_4.dropna(inplace=True)
df_temp_4.reset_index(inplace=True)
df_temp_4.drop(["index"], axis=1, inplace=True)
clf = IsolationForest(n_estimators=100, contamination=0.1, warm_start=True, random_state=42)
outlier_locs = clf.fit_predict(df_temp_4[['cycle_num','capacity']])

In [None]:
df_temp_4 = df_temp_4[outlier_locs != -1]
df_temp_4.reset_index(drop=True, inplace=True)
df_temp_4.corr(numeric_only=True)

In [None]:
df_temp_4.reset_index(inplace=True)
df_temp_4.drop(["index"], axis=1, inplace=True)

df_temp_24.reset_index(inplace=True)
df_temp_24.drop(["index"], axis=1, inplace=True)
df_temp_24

In [None]:
df_all = pd.concat([df_temp_24, df_temp_4])
df_all.reset_index(inplace=True)
df_all.drop(['index'], axis = 1, inplace=True)
df_onehot = pd.get_dummies(df_all.ambient_temp, prefix='ambient_temp', dtype=float)
df_all = df_all.join(df_onehot)
df_all.drop(['ambient_temp'], axis =1, inplace=True)

In [None]:
df_all

In [None]:
i = df_all[df_all.battery_id == 'B0050'].index
df_all.drop(i, inplace=True)
df_all.reset_index(inplace=True)
df_all.drop(['index'], axis=1, inplace=True)
df_all

In [None]:
from sklearn.model_selection import train_test_split

df_train, df_val_test = train_test_split(df_all, test_size=0.4, random_state=42, stratify=df_all[['battery_id']])
df_train.drop(['battery_id'], axis = 1, inplace=True)
df_val, df_test = train_test_split(df_val_test, test_size=0.5, random_state=42, stratify=df_val_test[['battery_id']])
df_val.drop(['battery_id'], axis = 1, inplace=True)
df_test.drop(['battery_id'], axis = 1, inplace=True)

cols_to_drop = ['capacity', 'ambient_temp_4', 'ambient_temp_24']

X_train_num = df_train.drop(cols_to_drop, axis=1)
X_train_cat = df_train[['ambient_temp_4', 'ambient_temp_24']]
y_train = df_train.capacity

X_val_num = df_val.drop(cols_to_drop, axis=1)
X_val_cat = df_val[['ambient_temp_4', 'ambient_temp_24']]
y_val = df_val.capacity

X_test_num = df_test.drop(cols_to_drop, axis=1)
X_test_cat = df_test[['ambient_temp_4', 'ambient_temp_24']]
y_test = df_test.capacity

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler

class DenseNN(nn.Module):
    def __init__(self, input_size, hidden_sizes, output_size, dropout_rate=0.0):
        super(DenseNN, self).__init__()
        layers = []
        in_size = input_size
        for h_size in hidden_sizes:
            layers.append(nn.Linear(in_size, h_size))
            layers.append(nn.ReLU())
            if dropout_rate > 0:
                layers.append(nn.Dropout(dropout_rate))
            in_size = h_size
        layers.append(nn.Linear(in_size, output_size))
        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)


In [None]:
def train_model(model, X_train_num, X_train_cat, y_train, X_val_num, X_val_cat, y_val, num_epochs=100, batch_size=64, learning_rate=0.0005):
    scaler = StandardScaler()
    X_train_num = scaler.fit_transform(X_train_num)
    X_val_num = scaler.transform(X_val_num)

    X_train = np.concatenate((X_train_num, X_train_cat.values), axis=1)
    X_val = np.concatenate((X_val_num, X_val_cat.values), axis=1)

    train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float32), torch.tensor(y_train.values, dtype=torch.float32))
    val_dataset = TensorDataset(torch.tensor(X_val, dtype=torch.float32), torch.tensor(y_val.values, dtype=torch.float32))

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    best_val_loss = float('inf')
    patience_counter = 0

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch).squeeze()
            loss = criterion(outputs, y_batch)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        model.eval()
        with torch.no_grad():
            val_predictions = model(torch.tensor(X_val, dtype=torch.float32))
            val_loss = criterion(val_predictions, torch.tensor(y_val.values, dtype=torch.float32).reshape(-1, 1))
        
        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss/len(train_loader):.4f}, Val Loss:  {val_loss:.4f}')
        
        if val_loss < best_val_loss: 
            best_val_loss = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), 'best_model.pth')
        else:
            patience_counter += 1
            if patience_counter >= 15:
                print("Early stopping triggered")
                break

    return model, scaler

In [None]:
model = DenseNN(input_size=X_train_num.shape[1] + X_train_cat.shape[1], hidden_sizes=[256, 128, 64], output_size=1, dropout_rate=0.1)
trained_model, scaler = train_model(model, X_train_num, X_train_cat, y_train, X_val_num, X_val_cat, y_val)

In [None]:
save_path = 'trained_model'
if not os.path.exists(save_path):
    os.makedirs(save_path)
torch.save(trained_model.state_dict(), os.path.join(save_path, 'soh_model.pth'))
print(f"Model saved successfully to {os.path.join(save_path, 'soh_model.pth')}")

In [None]:
def evaluate_model(model, X_test, y_test):
    model.eval()
    with torch.no_grad():
        predictions = model(torch.FloatTensor(X_test)).numpy().flatten()
    
    mse = np.mean((y_test - predictions) ** 2)
    rmse = np.sqrt(mse)
    mae = np. mean(np.abs(y_test - predictions))
    mape = np.mean(np.abs((y_test - predictions) / y_test)) * 100
    r2 = 1 - (np.sum((y_test - predictions) ** 2) / np.sum((y_test - np.mean(y_test)) ** 2))
    
    print(f"MSE:   {mse:.6f}")
    print(f"RMSE: {rmse:.6f}")
    print(f"MAE:  {mae:.6f}")
    print(f"MAPE:  {mape:.2f}%")
    print(f"RÂ²:   {r2:.4f}")
    
    return {'mse': mse, 'rmse': rmse, 'mae': mae, 'mape':  mape, 'r2': r2}

evaluate_model(trained_model, np.concatenate((scaler.transform(X_test_num), X_test_cat.values), axis=1), y_test)

In [None]:
model.eval()
with torch.no_grad():
    predictions = model(torch.FloatTensor(np.concatenate((scaler.transform(X_test_num), X_test_cat.values), axis=1))).numpy().flatten()

df_eval = pd.DataFrame({'Actual': y_test.values, 'Predicted': predictions})
df_eval

In [None]:
indices = df_all.index

df_train_idx, df_val_test_idx = train_test_split(indices, test_size=0.4, random_state=42, stratify=df_all[['battery_id']])
df_val_idx, df_test_idx = train_test_split(df_val_test_idx, test_size=0.5, random_state=42, stratify=df_all.loc[df_val_test_idx][['battery_id']])

set_mapping = pd.Series(index=indices, data='Unknown')
set_mapping.loc[df_train_idx] = 'Train'
set_mapping.loc[df_val_idx] = 'Validation'
set_mapping.loc[df_test_idx] = 'Test'

df_viz = df_all.copy()
df_viz['Set'] = set_mapping

cols_to_drop_viz = ['capacity', 'ambient_temp_4', 'ambient_temp_24', 'battery_id', 'Set']
X_num_viz = df_viz.drop(cols_to_drop_viz, axis=1)
X_cat_viz = df_viz[['ambient_temp_4', 'ambient_temp_24']]
X_num_viz_scaled = scaler.transform(X_num_viz)
X_viz_full = np.concatenate((X_num_viz_scaled, X_cat_viz.values), axis=1)

model.eval()
with torch.no_grad():
    preds_viz = model(torch.FloatTensor(X_viz_full)).numpy().flatten()
df_viz['Predicted'] = preds_viz

unique_batteries = df_viz['battery_id'].unique()
colors = {'Train': 'blue', 'Validation': 'orange', 'Test': 'green'}

for battery in unique_batteries:
    battery_data = df_viz[df_viz['battery_id'] == battery].copy()
    consensus_data = battery_data.groupby(['cycle_num', 'Set'])[['capacity', 'Predicted']].mean().reset_index()
    
    plt.figure(figsize=(14, 5))
    plt.plot(consensus_data['cycle_num'], consensus_data['capacity'], 
             color='black', linewidth=2, label='Actual Capacity', alpha=0.7)
    
  
    for set_name in ['Validation', 'Test']:
        subset = consensus_data[consensus_data['Set'] == set_name]
        if not subset.empty:
            plt.scatter(subset['cycle_num'], subset['Predicted'], 
                       color=colors[set_name], s=10, label=f'{set_name} Set', alpha=0.8)

    plt.title(f'Capacity Prediction (Averaged over all segments) - {battery}')
    plt.xlabel('Cycle Number')
    plt.ylabel('Capacity (Ah)')
    plt.legend()
    plt.grid(True, alpha=0.3)
    
    plt.savefig(f'Figures/PDF/soh_prediction_{battery}.pdf', format='pdf', bbox_inches='tight', dpi=300)
    plt.savefig(f'Figures/PNG/soh_prediction_{battery}.png', format='png', bbox_inches='tight', dpi=300)
    
    plt.show()

In [None]:
def predict_capacity(model, scaler, cycle_num, start_v, end_v, duration, mean_current, ambient_temp_24=1, ambient_temp_4=0):
    model.eval()
    slope = (start_v - end_v) / duration if duration > 0 else 0
    numeric_features = np.array([[slope, duration, mean_current, start_v, end_v, cycle_num]])
    numeric_scaled = scaler.transform(numeric_features)
    cat_features = np.array([[ambient_temp_4, ambient_temp_24]])
    input_tensor = torch.FloatTensor(np.concatenate((numeric_scaled, cat_features), axis=1))
    with torch.no_grad():
        pred = model(input_tensor).item()
        
    return pred
cap_shallow = predict_capacity(trained_model, scaler, cycle_num=50, start_v=4.2, end_v=3.7, duration=1000, mean_current=2)
cap_deep = predict_capacity(trained_model, scaler, cycle_num=50, start_v=4.2, end_v=2.8, duration=1000, mean_current=2)
print(f"Capacity (Shallow DoD 4.2->3.7): {cap_shallow:.4f} Ah")
print(f"Capacity (Deep DoD 4.2->2.8):    {cap_deep:.4f} Ah")
print(f"Difference: {cap_shallow - cap_deep:.4f} Ah")