In [15]:
%load_ext autoreload
%autoreload 2
%matplotlib widget
from data_processing import *
from sklearn.metrics import mean_absolute_error, r2_score,mean_squared_error
import torch
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import optuna
import random
import os
import time
from pytorch_lightning.loggers import TensorBoardLogger


import numpy as np
from tqdm import tqdm
import copy
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Load data & scaling
- Resample the datasets for SOH to h
- Split data to 13/1/1 for Traning/Validation/Test
- Scaling corresponding Current, Voltage, Temperature

In [9]:
data_dir = "../01_Datenaufbereitung/Output/Calculated/"
all_data = load_data(data_dir)

Found 15 parquet files


Processing cells:   0%|          | 0/15 [00:00<?, ?cell/s]

Processing C01 ...


Processing cells:   7%|▋         | 1/15 [00:18<04:20, 18.63s/cell]

Processing C03 ...


Processing cells:  13%|█▎        | 2/15 [00:28<02:58, 13.73s/cell]

Processing C05 ...


Processing cells:  20%|██        | 3/15 [00:37<02:16, 11.39s/cell]

Processing C07 ...


Processing cells:  27%|██▋       | 4/15 [00:45<01:49,  9.97s/cell]

Processing C09 ...


Processing cells:  33%|███▎      | 5/15 [00:49<01:18,  7.84s/cell]

Processing C11 ...


Processing cells:  40%|████      | 6/15 [00:53<00:59,  6.56s/cell]

Processing C13 ...


Processing cells:  47%|████▋     | 7/15 [00:55<00:39,  4.94s/cell]

Processing C15 ...


Processing cells:  53%|█████▎    | 8/15 [00:57<00:28,  4.05s/cell]

Processing C17 ...


Processing cells:  60%|██████    | 9/15 [01:05<00:31,  5.28s/cell]

Processing C19 ...


Processing cells:  67%|██████▋   | 10/15 [01:12<00:28,  5.77s/cell]

Processing C21 ...


Processing cells:  73%|███████▎  | 11/15 [01:19<00:25,  6.36s/cell]

Processing C23 ...


Processing cells:  80%|████████  | 12/15 [01:27<00:20,  6.85s/cell]

Processing C25 ...


Processing cells:  87%|████████▋ | 13/15 [01:32<00:12,  6.37s/cell]

Processing C27 ...


Processing cells:  93%|█████████▎| 14/15 [01:41<00:07,  7.15s/cell]

Processing C29 ...


Processing cells: 100%|██████████| 15/15 [01:47<00:00,  7.18s/cell]


In [10]:
train_df, val_df, test_df = split_data(all_data, train=13, val=1, test=1,parts = 1)
train_scaled, val_scaled, test_scaled = scale_data(train_df, val_df, test_df)

Cell split completed:
Training set: 13 cells
Validation set: 1 cells
Test set: 1 cells
Final dataset sizes:
Training set: 47658 rows (split into 13 parts)
Validation set: 4466 rows from 1 cells
Test set: 4533 rows from 1 cells


In [None]:
### Visualize data
# visualize_data(all_data)
# inspect_data_ranges(all_data)
# inspect_data_ranges(train_scaled)
# plot_dataset_soh(train_df, "Train")
# plot_dataset_soh(val_df, "Validation")
# plot_dataset_soh(test_df, "Test")

## Model implementation

### Model definition
- One/muti step prediction model  -- class LSTMmodel()
- LSTM with attention model -- class LSTMattention()
- Transformer model -- class TransformerModel()

In [12]:
class LSTMOneStep(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers=1, dropout=0.0):
        super(LSTMOneStep, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout)
        # 假设只预测 1 个值 (SOH)
        self.fc = nn.Linear(hidden_dim, 1)
    
    def forward_one_step(self, x_t, hidden):
        """
        x_t: (batch, 1, input_dim)
        hidden: (h, c) 
            h,c 形状: (num_layers, batch, hidden_dim)
        return: 
            pred: (batch, 1) —— 预测值
            (h, c) —— 更新后的隐藏状态
        """
        out, (h, c) = self.lstm(x_t, hidden)  # out: (batch, 1, hidden_dim)
        pred = self.fc(out[:, -1, :])         # (batch, hidden_dim) -> (batch, 1)
        return pred, (h, c)
    
    def init_hidden(self, batch_size, device):
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim, device=device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_dim, device=device)
        return (h0, c0)
   

### Model Training and Validation

In [None]:
import random
import copy
import time
import os
import torch.optim as optim
from tqdm import tqdm
import torch.nn.functional as F

def train_model(model, train_scaled, val_scaled, EPOCHS=20, LEARNING_RATE=1e-3, device = DEVICE, batch_size=1, patience=5,
                initial_teacher_forcing=1.0, teacher_forcing_decay=0.1):
    """
    最基础的多cell并行方法：每个 batch 对应一个 cell，内部对该 cell 序列逐步迭代进行teacher forcing + scheduled sampling。
    """
    # 1) 构建 Dataset / DataLoader
    train_dataset = CellDataset(train_scaled)
    val_dataset = CellDataset(val_scaled)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    
    # 2) 优化器 / 损失
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=1e-4)
    criterion = nn.MSELoss()
    
    # ReduceLROnPlateau 作为示例
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=patience)
    
    best_val_loss = float('inf')
    best_epoch = 0
    best_model_state = None
    
    history = {
        'train_loss': [],
        'val_loss': [],
        'epoch': [],
        'teacher_forcing_ratio': []
    }
    
    print("\nStart training ...")
    for epoch in range(EPOCHS):
        # 计算 teacher forcing 概率
        teacher_forcing_ratio = max(0.0, initial_teacher_forcing - teacher_forcing_decay*epoch)
        
        model.train()
        train_loss = 0.0
        count_cell = 0
        
        # ============ 训练循环 ============ 
        pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS}", leave=False)
        for cell_seq in pbar:
            # cell_seq shape: (T, feature_dim), batch_size=1 情况下
            cell_seq = cell_seq.to(device)
            T = cell_seq.shape[0]
            
            # 初始化隐藏状态
            hidden = model.init_hidden(batch_size=1, device=device)
            optimizer.zero_grad()
            
            total_loss_cell = 0.0
            
            # 遍历序列的时间步
            # 假设要预测 cell_seq[t,0] (SOH) -> 这里的 y_true=t行 =>  t 行 "SOH"
            # 也可以选未来时刻 (t+1), 视需求而定
            y_pred_prev = None
            for t in range(T):
                # 构造输入 x_t: (1, 1, feature_dim)
                x_t = cell_seq[t : t+1, :].unsqueeze(0)  # => (1, 1, feature_dim)
                
                # 真实值
                y_true = cell_seq[t, 0]  # 第0列为 SOH
                
                # 如果上一时刻有预测值 & 需要使用自回归
                if (t>0) and (random.random() < (1-teacher_forcing_ratio)) and (y_pred_prev is not None):
                    # 用上一时刻预测替换 x_t 的第 0 列
                    x_t[0,0,0] = y_pred_prev  
                
                # 前向
                y_pred, hidden = model.forward_one_step(x_t, hidden)
                # y_pred shape: (1,1)
                
                # 当前时刻 loss
                loss_step = criterion(y_pred[0,0], y_true)
                total_loss_cell += loss_step
                
                # 记录当前预测
                y_pred_prev = y_pred[0,0].detach()
            
            total_loss_cell.backward()
            nn.utils.clip_grad_norm_(model.parameters(), max_norm=1)
            optimizer.step()
            
            train_loss += total_loss_cell.item()
            count_cell += 1
        
        train_loss /= max(1, count_cell)
        
        # ============ 验证循环 ============ 
        model.eval()
        val_loss = 0.0
        count_val_cell = 0
        with torch.no_grad():
            for cell_seq in val_loader:
                cell_seq = cell_seq.to(device)
                T = cell_seq.shape[0]
                hidden = model.init_hidden(batch_size=1, device=device)
                
                total_loss_val_cell = 0.0
                y_pred_prev = None
                # 验证时 teacher_forcing_ratio=0 => 纯自回归
                for t in range(T):
                    x_t = cell_seq[t : t+1, :].unsqueeze(0)
                    y_true = cell_seq[t, 0]
                    
                    if (t>0) and (y_pred_prev is not None):
                        x_t[0,0,0] = y_pred_prev
                        
                    y_pred, hidden = model.forward_one_step(x_t, hidden)
                    loss_step = criterion(y_pred[0,0], y_true)
                    total_loss_val_cell += loss_step.item()
                    
                    y_pred_prev = y_pred[0,0]
                
                val_loss += total_loss_val_cell
                count_val_cell += 1
        
        val_loss /= max(1, count_val_cell)
        scheduler.step(val_loss)
        
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['epoch'].append(epoch+1)
        history['teacher_forcing_ratio'].append(teacher_forcing_ratio)
        
        pbar.clear()
        print(f"Epoch {epoch+1}/{EPOCHS} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | "
              f"TF: {teacher_forcing_ratio:.2f} | LR: {optimizer.param_groups[0]['lr']:.2e}")
        
        # 保存最佳模型
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_epoch = epoch+1
            best_model_state = copy.deepcopy(model.state_dict())
    
    print(f"Best model at epoch {best_epoch} with Val Loss: {best_val_loss:.4f}")
    # 可以将 best_model_state 保存到文件
    return history, best_model_state


In [17]:
model = LSTMmodel(input_dim=4, hidden_dim=64, num_layers=2, dropout=0.2, output_length=PRED_LEN)
model.to(device)

EPOCHS = 100
LEARNING_RATE = 1e-3
PATIENCE = 10
logger = TensorBoardLogger("logs", name="LSTM")

history = train_and_validation(model, train_loader, val_loader, logger, EPOCHS, LEARNING_RATE, PATIENCE)


Start training...


                                                    

TypeError: forward() got an unexpected keyword argument 'target_seq'

## ! Create sequence window
- Split data by cell id
- Use feature [SOH_ZHU, Current, Voltage, Temperature]
- Create continuous train/val rolling window with stride (Prediction length)
- Problem: fixed window, real case should be rolling window on whole cell data

In [None]:
# class SequenceDataset(Dataset):
#     def __init__(self, df, seed_len=36, pred_len=5, cell_col='cell_id'):
#         self.seed_len = seed_len
#         self.pred_len = pred_len
#         self.sequences = []
        
#         # 遍历每个电池（cell_id），分别生成滑动窗口
#         for cell in df[cell_col].unique():
#             cell_data = df[df[cell_col] == cell]
#             # 取出感兴趣的特征列
#             data = cell_data[['SOH_ZHU', 'Current[A]', 'Voltage[V]', 'Temperature[°C]']].values
#             # 用 stride=pred_len 生成滑动窗口
#             # 每个窗口长度为 seed_len + pred_len
#             for start in range(0, len(data) - (seed_len + pred_len) + 1, pred_len):
#                 block = data[start: start + seed_len + pred_len]
#                 self.sequences.append(block)

#     def __len__(self):
#         return len(self.sequences)

#     def __getitem__(self, idx):
#         block = self.sequences[idx]
#         # 分割为种子序列和未来序列
#         x_seed = block[:self.seed_len]       # (seed_len, 4)
#         x_future = block[self.seed_len:]       # (pred_len, 4)
#         # 目标仅取未来序列的第一列（SOH_ZHU）
#         y_target = x_future[:, 0]              # (pred_len,)
#         return (
#             torch.tensor(x_seed, dtype=torch.float32),
#             torch.tensor(x_future, dtype=torch.float32),
#             torch.tensor(y_target, dtype=torch.float32)
#         )
# SEED_LEN = 72
# PRED_LEN = 12   
# train_dataset = SequenceDataset(train_scaled, seed_len=SEED_LEN, pred_len=PRED_LEN)
# val_dataset = SequenceDataset(val_scaled, seed_len=SEED_LEN, pred_len=PRED_LEN)
# test_dataset = SequenceDataset(test_scaled, seed_len=SEED_LEN, pred_len=PRED_LEN)

# BATCH_SIZE = 32
# train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, pin_memory=torch.cuda.is_available())
# val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, pin_memory=torch.cuda.is_available())
# test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, pin_memory=torch.cuda.is_available())


In [None]:
 
# class AttentionLayer(nn.Module):
#     def __init__(self, hidden_dim):
#         super(AttentionLayer, self).__init__()
#         self.hidden_dim = hidden_dim
#         # 前馈网络：先降维，再经过 Tanh 激活，最后映射到标量注意力得分
#         self.attention = nn.Sequential(
#             nn.Linear(hidden_dim, hidden_dim // 2),
#             nn.Tanh(),
#             nn.Linear(hidden_dim // 2, 1)
#         )
        
#     def forward(self, lstm_output):
#         """
#         Args:
#             lstm_output: [batch_size, seq_len, hidden_dim]
#         Returns:
#             context: [batch_size, hidden_dim]，加权求和后的上下文向量
#             attn_weights: [batch_size, seq_len, 1]，归一化的注意力权重
#         """
#         # 计算每个时间步的注意力得分
#         attn_scores = self.attention(lstm_output)  # 形状: (batch_size, seq_len, 1)
#         # 对得分做 softmax 归一化
#         attn_weights = F.softmax(attn_scores, dim=1)
#         # 利用注意力权重对所有时间步的隐藏状态做加权求和
#         context = torch.bmm(attn_weights.transpose(1, 2), lstm_output)  # 形状: (batch_size, 1, hidden_dim)
#         context = context.squeeze(1)  # 形状: (batch_size, hidden_dim)
#         return context, attn_weights

# class LSTMattention(nn.Module):
#     def __init__(self, input_dim: int, hidden_dim: int, num_layers: int, dropout: float, output_length: int = 5):
#         """
#         Args:
#             input_dim (int): 输入特征数
#             hidden_dim (int): LSTM 隐藏层维度
#             num_layers (int): LSTM 层数
#             dropout (float): dropout 概率
#             output_length (int): 预测步数，即未来要预测多少个时间步的值
#         """
#         super(LSTMattention, self).__init__()
#         self.hidden_dim = hidden_dim
#         self.num_layers = num_layers
#         self.output_length = output_length
        
#         self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=dropout)
#         # 添加加性注意力层
#         self.attention = AttentionLayer(hidden_dim)
#         # 全连接层将上下文向量映射为多步预测输出
#         # self.fc = nn.Linear(hidden_dim, output_length)
#         self.fc = nn.Sequential(
#             nn.Linear(hidden_dim, hidden_dim // 2),
#             nn.LeakyReLU(),
#             nn.Dropout(dropout),
#             nn.Linear(hidden_dim // 2, output_length)
#         )
    
#     def forward(self, x: torch.Tensor) -> torch.Tensor:
#         """
#         Args:
#             x (torch.Tensor): 输入数据，形状为 (batch_size, seq_len, input_dim)
        
#         Returns:
#             torch.Tensor: 预测结果，形状为 (batch_size, output_length)
#         """
#         # 初始化 LSTM 的隐状态和细胞状态
#         h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim, dtype=x.dtype, device=x.device)
#         c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim, dtype=x.dtype, device=x.device)
        
#         # LSTM 输出： (batch_size, seq_len, hidden_dim)
#         lstm_out, _ = self.lstm(x, (h0, c0))
        
#         # 使用加性注意力机制生成上下文向量
#         context, attn_weights = self.attention(lstm_out)
        
#         # 利用上下文向量进行多步预测
#         output = self.fc(context)
        
#         return output, attn_weights

# class TransformerModel(nn.Module):
#     def __init__(self, input_dim: int, hidden_dim: int, num_layers: int, dropout: float, output_length: int = 1):
#         """
#         Args:
#             input_dim (int): 输入特征数
#             hidden_dim (int): Transformer 隐藏层维度
#             num_layers (int): Transformer 层数
#             dropout (float): dropout 概率
#             output_length (int): 预测步数，1 表示单步预测，大于1则为多步预测
#         """
#         super(TransformerModel, self).__init__()
#         self.hidden_dim = hidden_dim
#         self.num_layers = num_layers
#         self.output_length = output_length

#         # Transformer Encoder 部分，添加 batch_first=True
#         self.encoder_layer = nn.TransformerEncoderLayer(d_model=hidden_dim, nhead=8, dropout=dropout, batch_first=True)
#         self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
        
#         # 用于处理输入特征的线性层
#         self.input_fc = nn.Linear(input_dim, hidden_dim)
        
#         # 输出层
#         self.fc = nn.Linear(hidden_dim, output_length)
    
#     def forward(self, x: torch.Tensor) -> torch.Tensor:
#         # 输入 x 的形状是 (batch_size, seq_len, input_dim)
#         batch_size, seq_len, _ = x.size()
        
#         # 将输入特征通过线性层转换到 hidden_dim 的维度
#         x = self.input_fc(x)  # 形状变为 (batch_size, seq_len, hidden_dim)
        
#         # Transformer 编码器输出
#         transformer_out = self.transformer_encoder(x)
        
#         # 取最后一个时间步的输出
#         final_hidden = transformer_out[:, -1, :]  # 形状为 (batch_size, hidden_dim)
        
#         # 通过全连接层得到最终的输出
#         output = self.fc(final_hidden)  # 形状为 (batch_size, output_length)
        
#         return output, _