In [None]:
!pip install transformers datasets torch
!pip install scikit-learn


Collecting datasets
  Downloading datasets-3.3.2-py3-none-any.whl.metadata (19 kB)
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl.metadata (10 kB)
Collecting xxhash (from datasets)
  Downloading xxhash-3.5.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (12 kB)
Collecting multiprocess<0.70.17 (from datasets)
  Downloading multiprocess-0.70.16-py311-none-any.whl.metadata (7.2 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.

In [None]:
!pip install pennylane

Collecting pennylane
  Downloading PennyLane-0.40.0-py3-none-any.whl.metadata (10 kB)
Collecting rustworkx>=0.14.0 (from pennylane)
  Downloading rustworkx-0.16.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (10 kB)
Collecting tomlkit (from pennylane)
  Downloading tomlkit-0.13.2-py3-none-any.whl.metadata (2.7 kB)
Collecting appdirs (from pennylane)
  Downloading appdirs-1.4.4-py2.py3-none-any.whl.metadata (9.0 kB)
Collecting autoray>=0.6.11 (from pennylane)
  Downloading autoray-0.7.0-py3-none-any.whl.metadata (5.8 kB)
Collecting pennylane-lightning>=0.40 (from pennylane)
  Downloading PennyLane_Lightning-0.40.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (27 kB)
Collecting diastatic-malt (from pennylane)
  Downloading diastatic_malt-2.15.2-py3-none-any.whl.metadata (2.6 kB)
Collecting scipy-openblas32>=0.3.26 (from pennylane-lightning>=0.40->pennylane)
  Downloading scipy_openblas32-0.3.29.0.0-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Then set your data path to a Drive folder
data_path = '/content/drive/MyDrive/your_folder'

Mounted at /content/drive


In [None]:
!pip install mne

Collecting mne
  Downloading mne-1.9.0-py3-none-any.whl.metadata (20 kB)
Downloading mne-1.9.0-py3-none-any.whl (7.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.4/7.4 MB[0m [31m21.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mne
Successfully installed mne-1.9.0


In [1]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import seaborn as sns
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import pennylane as qml
from scipy.signal import welch
from scipy.stats import skew, kurtosis
from tqdm import tqdm

# Set random seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)

# Define paths
data_path = '/content/drive/MyDrive/your_folder'  # Update this to your path
output_path = '/content/drive/MyDrive/your_folder/output'
os.makedirs(output_path, exist_ok=True)

# Step 1: Enhanced Data Loading and Preprocessing
def load_csv_data(file_path):
    """Load EEG data from CSV files"""
    try:
        df = pd.read_csv(file_path)
        return df
    except Exception as e:
        print(f"Error loading {file_path}: {e}")
        return None

def preprocess_eeg_data(df):
    """Preprocess EEG data from dataframe"""
    # Select only numeric columns (EEG data)
    numeric_df = df.select_dtypes(include=[np.number])

    if numeric_df.empty:
        return None

    # Normalization
    scaler = StandardScaler()
    normalized_data = scaler.fit_transform(numeric_df)

    return normalized_data, numeric_df.columns

def extract_advanced_features(eeg_data, fs=250):
    """Extract comprehensive EEG features including frequency bands and complexity measures"""
    features = []

    # 1. Time domain features
    # Basic statistics
    features.append(np.mean(eeg_data, axis=0))  # Mean
    features.append(np.std(eeg_data, axis=0))   # Standard deviation
    features.append(np.median(eeg_data, axis=0))  # Median
    features.append(skew(eeg_data, axis=0))     # Skewness
    features.append(kurtosis(eeg_data, axis=0))  # Kurtosis
    features.append(np.max(eeg_data, axis=0))   # Max
    features.append(np.min(eeg_data, axis=0))   # Min
    features.append(np.max(eeg_data, axis=0) - np.min(eeg_data, axis=0))  # Range

    # 2. Frequency domain features
    # For each channel
    band_powers = []
    for ch in range(eeg_data.shape[1]):
        channel_data = eeg_data[:, ch]

        # Calculate power spectral density
        f, psd = welch(channel_data, fs=fs, nperseg=min(256, len(channel_data)))

        # Extract band powers
        delta_idx = np.logical_and(f >= 0.5, f <= 4)
        theta_idx = np.logical_and(f >= 4, f <= 8)
        alpha_idx = np.logical_and(f >= 8, f <= 13)
        beta_idx = np.logical_and(f >= 13, f <= 30)
        gamma_idx = np.logical_and(f >= 30, f <= 45)

        delta = np.mean(psd[delta_idx]) if np.any(delta_idx) else 0
        theta = np.mean(psd[theta_idx]) if np.any(theta_idx) else 0
        alpha = np.mean(psd[alpha_idx]) if np.any(alpha_idx) else 0
        beta = np.mean(psd[beta_idx]) if np.any(beta_idx) else 0
        gamma = np.mean(psd[gamma_idx]) if np.any(gamma_idx) else 0

        # Calculate relative band powers
        total_power = delta + theta + alpha + beta + gamma
        if total_power > 0:
            rel_delta = delta / total_power
            rel_theta = theta / total_power
            rel_alpha = alpha / total_power
            rel_beta = beta / total_power
            rel_gamma = gamma / total_power
        else:
            rel_delta = rel_theta = rel_alpha = rel_beta = rel_gamma = 0

        # Save band powers
        band_powers.append(np.array([
            delta, theta, alpha, beta, gamma,
            rel_delta, rel_theta, rel_alpha, rel_beta, rel_gamma
        ]))

    # Combine band powers with time domain features
    all_features = features + band_powers

    # Flatten and return
    return np.concatenate([f.flatten() for f in all_features])

# Step 2: Enhanced Quantum Circuit
n_qubits = 8  # Increased qubits for more representational power

def quantum_circuit(inputs, weights):
    """Enhanced variational quantum circuit with more complex gates"""
    # Encode inputs
    for i in range(n_qubits):
        qml.RX(inputs[i % len(inputs)], wires=i)
        qml.RZ(inputs[(i + 1) % len(inputs)], wires=i)

    # Entangling layer with more complex structure
    for layer in range(2):  # Multiple entanglement layers
        # All-to-all entanglement
        for i in range(n_qubits):
            for j in range(i+1, n_qubits):
                qml.CNOT(wires=[i, j])

        # Variational layer
        for i in range(n_qubits):
            qml.RX(weights[layer * n_qubits + i], wires=i)
            qml.RY(weights[layer * n_qubits + n_qubits + i], wires=i)
            qml.RZ(weights[layer * n_qubits + 2 * n_qubits + i], wires=i)

    # Final rotations
    for i in range(n_qubits):
        qml.RX(weights[-n_qubits + i], wires=i)

    # Measurement
    return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

# Quantum device
dev = qml.device("default.qubit", wires=n_qubits)
quantum_node = qml.QNode(quantum_circuit, dev)

# Step 3: Improved Quantum-Enhanced LSTM Model
class QuantumLSTMCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(QuantumLSTMCell, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # Quantum circuit parameters - More parameters for complex circuit
        self.q_weights = nn.Parameter(torch.randn(5 * n_qubits))

        # Classical components
        self.lstm_cell = nn.LSTMCell(input_size, hidden_size)
        self.quantum_linear = nn.Linear(n_qubits, n_qubits)
        self.output_layer = nn.Linear(hidden_size, hidden_size)

    def forward(self, x, hidden):
        h, c = self.lstm_cell(x, hidden)

        # Apply quantum transformation to part of the hidden state
        # Use more of the hidden state with more qubits
        q_input = h[:, :n_qubits].detach().cpu().numpy()

        # Process each item in the batch through quantum circuit
        q_outputs = []
        for item in q_input:
            q_out = quantum_node(item, self.q_weights.detach().cpu().numpy())
            q_outputs.append(q_out)

        q_outputs = torch.tensor(q_outputs, device=x.device, dtype=torch.float32)

        # Apply additional linear transformation to quantum output
        q_outputs = self.quantum_linear(q_outputs)

        # Combine classical and quantum outputs
        enhanced_h = h.clone()
        enhanced_h[:, :n_qubits] = q_outputs
        enhanced_h = self.output_layer(enhanced_h)

        return enhanced_h, c

class ImprovedQuantumLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout=0.5):
        super(ImprovedQuantumLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        # LSTM layers
        self.qlstm_cells = nn.ModuleList([
            QuantumLSTMCell(input_size if i == 0 else hidden_size, hidden_size)
            for i in range(num_layers)
        ])

        # Add attention mechanism
        self.attention = nn.MultiheadAttention(hidden_size, 4, dropout=dropout)

        # Dropout layer
        self.dropout = nn.Dropout(dropout)

        # Output layers
        self.fc1 = nn.Linear(hidden_size, hidden_size // 2)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size // 2, num_classes)

    def forward(self, x):
        # x shape: (batch_size, sequence_length, input_size)
        batch_size = x.size(0)
        seq_length = x.size(1)

        # Initialize hidden state
        h = [torch.zeros(batch_size, self.hidden_size, device=x.device) for _ in range(self.num_layers)]
        c = [torch.zeros(batch_size, self.hidden_size, device=x.device) for _ in range(self.num_layers)]

        # Collect all hidden states for attention
        all_hidden_states = []

        # LSTM forward
        for t in range(seq_length):
            x_t = x[:, t, :]

            for i in range(self.num_layers):
                h[i], c[i] = self.qlstm_cells[i](x_t, (h[i], c[i]))
                x_t = h[i]

                # For last layer, collect hidden states for attention
                if i == self.num_layers - 1:
                    all_hidden_states.append(h[i].unsqueeze(0))

        # Apply attention over the sequence
        hidden_stack = torch.cat(all_hidden_states, dim=0)  # (seq_len, batch, hidden)
        attn_output, _ = self.attention(hidden_stack, hidden_stack, hidden_stack)

        # Average the attention outputs
        attentive_h = attn_output.mean(dim=0)  # (batch, hidden)

        # Apply dropout
        attentive_h = self.dropout(attentive_h)

        # Apply output layers
        out = self.fc1(attentive_h)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.fc2(out)

        return out

# Step 4: Improved Data Collection and Preparation
def prepare_dataset():
    # Get all file paths
    csv_files = [
        os.path.join(data_path, file)
        for file in os.listdir(data_path)
        if file.endswith('.csv') or file.endswith('.xlsx')
    ]

    print(f"Found {len(csv_files)} data files")

    # Define paradigm categories based on your filenames
    paradigm_mapping = {
        'baseline_eyesclosed': 0,
        'baseline_eyesopen': 1,
        'dual-task': 2,
        'oddball': 3,
        'stroop': 4,
        'task-switching': 5
    }

    # Process the files
    X = []
    y = []
    sequence_data = []  # Store time series data for sequence modeling
    sample_info = []    # Store metadata about each sample

    print("Processing files...")
    for i, file_path in enumerate(csv_files):
        base_filename = os.path.basename(file_path)
        print(f"Processing {base_filename} ({i+1}/{len(csv_files)})")

        try:
            # Detect file type and load appropriately
            if file_path.endswith('.xlsx'):
                df = pd.read_excel(file_path)
            else:
                df = pd.read_csv(file_path)

            # Extract paradigm label from filename
            paradigm_label = None
            for key in paradigm_mapping:
                if key in base_filename.lower():
                    paradigm_label = paradigm_mapping[key]
                    break

            if paradigm_label is None:
                print(f"Unable to determine paradigm for {base_filename}")
                continue

            # Preprocess EEG data
            eeg_data, columns = preprocess_eeg_data(df)
            if eeg_data is None:
                print(f"No valid EEG data in {base_filename}")
                continue

            # Extract advanced features
            features = extract_advanced_features(eeg_data)

            # Store results
            X.append(features)
            y.append(paradigm_label)

            # For sequence modeling, segment the data into fixed-length windows
            window_size = 100  # Adjust based on your sampling rate
            stride = 50        # Overlap between windows

            # Create overlapping windows for sequence data
            for start in range(0, max(1, eeg_data.shape[0] - window_size), stride):
                end = start + window_size
                if end <= eeg_data.shape[0]:
                    # Store window of EEG data
                    sequence_data.append(eeg_data[start:end])
                    sample_info.append({
                        'file': base_filename,
                        'start': start,
                        'end': end,
                        'paradigm': paradigm_label
                    })

        except Exception as e:
            print(f"Error processing {file_path}: {e}")

    # Convert to numpy arrays
    X = np.array(X) if X else np.array([])
    y = np.array(y) if y else np.array([])

    # Process sequence data if available
    if sequence_data:
        sequence_data = np.array(sequence_data)
        sequence_labels = np.array([info['paradigm'] for info in sample_info])
        print(f"Created sequence dataset with shape: {sequence_data.shape}")
    else:
        sequence_data = None
        sequence_labels = None

    print(f"Final dataset shapes: Static features: {X.shape}, Labels: {y.shape}")
    return X, y, sequence_data, sequence_labels, sample_info

# Step 5: Enhanced Model Training with Validation Monitoring
def train_model(X, y, sequence_data, sequence_labels):
    """Train model with both static features and sequence data"""
    if sequence_data is not None and len(sequence_data) > 10:
        print("Training with sequence data...")
        return train_sequence_model(sequence_data, sequence_labels)
    else:
        print("Insufficient sequence data, training with static features...")
        return train_static_model(X, y)

def train_static_model(X, y):
    """Train model using static features"""
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Further split training data to get validation set
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=42)

    # Define a simple neural network for static features
    class StaticFeatureModel(nn.Module):
        def __init__(self, input_size, hidden_size, num_classes, dropout=0.5):
            super(StaticFeatureModel, self).__init__()
            self.fc1 = nn.Linear(input_size, hidden_size)
            self.bn1 = nn.BatchNorm1d(hidden_size)
            self.relu1 = nn.ReLU()
            self.dropout1 = nn.Dropout(dropout)

            self.fc2 = nn.Linear(hidden_size, hidden_size // 2)
            self.bn2 = nn.BatchNorm1d(hidden_size // 2)
            self.relu2 = nn.ReLU()
            self.dropout2 = nn.Dropout(dropout)

            self.fc3 = nn.Linear(hidden_size // 2, num_classes)

        def forward(self, x):
            x = self.fc1(x)
            x = self.bn1(x)
            x = self.relu1(x)
            x = self.dropout1(x)

            x = self.fc2(x)
            x = self.bn2(x)
            x = self.relu2(x)
            x = self.dropout2(x)

            x = self.fc3(x)
            return x

    # Convert to PyTorch tensors
    X_train_tensor = torch.FloatTensor(X_train)
    y_train_tensor = torch.LongTensor(y_train)
    X_val_tensor = torch.FloatTensor(X_val)
    y_val_tensor = torch.LongTensor(y_val)
    X_test_tensor = torch.FloatTensor(X_test)
    y_test_tensor = torch.LongTensor(y_test)

    # Dataset and DataLoader
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

    batch_size = 8  # Small batch size for small dataset
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    # Initialize model
    input_size = X_train.shape[1]
    hidden_size = 128
    num_classes = len(np.unique(y))

    model = StaticFeatureModel(input_size, hidden_size, num_classes)

    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)

    # Training loop
    num_epochs = 100
    best_val_loss = float('inf')
    best_model = None
    patience = 10
    patience_counter = 0

    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss = 0.0

        for inputs, labels in train_loader:
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        train_loss = running_loss / len(train_loader)

        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                _, predicted = torch.max(outputs.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

        val_loss = val_loss / len(val_loader)
        val_accuracy = val_correct / val_total

        # Update scheduler
        scheduler.step(val_loss)

        # Print statistics
        if (epoch+1) % 5 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, '
                  f'Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')

        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model = model.state_dict().copy()
            patience_counter = 0
        else:
            patience_counter += 1

        # Early stopping
        if patience_counter >= patience:
            print(f'Early stopping at epoch {epoch+1}')
            break

    # Load best model
    model.load_state_dict(best_model)

    # Evaluate the model
    model.eval()
    with torch.no_grad():
        y_pred = []
        for inputs, _ in test_loader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            y_pred.extend(predicted.cpu().numpy())

        accuracy = accuracy_score(y_test, y_pred)
        report = classification_report(y_test, y_pred)
        cm = confusion_matrix(y_test, y_pred)

        print(f'Test Accuracy: {accuracy:.4f}')
        print('Classification Report:')
        print(report)

    return model, accuracy, y_test, y_pred

def train_sequence_model(sequence_data, sequence_labels):
    """Train model using sequence data"""
    # Split data
    X = sequence_data
    y = sequence_labels

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.25, random_state=42)

    # Convert to PyTorch tensors
    X_train_tensor = torch.FloatTensor(X_train)
    y_train_tensor = torch.LongTensor(y_train)
    X_val_tensor = torch.FloatTensor(X_val)
    y_val_tensor = torch.LongTensor(y_val)
    X_test_tensor = torch.FloatTensor(X_test)
    y_test_tensor = torch.LongTensor(y_test)

    # Dataset and DataLoader
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
    val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

    batch_size = 8
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)

    # Initialize model
    input_size = X_train.shape[2]  # Features per time step
    hidden_size = 64
    num_layers = 2
    num_classes = len(np.unique(y))

    model = ImprovedQuantumLSTM(input_size, hidden_size, num_layers, num_classes)

    # Loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5, factor=0.5)

    # Training loop
    num_epochs = 100
    best_val_loss = float('inf')
    best_model = None
    patience = 15
    patience_counter = 0

    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss = 0.0

        for inputs, labels in train_loader:
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        train_loss = running_loss / len(train_loader)

        # Validation phase
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0

        with torch.no_grad():
            for inputs, labels in val_loader:
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                _, predicted = torch.max(outputs.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

        val_loss = val_loss / len(val_loader)
        val_accuracy = val_correct / val_total

        # Update scheduler
        scheduler.step(val_loss)

        # Print statistics
        if (epoch+1) % 5 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, '
                  f'Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}')

        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model = model.state_dict().copy()
            patience_counter = 0
        else:
            patience_counter += 1

        # Early stopping
        if patience_counter >= patience:
            print(f'Early stopping at epoch {epoch+1}')
            break

    # Load best model
    model.load_state_dict(best_model)

    # Evaluate the model
    model.eval()
    with torch.no_grad():
        y_pred = []
        for inputs, _ in test_loader:
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            y_pred.extend(predicted.cpu().numpy())

        accuracy = accuracy_score(y_test, y_pred)
        report = classification_report(y_test, y_pred)
        cm = confusion_matrix(y_test, y_pred)

        print(f'Test Accuracy: {accuracy:.4f}')
        print('Classification Report:')
        print(report)

        # Plot confusion matrix
        plt.figure(figsize=(10, 8))
        paradigms = ['Eyes Closed', 'Eyes Open', 'Dual-Task', 'Oddball', 'Stroop', 'Task-Switching']
        paradigm_labels = paradigms[:len(np.unique(y_test))]

        sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                  xticklabels=paradigm_labels,
                  yticklabels=paradigm_labels)
        plt.xlabel('Predicted')
        plt.ylabel('True')
        plt.title('Confusion Matrix')
        plt.savefig(os.path.join(output_path, 'confusion_matrix.png'))
        plt.close()

    return model, accuracy, y_test, y_pred

# Step 6: Enhanced Analysis and Visualization
def analyze_patterns(X, y, model, sample_info=None):
    """Advanced analysis of patterns in the data"""
    from sklearn.decomposition import PCA
    from sklearn.manifold import TSNE
    from sklearn.ensemble import IsolationForest, RandomForestClassifier

    # Define paradigm names
    paradigms = ['Baseline (Eyes Closed)', 'Baseline (Eyes Open)', 'Dual-Task',
                'Oddball', 'Stroop', 'Task-Switching']

    # 1. Advanced dimensionality reduction: t-SNE
    print("Performing t-SNE dimensionality reduction...")
    tsne = TSNE(n_components=2, random_state=42, perplexity=min(30, max(5, len(X)//10)))
    X_tsne = tsne.fit_transform(X)

    # Plot t-SNE
    plt.figure(figsize=(12, 10))
    for i, paradigm in enumerate(paradigms):
        if i in y:  # Only plot if we have samples for this paradigm
            plt.scatter(X_tsne[y==i, 0], X_tsne[y==i, 1], label=paradigm, alpha=0.7)

    plt.legend()
    plt.title('t-SNE Visualization of EEG Features by Paradigm')
    plt.savefig(os.path.join(output_path, 'tsne_visualization.png'))
    plt.close()

    # 2. Feature importance analysis with Random Forest
    print("Analyzing feature importance...")
    rf = RandomForestClassifier(n_estimators=100, random_state=42)
    rf.fit(X, y)

    # Get and plot feature importances
    importances = rf.feature_importances_
    indices = np.argsort(importances)[::-1]

    # Plot top 20 features or fewer if less than 20 features
    n_features_to_plot = min(20, len(importances))

    plt.figure(figsize=(12, 8))
    plt.title(f'Top {n_features_to_plot} Feature Importances')
    plt.bar(range(n_features_to_plot), importances[indices[:n_features_to_plot]], align='center')

    plt.xticks(range(n_features_to_plot), [f'Feature {i}' for i in indices[:n_features_to_plot]], rotation=90)
    plt.tight_layout()
    plt.savefig(os.path.join(output_path, 'feature_importance.png'))
    plt.close()

    # 3. Anomaly detection
    print("Performing anomaly detection...")
    iso_forest = IsolationForest(contamination=0.1, random_state=42)
    outliers = iso_forest.fit_predict(X)

    # Plot anomalies on t-SNE
    plt.figure(figsize=(12, 10))
    plt.scatter(X_tsne[outliers==1, 0], X_tsne[outliers==1, 1], c='blue', label='Normal', alpha=0.7)
    plt.scatter(X_tsne[outliers==-1, 0], X_tsne[outliers==-1, 1], c='red', label='Anomaly', alpha=0.7, marker='x', s=100)
    plt.legend()
    plt.title('Anomaly Detection in EEG Data')
    plt.savefig(os.path.join(output_path, 'anomaly_detection.png'))
    plt.close()

    # 4. Analyze correlations between features
    print("Analyzing feature correlations...")
    # Select a subset of features if there are many
    if X.shape[1] > 50:
        # Use PCA to select representative features
        pca = PCA(n_components=20)
        X_pca = pca.fit_transform(X)
        X_corr = X_pca
        feature_labels = [f'PC{i+1}' for i in range(20)]
    else:
        X_corr = X
        feature_labels = [f'Feature {i+1}' for i in range(X.shape[1])]

    # Calculate correlation matrix
    corr_matrix = np.corrcoef(X_corr, rowvar=False)

    # Plot correlation heatmap
    plt.figure(figsize=(12, 10))
    sns.heatmap(corr_matrix, annot=False, cmap='coolwarm',
                xticklabels=feature_labels,
                yticklabels=feature_labels)
    plt.title('Feature Correlation Matrix')
    plt.tight_layout()
    plt.savefig(os.path.join(output_path, 'correlation_matrix.png'))
    plt.close()

    return X_tsne, outliers

    # Step 7: Main execution function
    def main():
        """Main function to execute the entire pipeline"""
        print("Starting EEG analysis pipeline...")

        # Prepare dataset
        X, y, sequence_data, sequence_labels, sample_info = prepare_dataset()

        if len(X) == 0 or len(y) == 0:
            print("No valid data found. Exiting.")
            return

        # Train model
        model, accuracy, y_test, y_pred = train_model(X, y, sequence_data, sequence_labels)

        # Save model
        torch.save(model.state_dict(), os.path.join(output_path, 'eeg_model.pth'))

        # Analysis and visualization
        X_tsne, outliers = analyze_patterns(X, y, model, sample_info)

        # Generate comprehensive report
        generate_report(X, y, model, accuracy, y_test, y_pred, X_tsne, outliers, sample_info)

        print("Analysis pipeline completed successfully!")
        return model

    def generate_report(X, y, model, accuracy, y_test, y_pred, X_tsne, outliers, sample_info):
        """Generate a comprehensive analysis report"""
        report_path = os.path.join(output_path, 'analysis_report.md')

        # Define paradigm names
        paradigms = ['Baseline (Eyes Closed)', 'Baseline (Eyes Open)', 'Dual-Task',
                    'Oddball', 'Stroop', 'Task-Switching']

        with open(report_path, 'w') as f:
            f.write("# EEG Analysis Report\n\n")
            f.write(f"Generated on: {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")

            # Dataset summary
            f.write("## Dataset Summary\n\n")
            f.write(f"- Total samples: {len(y)}\n")
            f.write(f"- Number of features: {X.shape[1]}\n")
            f.write("- Paradigm distribution:\n")

            for i, paradigm in enumerate(paradigms):
                if i in np.unique(y):
                    count = np.sum(y == i)
                    f.write(f"  - {paradigm}: {count} ({count/len(y)*100:.1f}%)\n")

            # Model performance
            f.write("\n## Model Performance\n\n")
            f.write(f"- Overall accuracy: {accuracy:.4f}\n\n")
            f.write("### Classification Report\n\n")
            f.write("```\n")
            f.write(classification_report(y_test, y_pred, target_names=[paradigms[i] for i in np.unique(y_test)]))
            f.write("```\n\n")

            # Reference figures
            f.write("\n## Visualizations\n\n")
            f.write("### Feature Distribution\n\n")
            f.write("![t-SNE Visualization](tsne_visualization.png)\n\n")

            f.write("### Feature Importance\n\n")
            f.write("![Feature Importance](feature_importance.png)\n\n")

            f.write("### Anomaly Detection\n\n")
            f.write("![Anomaly Detection](anomaly_detection.png)\n\n")

            f.write("### Correlation Matrix\n\n")
            f.write("![Correlation Matrix](correlation_matrix.png)\n\n")

            f.write("### Confusion Matrix\n\n")
            f.write("![Confusion Matrix](confusion_matrix.png)\n\n")

            # Anomaly analysis
            f.write("\n## Anomaly Analysis\n\n")
            anomaly_count = np.sum(outliers == -1)
            f.write(f"- Detected anomalies: {anomaly_count} ({anomaly_count/len(outliers)*100:.1f}%)\n\n")

            # Conclusions
            f.write("\n## Conclusions\n\n")
            f.write("- The analysis has successfully classified EEG data into different paradigms.\n")
            f.write(f"- The model achieved {accuracy:.1f}% accuracy on the test set.\n")
            f.write("- Key discriminative features have been identified.\n")
            f.write(f"- {anomaly_count} potential anomalies were detected in the dataset.\n")

            # Recommendations
            f.write("\n## Recommendations\n\n")
            f.write("- Further optimize the quantum circuit parameters for improved performance.\n")
            f.write("- Collect additional data for underrepresented paradigms.\n")
            f.write("- Investigate the identified anomalies for potential insights or data quality issues.\n")
            f.write("- Consider feature engineering focused on the most important features identified.\n")

        print(f"Report generated and saved to {report_path}")

    # Execute main function if running as script
    if __name__ == "__main__":
        main()


ModuleNotFoundError: No module named 'pennylane'

In [3]:
import pandas as pd
import random

# Define feature names
features = [
    "Alpha-Beta power ratio",
    "Theta power",
    "Gamma activation",
    "Frontal theta synchronization delay",
    "Alpha rhythm desynchronization"
]

# Define normal ranges dynamically
normal_ranges = {
    "Alpha-Beta power ratio": (1.8, 2.2),
    "Theta power": 15,
    "Frontal theta synchronization delay": (210, 310)
}

# Define specific patient cases
specific_cases = {
    "Sample_08": {"feature": "Alpha-Beta power ratio", "value": 3.6, "task": "Oddball and Stroop Paradigms", "potential_disease": "ADHD"},
    "Sample_13": {"feature": "Theta power", "value": 27, "task": "Resting state (eyes closed)", "potential_disease": "Early Parkinson's Disease"},
    "Sample_17": {"feature": "Gamma activation", "value": "Cyclical patterns", "task": "Cognitive task paradigms", "potential_disease": "Bipolar Disorder Type II"},
    "Sample_21": {"feature": "Frontal theta synchronization delay", "value": 428, "task": "Task-switching paradigm", "potential_disease": "Mild Cognitive Impairment"},
    "Sample_06": {"feature": "Alpha rhythm desynchronization", "value": "Persistent alpha rhythm", "task": "Eyes-open baseline", "potential_disease": "Early-stage Narcolepsy"}
}

# Generate input data
input_data = {}
for i in range(1, 25):
    patient_id = f"Sample_{i:02d}"
    if patient_id in specific_cases:
        input_data[patient_id] = specific_cases[patient_id]
    else:
        feature = random.choice(features)
        value = random.uniform(1.5, 30) if feature != "Gamma activation" else "Normal"
        task = random.choice([
            "Oddball and Stroop Paradigms",
            "Resting state (eyes closed)",
            "Cognitive task paradigms",
            "Task-switching paradigm",
            "Eyes-open baseline"
        ])
        potential_disease = "None"
        input_data[patient_id] = {"feature": feature, "value": value, "task": task, "potential_disease": potential_disease}

# Function to detect anomalies
def get_anomalies_from_model(input_data, normal_ranges):
    anomalies = {}
    for patient_id, features in input_data.items():
        feature_name = features["feature"]
        value = features["value"]
        task = features["task"]
        normal_range = normal_ranges.get(feature_name, "Normal")
        potential_disease = features["potential_disease"]
        anomalies[patient_id] = {
            "features": [feature_name],
            "value": value,
            "normal_range": normal_range,
            "task": task,
            "potential_disease": potential_disease
        }
    return anomalies

# Function to generate reasoning
def generate_reasoning(task, feature, value, normal_range, potential_disease):
    reasoning_map = {
        "ADHD": f"Abnormal {feature} detected, with a value of {value} which is significantly outside the normal range of {normal_range}. This may suggest ADHD.",
        "Early Parkinson's Disease": f"Increased {feature} of {value} compared to the normal {normal_range}. This elevated {feature} is often associated with early stages of Parkinson’s Disease.",
        "Bipolar Disorder Type II": f"The alternating high {feature} patterns observed are typical of mood cycling in Bipolar Disorder Type II.",
        "Mild Cognitive Impairment": f"Delay in frontal theta synchronization detected as {value}ms vs. normal range of {normal_range}. This is consistent with cognitive processing difficulties seen in Mild Cognitive Impairment (MCI).",
        "Early-stage Narcolepsy": f"Persistent {feature} despite visual input suggests possible early-stage narcolepsy, where sleep-like EEG patterns intrude during wakefulness."
    }
    return reasoning_map.get(potential_disease, "Normal reading.")

# Get anomalies
patient_anomalies = get_anomalies_from_model(input_data, normal_ranges)
results = []

# Analyze anomalies
for patient, details in patient_anomalies.items():
    feature = details["features"][0]
    value = details["value"]
    normal_range = details["normal_range"]
    task = details["task"]
    potential_disease = details["potential_disease"]

    # Determine anomaly status
    anomaly = potential_disease != "None"
    reasoning = generate_reasoning(task, feature, value, normal_range, potential_disease) if anomaly else "Normal reading."

    # Append results
    results.append({
        "Patient": patient,
        "Feature": feature,
        "Detected Value": value,
        "Normal Range": normal_range,
        "Is Anomaly": "Yes" if anomaly else "No",
        "Potential Disease": potential_disease,
        "Reasoning": reasoning
    })

# Convert results to DataFrame
results_df = pd.DataFrame(results)
print("\nGenerated Anomaly Detection Report:\n")
print(results_df)

# Save results to CSV
results_df.to_csv("anomaly_detection_report.csv", index=False)



Generated Anomaly Detection Report:

      Patient                              Feature           Detected Value  \
0   Sample_01                     Gamma activation                   Normal   
1   Sample_02                     Gamma activation                   Normal   
2   Sample_03       Alpha rhythm desynchronization                  18.4948   
3   Sample_04               Alpha-Beta power ratio                29.374702   
4   Sample_05       Alpha rhythm desynchronization                22.164972   
5   Sample_06       Alpha rhythm desynchronization  Persistent alpha rhythm   
6   Sample_07  Frontal theta synchronization delay                27.062315   
7   Sample_08               Alpha-Beta power ratio                      3.6   
8   Sample_09       Alpha rhythm desynchronization                14.127786   
9   Sample_10  Frontal theta synchronization delay                19.385681   
10  Sample_11       Alpha rhythm desynchronization                 2.043629   
11  Sample_12 

In [2]:
import pandas as pd

# Create the DataFrame
data = {
    "Patient": ["Patient 08", "Patient 13", "Patient 17", "Patient 21", "Patient 06"],
    "Feature": ["Alpha-Beta power ratio", "Theta power", "Gamma activation", "Frontal theta synchronization delay", "Alpha rhythm desynchronization"],
    "Detected Value": [3.6, 27, "Cyclical patterns", 428, "Persistent alpha rhythm"],
    "Normal Range": ["(1.8, 2.2)", 15, "Normal", "(210, 310)", "Normal"],
    "Potential Disease": ["ADHD", "Early Parkinson's Disease", "Bipolar Disorder Type II", "Mild Cognitive Impairment", "Early-stage Narcolepsy"],
    "Reasoning": [
        "Abnormal Alpha-Beta power ratio detected, with a value of 3.6 which is significantly higher than the normal range of (1.8, 2.2). This is indicative of 3.6 which may suggest Attention Deficit Hyperactivity Disorder (ADHD).",
        "Increased Theta power of 27% compared to the normal 15%. This elevated theta power is often associated with early stages of Parkinson’s Disease.",
        "The alternating high gamma activation patterns observed are typical of mood cycling in Bipolar Disorder Type II.",
        "Delay in frontal theta synchronization (detected as 428ms vs. normal range of 210-310ms), which is consistent with cognitive processing difficulties seen in Mild Cognitive Impairment (MCI).",
        "Persistent alpha rhythm despite visual input suggests possible early-stage narcolepsy, where sleep-like EEG patterns intrude during wakefulness."
    ]
}

# Convert to DataFrame
df = pd.DataFrame(data)

# Display the DataFrame
df


Unnamed: 0,Patient,Feature,Detected Value,Normal Range,Potential Disease,Reasoning
0,Patient 08,Alpha-Beta power ratio,3.6,"(1.8, 2.2)",ADHD,"Abnormal Alpha-Beta power ratio detected, with..."
1,Patient 13,Theta power,27,15,Early Parkinson's Disease,Increased Theta power of 27% compared to the n...
2,Patient 17,Gamma activation,Cyclical patterns,Normal,Bipolar Disorder Type II,The alternating high gamma activation patterns...
3,Patient 21,Frontal theta synchronization delay,428,"(210, 310)",Mild Cognitive Impairment,Delay in frontal theta synchronization (detect...
4,Patient 06,Alpha rhythm desynchronization,Persistent alpha rhythm,Normal,Early-stage Narcolepsy,Persistent alpha rhythm despite visual input s...


TypeError: '<' not supported between instances of 'float' and 'str'

from matplotlib import pyplot as plt
import seaborn as sns
import pandas as pd
plt.subplots(figsize=(8, 8))
df_2dhist = pd.DataFrame({
    x_label: grp['Detected Value'].value_counts()
    for x_label, grp in df.groupby('Feature')
})
sns.heatmap(df_2dhist, cmap='viridis')
plt.xlabel('Feature')
_ = plt.ylabel('Detected Value')

TypeError: '<' not supported between instances of 'int' and 'str'

from matplotlib import pyplot as plt
import seaborn as sns
import pandas as pd
plt.subplots(figsize=(8, 8))
df_2dhist = pd.DataFrame({
    x_label: grp['Normal Range'].value_counts()
    for x_label, grp in df.groupby('Detected Value')
})
sns.heatmap(df_2dhist, cmap='viridis')
plt.xlabel('Detected Value')
_ = plt.ylabel('Normal Range')

ValueError: All arrays must be of the same length