In [1]:
from sklearn.preprocessing import RobustScaler, OneHotEncoder, StandardScaler, MinMaxScaler
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split, KFold
import pandas as pd
import numpy as np
import re
import optuna
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import itertools
import matplotlib.pyplot as plt
from torch.nn import MSELoss

In [2]:
pd.set_option('display.max_seq_item', None)

In [3]:
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

# Data preprocessing

In [4]:
# Load the dataset
dataset = pd.read_csv('battery_feature_extracted.csv')

In [5]:
# Select features and target
X = dataset.drop(columns=['average_voltage'])
y = dataset['average_voltage']

In [6]:
# First split to separate out the test set
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)

In [7]:
# Second split: separate the training set into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, random_state=42)  # 20% for validation


In [8]:
# Standardizing the features (fit on X_train, apply to all)
scaler = RobustScaler()
#scaler = StandardScaler()
#scaler = MinMaxScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)

In [9]:
# Convert to tensors
X_train_tensor = torch.FloatTensor(X_train_scaled)
y_train_tensor = torch.FloatTensor(y_train.values).unsqueeze(1)  # Ensure target tensor is of the right shape
X_val_tensor = torch.FloatTensor(X_val_scaled)
y_val_tensor = torch.FloatTensor(y_val.values).unsqueeze(1)
X_test_tensor = torch.FloatTensor(X_test_scaled)
y_test_tensor = torch.FloatTensor(y_test.values).unsqueeze(1)

In [10]:
# Model parameters
num_features = X_train_scaled.shape[1]
output_size = 1  # For regression, we predict a single continuous value

# Imbalance Mitigation

## Calculate Ion Distribution in Train / Val / Test Sets

In [11]:
# Get the ion columns
ion_columns = [col for col in X.columns if col.startswith("working_ion_")]


In [12]:
# Get corresponding subsets
X_train_df = X_train.copy()
X_val_df = X_val.copy()
X_test_df = X_test.copy()

In [13]:
# Count samples per ion in each split
def count_ions(df):
    return df[ion_columns].sum().astype(int)

In [14]:
train_counts = count_ions(X_train_df)
val_counts = count_ions(X_val_df)
test_counts = count_ions(X_test_df)

In [15]:
# Combine into a single DataFrame
ion_distribution = pd.DataFrame({
    'Train': train_counts,
    'Validation': val_counts,
    'Test': test_counts,
    'Total': train_counts + val_counts + test_counts
})

In [16]:
ion_distribution.index = ion_distribution.index.str.replace('working_ion_', '')
display(ion_distribution)

Unnamed: 0,Train,Validation,Test,Total
Al,75,10,10,95
Ca,293,82,60,435
Cs,24,6,3,33
K,83,18,6,107
Li,1770,441,229,2440
Mg,297,78,48,423
Na,223,55,31,309
Rb,37,5,8,50
Y,70,19,4,93
Zn,260,69,37,366


## Sample weighting in loss function

In [18]:
# Compute per-sample weights based on inverse ion frequency
ion_columns = [col for col in X_train.columns if col.startswith("working_ion_")]
ion_counts = X_train[ion_columns].sum()
ion_weights = 1.0 / ion_counts
ion_weights /= ion_weights.sum()

# Assign sample weights (dot product of one-hot ion indicators and weight vector)
train_weights = X_train[ion_columns].dot(ion_weights.astype(np.float32))

# Ensure correct tensor type
train_weights_tensor = torch.tensor(train_weights.values.astype(np.float32)).unsqueeze(1)


In [19]:
# TabTransformer model
class TabTransformer(nn.Module):
    def __init__(self, num_features, output_size=1, dim_embedding=128, num_heads=2, num_layers=2, dropout=0.2):
        super(TabTransformer, self).__init__()
        self.embedding = nn.Linear(num_features, dim_embedding)
        encoder_layer = nn.TransformerEncoderLayer(d_model=dim_embedding, nhead=num_heads, batch_first=True, dropout=dropout)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.regressor = nn.Linear(dim_embedding, output_size)

    def forward(self, x):
        x = self.embedding(x)
        x = x.unsqueeze(1)  # [batch_size, seq_len=1, dim]
        x = self.transformer(x)
        x = torch.mean(x, dim=1)
        return self.regressor(x)

In [20]:
# Weighted loss
class WeightedCompositeLoss(nn.Module):
    def forward(self, outputs, targets, weights):
        mse = (weights * (outputs - targets) ** 2).mean()
        mae = (weights * torch.abs(outputs - targets)).mean()
        return mse + 0.6 * mae


In [21]:
# Setup
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TabTransformer(num_features=X_train_tensor.shape[1]).to(device)
criterion = WeightedCompositeLoss()
optimizer = optim.Adam(model.parameters(), lr=0.00075)

In [22]:
# Move data to device
X_train_tensor = X_train_tensor.to(device)
y_train_tensor = y_train_tensor.to(device)
train_weights_tensor = train_weights_tensor.to(device)
X_val_tensor = X_val_tensor.to(device)
y_val_tensor = y_val_tensor.to(device)
X_test_tensor = X_test_tensor.to(device)
y_test_tensor = y_test_tensor.to(device)

In [23]:
# Training loop
training_losses, validation_losses = [], []
for epoch in range(2000):
    model.train()
    optimizer.zero_grad()
    outputs = model(X_train_tensor)
    loss = criterion(outputs, y_train_tensor, train_weights_tensor)
    loss.backward()
    optimizer.step()

    model.eval()
    with torch.no_grad():
        val_outputs = model(X_val_tensor)
        val_loss = criterion(val_outputs, y_val_tensor, torch.ones_like(y_val_tensor))

    training_losses.append(loss.item())
    validation_losses.append(val_loss.item())

    if epoch % 10 == 0:
        print(f"Epoch {epoch}: Train Loss = {loss.item():.4f}, Val Loss = {val_loss.item():.4f}")


Epoch 0: Train Loss = 0.3504, Val Loss = 3.5675
Epoch 10: Train Loss = 0.0867, Val Loss = 2.7971
Epoch 20: Train Loss = 0.0724, Val Loss = 2.6857
Epoch 30: Train Loss = 0.0529, Val Loss = 2.2620
Epoch 40: Train Loss = 0.0435, Val Loss = 1.9915
Epoch 50: Train Loss = 0.0376, Val Loss = 1.9239
Epoch 60: Train Loss = 0.0325, Val Loss = 1.8252
Epoch 70: Train Loss = 0.0271, Val Loss = 1.7525
Epoch 80: Train Loss = 0.0238, Val Loss = 1.7568
Epoch 90: Train Loss = 0.0212, Val Loss = 1.6669
Epoch 100: Train Loss = 0.0194, Val Loss = 1.5450
Epoch 110: Train Loss = 0.0183, Val Loss = 1.5390
Epoch 120: Train Loss = 0.0167, Val Loss = 1.4259
Epoch 130: Train Loss = 0.0159, Val Loss = 1.3354
Epoch 140: Train Loss = 0.0152, Val Loss = 1.2947
Epoch 150: Train Loss = 0.0137, Val Loss = 1.2986
Epoch 160: Train Loss = 0.0152, Val Loss = 1.2222
Epoch 170: Train Loss = 0.0127, Val Loss = 1.2056
Epoch 180: Train Loss = 0.0130, Val Loss = 1.1576
Epoch 190: Train Loss = 0.0121, Val Loss = 1.1363
Epoch 200: 

In [24]:
# Evaluation
model.eval()
with torch.no_grad():
    preds = model(X_test_tensor)
    mse = nn.MSELoss()(preds, y_test_tensor).item()
    mae = nn.L1Loss()(preds, y_test_tensor).item()
    r2 = 1 - torch.sum((y_test_tensor - preds)**2) / torch.sum((y_test_tensor - y_test_tensor.mean())**2)


In [25]:
print(f"\nTest MSE: {mse:.4f}")
print(f"Test MAE: {mae:.4f}")
print(f"Test R²: {r2.item():.4f}")


Test MSE: 0.5438
Test MAE: 0.3873
Test R²: 0.7925


In [29]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
# Add per-ion metrics
X_test_df = X_test.reset_index(drop=True).copy()
X_test_df['true'] = y_test.values
X_test_df['pred'] = preds.cpu().numpy().flatten()

print("\nPer-ion metrics on test set (weighted sample):")
for ion in ion_columns:
    subset = X_test_df[X_test_df[ion] == 1]
    if not subset.empty:
        y_true = subset['true'].values
        y_pred = subset['pred'].values
        mae = mean_absolute_error(y_true, y_pred)
        mse = mean_squared_error(y_true, y_pred)
        r2 = r2_score(y_true, y_pred)
        print(f"{ion.replace('working_ion_', '')}: MAE = {mae:.4f}, MSE = {mse:.4f}, R² = {r2:.4f}")


Per-ion metrics on test set (weighted sample):
Al: MAE = 0.3763, MSE = 0.1914, R² = 0.8336
Ca: MAE = 0.2751, MSE = 0.1627, R² = 0.8683
Cs: MAE = 0.6845, MSE = 0.5900, R² = -0.5570
K: MAE = 0.2436, MSE = 0.0800, R² = 0.9762
Li: MAE = 0.4058, MSE = 0.6353, R² = 0.7007
Mg: MAE = 0.5646, MSE = 1.3085, R² = 0.6288
Na: MAE = 0.3180, MSE = 0.1844, R² = 0.9079
Rb: MAE = 0.1944, MSE = 0.0470, R² = 0.9709
Y: MAE = 0.1740, MSE = 0.0490, R² = 0.8957
Zn: MAE = 0.3501, MSE = 0.2318, R² = 0.7407


In [27]:
# Reattach ion info to test set
X_test_df = X_test.reset_index(drop=True).copy()
X_test_df['true'] = y_test.values
X_test_df['pred'] = preds.cpu().numpy().flatten()

print("\nPer-ion MAE on test set:")
for ion in ion_columns:
    subset = X_test_df[X_test_df[ion] == 1]
    if not subset.empty:
        mae = np.mean(np.abs(subset['true'] - subset['pred']))
        print(f"{ion.replace('working_ion_', '')}: MAE = {mae:.4f}")



Per-ion MAE on test set:
Al: MAE = 0.3763
Ca: MAE = 0.2751
Cs: MAE = 0.6845
K: MAE = 0.2436
Li: MAE = 0.4058
Mg: MAE = 0.5646
Na: MAE = 0.3180
Rb: MAE = 0.1944
Y: MAE = 0.1740
Zn: MAE = 0.3501
