In [1]:
import os
import pandas as pd
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import Dataset, DataLoader

# Define your features and target
features = ['Voltage [V]', 'Current [A]', 'Temperature [degC]']
target = 'SOC [-]'

# Custom Dataset class
class SOCDataset(Dataset):
    def __init__(self, folder_path):
        self.data = []
        
        # Load all CSV files in the specified folder
        for file in os.listdir(folder_path):
            if file.endswith('.csv'):
                file_path = os.path.join(folder_path, file)
                df = pd.read_csv(file_path)
                
                # Select only the relevant features and target
                self.data.append(df[features + [target]])
        
        # Concatenate all dataframes into a single dataframe
        self.data = pd.concat(self.data, ignore_index=True)

        # Convert to tensors
        self.X = torch.tensor(self.data[features].values, dtype=torch.float32)
        self.y = torch.tensor(self.data[target].values, dtype=torch.float32).unsqueeze(1)  # Shape to (N, 1)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, index):
        return self.X[index], self.y[index]

# Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)  # Shape: [max_len, 1, d_model]
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x should be of shape [seq_len, batch_size, d_model]
        seq_len = x.size(0)
        if seq_len > self.pe.size(0):
            raise ValueError("Input sequence length exceeds the maximum length of positional encoding.")
        x = x + self.pe[:seq_len, :].detach()  # Add positional encoding
        return x

# Transformer Encoder Block
class TransformerEncoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(TransformerEncoderBlock, self).__init__()
        self.self_attn = nn.MultiheadAttention(d_model, num_heads)
        self.ffn = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        attn_output, _ = self.self_attn(x, x, x)
        x = self.layer_norm1(x + self.dropout(attn_output))
        ffn_output = self.ffn(x)
        x = self.layer_norm2(x + self.dropout(ffn_output))
        return x

# Dual-Tower Transformer Architecture
class BERTtery(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, num_layers, input_dim, dropout=0.1):
        super(BERTtery, self).__init__()
        self.positional_encoding = PositionalEncoding(d_model)
        self.operational_embedding = nn.Linear(input_dim, d_model)
        
        # Temporal and Channel Encoders (Two-Tower Structure)
        self.temporal_encoder = nn.ModuleList([
            TransformerEncoderBlock(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)
        ])
        self.channel_encoder = nn.ModuleList([
            TransformerEncoderBlock(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)
        ])
        
        # Gating Mechanism
        self.gate = nn.Linear(d_model * 2, d_model)
        
        # Final Prediction Layer
        self.fc = nn.Linear(d_model, 1)
        
    def forward(self, x):
        # x: [batch_size, seq_len, input_dim] - input features (voltage, current, temperature)
        
        # Apply Operational Embedding
        x = self.operational_embedding(x)  # Shape: [batch_size, seq_len, d_model]
        
        # Transpose for transformer input: [seq_len, batch_size, d_model]
        x = x.transpose(0, 1)

        # Apply Positional Encoding
        x = self.positional_encoding(x)  # Shape: [seq_len, batch_size, d_model]

        # Temporal Encoder
        x_temp = x  # Initialize x_temp
        for layer in self.temporal_encoder:
            x_temp = layer(x_temp)
        
        # Channel Encoder
        x_chan = x  # Initialize x_chan
        for layer in self.channel_encoder:
            x_chan = layer(x_chan)

        # Ensure both outputs are shaped correctly before concatenation
        assert x_temp.shape == x_chan.shape, f"Shapes do not match: {x_temp.shape} vs {x_chan.shape}"

        # Concatenate the two outputs and apply gating
        x_concat = torch.cat((x_temp, x_chan), dim=-1)  # Shape: [seq_len, batch_size, 2*d_model]
        
        x_gate = torch.sigmoid(self.gate(x_concat))  # Apply gating
        
        # Final output layer (SOC prediction)
        # Get the last time step: [batch_size, d_model]
        output = self.fc(x_gate[-1])  # Take the last time step only

        return output

# Specify the folder where your CSV files are located
folder_path = './dataset'  # Change this to your actual path

# Create dataset and dataloader
soc_dataset = SOCDataset(folder_path)
dataloader = DataLoader(soc_dataset, batch_size=64, shuffle=True)

# Define the model
d_model = 64  # Dimension of model
num_heads = 8  # Number of attention heads
d_ff = 256  # Feed-forward network hidden size
num_layers = 4  # Number of encoder layers
input_dim = 3  # Input dimensions (Voltage, Current, Temperature)

model = BERTtery(d_model, num_heads, d_ff, num_layers, input_dim)

# Example Training Loop
def train_model(model, dataloader, num_epochs=10):
    model.train()  # Set the model to training mode
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.MSELoss()  # Mean Squared Error loss for regression tasks

    for epoch in range(num_epochs):
        for inputs, labels in dataloader:
            optimizer.zero_grad()  # Zero the gradients
            
            # Forward pass
            outputs = model(inputs)  # Model expects [batch_size, seq_len, input_dim]
            loss = criterion(outputs, labels)  # Compute the loss
            loss.backward()  # Backpropagation
            optimizer.step()  # Update the weights

        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')

# Train the model
# train_model(model, dataloader, num_epochs=10)


: 

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, random_split
import os
import pandas as pd
import numpy as np

# Dataset class with a limit on the number of samples
class BatteryDataset(Dataset):
    def __init__(self, directory, features, target, limit=None):
        self.features = features
        self.target = target
        self.data = []
        self.cumulative_lengths = []  # Store cumulative lengths of each CSV
        self.limit = limit
        self.load_data(directory)
    
    def load_data(self, directory):
        total_length = 0
        for filename in os.listdir(directory):
            if filename.endswith('.csv'):
                filepath = os.path.join(directory, filename)
                df = pd.read_csv(filepath)
                self.data.append(df)
                total_length += len(df)
                self.cumulative_lengths.append(total_length)
                if self.limit and total_length >= self.limit:
                    break
    
    def __len__(self):
        if self.limit:
            return min(self.cumulative_lengths[-1], self.limit)
        return self.cumulative_lengths[-1]

    def __getitem__(self, idx):
        for i, length in enumerate(self.cumulative_lengths):
            if idx < length:
                df = self.data[i]
                if i > 0:
                    row_idx = idx - self.cumulative_lengths[i-1]
                else:
                    row_idx = idx
                break
        else:
            raise IndexError(f"Index {idx} out of range")

        row = df.iloc[row_idx]
        X = row[self.features].values.astype(np.float32)
        y = row[self.target].astype(np.float32)
        return torch.tensor(X), torch.tensor(y)

# Positional encoding (sine-cosine encoding)
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)
    
    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return x

# Transformer-based model
class TransformerBatteryModel(nn.Module):
    def __init__(self, input_dim, model_dim=64, num_layers=4, num_heads=8, dropout=0.1):
        super(TransformerBatteryModel, self).__init__()
        
        self.input_embedding = nn.Linear(input_dim, model_dim)
        self.positional_encoding = PositionalEncoding(d_model=model_dim)
        
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=model_dim, nhead=num_heads, dim_feedforward=256, dropout=dropout)
        self.temporal_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.channel_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        self.gate = nn.Linear(2 * model_dim, model_dim)
        self.fc = nn.Linear(model_dim, 1)
    
    def forward(self, x):
        x = self.input_embedding(x)
        x = self.positional_encoding(x)
        
        temporal_output = self.temporal_encoder(x)
        channel_output = self.channel_encoder(x)
        
        combined_output = torch.cat((temporal_output, channel_output), dim=-1)
        gated_output = torch.sigmoid(self.gate(combined_output))
        
        output = self.fc(gated_output.mean(dim=0))
        return output

# Hyperparameters
input_dim = 3  # Number of features: Voltage, Current, Temperature
model_dim = 64
num_layers = 4
num_heads = 8
dropout = 0.1
epochs = 130
batch_size = 284
learning_rate = 2.0
gradient_clip_val = 1.0
weight_decay = 0.0001
validation_split = 0.2
max_data_size = 4000  # Limit to 10k samples

# Data preparation
directory = './dataset'
features = ['Voltage [V]', 'Current [A]', 'Temperature [degC]']
target = 'SOC [-]'
dataset = BatteryDataset(directory, features, target, limit=max_data_size)

# Split the dataset into training and validation sets
train_size = int((1 - validation_split) * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

# Model, optimizer, and loss function
model = TransformerBatteryModel(input_dim=input_dim, model_dim=model_dim, num_layers=num_layers, num_heads=num_heads, dropout=dropout)
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
criterion = nn.MSELoss()

# Training loop with validation loss printing
for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    for inputs, targets in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets.unsqueeze(1))
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), gradient_clip_val)
        optimizer.step()
        running_loss += loss.item()
    
    # Validation loss
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for inputs, targets in val_loader:
            outputs = model(inputs)
            loss = criterion(outputs, targets.unsqueeze(1))
            val_loss += loss.item()
    
    print(f'Epoch {epoch}, Training Loss: {running_loss/len(train_loader):.9f}, Validation Loss: {val_loss/len(val_loader):.4f}')


