https://www.frontiersin.org/articles/10.3389/fnhum.2016.00235/full

https://www.mdpi.com/1424-8220/21/19/6570


https://github.com/dokato/connectivipy

La Coherence Dirigée Partielle, ou PDC (Partial Directed Coherence en anglais), est une mesure de la connectivité dans le domaine de la neurophysiologie et de l'analyse du cerveau. Elle est utilisée pour étudier comment différentes régions du cerveau interagissent les unes avec les autres.

La PDC est particulièrement utile pour comprendre comment l'activité électrique ou électroencéphalographique (EEG) est propagée entre différentes régions du cerveau. Voici quelques points clés pour comprendre la PDC :

Connectivité Dirigée : La PDC permet de mesurer la direction de la connectivité entre les régions cérébrales. Autrement dit, elle indique comment l'activité d'une région du cerveau influence l'activité d'une autre région, et vice versa.

Fréquences et Bandes de Fréquences : La PDC peut être calculée pour différentes fréquences ou bandes de fréquences du signal EEG. Cela permet de comprendre comment la connectivité entre les régions cérébrales varie en fonction de la fréquence.

Utilisation : La PDC est largement utilisée dans la recherche en neurosciences pour étudier des phénomènes tels que la communication cérébrale, la synchronisation entre les régions cérébrales, et les changements dans la connectivité en réponse à des tâches spécifiques ou à des pathologies.

Interprétation : Une PDC proche de 1 entre deux régions signifie une forte connectivité dirigée de l'une vers l'autre, tandis qu'une PDC proche de 0 indique une connectivité faible ou nulle.

Applications : La PDC est utilisée dans divers domaines, y compris la recherche sur les troubles neurologiques, l'étude de la cognition humaine, la neurologie clinique, et même dans le domaine de l'analyse de l'activité cérébrale lors de tâches spécifiques telles que la réflexion ou la perception.

En résumé, la PDC est une mesure importante pour comprendre comment différentes parties du cerveau interagissent et communiquent entre elles. Elle est souvent utilisée dans le cadre de la recherche en neurosciences pour mieux comprendre le fonctionnement du cerveau et les altérations qui peuvent survenir dans diverses conditions.

In [11]:
import mne
from mne.datasets import eegbci
from mne.io import concatenate_raws, read_raw_edf
import glob
import numpy as np
from scipy.linalg import toeplitz
from scipy.linalg import solve
from scipy.signal import lfilter

In [2]:
files = glob.glob('../files/S001/*.edf')
'''
=========  ===================================
run        task
=========  ===================================
1          Baseline, eyes open
2          Baseline, eyes closed
3, 7, 11   Motor execution: left vs right hand
4, 8, 12   Motor imagery: left vs right hand
5, 9, 13   Motor execution: hands vs feet
6, 10, 14  Motor imagery: hands vs feet
=========  ===================================
'''
raws = []

for i in [5, 9, 13]:
    current_file = files[i]
    r = read_raw_edf(current_file, preload=True, stim_channel='auto')
    events, _ = mne.events_from_annotations(r)
    if i in [5, 9, 13]:
        new_labels_events = {1:'rest', 2:'action_hand', 3:'action_feet'} # action
    new_annot = mne.annotations_from_events(events=events, event_desc=new_labels_events, sfreq=r.info['sfreq'], orig_time=r.info['meas_date'])
    r.set_annotations(new_annot)
    raws.append(r)
    
raw_obj = concatenate_raws(raws)

Extracting EDF parameters from /Users/owalid/42/post_intership/total-perspective-vortex/files/S001/S001R03.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 19999  =      0.000 ...   124.994 secs...
Used Annotations descriptions: ['T0', 'T1', 'T2']
Extracting EDF parameters from /Users/owalid/42/post_intership/total-perspective-vortex/files/S001/S001R13.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 19999  =      0.000 ...   124.994 secs...
Used Annotations descriptions: ['T0', 'T1', 'T2']
Extracting EDF parameters from /Users/owalid/42/post_intership/total-perspective-vortex/files/S001/S001R09.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
Reading 0 ... 19999  =      0.000 ...   124.994 secs...
Used Annotations descriptions: ['T0', 'T1', 'T2']


In [30]:
import numpy as np
import statsmodels.api as sm
from scipy.linalg import inv
from scipy.signal import welch
from scipy.linalg import solve_discrete_lyapunov


import numpy as np
import statsmodels.api as sm

def estimate_mvar_model(data, freq, model_order):
    """
    Estimate an MVAR model for a specific frequency.

    Parameters:
    - data: Multivariate time series data (n_channels x n_samples).
    - freq: Frequency of interest.
    - model_order: Order of the MVAR model.

    Returns:
    - mvar_model: Estimated MVAR model.
    """
    n_channels, n_samples = data.shape

    # Define the lag matrix for the MVAR model
    lag_matrix = np.zeros((n_samples - model_order, n_channels * model_order))
    for lag in range(1, model_order + 1):
        lag_matrix[:, (lag - 1) * n_channels:lag * n_channels] = data[:, lag:n_samples - model_order + lag]

    # Define the target matrix for the MVAR model
    target_matrix = data[:, model_order:]

    # Fit the MVAR model
    mvar_model = sm.OLS(target_matrix.T, lag_matrix).fit()

    return mvar_model


# Load your EEG/MEG data (replace with your data loading code)
# Make sure your data is in the shape (n_channels, n_samples)

# Define the model order for MVAR modeling
model_order = 10

# Define the sampling rate of your data (replace with the actual value)
sample_rate = 1000  # Replace with your data's sampling rate in Hz

# Define the frequency range of interest
fmin = 1
fmax = 30

# Compute the cross-spectral density matrix
frequencies, Cxy = welch(data, fs=sample_rate, nperseg=n_samples)

# Initialize the DTF matrix
dtf_matrix = np.zeros((n_channels, n_channels, len(frequencies)))

# Loop over frequencies of interest
for freq_idx, freq in enumerate(frequencies):
    # Estimate the MVAR model for this frequency
    mvar_model = estimate_mvar_model(data, freq, model_order)

    # Calculate the DTF for this frequency using the MVAR coefficients
    dtf_matrix[:, :, freq_idx] = calculate_dtf(mvar_model)

# dtf_matrix contains the DTF values for each frequency


ValueError: could not broadcast input array from shape (3,990) into shape (990,3)

In [3]:
raw = raw_obj.copy()

In [9]:
fmin = 1  # Minimum frequency
fmax = 30  # Maximum frequency

# Compute PDC using spectral_connectivity
pdc = spectral_connectivity(
    raw, method='pdc', mode='multitaper', fmin=fmin, fmax=fmax, faverage=True)

NameError: name 'spectral_connectivity' is not defined

In [None]:


'''
PNN

from sklearn.neighbors import KNeighborsClassifier
from sklearn.kernel_approximation import RBFSampler
from sklearn.pipeline import make_pipeline

# Sample data (replace with your own dataset)
# X_train, y_train = your_training_data
# X_test, y_test = your_testing_data

# Create a pipeline for PNN using Scikit-learn
pnn = make_pipeline(
    RBFSampler(gamma=1, random_state=1),
    KNeighborsClassifier(n_neighbors=5)
)

# Train the PNN on your training data
pnn.fit(X_train, y_train)

# Predict using the trained PNN
y_pred = pnn.predict(X_test)

# Evaluate the PNN
accuracy = (y_pred == y_test).mean()
print(f'Accuracy: {accuracy * 100:.2f}%')




PNN 2


import numpy as np
from sklearn.neighbors import KernelDensity
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.preprocessing import LabelBinarizer

class ProbabilisticNeuralNetwork(BaseEstimator, ClassifierMixin):
    def __init__(self, kernel='gaussian', bandwidth=1.0):
        self.kernel = kernel
        self.bandwidth = bandwidth

    def fit(self, X, y):
        self.classes_ = np.unique(y)
        self.classifiers_ = {}

        for c in self.classes_:
            X_class = X[y == c]
            self.classifiers_[c] = KernelDensity(kernel=self.kernel, bandwidth=self.bandwidth)
            self.classifiers_[c].fit(X_class)

    def predict_proba(self, X):
        probabilities = np.zeros((X.shape[0], len(self.classes_)))

        for i, c in enumerate(self.classes_):
            log_density = self.classifiers_[c].score_samples(X)
            probabilities[:, i] = np.exp(log_density)

        return probabilities / probabilities.sum(axis=1, keepdims=True)

    def predict(self, X):
        return self.classes_[np.argmax(self.predict_proba(X), axis=1)]

# Sample data (replace with your own dataset)
# X_train, y_train = your_training_data
# X_test, y_test = your_testing_data

# Create and train the PNN
pnn = ProbabilisticNeuralNetwork(kernel='gaussian', bandwidth=0.1)
pnn.fit(X_train, y_train)

# Predict probabilities and class labels
probs = pnn.predict_proba(X_test)
predicted_labels = pnn.predict(X_test)

# Print predicted probabilities and labels
print("Predicted Probabilities:")
print(probs)
print("Predicted Labels:")
print(predicted_labels)



PNN Keras

import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Layer

class ProbabilisticLayer(Layer):
    def __init__(self, num_units, smoothing_parameter=1.0, **kwargs):
        super(ProbabilisticLayer, self).__init__(**kwargs)
        self.num_units = num_units
        self.smoothing_parameter = smoothing_parameter

    def call(self, inputs):
        pdfs = []
        for i in range(self.num_units):
            pdf = tf.exp(-0.5 * ((inputs - inputs[:, i:i+1]) / self.smoothing_parameter) ** 2)
            pdf_sum = tf.reduce_sum(pdf, axis=1)
            pdfs.append(pdf_sum)
        return tf.stack(pdfs, axis=-1)

# Sample data (replace with your own dataset)
# X_train, y_train = your_training_data
# X_test = your_testing_data

# Create a PNN-like model
model = keras.Sequential([
    keras.layers.Input(shape=(X_train.shape[1],)),
    ProbabilisticLayer(num_units=X_train.shape[1]),
    keras.layers.Lambda(lambda x: tf.reduce_sum(x, axis=-1)),  # Sum over the PDFs
    keras.layers.Dense(np.max(y_train) + 1, activation='softmax')  # Output layer for classification
])

# Compile the model
model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Train the model
model.fit(X_train, y_train, epochs=10, batch_size=32)

# Evaluate the model
accuracy = model.evaluate(X_test, y_test)[1]
print(f'Accuracy: {accuracy * 100:.2f}%')

'''