In [9]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import KFold
from torchvision import transforms
from chart_utils import TimeSeriesImageDataset, accuracy_fn

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Function to read UCR dataset
def read_ucr(filename):
    data = []
    labels = []
    
    with open(filename, 'r') as file:
        for line in file:
            parts = line.strip().split(',')
            if len(parts) < 2:
                continue
            features = [float(f) for f in parts[:-1]]
            label = int(parts[-1].split(':')[-1])
            data.append(features)
            labels.append(label)
    
    print(f"Loaded {len(data)} samples from {filename}")
    return np.array(data), np.array(labels)

# File paths (adjust these paths as necessary)
train_file = 'ECG/ECG_TRAIN.ts'
test_file = 'ECG/ECG_TEST.ts'

# Load dataset
x_train, y_train = read_ucr(train_file)
x_test, y_test = read_ucr(test_file)

# Normalize labels to be within range [0, num_classes-1]
unique_labels = np.unique(y_train)
label_map = {label: idx for idx, label in enumerate(unique_labels)}
y_train = np.array([label_map[label] for label in y_train])
y_test = np.array([label_map[label] for label in y_test])

nb_classes = len(unique_labels)

# Normalize features
x_train_mean = x_train.mean()
x_train_std = x_train.std()
x_train = (x_train - x_train_mean) / x_train_std
x_test = (x_test - x_train_mean) / x_train_std

# Convert to PyTorch tensors
X_train = torch.tensor(x_train, dtype=torch.float32)
X_test = torch.tensor(x_test, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.long)

# Combine train and test data for K-Fold Cross-Validation
X = torch.cat((X_train, X_test))
y = torch.cat((y_train, y_test))

# K-Fold Cross-Validation
k_folds = 5
kf = KFold(n_splits=k_folds, shuffle=True)

# Define a simpler CNN Model
class Simple2DCNN(nn.Module):
    def __init__(self, input_channels, num_classes):
        super(Simple2DCNN, self).__init__()
        self.model = nn.Sequential(
            nn.Conv2d(input_channels, 8, kernel_size=3, padding=1),
            nn.BatchNorm2d(8),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.3),
            nn.Conv2d(8, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout(0.3),
            nn.Flatten(),
            nn.Linear(16 * 16 * 16, 32),  # Adjusted size based on pooling layers
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(32, num_classes)
        )

    def forward(self, x):
        return self.model(x)


# Data Augmentation
transform = transforms.Compose([
    transforms.Resize((64, 64)),  # Resize to a smaller size for simpler model
    transforms.ToTensor(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2)
])

# Perform k-fold cross-validation
fold_results = []

for fold, (train_idx, test_idx) in enumerate(kf.split(X)):
    print(f'Fold {fold+1}/{k_folds}')
    
    # Create data loaders for the current fold
    train_data, test_data = X[train_idx], X[test_idx]
    train_labels, test_labels = y[train_idx], y[test_idx]
    
    train_dataset = TimeSeriesImageDataset(train_data.numpy(), train_labels.numpy(), transform)
    test_dataset = TimeSeriesImageDataset(test_data.numpy(), test_labels.numpy(), transform)
    
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    
    # Initialize the model, criterion, optimizer, and scheduler
    model = Simple1DCNN(6, nb_classes).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.0005, weight_decay=0.01)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=50)
    
    num_epochs = 100
    best_test_accuracy = 0
    patience = 10
    trigger_times = 0

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        for images_area, images_bar, labels in train_loader:
            images_area, images_bar, labels = images_area.to(device), images_bar.to(device), labels.to(device)
            
            # Combine area and bar charts along the channel dimension
            combined_images = torch.cat((images_area, images_bar), dim=1)
            
            optimizer.zero_grad()
            outputs = model(combined_images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        epoch_loss = running_loss / len(train_loader)
        epoch_accuracy = 100 * correct / total
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Training Accuracy: {epoch_accuracy:.2f}%')

        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for images_area, images_bar, labels in test_loader:
                images_area, images_bar, labels = images_area.to(device), images_bar.to(device), labels.to(device)
                
                # Combine area and bar charts along the channel dimension
                combined_images = torch.cat((images_area, images_bar), dim=1)
                
                outputs = model(combined_images)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        test_accuracy = 100 * correct / total
        print(f'Test Accuracy after Epoch {epoch+1}: {test_accuracy:.2f}%')
        scheduler.step(epoch_loss)

        if test_accuracy > best_test_accuracy:
            best_test_accuracy = test_accuracy
            trigger_times = 0
        else:
            trigger_times += 1
            if trigger_times >= patience:
                print(f'Early stopping at epoch {epoch+1}')
                break

    print(f'Best Test Accuracy for fold {fold+1}: {best_test_accuracy:.2f}%')
    fold_results.append(best_test_accuracy)

# Calculate the average and standard deviation of the test accuracies
mean_accuracy = np.mean(fold_results)
std_accuracy = np.std(fold_results)
print(f'Mean Test Accuracy: {mean_accuracy:.2f}%')
print(f'Standard Deviation of Test Accuracy: {std_accuracy:.2f}%')


Loaded 100 samples from ECG/ECG_TRAIN.ts
Loaded 100 samples from ECG/ECG_TEST.ts
Fold 1/5
Epoch [1/100], Loss: 1.4532, Training Accuracy: 50.62%
Test Accuracy after Epoch 1: 70.00%




Epoch [2/100], Loss: 0.7085, Training Accuracy: 60.62%
Test Accuracy after Epoch 2: 70.00%
Epoch [3/100], Loss: 0.8181, Training Accuracy: 55.62%
Test Accuracy after Epoch 3: 70.00%
Epoch [4/100], Loss: 0.7519, Training Accuracy: 63.12%
Test Accuracy after Epoch 4: 82.50%
Epoch [5/100], Loss: 0.5986, Training Accuracy: 71.25%
Test Accuracy after Epoch 5: 75.00%
Epoch [6/100], Loss: 0.5453, Training Accuracy: 75.00%
Test Accuracy after Epoch 6: 77.50%
Epoch [7/100], Loss: 0.5436, Training Accuracy: 73.12%
Test Accuracy after Epoch 7: 80.00%
Epoch [8/100], Loss: 0.5790, Training Accuracy: 73.75%
Test Accuracy after Epoch 8: 77.50%
Epoch [9/100], Loss: 0.4887, Training Accuracy: 79.38%
Test Accuracy after Epoch 9: 80.00%
Epoch [10/100], Loss: 0.4365, Training Accuracy: 79.38%
Test Accuracy after Epoch 10: 75.00%
Epoch [11/100], Loss: 0.4742, Training Accuracy: 76.88%
Test Accuracy after Epoch 11: 72.50%
Epoch [12/100], Loss: 0.4261, Training Accuracy: 80.62%
Test Accuracy after Epoch 12: 