In [1]:
!pip install --user -r requirements.txt



In [15]:
import csv
import matplotlib.pyplot as plt
import numpy as np
import os
from wettbewerb import load_references, get_3montages
import mne
from scipy import signal as sig
import ruptures as rpt

In [16]:
training_folder = "../../shared_data/training_mini"

In [17]:
ids, channels, data, sampling_frequencies, reference_systems, eeg_labels = load_references(training_folder) 

100	 Dateien wurden geladen.


In [18]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

# Angenommen, Sie haben Ihre Daten bereits geladen:
# ids, channels, data, sampling_frequencies, reference_systems, eeg_labels = load_references(training_folder)

# Konvertieren der EEG-Daten in das erforderliche Format
def prepare_data(data, channels, sampling_frequencies, time_steps=400):
    num_records = len(data)
    formatted_data = np.empty((num_records, 3, time_steps)) # [Anzahl der Beispiele, 3 Kanäle, Zeitpunkte]

    for i in range(num_records):
        _fs = sampling_frequencies[i]
        montages, montage_data, _ = get_3montages(channels[i], data[i])

        # Filter für jedes Montage anwenden
        for j in range(montage_data.shape[0]):
            signal = montage_data[j, :]
            # Anwenden des Notch-Filters
            signal_notch = mne.filter.notch_filter(x=signal, Fs=_fs, freqs=np.array([50., 100.]), n_jobs=2, verbose=False)
            # Anwenden des Bandpassfilters
            signal_filter = mne.filter.filter_data(data=signal_notch, sfreq=_fs, l_freq=0.5, h_freq=70.0, n_jobs=2, verbose=False)
            montage_data[j, :] = signal_filter

        # Skalieren oder Auffüllen der Daten auf die gewünschte Länge
        montage_data_resized = montage_data[:, :time_steps] if montage_data.shape[1] > time_steps else np.pad(montage_data, ((0,0), (0, time_steps - montage_data.shape[1])), 'constant')
        formatted_data[i] = montage_data_resized

    return formatted_data

# Vorbereiten der Daten mit Filtern
formatted_data = prepare_data(data, channels, sampling_frequencies)

# Überprüfen der Dimensionen der formatierten Daten
print("Dimensionen der formatierten Daten:", formatted_data.shape)

# Stellen Sie sicher, dass die Dimensionen [num_records, 3, time_steps] sind
expected_shape = (len(data), 3, 400)  # 400 ist ein Beispiel für time_steps
if formatted_data.shape == expected_shape:
    print("Die Daten haben das erwartete Format.")
else:
    print(f"Unerwartetes Format. Erwartet: {expected_shape}, Erhalten: {formatted_data.shape}")

Dimensionen der formatierten Daten: (100, 3, 400)
Die Daten haben das erwartete Format.


In [19]:
class EEGDataset(Dataset):
    def __init__(self, data, labels):
        self.data = torch.from_numpy(data).float()  # Konvertieren zu torch.Tensor
        self.labels = torch.from_numpy(np.array(labels)).long()

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# Erstellen des Datasets
eeg_dataset = EEGDataset(formatted_data, np.array([label[0] for label in eeg_labels]))  # Hier nehmen wir nur den Teil "seizure_present" als Label

In [20]:
from torch import nn
import torch

class CNN(nn.Module):
    def __init__(self, num_classes, seq_length):
        super().__init__()
        self.classifier = nn.Sequential(
            nn.Conv1d(in_channels=3, out_channels=6, kernel_size=5),
            nn.BatchNorm1d(num_features=6),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2, stride=2),
            nn.Conv1d(6, 16, 5),
            nn.BatchNorm1d(16),
            nn.ReLU(),
            nn.MaxPool1d(2, 2),
        )

        # Anpassung für die Berechnung der Größe des linearen Layers
        linear_input_size = self._get_conv_output(seq_length)

        self.fc = nn.Sequential(
            nn.Linear(linear_input_size, 120),
            nn.BatchNorm1d(120),
            nn.ReLU(),
            nn.Linear(120, 84),
            nn.BatchNorm1d(84),
            nn.ReLU(),
            nn.Linear(84, num_classes),
        )

    def _get_conv_output(self, shape):
        with torch.no_grad():
            input = torch.zeros(1, 3, shape)
            output = self.classifier(input)
            return output.numel()

    def forward(self, x):
        x = self.classifier(x)
        x = torch.flatten(x, start_dim=1)
        x = self.fc(x)
        return x


In [21]:
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, random_split
from sklearn.metrics import f1_score
import numpy as np

# Angenommen, Ihre EEGDataset Klasse ist bereits definiert und instanziiert als `eeg_dataset`

# Teilen Sie Ihre Daten in Trainings- und Validierungssets
dataset_size = len(eeg_dataset)
train_size = int(dataset_size * 0.8)
val_size = dataset_size - train_size
train_dataset, val_dataset = random_split(eeg_dataset, [train_size, val_size])

train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=32)

# Geräteeinstellungen (nutzt GPU, falls verfügbar, sonst CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Definieren Sie Ihr Modell, Verlustfunktion und Optimierer
model = CNN(num_classes=2, seq_length=400).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

def train_one_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss, total_correct, total_samples = 0, 0, 0
    predictions, targets = [], []
    
    for inputs, labels in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total_correct += (predicted == labels).sum().item()
        total_samples += labels.size(0)
        
        predictions.extend(predicted.view(-1).cpu().numpy())
        targets.extend(labels.view(-1).cpu().numpy())
        
    f1 = f1_score(targets, predictions, average='macro')
    return total_loss / total_samples, total_correct / total_samples, f1

def validate(model, dataloader, criterion, device):
    model.eval()
    total_loss, total_correct, total_samples = 0, 0, 0
    predictions, targets = [], []
    
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total_correct += (predicted == labels).sum().item()
            total_samples += labels.size(0)
            
            predictions.extend(predicted.view(-1).cpu().numpy())
            targets.extend(labels.view(-1).cpu().numpy())

    f1 = f1_score(targets, predictions, average='macro')
    return total_loss / total_samples, total_correct / total_samples, f1

# Trainingszyklus
epochs = 25
for epoch in range(epochs):
    train_loss, train_acc, train_f1 = train_one_epoch(model, train_dataloader, optimizer, criterion, device)
    val_loss, val_acc, val_f1 = validate(model, val_dataloader, criterion, device)
    print(f"Epoch {epoch+1}/{epochs} | "
          f"Train Loss: {train_loss:.4f}, Accuracy: {train_acc:.4f}, F1: {train_f1:.4f} | "
          f"Val Loss: {val_loss:.4f}, Accuracy: {val_acc:.4f}, F1: {val_f1:.4f}")


RuntimeError: CUDA error: out of memory
CUDA kernel errors might be asynchronously reported at some other API call,so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.

In [22]:
print(next(iter(train_dataloader))[0].shape)

torch.Size([32, 3, 400])


In [8]:
import torch.optim as optim
from torch.utils.data import DataLoader
from sklearn.metrics import f1_score
from torch.optim.lr_scheduler import StepLR

# Geräteeinstellungen (nutzt GPU, falls verfügbar)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Modell auf das gewählte Gerät übertragen
model = model.to(device)

# Hyperparameter
learning_rate = 0.001
batch_size = 32
epochs = 50

# Loss-Funktion und Optimierer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = StepLR(optimizer, step_size=1, gamma=0.95)

# DataLoader für Training und Validierung (ersetzen Sie diese mit Ihren eigenen Datensätzen)
# train_dataset = ...
# val_dataset = ...

# Aufteilen der Daten in Trainings- und Validierungssets
train_size = int((1 - validation_split) * len(eeg_dataset))
val_size = len(eeg_dataset) - train_size
train_dataset, val_dataset = random_split(eeg_dataset, [train_size, val_size])

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size)

# Trainings- und Validierungsschleife
for epoch in range(epochs):
    model.train()
    train_losses = []
    train_targets = []
    train_predictions = []

    for inputs, targets in train_dataloader:
        inputs, targets = inputs.to(device), targets.to(device)

        # Vorwärtsdurchlauf
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        train_losses.append(loss.item())

        # Rückwärtsdurchlauf und Optimierung
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Speichern von Vorhersagen und Zielen für die F1-Berechnung
        _, predicted = torch.max(outputs, 1)
        train_targets.extend(targets.cpu().numpy())
        train_predictions.extend(predicted.cpu().numpy())

    train_f1 = f1_score(train_targets, train_predictions, average='macro')
    train_loss = sum(train_losses) / len(train_losses)

    # Validierung
    model.eval()
    val_losses = []
    val_targets = []
    val_predictions = []
    with torch.no_grad():
        for inputs, targets in val_dataloader:
            inputs, targets = inputs.to(device), targets.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            val_losses.append(loss.item())

            _, predicted = torch.max(outputs, 1)
            val_targets.extend(targets.cpu().numpy())
            val_predictions.extend(predicted.cpu().numpy())

    val_f1 = f1_score(val_targets, val_predictions, average='macro')
    val_loss = sum(val_losses) / len(val_losses)

    print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.4f}, Train F1: {train_f1:.4f}, Val Loss: {val_loss:.4f}, Val F1: {val_f1:.4f}")

    scheduler.step()

# Weiterer Code für die Auswertung oder Speicherung des Modells


RuntimeError: CUDA error: out of memory
CUDA kernel errors might be asynchronously reported at some other API call,so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1.

In [27]:
print("Datentyp von 'data':", type(data))
print("Struktur der EEG-Daten:", type(data[0]))


Datentyp von 'data': <class 'list'>
Struktur der EEG-Daten: <class 'numpy.ndarray'>


In [28]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

# Angenommen, Sie haben Ihre Daten bereits geladen:
# ids, channels, data, sampling_frequencies, reference_systems, eeg_labels = load_references(training_folder)

# Konvertieren der EEG-Daten in das erforderliche Format
def prepare_data(data, channels, time_steps=400):
    num_records = len(data)
    formatted_data = np.empty((num_records, 3, time_steps)) # [Anzahl der Beispiele, 3 Kanäle, Zeitpunkte]

    for i in range(num_records):
        montages, montage_data, _ = get_3montages(channels[i], data[i])
        montage_data_resized = montage_data[:, :time_steps] if montage_data.shape[1] > time_steps else np.pad(montage_data, ((0,0), (0, time_steps - montage_data.shape[1])), 'constant')
        formatted_data[i] = montage_data_resized

    return formatted_data

# Vorbereiten der Daten
formatted_data = prepare_data(data, channels)

# Überprüfen der Dimensionen der formatierten Daten
print("Dimensionen der formatierten Daten:", formatted_data.shape)

# Stellen Sie sicher, dass die Dimensionen [num_records, 3, time_steps] sind
expected_shape = (len(data), 3, 400)  # 400 ist ein Beispiel für time_steps
if formatted_data.shape == expected_shape:
    print("Die Daten haben das erwartete Format.")
else:
    print(f"Unerwartetes Format. Erwartet: {expected_shape}, Erhalten: {formatted_data.shape}")

Dimensionen der formatierten Daten: (100, 3, 400)
Die Daten haben das erwartete Format.


In [29]:
import torch.nn as nn
import torch.nn.functional as F

class EEGNet(nn.Module):
    def __init__(self, num_channels=3, time_steps=400):
        super(EEGNet, self).__init__()
        # 1D Convolutional Layer
        self.conv1 = nn.Conv1d(in_channels=num_channels, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        
        # Max Pooling Layer
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2, padding=0)
        
        # Anpassen der Feature-Map-Größe nach Pooling
        reduced_size = time_steps // 4  # angenommen, zweimaliges Pooling halbiert die Größe jedes Mal
        self.fc1 = nn.Linear(64 * reduced_size, 128)
        self.fc2 = nn.Linear(128, 1)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        
        # Flatten der Ausgabe für die Linear Layers
        x = x.view(x.size(0), -1)
        
        x = F.relu(self.fc1(x))
        x = torch.sigmoid(self.fc2(x))
        return x

model = EEGNet(num_channels=3, time_steps=400)
print(model)


EEGNet(
  (conv1): Conv1d(3, 32, kernel_size=(3,), stride=(1,), padding=(1,))
  (conv2): Conv1d(32, 64, kernel_size=(3,), stride=(1,), padding=(1,))
  (pool): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=6400, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=1, bias=True)
)


In [15]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score

best_f1 = 0.0  # Initialisieren Sie den besten F1-Score mit 0
best_model = None  # Variable zum Speichern des besten Modells

num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0

    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss /= len(train_loader)

    # Validierung und Berechnung des F1-Scores
    model.eval()
    all_predictions = []
    all_labels = []
    with torch.no_grad():
        for inputs, labels in test_loader:
            outputs = model(inputs)
            predicted = outputs.squeeze().round()
            all_predictions.extend(predicted.tolist())
            all_labels.extend(labels.tolist())

    epoch_f1 = f1_score(all_labels, all_predictions)
    if epoch_f1 > best_f1:
        best_f1 = epoch_f1
        best_model = model.state_dict()  # Speichern des besten Modells

    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {train_loss:.4f}, F1-Score: {epoch_f1:.4f}')

# Laden des besten Modells nach dem Training
model.load_state_dict(best_model)


NameError: name 'train_loader' is not defined

In [16]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score

best_f1 = 0.0
best_model = model.state_dict()  # Initialisierung mit dem Anfangszustand des Modells

num_epochs = 20

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0

    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs.squeeze(), labels.squeeze())  # Stellen Sie sicher, dass beide eindimensional sind
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss /= len(train_loader)

    # Validierung und Berechnung des F1-Scores
    model.eval()
    all_predictions = []
    all_labels = []
    with torch.no_grad():
        for inputs, labels in test_loader:
            outputs = model(inputs)
            predicted = outputs.squeeze().round()  # Squeeze und Round für binäre Klassifikation
            all_predictions.extend(predicted.tolist())
            all_labels.extend(labels.squeeze().tolist())  # Stellen Sie sicher, dass die Labels eindimensional sind

    epoch_f1 = f1_score(all_labels, all_predictions)
    if epoch_f1 > best_f1:
        best_f1 = epoch_f1
        best_model = model.state_dict()

    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {train_loss:.4f}, F1-Score: {epoch_f1:.4f}')

# Laden des besten Modells nach dem Training
model.load_state_dict(best_model)


NameError: name 'train_loader' is not defined