In [None]:
!pip install mne

Collecting mne
  Using cached mne-1.8.0-py3-none-any.whl (7.4 MB)
Collecting pooch>=1.5
  Using cached pooch-1.8.2-py3-none-any.whl (64 kB)
Collecting scipy>=1.9
  Using cached scipy-1.14.1.tar.gz (58.6 MB)
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Installing backend dependencies ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25l-

In [None]:
!pip install PyWavelets
!pip install scipy

In [None]:
import mne
import numpy as np
import pandas as pd
import pywt
from pywt import wavedec
import scipy.stats as stats

In [None]:
raw = mne.io.read_raw_edf("1-2.edf", preload=True)
print(raw.info)

In [None]:
# Sélectionner les 5 premiers canaux EEG
#r=raw.pick_channels(raw.ch_names[:5])

In [None]:
raw.plot()

In [None]:
%%time
rf = raw.filter(0.1,30)

In [None]:
rf.plot()

In [None]:
print(rf.info)

In [None]:
selected_channel_name = rf.info['ch_names']
sample_frequency = rf.info['sfreq']
epoch_duration = 10

In [None]:
selected_channel_name

In [None]:
sample_frequency

In [None]:
epoch_samples = int(epoch_duration*sample_frequency)

In [None]:
epoch_samples

In [None]:
total_epochs = 307200// epoch_samples

In [None]:
total_epochs

In [None]:
epochs_matrix = np.zeros((total_epochs, 5, epoch_samples))  # 5 channels

In [None]:
# Segmenter les données en epochs de 10 secondes
events = mne.make_fixed_length_events(rf, duration=epoch_duration)

In [None]:
# Créer un objet Epochs
epochs = mne.Epochs(rf, events, tmin=0, tmax=epoch_duration, baseline=None, detrend=1)

In [None]:
# Obtenir les données des epochs sous forme de tableau numpy
epochs_data = epochs.get_data()

In [None]:
# Vérifier la forme de la matrice d'epochs
print("Forme de la matrice d'epochs:", epochs_data.shape)

In [None]:
zscores = stats.zscore(epochs_data)

In [None]:
zscores.shape

In [None]:
# Définition de la famille d'ondelettes à utiliser
wavelet = 'db4'

# Initialisation des tableaux pour stocker les caractéristiques
cD_Energy = np.zeros((59, 5))
cA_Energy = np.zeros((59, 5))
D_Entropy = np.zeros((59, 5))
A_Entropy = np.zeros((59, 5))
D_mean = np.zeros((59, 5))
A_mean = np.zeros((59, 5))
D_std = np.zeros((59, 5))
A_std = np.zeros((59, 5))

In [None]:
%%time
for i in range(59):
  for j in range(5):
    coeffs = pywt.wavedec(zscores[i, j, :], wavelet)  # Calcul de la DWT avec pywt
    cD_Energy[i,j] = np.mean([np.sum(np.square(coeffs[5])),np.sum(np.square(coeffs[4])),
                         np.sum(np.square(coeffs[3])),np.sum(np.square(coeffs[2])),
                         np.sum(np.square(coeffs[1]))])
    cA_Energy[i,j] = np.sum(np.square(coeffs[0]))
    D_Entropy[i,j] = np.mean([np.sum(np.square(coeffs[5]) * np.log(np.square(coeffs[5]))),
                         np.sum(np.square(coeffs[4]) * np.log(np.square(coeffs[4]))),
                         np.sum(np.square(coeffs[3]) * np.log(np.square(coeffs[3]))),
                         np.sum(np.square(coeffs[2]) * np.log(np.square(coeffs[2]))),
                         np.sum(np.square(coeffs[1]) * np.log(np.square(coeffs[1])))])
    A_Entropy[i,j] = np.sum(np.square(coeffs[0]) * np.log(np.square(coeffs[0])))
    D_mean[i,j] = np.mean([np.mean(coeffs[5]),np.mean(coeffs[4]),np.mean(coeffs[3]),np.mean(coeffs[2]),np.mean(coeffs[1])])
    A_mean[i,j] = np.mean(coeffs[0])
    D_std[i,j] = np.mean([np.std(coeffs[5]),np.std(coeffs[4]),np.std(coeffs[3]),np.std(coeffs[2]),np.std(coeffs[1])])
    A_std[i,j] = np.std(coeffs[0])

### FPGA implementation part

In [None]:
from pynq import Overlay
from pynq import allocate

#overlay = Overlay('yuv_filter_full.bit')
overlay = Overlay('design_test_2.bit')

# Display the names of the overlay IP blocks
print('IP blocks :', list(overlay.ip_dict.keys()))

#Define custom IPs and DMAs

axi_vhdl_wrapper_ip = overlay.axi_vhdl_wrapper_0

dma0 = overlay.axi_dma_0
dma0_send = overlay.axi_dma_0.sendchannel
dma0_recv = overlay.axi_dma_0.recvchannel

#Get register offset of my parameter
def get_register_offset(overlay, ip, parameter):
    return overlay.ip_dict[ip]['registers'][parameter]['address_offset']

#Get parameters's memory offset
#print('Parameters address offset :')
#addr = get_register_offset(overlay, 'axi_vhdl_wrapper_0', 'Y_scale')
#print('Y_Scale =', hex(addr))

#axi_vhdl_wrapper_ip.write(0x10, 128) 

#Start my IPs

CONTROL_REGISTER = 0x0
axi_vhdl_wrapper_ip.write(CONTROL_REGISTER, 0x81) # 0x81 will set bit 0

#Data transfer to DMAs
def dma_transfer(input_buffer, output_buffer):
    dma0_send.transfer(input_buffer)
    dma0_recv.transfer(output_buffer)
    dma0_send.wait()
    dma0_recv.wait()
        
#Do the process to filter the image
def process_data(in_data):
    
    #axi_vhdl_wrapper_ip.write(0x10, Y_scale)
    
    data_size = len(in_data)
    
    out_data = [0 for y in range(data_size)]
    
    input_buffer = allocate(shape=(data_size,), dtype=int)
    output_buffer = allocate(shape=(data_size,), dtype=int)

    for i in range (data_size):
        input_buffer[i] = in_data[i]
            
    dma_transfer(input_buffer, output_buffer)
    #dma_transfer(input_buffer, output_buffer)
    
    for i in range (data_size):
        out_data[i] = output_buffer[i] 
            
    return out_data


#Compute the duration between "start" and "end" to get the function execution time
import time
def get_duration(start, end):
    delta_t = round((end - start) * 1000, 1)
    print('Duration : {0} ms'.format(delta_t))
    return delta_t



%%time
for i in range(59):
  for j in range(5):
    coeff = process_data(zscores[i, j, :])
    #coeffs = pywt.wavedec(zscores[i, j, :], wavelet)  # Calcul de la DWT avec pywt
    cD_Energy[i,j] = np.mean([np.sum(np.square(coeffs[5])),np.sum(np.square(coeffs[4])),
                         np.sum(np.square(coeffs[3])),np.sum(np.square(coeffs[2])),
                         np.sum(np.square(coeffs[1]))])
    cA_Energy[i,j] = np.sum(np.square(coeffs[0]))
    D_Entropy[i,j] = np.mean([np.sum(np.square(coeffs[5]) * np.log(np.square(coeffs[5]))),
                         np.sum(np.square(coeffs[4]) * np.log(np.square(coeffs[4]))),
                         np.sum(np.square(coeffs[3]) * np.log(np.square(coeffs[3]))),
                         np.sum(np.square(coeffs[2]) * np.log(np.square(coeffs[2]))),
                         np.sum(np.square(coeffs[1]) * np.log(np.square(coeffs[1])))])
    A_Entropy[i,j] = np.sum(np.square(coeffs[0]) * np.log(np.square(coeffs[0])))
    D_mean[i,j] = np.mean([np.mean(coeffs[5]),np.mean(coeffs[4]),np.mean(coeffs[3]),np.mean(coeffs[2]),np.mean(coeffs[1])])
    A_mean[i,j] = np.mean(coeffs[0])
    D_std[i,j] = np.mean([np.std(coeffs[5]),np.std(coeffs[4]),np.std(coeffs[3]),np.std(coeffs[2]),np.std(coeffs[1])])
    A_std[i,j] = np.std(coeffs[0])

In [None]:
df = pd.DataFrame(cD_Energy)
df1 = pd.DataFrame(cA_Energy)
df2 = pd.DataFrame(D_Entropy)
df3 = pd.DataFrame(A_Entropy)
df4 = pd.DataFrame(D_mean)
df5 = pd.DataFrame(A_mean)
df6 = pd.DataFrame(D_std)
df7 = pd.DataFrame(A_std)

In [None]:
ddd = pd.concat([df,df1,df2,df3,df4,df5,df6,df7], axis = 1)

In [None]:
ddd