In [104]:
import mne
import numpy as np
import pandas as pd
from mne.preprocessing.nirs import beer_lambert_law
from mne.preprocessing.nirs import optical_density
from sklearn.preprocessing import MinMaxScaler
import sched, time, socket, struct

In [97]:
snirf_path = './task/2023-06-08_004.snirf'
sampling_freq = 0.5
raw_intensity = mne.io.read_raw_snirf(snirf_path, verbose=True, preload=False)

# resample to one Hz
raw_intensity.load_data().resample(sampling_freq, npad="auto")

Loading /Users/nolanbrady/Desktop/Tactile_Brain/replay/task/2023-06-08_004.snirf
Reading 0 ... 4271  =      0.000 ...   419.839 secs...


0,1
Measurement date,"June 08, 2023 14:18:38 GMT"
Experimenter,Unknown
Digitized points,300 points
Good channels,28 fNIRS (CW amplitude)
Bad channels,
EOG channels,Not available
ECG channels,Not available
Sampling frequency,1.00 Hz
Highpass,0.00 Hz
Lowpass,0.50 Hz


In [98]:
# Define the motor unit to channel mapping
mu_0 = ['S5_D3 hbo', 'S5_D6 hbo']
mu_1 = ['S8_D6 hbo']
mu_2 = ['S8_D6 hbo']
mu_3 = ['S6_D5 hbo', 'S7_D5 hbo', 'S6_D7 hbo', 'S7_D4 hbo']
mu_4 = ['S6_D5 hbo', 'S7_D5 hbo', 'S6_D7 hbo', 'S7_D4 hbo']
mu_12 = ['S3_D1 hbo', 'S3_D2 hbo', 'S4_D2 hbo', 'S1_D1 hbo', 'S1_D2 hbo']
mu_13 = ['S3_D1 hbo', 'S3_D2 hbo', 'S4_D2 hbo', 'S1_D1 hbo', 'S1_D2 hbo']
mu_14 = ['S2_D3 hbo']
mu_15 = ['S5_D3 hbo', 'S2_D3 hbo']


In [99]:
raw_od = optical_density(raw_intensity)
raw_haemo = beer_lambert_law(raw_od, ppf=0.1)
ch_names = raw_haemo.info['ch_names']
# ch_names = ['0'] + ch_names

# Converts the snirf data into a CSV
data = pd.DataFrame(raw_haemo.get_data())

#Normalize the data
np_data = data.to_numpy()
scaler = MinMaxScaler(feature_range=(8, 40))
norm_data = scaler.fit_transform(np_data)
df = pd.DataFrame(norm_data)

# Add the channel names
df.insert(0, 'ch_names', ch_names)
df.to_csv('hemo_data.csv')


# Map channels to motor units

In [100]:

# Motor unit 0
mu_0_mask = df['ch_names'].isin(mu_0)
mu_0_df = df[mu_0_mask]

# Motor unit 1
mu_1_mask = df['ch_names'].isin(mu_1)
mu_1_df = df[mu_1_mask]

# Motor Unit 2
mu_2_mask = df['ch_names'].isin(mu_2)
mu_2_df = df[mu_2_mask]

# Motor Unit 3
mu_3_mask = df['ch_names'].isin(mu_3)
mu_3_df = df[mu_3_mask]

# Motor Unit 4
mu_4_mask = df['ch_names'].isin(mu_4)
mu_4_df = df[mu_4_mask]

# Motor Unit 12
mu_12_mask = df['ch_names'].isin(mu_12)
mu_12_df = df[mu_12_mask]

# Motor Unit 13
mu_13_mask = df['ch_names'].isin(mu_13)
mu_13_df = df[mu_13_mask]

# Motor Unit 14
mu_14_mask = df['ch_names'].isin(mu_14)
mu_14_df = df[mu_14_mask]

# Motor Unit 15
mu_15_mask = df['ch_names'].isin(mu_15)
mu_15_df = df[mu_15_mask]

print(mu_14_df)

    ch_names          0          1          2          3          4  \
2  S2_D3 hbo  27.622909  18.961538  11.129621  14.829219  16.858263   

           5         6          7          8  ...        410        411  \
2  15.155058  15.88577  14.925876  14.954636  ...  35.749459  36.351192   

         412        413        414        415        416        417  \
2  35.976347  25.123485  28.585297  36.731698  35.153398  34.317768   

         418        419  
2  31.985443  31.676067  

[1 rows x 421 columns]


In [106]:
UDP_IP = "192.168.4.1"
UDP_PORT = 80


# Run through the dataframe and ouput to motor units
for id, column in enumerate(df):
    print('tick')
    mu_0_mean = mu_0_df[id].mean()
    mu_1_mean = mu_1_df[id].mean()
    mu_2_mean = mu_2_df[id].mean()
    mu_3_mean = mu_3_df[id].mean()
    mu_4_mean = mu_4_df[id].mean()
    mu_12_mean = mu_12_df[id].mean()
    mu_13_mean = mu_13_df[id].mean()
    mu_14_mean = mu_14_df[id].mean()
    mu_15_mean = mu_15_df[id].mean()

    packet = np.array([mu_0_mean, mu_1_mean, mu_2_mean, mu_3_mean, mu_4_mean, 8, 8, 8, 8, 8, 8, 8, mu_12_mean, mu_13_mean, mu_14_mean, mu_15_mean])
    # MESSAGE = packet.tobytes()
    MESSAGE = packet
    for intensity in packet:

        # Convert the float64 value to bytes
        MESSAGE = struct.pack('d', intensity)

        # Send the packet to the Arduino
        # NOTE: Make sure you are connected to the "DrivingVibe-Board2 Wifi
        # Wifi password: 55555555
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        sock.sendto(MESSAGE, (UDP_IP, UDP_PORT))

    time.sleep(sampling_freq)

tick
tick
tick
tick
tick
tick
tick
tick
tick
tick
tick
tick


KeyboardInterrupt: 