In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, confusion_matrix
import os
from scipy.signal import butter, filtfilt
# Set seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

In [None]:
from PIL import Image
import os

def get_image_dimensions_stats(image_dir):
    dimensions = []
    for root, _, files in os.walk(image_dir):
        for filename in files:
            if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif', '.tiff')):
                img_path = os.path.join(root, filename)
                try:
                    with Image.open(img_path) as img:
                        dimensions.append(img.size)  # (width, height)
                except Exception as e:
                    print(f"Could not open {img_path}: {e}")

    if not dimensions:
        print("No images found.")
        return

    min_dim = min(dimensions, key=lambda x: x[0]*x[1])
    max_dim = max(dimensions, key=lambda x: x[0]*x[1])

    print(f"Lowest dimension: {min_dim} (width x height)")
    print(f"Highest dimension: {max_dim} (width x height)")

# Example usage:
get_image_dimensions_stats('C:/Users/sabit/OneDrive/Desktop/ecg_exp')

Lowest dimension: (1899, 1651) (width x height)
Highest dimension: (3158, 3092) (width x height)


In [2]:
#Resize Image
from PIL import Image
import os
def resize_images(input_dir, output_dir, size=(1500, 1300)):
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    for root, _, files in os.walk(input_dir):
        rel_path = os.path.relpath(root, input_dir)
        save_dir = os.path.join(output_dir, rel_path)
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)
        for filename in files:
            if filename.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif', '.tiff')):
                img_path = os.path.join(root, filename)
                save_path = os.path.join(save_dir, filename)
                try:
                    with Image.open(img_path) as img:
                        img_resized = img.resize(size, resample=Image.LANCZOS)
                        img_resized.save(save_path, quality=100)
                except Exception as e:
                    print(f"Could not process {img_path}: {e}")

# Example usage:
resize_images('C:/Users/sabit/OneDrive/Desktop/ecg_exp', 'C:/Users/sabit/OneDrive/Desktop/ecg_resized')

In [2]:
def butter_bandpass_filter(signal, lowcut=0.5, highcut=30, fs=100, order=5):
    """Bandpass filter to remove noise"""
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, signal)

def normalize(signal):
    """Z-score normalization"""
    return (signal - np.mean(signal)) / np.std(signal)

def random_shift(signal, max_shift=0.1):
    """Random time shift augmentation"""
    shift = np.random.randint(-int(max_shift * len(signal)), int(max_shift * len(signal)))
    return np.roll(signal, shift)

def add_gaussian_noise(signal, std=0.01):
    """Add Gaussian noise"""
    noise = np.random.normal(0, std, len(signal))
    return signal + noise

In [3]:
class ECGDataset(Dataset):
    def __init__(self, signals, labels, transform=True):
        self.signals = signals
        self.labels = labels
        self.transform = transform  # Enable/disable augmentation

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        signal = self.signals[idx]
        label = self.labels[idx]

        # Preprocessing
        signal = butter_bandpass_filter(signal)
        signal = normalize(signal)

        # Data augmentation (only during training)
        if self.transform:
            if np.random.rand() > 0.5:
                signal = random_shift(signal)
            if np.random.rand() > 0.5:
                signal = add_gaussian_noise(signal)

        return torch.tensor(signal, dtype=torch.float32).unsqueeze(0), torch.tensor(label, dtype=torch.long)

In [6]:
class ECG1DCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=5, stride=2, padding=2),
            nn.BatchNorm1d(16),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(16, 32, kernel_size=3, padding=1),
            nn.BatchNorm1d(32),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Conv1d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.MaxPool1d(2)
        )
        self.fc_layers = nn.Sequential(
            nn.Linear(64 * 30, 128),  # Adjust based on your input length (1000 -> 30 after pooling)
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 3))
        
    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        return self.fc_layers(x)

In [7]:
def train_model(X, y, num_epochs=50, batch_size=32):
    # Split data (stratified for class balance)
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, stratify=y)

    # Compute class weights (for imbalanced data)
    class_weights = compute_class_weight('balanced', classes=np.unique(y_train), y=y_train)
    class_weights = torch.tensor(class_weights, dtype=torch.float32)

    # Create datasets
    train_dataset = ECGDataset(X_train, y_train, transform=True)
    val_dataset = ECGDataset(X_val, y_val, transform=False)

    # Data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)

    # Model, loss, optimizer
    model = ECG1DCNN()
    criterion = nn.CrossEntropyLoss(weight=class_weights)
    optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-5)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5)

    # Training loop
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        for signals, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(signals)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        # Validation
        model.eval()
        val_loss = 0.0
        all_preds, all_labels = [], []
        with torch.no_grad():
            for signals, labels in val_loader:
                outputs = model(signals)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, preds = torch.max(outputs, 1)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        # Metrics
        f1 = f1_score(all_labels, all_preds, average='macro')
        print(f"Epoch {epoch+1}/{num_epochs} | Train Loss: {train_loss/len(train_loader):.4f} | Val Loss: {val_loss/len(val_loader):.4f} | Val F1: {f1:.4f}")
        scheduler.step(val_loss)

    return model

In [10]:
from sklearn.utils.class_weight import compute_class_weight

# Example synthetic data (replace with your actual data)
X = [np.random.randn(1000) for _ in range(732)]  # 732 samples, each 1000 points long
y = np.concatenate([np.zeros(345), np.ones(324), 2*np.ones(63)])  # Class counts

# Train
model = train_model(X, y, num_epochs=30)

# Save model
torch.save(model.state_dict(), 'C:/Users/sabit/OneDrive/Desktop/ecg_exp')

RuntimeError: mat1 and mat2 shapes cannot be multiplied (32x3968 and 1920x128)