In [3]:
from obci_readmanager.signal_processing.read_manager import ReadManager
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy import signal as ss

In [4]:
def download_signal(bin_file_path=None, xml_file_path=None, tag_file_path=None, csv_file_path=None):
    if bin_file_path is None:
        raise ValueError("'bin_file_path' is required.")

    if xml_file_path is None:
        raise ValueError("'xml_file_path' is required.")

    eeg = dict()

    mgr = ReadManager(xml_file_path, bin_file_path, tag_file_path if tag_file_path else None)

    # Retrieve the sampling frequency
    eeg['sampling'] = float(mgr.get_param("sampling_frequency"))

    # Retrieve the names of the channels
    eeg['channels_names'] = mgr.get_param("channels_names")

    # Retrieve EEG data and process it
    eeg['data'] = mgr.get_samples()
    eeg['data'] = eeg['data'] * 0.0715

    # Retrieve tags if the tag file is provided
    if tag_file_path:
        eeg['tags'] = mgr.get_tags()

    # Handle CSV file if provided
    if csv_file_path:
        try:
            eeg['data_csv'] = pd.read_csv(csv_file_path)
        except FileNotFoundError:
            print(f"Warning: CSV file not found at {csv_file_path}.")
            eeg['data_csv'] = None
    else:
        eeg['data_csv'] = None

    return eeg

In [5]:
def filter_signal(syg, sampling, lowpass_params=None, highpass_params=None, notch_params=None, passband_params=None, useMuPassBand = False):
    # Default filter parameters
    default_lowpass = {'N': 2, 'Wn': 45, 'btype': 'lowpass', 'ftype': 'butter'}
    default_highpass = {'N': 2, 'Wn': 3, 'btype': 'highpass', 'ftype': 'butter'}
    default_notch = {'w0': 50, 'Q': 30}
    default_Mu_passband = {'N': 4, 'Wn': (8, 12), 'btype': 'bandpass', 'ftype': 'butter'}


    # Merge user-provided parameters with defaults
    lowpass_params = {**default_lowpass, **(lowpass_params or {})}
    highpass_params = {**default_highpass, **(highpass_params or {})}
    notch_params = {**default_notch, **(notch_params or {})}
    passband_params = {**default_Mu_passband, **(passband_params or {})}

    # Design lowpass filter
    sos_low = ss.iirfilter(
        N=lowpass_params['N'],
        Wn=lowpass_params['Wn'],
        btype=lowpass_params['btype'],
        ftype=lowpass_params['ftype'],
        fs=sampling,
        output='sos'
    )

    # Design highpass filter
    sos_high = ss.iirfilter(
        N=highpass_params['N'],
        Wn=highpass_params['Wn'],
        btype=highpass_params['btype'],
        ftype=highpass_params['ftype'],
        fs=sampling,
        output='sos'
    )

    # Design notch filter
    b_notch, a_notch = ss.iirnotch(
        w0=notch_params['w0'],
        Q=notch_params['Q'],
        fs=sampling
    )

    # Apply the filters sequentially
    syg = ss.sosfiltfilt(sos_low, syg, axis=-1)
    syg = ss.sosfiltfilt(sos_high, syg, axis=-1)
    syg = ss.filtfilt(b_notch, a_notch, syg, axis=-1)


    if useMuPassBand:
        # Design stopband filter (alpha band 8-12 Hz)
        sos_passband = ss.iirfilter(
        N=passband_params['N'],
        Wn=passband_params['Wn'],
        btype=passband_params['btype'],
        ftype=passband_params['ftype'],
        fs=sampling,
        output='sos'
        )
        # Apply the stopband filter
        syg = ss.sosfiltfilt(sos_passband, syg, axis=-1)   # Alpha stopband filter
  
    return syg

In [39]:
def apply_montage(syg, channels_names, montage_type='average', reference_channel=None):

    # Identify 'M1' and 'M2'
    unwanted_labels = {'M1', 'M2'}
    m1_m2_indices = [i for i, label in enumerate(channels_names) if label in unwanted_labels]

    
    if montage_type == 'common_average':
        # Remove 'M1' and 'M2' from channels
        indices_to_keep = [i for i, label in enumerate(channels_names) if label not in unwanted_labels]
        channels_names = [channels_names[i] for i in indices_to_keep]
        syg = syg[indices_to_keep]
    
        if len(m1_m2_indices) != 2:
            raise ValueError("Both 'M1' and 'M2' must be present in channels_names for average montage.")
            
            # Compute the mean
            average_reference = syg.mean(axis=0)
    
            # Apply montage
            syg = syg - average_reference
        
    elif montage_type == 'linked_ears':
        # Extract signals for 'M1' and 'M2'
        m1_signal = syg[m1_m2_indices[0], :]
        m2_signal = syg[m1_m2_indices[1], :]

        # Compute the mean of M1 and M2
        average_reference = (m1_signal + m2_signal) / 2

        # Apply montage
        syg = syg - average_reference
        
        # Remove 'M1' and 'M2' from channels
        indices_to_keep = [i for i, label in enumerate(channels_names) if label not in unwanted_labels]
        channels_names = [channels_names[i] for i in indices_to_keep]
        syg = syg[indices_to_keep]

    elif montage_type == 'channel':
        if reference_channel is None:
            raise ValueError("`reference_channel` must be specified for montage_type='channel'.")

        # Find the reference channel index
        if reference_channel not in channels_names:
            raise ValueError(f"Reference channel '{reference_channel}' not found in channels_names.")

        ref_index = channels_names.index(reference_channel)
        ref_signal = syg[ref_index, :]

        # Apply montage
        syg = syg - ref_signal

        # Identify 'M1' and 'M2'
        unwanted_labels = {'M1', 'M2'}
        m1_m2_indices = [i for i, label in enumerate(channels_names) if label in unwanted_labels]
        
        # Remove 'M1' and 'M2' from channels
        indices_to_keep = [i for i, label in enumerate(channels_names) if label not in unwanted_labels]
        channels_names = [channels_names[i] for i in indices_to_keep]
        syg = syg[indices_to_keep]

    else:
        raise ValueError("Invalid montage_type. Choose 'average' or 'channel'.")    

    return syg, channels_names

In [41]:
def cut_signal(syg, tags, sampling, amplitude_limit=500):
  # Initialize parameters
  dlugosc = int((2 + 6) * sampling)
  lewa_list = []
  prawa_list = []
  lewa, prawa = ('lewa', 'prawa')

  i = 0
  for tag in tags:
      # Determine if the tag is frequent or rare based on the timestamp
      t0 = int(sampling * (tag['start_timestamp'] - 2))

      # Slice the signal segment
      segment = syg[:, t0:t0 + dlugosc]
      if np.max(segment) > 100:
          print(f"ajj {i}")
          i += 1
 
      if tag['desc']['strona'] == lewa:
          lewa_list.append(segment)
      elif tag['desc']['strona'] == prawa:
          prawa_list.append(segment)

  lewa_array = np.array(lewa_list)

  prawa_array = np.array(prawa_list)

  return lewa_array, prawa_array

In [43]:
name = "mati"
signal_type = "ruch"
xml_file = f"{name}/{name}_{signal_type}.obci.xml"
bin_file = f"{name}/{name}_{signal_type}.obci.raw"
csv_file_path = f"{name}/{name}_{signal_type}.csv"
tag_file_path = f"{name}/{name}_{signal_type}.obci.tag"

EEG_mati = download_signal(bin_file, xml_file, tag_file_path, csv_file_path)
EEG_mati['data'], EEG_mati['channels_names'] = apply_montage(EEG_mati['data'], EEG_mati['channels_names'], 'linked_ears')
EEG_mati['data'] = filter_signal(EEG_mati['data'], EEG_mati['sampling'])

2025-04-09 11:06:14,311 - data_source - INFO - All data set requested for the first time. Start reading all data from the file...


In [45]:
#EEG_mati

In [47]:
EEG_mati['lewa'], EEG_mati['prawa'] = cut_signal(EEG_mati['data'], EEG_mati['tags'], EEG_mati['sampling'])

ajj 0


In [49]:
#EEG_mati['lewa']

In [51]:
EEG_mati['prawa'].shape

(30, 19, 2048)

In [53]:
merged = np.stack((EEG_mati['lewa'], EEG_mati['prawa']), axis=0)

In [55]:
np.save(f"{name}_{signal_type}_dane.npy", merged)

In [57]:
merged_loaded = np.load(f"{name}_{signal_type}_dane.npy")

In [59]:
merged_loaded.shape

(2, 30, 19, 2048)

Zapis danych prszefiltrowanych dodatkowo passBandem na Mu

In [62]:
name = "mati"
signal_type = "wyobrazenie"
xml_file = f"{name}/{name}_{signal_type}.obci.xml"
bin_file = f"{name}/{name}_{signal_type}.obci.raw"
csv_file_path = f"{name}/{name}_{signal_type}.csv"
tag_file_path = f"{name}/{name}_{signal_type}.obci.tag"

EEG_mati = download_signal(bin_file, xml_file, tag_file_path, csv_file_path)
EEG_mati['data'], EEG_mati['channels_names'] = apply_montage(EEG_mati['data'], EEG_mati['channels_names'], 'linked_ears')
EEG_mati['data'] = filter_signal(EEG_mati['data'], EEG_mati['sampling'], useMuPassBand = True)

2025-04-09 11:06:39,959 - data_source - INFO - All data set requested for the first time. Start reading all data from the file...


In [63]:
EEG_mati['lewa'], EEG_mati['prawa'] = cut_signal(EEG_mati['data'], EEG_mati['tags'], EEG_mati['sampling'])

In [64]:
EEG_mati['prawa'].shape

(30, 19, 2048)

In [65]:
merged = np.stack((EEG_mati['lewa'], EEG_mati['prawa']), axis=0)

In [70]:
np.save(f"{name}_{signal_type}_dane_passbandMU.npy", merged)