In [1]:
import os
import pandas as pd
import numpy as np
import mtrf
import librosa
import matplotlib.pyplot as plt
from scipy.io import loadmat
from scipy.stats import zscore, pearsonr
from scipy.signal import hilbert, resample, correlate

from mtrf.model import TRF
from sklearn.cross_decomposition import CCA

In [2]:
def lag_generator_new(r, lags):
    '''
    Args:
      r: [time, neurons]
      
    Return
      out: [time, neuron*lags]
    
    '''
    lags = list(range(lags[0], lags[1]+1))
    out = np.zeros([r.shape[0], r.shape[1]*len(lags)])
    r = np.pad(r, ((0,len(lags)),(0,0)), 'constant')

    r_lag_list = []
    
    for lag in lags:
        t1 = np.roll(r, lag, axis=0)
        if lag < 0:
            t1[lag-1:, :] = 0
        else:
            t1[:lag, :] = 0
            
        r_lag_list.append(t1[:out.shape[0], :])
        
    out = np.concatenate(r_lag_list, axis=1)
    
    return out

In [3]:
folder_name = '../../../Data/Cindy/Preprocessed/single_filtered_01_15Hz'
data_train1_speech = loadmat(os.path.join(folder_name,'cindy_single2_jane_eyre_05_part1.mat'))
data_train2_speech = loadmat(os.path.join(folder_name,'cindy_single2_jane_eyre_05_part2.mat'))
data_train1_music = loadmat(os.path.join(folder_name,'cindy_single2_piano_4_1_22050Hz.mat'))
data_train2_music = loadmat(os.path.join(folder_name,'cindy_single2_piano_4_2_22050Hz.mat'))


# folder_name = '../../../Data/Samet/Preprocessed/preprocessed_single_01_15Hz'
# data_train1_speech = loadmat(os.path.join(folder_name,'single1_jane_eyre_05_part1.mat'))
# data_train2_speech = loadmat(os.path.join(folder_name,'single3_jane_eyre_05_part2.mat'))
# data_train1_music = loadmat(os.path.join(folder_name,'single2_piano_4_1_22050Hz.mat'))
# data_train2_music = loadmat(os.path.join(folder_name,'single4_piano_4_2_22050Hz.mat'))

fs_eeg = 128  # Sampling rate in Hz
fs_audio = 22050

lags_neuro = [-40, 10]
lags_stim = [-10, 10]

In [4]:
# Load long audio at 1000 Hz
long_audio, long_sr = librosa.load(f'../../../Stimuli/Cindy/piano_4.wav', sr=None)

# Load short audio at its native rate (22050 Hz)
short_audio = np.squeeze(data_train1_music['stimuli'])
short_sr = 22050

# Resample short audio to match long audio's sampling rate
short_audio_resampled = librosa.resample(short_audio, orig_sr=short_sr, target_sr=long_sr)

# Normalize both signals
long_audio = (long_audio - np.mean(long_audio)) / np.std(long_audio)
short_audio_resampled = (short_audio_resampled - np.mean(short_audio_resampled)) / np.std(short_audio_resampled)

# Cross-correlation to find best match
correlation = correlate(long_audio, short_audio_resampled, mode='valid')
best_match_index1 = np.argmax(correlation)
end_index1 = best_match_index1 + len(short_audio_resampled)

print(f"Best match found at index range: {best_match_index1} to {end_index1} (Corr: {np.max(correlation)})")

# Load short audio at its native rate (22050 Hz)
short_audio = np.squeeze(data_train2_music['stimuli'])
short_sr = 22050

# Resample short audio to match long audio's sampling rate
short_audio_resampled = librosa.resample(short_audio, orig_sr=short_sr, target_sr=long_sr)

# Normalize both signals
long_audio = (long_audio - np.mean(long_audio)) / np.std(long_audio)
short_audio_resampled = (short_audio_resampled - np.mean(short_audio_resampled)) / np.std(short_audio_resampled)

# Cross-correlation to find best match
correlation = correlate(long_audio, short_audio_resampled, mode='valid')
best_match_index2 = np.argmax(correlation)
end_index2 = best_match_index2 + len(short_audio_resampled)

print(f"Best match found at index range: {best_match_index2} to {end_index2} (Corr: {np.max(correlation)})")

Best match found at index range: 0 to 28951030 (Corr: 34864052.0)
Best match found at index range: 84147472 to 108461328 (Corr: 933777.375)


In [5]:
# Example: EEG shape = [n_channels, n_samples]
#          Stimulus shape = [1, n_samples]

# Load your data (replace with your real data)
eeg1 = data_train1_music['eeg_data'].T
# stim1 = data_train1_speech['envelope'].T
# eeg1 = eeg1[:stim1.shape[0],:]
# stim1 = stim1[:eeg1.shape[0],:]
eeg1 = zscore(eeg1, axis=0)


eeg2 = data_train2_music['eeg_data'].T[:-1,:]
# stim2 = data_train2_speech['envelope'].T
# eeg2 = eeg2[:stim2.shape[0],:]
#stim2 = stim2[:eeg2.shape[0],:]
eeg2 = zscore(eeg2, axis=0)

surprisal_feature = pd.read_csv('../../../Stimuli/Cindy/Surprisal/piano_4.csv')
surprisal_feature = np.array(surprisal_feature['surprise'].to_list())


# stim1 = np.expand_dims(surprisal_feature[0:eeg1.shape[0]],axis=1)
# stim2 = np.expand_dims(surprisal_feature[eeg1.shape[0]:eeg1.shape[0]+eeg2.shape[0]],axis=1)
stim1 = np.expand_dims(surprisal_feature[round((best_match_index1/long_sr*128)):round((end_index1/long_sr*128))],axis=1)
# stim2 = np.expand_dims(surprisal_feature[round((best_match_index2/long_sr*128)):round((end_index2/long_sr*128))],axis=1)
stim2 = np.expand_dims(surprisal_feature[round((best_match_index2/long_sr*128)):round((best_match_index2/long_sr*128))+eeg2.shape[0]],axis=1)

# stim1 = np.squeeze(data_train1_music['stimuli'])
# stim1 = np.abs(hilbert(stim1))
# duration_sec = len(stim1) / fs_audio
# n_target_samples = int(duration_sec * fs_eeg)
# stim1 = np.expand_dims(resample(stim1, n_target_samples),axis=1)
# stim1 = zscore(stim1, axis=0)

# stim2 = np.squeeze(data_train2_music['stimuli'])
# stim2 = np.abs(hilbert(stim2))
# duration_sec = len(stim2) / fs_audio
# n_target_samples = int(duration_sec * fs_eeg)
# stim2 = np.expand_dims(resample(stim2, n_target_samples),axis=1)
# stim2 = zscore(stim2, axis=0)

In [6]:
if eeg1.shape[0] > stim1.shape[0]:
    eeg1 = eeg1[:stim1.shape[0],:]

if eeg2.shape[0] > stim2.shape[0]:
    eeg2 = eeg2[:stim2.shape[0],:]

In [7]:
eeg = np.concatenate((eeg1,eeg2),axis=0)
stim = np.concatenate((stim1,stim2),axis=0)

# eeg = eeg2
# stim = stim2

In [8]:
print(eeg.shape)
print(stim.shape)

(154404, 31)
(154404, 1)


In [9]:
sample_len = eeg.shape[0]

train_corrs = []
test_corrs = []

k_cv = 10
for i in range(k_cv):
    print(f'Split {i+1}')
    
    eeg_train = np.concatenate((eeg[:i*(round(sample_len/k_cv)),:],eeg[(i+1)*(round(sample_len/k_cv)):,:]),axis=0)
    stim_train = np.concatenate((stim[:i*(round(sample_len/k_cv)),:],stim[(i+1)*(round(sample_len/k_cv)):,:]),axis=0)
    
    eeg_test = eeg[i*(round(sample_len/k_cv)):(i+1)*(round(sample_len/k_cv)),:]
    stim_test = stim[i*(round(sample_len/k_cv)):(i+1)*(round(sample_len/k_cv)),:]

    eeg_train = lag_generator_new(eeg_train,lags_neuro)
    eeg_test = lag_generator_new(eeg_test,lags_neuro)
    stim_train = lag_generator_new(stim_train,lags_stim)
    stim_test = lag_generator_new(stim_test,lags_stim)

    cca_att = CCA(n_components=1)
    cca_fit = cca_att.fit(eeg_train, stim_train)

    X_c, Y_c = cca_fit.transform(eeg_train, stim_train)
    r_fwd = pearsonr(np.squeeze(X_c.flatten()), np.squeeze(Y_c.flatten())).statistic
    print(f"Train: {r_fwd.round(3)}")

    train_corrs.append(r_fwd)

    X_c, Y_c = cca_fit.transform(eeg_test, stim_test)
    r_fwd = pearsonr(np.squeeze(X_c.flatten()), np.squeeze(Y_c.flatten())).statistic
    print(f"Test: {r_fwd.round(3)}")

    test_corrs.append(r_fwd)

print(f'Average Training Correlation: {np.mean(train_corrs)}')
print(f'Average Test Correlation: {np.mean(test_corrs)}')

Split 1
Train: 0.227
Test: 0.166
Split 2
Train: 0.234
Test: 0.137
Split 3
Train: 0.232
Test: 0.128
Split 4
Train: 0.229
Test: 0.095
Split 5
Train: 0.233
Test: 0.149
Split 6
Train: 0.243
Test: 0.031
Split 7
Train: 0.242
Test: 0.063
Split 8
Train: 0.243
Test: 0.053
Split 9
Train: 0.24
Test: 0.086
Split 10
Train: 0.244
Test: 0.02
Average Training Correlation: 0.23685525905058458
Average Test Correlation: 0.09288106768013096


In [10]:
print(f'Average Test Correlation: {np.mean(test_corrs)}')
print(f'Std Test Correlation: {np.std(test_corrs)}')

Average Test Correlation: 0.09288106768013096
Std Test Correlation: 0.04836690864628974
