In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import argparse
import os
from tqdm import tqdm

torch.manual_seed(42)
np.random.seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

conditions = [ 'CY45-05_1', 'CY35-05_1', 'CY25-05_1']

results = pd.DataFrame(columns=['condition', 'trial', 'train_loss', 'val_loss', 'test_rmse', 'test_mape'])

for condition in conditions:
    print(f"\n{'='*50}")
    print(f"Starting experiments for condition: {condition}")
    print(f"{'='*50}")
    
    for trial in range(10):
        print(f"\nTrial {trial+1}/10 for {condition}")

        parser = argparse.ArgumentParser()
        parser.add_argument('--condition', type=str, default=condition)
        parser.add_argument('--window_size', type=int, default=10)
        parser.add_argument('--prediction_steps', type=int, default=100)
        parser.add_argument('--step_size', type=int, default=1)
        parser.add_argument('--epochs', type=int, default=500)
        parser.add_argument('--batch_size', type=int, default=64)
        parser.add_argument('--lr', type=float, default=0.0001)

        args = parser.parse_args([])

        if args.condition == 'CY45-05_1':
            train_files = [
                'CY45-05_1-#1.csv', 'CY45-05_1-#2.csv', 'CY45-05_1-#3.csv', 'CY45-05_1-#4.csv',
                'CY45-05_1-#5.csv', 'CY45-05_1-#6.csv', 'CY45-05_1-#7.csv', 'CY45-05_1-#8.csv',
                'CY45-05_1-#9.csv', 'CY45-05_1-#10.csv', 'CY45-05_1-#11.csv', 'CY45-05_1-#12.csv',
                'CY45-05_1-#13.csv', 'CY45-05_1-#14.csv', 'CY45-05_1-#15.csv', 'CY45-05_1-#16.csv',
                'CY45-05_1-#17.csv'
            ]
            val_files = [
                'CY45-05_1-#28.csv', 'CY45-05_1-#25.csv'
            ]
            test_files = [
                'CY45-05_1-#24.csv', 'CY45-05_1-#26.csv', 'CY45-05_1-#27.csv', 'CY45-05_1-#22.csv',
                'CY45-05_1-#23.csv'
            ]

        elif args.condition == 'CY25-05_1':
            train_files = [
                'CY25-05_1-#2.csv', 'CY25-05_1-#3.csv', 'CY25-05_1-#4.csv','CY25-05_1-#18.csv',
                'CY25-05_1-#5.csv', 'CY25-05_1-#6.csv', 'CY25-05_1-#7.csv', 'CY25-05_1-#8.csv',
                'CY25-05_1-#9.csv', 'CY25-05_1-#10.csv', 'CY25-05_1-#11.csv', 'CY25-05_1-#13.csv'
            ]
            val_files = [
                'CY25-05_1-#18.csv', 'CY25-05_1-#19.csv'
            ]
            test_files = [
                'CY25-05_1-#1.csv', 'CY25-05_1-#14.csv', 'CY25-05_1-#15.csv', 'CY25-05_1-#16.csv',
                'CY25-05_1-#17.csv', 'CY25-05_1-#12.csv'
            ]

        elif args.condition == 'CY25-025_1':
            train_files = [
                'CY25-025_1-#1.csv', 'CY25-025_1-#2.csv', 'CY25-025_1-#3.csv'
            ]
            val_files = [
                'CY25-025_1-#7.csv'
            ]
            test_files = [
                'CY25-025_1-#5.csv', 'CY25-025_1-#6.csv', 'CY25-025_1-#4.csv'
            ]

        elif args.condition == 'CY35-05_1':
            train_files = [
                'CY35-05_1-#1.csv'
            ]
            val_files = [
                'CY35-05_1-#3.csv'
            ]
            test_files = [
                'CY35-05_1-#2.csv'
            ]

        else:
            raise ValueError(f"Unsupported condition: {args.condition}")

        input_folder = 'dataset/UL-NCA/' 
        
        def load_battery_data(filename):
            data = pd.read_csv(f"{input_folder}/{filename}")
            return data['Discharge_Capacity'].values / 1000  # Normalization

        def create_sequences(data, window_size, prediction_steps, step_size):
            X, y = [], []
            for start_idx in range(0, len(data) - window_size - prediction_steps + 1, step_size):
                X.append(data[start_idx:start_idx+window_size])
                y.append(data[start_idx+window_size:start_idx+window_size+prediction_steps])
            return np.array(X), np.array(y)

        def prepare_data(files):
            X_all, y_all = [], []
            for file in files:
                data = load_battery_data(file)
                X, y = create_sequences(data, args.window_size, args.prediction_steps, args.step_size)
                X_all.append(X)
                y_all.append(y)
            return np.concatenate(X_all), np.concatenate(y_all)

        X_train, y_train = prepare_data(train_files)
        X_val, y_val = prepare_data(val_files)
        X_test, y_test = prepare_data(test_files)
        def to_tensor(data, device):
            return torch.FloatTensor(data).to(device)
        X_train_t = to_tensor(X_train, device)
        y_train_t = to_tensor(y_train, device)
        X_val_t = to_tensor(X_val, device)
        y_val_t = to_tensor(y_val, device)
        X_test_t = to_tensor(X_test, device)
        y_test_t = to_tensor(y_test, device)
        train_dataset = TensorDataset(X_train_t, y_train_t)
        val_dataset = TensorDataset(X_val_t, y_val_t)
        test_dataset = TensorDataset(X_test_t, y_test_t)

        train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
        val_loader = DataLoader(val_dataset, batch_size=args.batch_size)
        test_loader = DataLoader(test_dataset, batch_size=1)
        class BatteryMLP(nn.Module):
            def __init__(self, input_size, output_size):
                super().__init__()
                self.net = nn.Sequential(
                    nn.Linear(input_size, 128),
                    nn.ReLU(),
                    nn.Dropout(0.2),
                    nn.Linear(128, 64),
                    nn.ReLU(),
                    nn.Dropout(0.2),
                    nn.Linear(64, output_size)
                )
                
            def forward(self, x):
                return self.net(x)

        model = BatteryMLP(args.window_size, args.prediction_steps).to(device)
        criterion = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=args.lr)
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=100, factor=0.5)

        def train_epoch(model, loader, optimizer, criterion, device):
            model.train()
            total_loss = 0
            for X_batch, y_batch in loader:
                X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                
                optimizer.zero_grad()
                outputs = model(X_batch)
                loss = criterion(outputs, y_batch)
                loss.backward()
                optimizer.step()
                
                total_loss += loss.item()
            return total_loss / len(loader)

        def evaluate(model, loader, criterion, device):
            model.eval()
            total_loss = 0
            with torch.no_grad():
                for X_batch, y_batch in loader:
                    X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                    outputs = model(X_batch)
                    total_loss += criterion(outputs, y_batch).item()
            return total_loss / len(loader)

        train_losses, val_losses = [], []
        best_val_loss = float('inf')

        for epoch in tqdm(range(args.epochs), desc=f"Training {condition} trial {trial+1}"):
            train_loss = train_epoch(model, train_loader, optimizer, criterion, device)
            val_loss = evaluate(model, val_loader, criterion, device)
            scheduler.step(val_loss)
            
            train_losses.append(train_loss)
            val_losses.append(val_loss)

            if val_loss < best_val_loss:
                best_val_loss = val_loss
                torch.save(model.state_dict(), 'best_model.pth')
            
            if (epoch+1) % 10 == 0:
                print(f'Epoch {epoch+1}/{args.epochs} | Train Loss: {train_loss:.6f} | Val Loss: {val_loss:.6f}')

        model.load_state_dict(torch.load('best_model.pth'))
        def evaluate_with_metrics(model, loader, device):
            model.eval()
            total_mse = 0
            total_mape = 0
            count = 0
            with torch.no_grad():
                for X_batch, y_batch in loader:
                    X_batch, y_batch = X_batch.to(device), y_batch.to(device)
                    outputs = model(X_batch)
                    y_true = y_batch.cpu().numpy()
                    y_pred = outputs.cpu().numpy()

                    total_mse += ((y_true - y_pred)**2).sum()
                    epsilon = 1e-10
                    total_mape += np.mean(np.abs((y_true - y_pred) / y_true)) * 100
                    count += y_true.size
            
            rmse = np.sqrt(total_mse / count)
            mape = 100 * total_mape / count
            
            return rmse, mape

        def plot_true_vs_predicted(model, test_files, device, condition_name, trial_num, args):
            plt.figure(figsize=(15, 8))  
            all_true_values = []
            all_pred_values = []
            
            for test_file in test_files:
                test_data = load_battery_data(test_file)
                X, y = create_sequences(test_data, args.window_size, args.prediction_steps, args.step_size)
                X_t = torch.FloatTensor(X).to(device)
                
                model.eval()
                with torch.no_grad():
                    preds = model(X_t).cpu().numpy()
                for i in range(len(y)):
                    all_true_values.append(y[i])
                    all_pred_values.append(preds[i])
            
            for i in range(len(all_true_values)):
                plt.plot(range(i, i + len(all_true_values[i])), 
                        all_true_values[i], 
                        color='blue', alpha=0.5, 
                        label='True Values' if i == 0 else "")
            
            for i in range(len(all_pred_values)):
                plt.plot(range(i, i + len(all_pred_values[i])), 
                        all_pred_values[i], 
                        color='red', alpha=0.5, 
                        label='Predictions' if i == 0 else "")
            
            plt.title(f'True vs Predicted Capacity (Overlapped)\nCondition: {condition_name} - Trial {trial_num}')
            plt.xlabel('Time Step')
            plt.ylabel('Capacity (Normalized)')
            plt.legend()
            plt.grid(True)
            plt.tight_layout()
            plt.show()
        test_rmse, test_mape = evaluate_with_metrics(model, test_loader, device)
        print(f'\nTest Metrics: RMSE = {test_rmse:.6f}, MAPE = {test_mape:.2f}%')
        plot_true_vs_predicted(model, test_files, device, args.condition, trial+1, args)
        results.loc[len(results)] = {
            'condition': condition,
            'trial': trial+1,
            'train_loss': train_losses[-1],
            'val_loss': val_losses[-1],
            'test_rmse': test_rmse,
            'test_mape': test_mape
        }

        results.to_csv('battery_prediction_results-UL-NCA.csv', index=False)
