In [None]:
import numpy as np
import matplotlib.pyplot as plt
import librosa
from librosa import display
from scipy import interpolate
import glob
import mne
from eelbrain import *
import os
import pandas as pd

# configure(n_workers=False)

#### Get envelope

In [None]:
def get_pitch_envelope(audio_file, time_arr, pitch_arr, sr):

    stim, sr = librosa.load(audio_file, sr=sr)

    T = 1/sr  # period
    ts = np.arange(0, len(stim)/sr, T)

    spl = interpolate.UnivariateSpline(time_arr, pitch_arr)
    interp_pitch = spl(ts)

    e = np.pad(interp_pitch, pad_width=(100, (701 - (len(stim)+100))))
    e = e.astype('<f8')
    e = np.where(np.isfinite(e), e, 0)

    return e

#### Create dataset

In [None]:
epoch_files = sorted(glob.glob('./analysis/Revcor*-epo.fif'))
log_files = sorted(glob.glob('./log/trials_subj*.csv'))

es_dict = dict(zip(epoch_files, log_files))
del es_dict['./analysis\\Revcor0019-epo.fif']
del es_dict['./analysis\\Revcor0006-epo.fif']

In [None]:
tstep = 1. / 1000
n_times = 701
time = UTS(0, tstep, n_times)

sensor = Sensor.from_montage('easycap-M1')[:64]

rows = []

for k, v in es_dict.items():
    subj = mne.read_epochs(k)
    subj = subj.drop_channels('STI')

    df = pd.read_csv(v, encoding='latin')
    df = df[df['stim_id_marker'].isin(subj.selection)]  
    df = df[df['stim_type'] != 'standard'] 

    uniq_trials = np.unique([i for i in df['trial_number']])
    for j in uniq_trials:

        subject = int(k[17:21])

        eeg = NDVar(subj[str(j)].get_data()[0].T, (time, sensor), name='EEG', info={'unit': 'µV'})

        sound = df[df['trial_number'] == j]['sound_file'].iloc[0]
        df_epoch = df.loc[df['stim_id_marker'] == j]
        times_arr = df_epoch['t'].to_numpy()
        pitch_arr = df_epoch['rms'].to_numpy()

        e = get_pitch_envelope(sound, times_arr, pitch_arr, sr=1000)
        envelope = NDVar(e, (time,), name='envelope')
        
        rows.append([subject, eeg, envelope]) 

ds = Dataset.from_caselist(['subject', 'eeg', 'envelope'], rows)
ds['subject'].random = True
print(ds.summary())

#### Save dataset

In [None]:
ds.save()

#### Load dataset

In [None]:
ds = load.unpickle('./datasets/dataset_pitch_excl_6_19.pickle')

#### Compute TRF

In [None]:
tstep = 1. / 1000
n_times = 701
time = UTS(0, tstep, n_times)

sensor = Sensor.from_montage('easycap-M1')[:64]

In [None]:
fit = boosting('eeg', 'envelope', 0, 0.600, basis=0.050, ds=ds, delta=0.01, partitions=3, test=1)

# Plot TRF
p = plot.TopoButterfly(fit.h_scaled, w=6, h=2)
p.set_time(.200)

#### Save boosting result

In [None]:
# Save boosting result to predict eeg from envelope
env2eeg_fit = save.pickle(fit)

#### Load saved result

In [None]:
res = load.unpickle()

In [None]:
df = pd.read_csv('./log/trials_subj0006_211026_10.46.csv', encoding='latin')
# df = df[df['stim_id_marker'].isin(subj.selection)]  
df = df[df['stim_type'] != 'standard'] 

df_epoch = df.loc[df['stim_id_marker'] == 14]
times_arr = df_epoch['t'].to_numpy()
pitch_arr = df_epoch['rms'].to_numpy()

In [None]:
# Predict EEG data from amplitude envelope
x = NDVar(get_pitch_envelope('./sounds/subj6/julie_neutral.0736.pitch_gain.wav', times_arr, pitch_arr, sr=1000), (time,))
y = convolve(res.h_scaled, x)

plot.UTS(y, '.sensor')

In [None]:
reqd_epoch = mne.read_epochs('./analysis/Revcor0006-epo.fif')
reqd_epoch.drop_channels('STI')
reqd_eeg = NDVar(reqd_epoch['14'].get_data()[0], (sensor, time), name='EEG', info={'unit': 'µV'})

plot.TopoButterfly(y, t=0.3)
plot.TopoButterfly(reqd_eeg, t=0.3)

# plot.TopoButterfly([y, ds['eeg']], t=0.3)

#### Section for experimentation

In [None]:
# tstep = 1. / 1000
# n_times = 701
# time = UTS(0, tstep, n_times)

# sensor = Sensor.from_montage('easycap-M1')[:64]

In [None]:
# subj = mne.read_epochs('./analysis/Revcor0007-epo.fif')
# subj.drop_channels('STI')

# df = pd.read_csv('./log/trials_subj0007_211026_16.27.csv', encoding='latin')
# df = df[df['stim_id_marker'].isin(subj.selection)]  
# df = df[df['stim_type'] != 'standard'] 

# rows = []

# for i in range(len(subj)):
    
#     eeg = NDVar(subj[i].get_data()[0].T, (time, sensor), name='EEG', info={'unit': 'µV'})

#     sound = df['sound_file'].iloc[i]
#     envelope = NDVar(get_envelope(sound, sr=1000), (time,), name='envelope')

#     rows.append([eeg, envelope]) 

# ds = Dataset.from_caselist(['eeg', 'envelope'], rows)
# print(ds.summary())

In [None]:
# fit = boosting('envelope', 'eeg', 0, 0.600, basis=0.050, ds=ds, partitions=2)

In [None]:
# # Save boosting result to predict envelope from EEG
# eeg2env_fit = save.pickle(fit)

In [None]:
# actual_envelope = NDVar(get_envelope('./sounds/subj6/julie_neutral.0736.pitch_gain.wav', sr=1000), (time,))
# plot.UTS(actual_envelope)

In [None]:
# std_epoch = mne.read_epochs('./analysis/Revcor0006-epo.fif', verbose=False)['14']
# std_epoch.drop_channels('STI')

In [None]:
# # Predict amplitude envelope from EEG data
# x = NDVar(std_epoch.get_data()[0].T, (time, sensor))
# y = convolve(fit.h_scaled, x, ds=ds)
# print(x)
# print(y)
# plot.UTS(y)