In [16]:
#imports

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf

from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.keras.models import Model


import etc.helper as helper


In [17]:
#Load EEG data - preprocessed
import mne
import numpy as np
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split
%matplotlib qt


#Subject numbers and experiment
sub = "sub-28"
exp = "fixthemix"

fname = f"derivatives/eegprep/{sub}/{sub}_task-{exp}_eegprep.vhdr"

raw = mne.io.read_raw_brainvision(fname, preload=True)
events, event_dict = mne.events_from_annotations(raw)
#start of songs in sample numbers
song_starts = np.array(events)[events[:,2] == 10001][2:,0]
press_starts = []
press_starts = events[2:,0]

print(event_dict)
#set sample rate
sample_rate = 250
#raw.plot()

Extracting parameters from derivatives/eegprep/sub-28/sub-28_task-fixthemix_eegprep.vhdr...
Setting channel info structure...
Reading 0 ... 465922  =      0.000 ...  1863.688 secs...
Used Annotations descriptions: ['New Segment/', 'Stimulus/1', 'Stimulus/11', 'Stimulus/12', 'Stimulus/13', 'Stimulus/15', 'Stimulus/2', 'Stimulus/21', 'Stimulus/22', 'Stimulus/23', 'Stimulus/25', 'Stimulus/3', 'Stimulus/31', 'Stimulus/32', 'Stimulus/33', 'Stimulus/35', 'Stimulus/41', 'Stimulus/42', 'Stimulus/43', 'Stimulus/45', 'Time 0/']
{'New Segment/': 99999, 'Stimulus/1': 10001, 'Stimulus/11': 10002, 'Stimulus/12': 10003, 'Stimulus/13': 10004, 'Stimulus/15': 10005, 'Stimulus/2': 10006, 'Stimulus/21': 10007, 'Stimulus/22': 10008, 'Stimulus/23': 10009, 'Stimulus/25': 10010, 'Stimulus/3': 10011, 'Stimulus/31': 10012, 'Stimulus/32': 10013, 'Stimulus/33': 10014, 'Stimulus/35': 10015, 'Stimulus/41': 10016, 'Stimulus/42': 10017, 'Stimulus/43': 10018, 'Stimulus/45': 10019, 'Time 0/': 10020}


In [39]:
#Load FLAC Audio in
import pyflac
import scipy.io.wavfile as wav
import scipy.signal as sig
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()

aname = f"derivatives/audio/{sub}/{sub}_task-{exp}_aud.flac"
decoder = pyflac.FileDecoder(aname, "temp.wav")
aud_rate, audio = wav.read("temp.wav")
dtype = audio.dtype
print(aud_rate)
print(audio.shape)
audio_shape = audio.shape
print(raw.get_data().shape)

#eeg_length = raw.get_data().shape[1]*sample_rate

#Resample audio to EEG sample rate and get audio envelope
audio = sig.resample(audio, raw.get_data().shape[1])
audio = np.abs(sig.hilbert(audio.T))
audio = np.average(audio, axis=0)
print(audio)
print(np.max(audio), np.min(audio))

#normalize audio
audio_scaler = StandardScaler()
audio = audio_scaler.fit_transform(audio.reshape(-1,1)).reshape(1, -1)
print(audio)
print(np.max(audio), np.min(audio))
print(np.mean(audio), np.std(audio))

audio = np.atleast_2d(audio)

44100
(82188285, 2)
(31, 465923)
[3.89786261 2.16101743 3.89697985 ... 2.16272319 3.89874611 2.16186872]
37195.47015751086 2.014702518637015
[[-0.81162915 -0.81207714 -0.81162937 ... -0.8120767  -0.81162892
  -0.81207692]]
8.781329033616105 -0.8121148767531039
-1.7568251225967283e-17 1.0000000000000002


In [40]:
#SPLIT BY SEGMENT

#Split audio and eeg up into their corresponding songs
#note that the 1st elements are the whitespace before the first song starts
scaler = StandardScaler()

num_segments = 1000
seconds = 10

split_eeg = raw.get_data()

#split data into 300 segments
times = np.linspace(song_starts[1], raw.get_data().shape[1], num_segments, dtype = int)
#each segment has a bound of "seconds" seconds before and after the segment
split_audio, split_eeg = helper.split_events(audio, split_eeg, times, sample_rate, seconds)

fs = sample_rate

labels = [f'song{i}' for i in range(1, len(split_audio))]
labels_train, labels_test = train_test_split(labels, train_size=0.7, test_size=0.3, random_state=5)
labels_test, labels_val = train_test_split(labels_test, train_size=0.5, test_size=0.5, random_state=5)

#X and Y are dictionaries so that the ordering of the corresponding segments can
#be maintained
X = {}
Y = {}
for i in range(1,len(split_audio)):
    X[f'song{i}'] = split_audio[i][0]

    Y[f'song{i}'] = split_eeg[i]

size = Y['song3'].T.shape

X_test, X_train, X_val, Y_test, Y_train, Y_val = [],[],[],[],[],[]

for i in labels_train:
    if(Y[i].T.shape == size):
        Y_train.append(scaler.fit_transform(X[i].reshape(-1,1)).reshape(1,-1))
        X_train.append(scaler.fit_transform(Y[i].T))

for i in labels_val:
    if(Y[i].T.shape == size):
        Y_val.append(scaler.fit_transform(X[i].reshape(-1,1)).reshape(1,-1))
        X_val.append(scaler.fit_transform(Y[i].T))

for i in labels_test:
    if(Y[i].T.shape == size):
        Y_test.append(scaler.fit_transform(X[i].reshape(-1,1)).reshape(1,-1))
        X_test.append(scaler.fit_transform(Y[i].T))

X_train = np.array(X_train)
Y_train = np.array(Y_train)
X_test = np.array(X_test)
Y_test = np.array(Y_test)
X_val = np.array(X_val)
Y_val = np.array(Y_val)




In [44]:
#size of latent space dimensions 
latent = 10
print(X_train.shape)
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], X_train.shape[2], 1)
input_shape = X_train.shape[1:]
output_shape = Y_train.shape[1:]
print(input_shape)
print(output_shape)

print(len(X_train))
print(len(X_test))
print(len(X_val))

#norm_layer.adapt(data=np.array(X_train))


#Autoencoder model definition
class Autoencoder(Model):
    def __init__(self, latent_dim, input_shape, output_shape):
        super(Autoencoder, self).__init__()
        self.latent_dim = latent_dim
        self.input_shape = input_shape
        self.output_shape = output_shape
        self.encoder = tf.keras.Sequential([
            layers.Input(shape=self.input_shape),
            layers.Conv2D(16, (3,3), strides=2, padding='same', activation='relu'),
            layers.MaxPooling2D(),
            layers.Dropout(0.25),
            layers.Conv2D(8, (3,3), strides=2, padding='same', activation='relu'),\
            layers.MaxPooling2D(),
            layers.Dropout(0.25),
            layers.Conv2D(4, (3,3), strides=2, padding='same', activation='relu'),
            #layers.MaxPooling2D(),
            layers.Dropout(0.25),
            layers.Conv2D(2, (3,3), strides=2, padding='same', activation='relu'),
            #layers.MaxPooling2D(),
            layers.Dropout(0.25),
            layers.Flatten(),
            layers.Dense(self.latent_dim),
        ])

        self.decoder = tf.keras.Sequential([
            layers.Dense(50),
            layers.Dropout(0.25),
            layers.Reshape((25, 2)),
            layers.Conv1DTranspose(5, 3, strides=2, padding='same', activation='sigmoid'),
            layers.Dropout(0.25),
            layers.Conv1DTranspose(10, 3, strides=2, padding='same', activation='sigmoid'),
            layers.Dropout(0.25),
            layers.Conv1DTranspose(25, 3, strides=2, padding='same', activation='sigmoid'),
            layers.Dropout(0.25),
            layers.Flatten(),
            #layers.Dense(self.output_shape[1]),
            layers.Reshape(self.output_shape)
        ])

    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded
    
    def predict(self, data, labels):
        reconstructions = self(data).numpy()
        loss = losses.mse(reconstructions, labels)
        return reconstructions, loss

    
model = Autoencoder(latent, input_shape = input_shape, output_shape = output_shape)
model.encoder.summary()
model.compile(optimizer="Adam", loss=losses.MeanSquaredError())

history = model.fit(X_train, Y_train,
          epochs=1,
          validation_data=(X_val, Y_val),
          )

model.decoder.summary()


(694, 5000, 31, 1)
(5000, 31, 1)
(1, 5000)
694
148
147


[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 71ms/step - loss: 1.2525 - val_loss: 1.2187


In [43]:
latents = [10, 20, 30, 40, 50]
epochs = 100

test_ind = 0
test_losses = []

for latent in latents:
	model = Autoencoder(latent, input_shape = input_shape, output_shape = output_shape)
	model.compile(optimizer="Adam", loss=losses.MeanSquaredError())

	history = model.fit(X_train, Y_train,
			epochs=epochs,
			validation_data=(X_val, Y_val),
			)

	plt.figure()
	plt.title(f"Model Loss for Epochs = {epochs} and Latent Space = {latent}")
	plt.plot(history.history["loss"], label="Training Loss")
	plt.plot(history.history["val_loss"], label="Validation Loss")
	plt.legend()
	plt.show()

	X_test = np.array(X_test)
	Y_test = np.array(Y_test)
	model_Y = model(X_test[test_ind].reshape(1, X_test[0].shape[0], X_test[0].shape[1])).numpy().reshape(1,-1)
	true_Y = Y_test[test_ind]

	print(true_Y.shape)
	print(model_Y.shape)
	display_scale = StandardScaler().fit(true_Y)
	display_Y = display_scale.transform(model_Y)

	pred, loss = model.predict(X_test, Y_test)
	test_losses.append(np.average(loss, axis=0)[0])

	fig, axs = plt.subplots(3,1)
	axs[0].plot(display_Y[0])
	axs[0].set_title(f"Reconstructed Audio Envelope (Latent Space = {latent})")
	axs[1].plot(true_Y[0])
	axs[1].set_title("True Audio Envelope")
	axs[2].plot(-1*display_Y[0], color="Red", label="Reconstructed (Flipped)")
	axs[2].plot(true_Y[0], label="True", alpha=0.5, color="Green")
	axs[2].set_title("True Audio Envelope vs Reconstructed")
	fig.legend()
	plt.show()

plt.figure()
plt.title(f"Average Model Loss Over Test Data")
plt.plot(latents, test_losses)
plt.xlabel("Dimension of Latent Space")
plt.ylabel("Average MSE")
plt.show()


Epoch 1/100
[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 57ms/step - loss: 1.2517 - val_loss: 1.2181
Epoch 2/100
[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 45ms/step - loss: 1.2078 - val_loss: 1.1773
Epoch 3/100
[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 43ms/step - loss: 1.1676 - val_loss: 1.1380
Epoch 4/100
[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 51ms/step - loss: 1.1291 - val_loss: 1.1036
Epoch 5/100
[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 45ms/step - loss: 1.0968 - val_loss: 1.0771
Epoch 6/100
[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 45ms/step - loss: 1.0721 - val_loss: 1.0579
Epoch 7/100
[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 43ms/step - loss: 1.0544 - val_loss: 1.0443
Epoch 8/100
[1m22/22[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 45ms/step - loss: 1.0418 - val_loss: 1.0345
Epoch 9/100
[1m22/22[0m [32m━━━━━━━━━

In [31]:
X_test = np.array(X_test)
Y_test = np.array(Y_test)
model_Y = model(X_test[0].reshape(1, X_test[0].shape[0], X_test[0].shape[1])).numpy().reshape(1,-1)
true_Y = Y_test[0]

print(true_Y.shape)
print(model_Y.shape)
display_scale = StandardScaler().fit(true_Y)
display_Y = display_scale.transform(model_Y)

pred, loss = model.predict(X_test, Y_test)

fig, axs = plt.subplots(3,1)
axs[0].plot(display_Y[0])
axs[0].set_title("Reconstructed Audio Envelope")
axs[1].plot(true_Y[0])
axs[1].set_title("True Audio Envelope")
axs[2].plot(-1*display_Y[0], color="Red", label="Reconstructed (Flipped)")
axs[2].plot(true_Y[0], label="True", alpha=0.5, color="Green")
axs[2].set_title("True Audio Envelope vs Reconstructed")
fig.legend()
plt.show()

(1, 5000)
(1, 5000)


In [None]:
from scipy.fftpack import ihilbert

samp_rate, audio = wav.read("temp.wav")

mean = audio_scaler.mean_
std = audio_scaler.var_

#display_Y = [(display_Y[0]*std) + mean]

recon_audio = ihilbert(-1*display_Y[0])
plt.figure()
plt.plot(recon_audio)
plt.plot(ihilbert(true_Y[0]))
plt.show()

#recon_audio = recon_audio.astype(np.float32)
print(max(recon_audio))
wav.write("new.wav", 250, np.array([recon_audio, recon_audio]))


4.088771796934348


In [None]:
plt.figure()
plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.legend()
plt.show()

[0.58796926]
