In [4]:
import mne 
import numpy as np
from scipy.signal import detrend
#from mne.preprocessing import find_bad_channels_lof
from mne.preprocessing import ICA

mne.viz.set_browser_backend('qt')

flip0 = mne.io.read_raw('aaaaafmq_s002_t001_raw.fif',   preload=False, verbose=None)
flip0.plot(block=True)
print (flip0.ch_names)
print (flip0.load_bad_channels())
#bads, _ = find_bad_channels_lof(flip0, return_scores=True)
#print("Bad channels detected by Maxwell filter:", bads)
#ica = ICA(n_components=20, random_state=97, max_iter=800)
#ica.fit(flip0)

# Обнаружение артефактов
#ica.detect_artifacts(raw)
#bads = ica.exclude
#print("Bad channels detected by ICA:", bads)
#print (flip0.ch_names)

data = flip0.get_data()
means = np.mean(data, axis=1)
stds = np.std(data, axis=1)

# Определение порогов для выявления аномальных каналов
mean_threshold = np.mean(means) + 3 * np.std(means)
std_threshold = np.mean(stds) + 3 * np.std(stds)

# Выявление аномальных каналов
bad_channels_mean = np.where(np.abs(means) > mean_threshold)[0]
bad_channels_std = np.where(stds > std_threshold)[0]

bad_channels = set(bad_channels_mean).union(bad_channels_std)
bad_channel_names = [flip0.ch_names[idx] for idx in bad_channels]
print("Bad channels based on mean and std:", bad_channel_names)

#flip1 = mne.io.read_raw(r'..\GPT_Dataset\shapka\aaaaaaac_s005_t000_raw.fif',   preload=False, verbose=None)
#flip1.plot(block=True)

Opening raw data file aaaaafmq_s002_t001_raw.fif...
Isotrak not found
    Range : 0 ... 136999 =      0.000 ...   547.996 secs
Ready.
Channels marked as bad:
none
['C3', 'C4', 'CZ', 'F3', 'F4', 'F7', 'F8', 'FP1', 'FP2', 'FZ', 'O1', 'O2', 'P3', 'P4', 'PZ', 'T3', 'T4', 'T5', 'T6']
No channels updated. Bads are: []
None
Bad channels based on mean and std: ['C3', 'C4', 'CZ', 'F3', 'F4', 'F7', 'F8', 'FP2', 'FZ', 'O1', 'O2', 'P3', 'P4', 'PZ', 'T3', 'T4', 'T5', 'T6']


In [2]:
import glob
import logging
import os
#from joblib import Parallel, delayed
from pathlib import Path
import mne
import numpy as np
from tqdm import tqdm
import pandas as pd

REQUIRED_CHANNELS = (
    'C3', 'C4', 'CZ', 'F3', 'F4', 'F7', 'F8', 'FP1', 'FP2', 'FZ', 'O1', 'O2', 'P3', 'P4', 'PZ', 'T3', 'T4', 'T5', 'T6'
)
def channels_available(raw, req_channels):
    """
    Selects only datasets which channels match one of the allowed sequences provided in ch_options

    Parameters
    ----------
    raw: mne.io.Raw
        An instance of Raw corresponding to a single EEG file
    req_channels: list
        A list of required channels for a file
    """
    # these are the channels of the recoding
    setb = set(raw.ch_names)
    # these are the channels we are looking for
    seta = set(req_channels)
    # if recording contains all channels we are looking for, include it
    if seta.issubset(setb):
        return True
    else:
        return False

def preprocess_single_raw(file_path, raw, verbose,
                          resampling_parameters, lowpass, logger):
    if raw is None:
        return None

    raw = raw.pick(picks=list(REQUIRED_CHANNELS), verbose=verbose)
    if resampling_parameters['sfreq'] != raw.info['sfreq']:
        if logger is not None:
            logger.info(
                f'Resampling {file_path} to {resampling_parameters["sfreq"]} Hz as desired '
                f'{resampling_parameters["sfreq"]} Hz != {raw.info["sfreq"]} Hz in a file'
            )
        raw = raw.resample(verbose=verbose, **resampling_parameters)

    if lowpass is not None:
        raw = raw.filter( **lowpass, verbose=verbose)

    #data = raw.get_data(picks=REQUIRED_CHANNELS, units='uV', verbose=verbose)
    #data = data.astype(np.float32)

    return raw

        
def tuh_channels_available(raw, ch_mapping):
    ref = 'ar' if raw.ch_names[0].endswith('-REF') else 'le'
    return channels_available(raw=raw, req_channels=list(ch_mapping[ref].keys()))
    
def rename_tuh_channels(raw, ch_mapping):
    reference = raw.ch_names[0].split('-')[-1].lower()
    assert reference in ['le', 'ref'], 'unexpected referencing'
    reference = 'le' if reference == 'le' else 'ar'
    raw.rename_channels(ch_mapping[reference])
    
def load_raw(edf_path, verbose):
    channel_mappings = {
        'ar': {
            'EEG C3-REF': 'C3', 'EEG C4-REF': 'C4', 'EEG CZ-REF': 'CZ',
            'EEG F3-REF': 'F3', 'EEG F4-REF': 'F4', 'EEG F7-REF': 'F7', 'EEG F8-REF': 'F8', 'EEG FP1-REF': 'FP1',
            'EEG FP2-REF': 'FP2', 'EEG FZ-REF': 'FZ', 'EEG O1-REF': 'O1', 'EEG O2-REF': 'O2', 'EEG P3-REF': 'P3',
            'EEG P4-REF': 'P4', 'EEG PZ-REF': 'PZ', 'EEG T3-REF': 'T3', 'EEG T4-REF': 'T4', 'EEG T5-REF': 'T5',
            'EEG T6-REF': 'T6'
        },
        'le': {
            'EEG C3-LE': 'C3', 'EEG C4-LE': 'C4', 'EEG CZ-LE': 'CZ',
            'EEG F3-LE': 'F3', 'EEG F4-LE': 'F4', 'EEG F7-LE': 'F7', 'EEG F8-LE': 'F8', 'EEG FP1-LE': 'FP1',
            'EEG FP2-LE': 'FP2', 'EEG FZ-LE': 'FZ', 'EEG O1-LE': 'O1', 'EEG O2-LE': 'O2', 'EEG P3-LE': 'P3',
            'EEG P4-LE': 'P4', 'EEG PZ-LE': 'PZ', 'EEG T3-LE': 'T3', 'EEG T4-LE': 'T4', 'EEG T5-LE': 'T5',
            'EEG T6-LE': 'T6'
        }
    }

    raw = mne.io.read_raw_edf(edf_path, preload=True, verbose=verbose)

    if tuh_channels_available(raw=raw, ch_mapping=channel_mappings):
        rename_tuh_channels(raw=raw, ch_mapping=channel_mappings)
        return raw
    else:
        return None

def get_numpy_tuh_path(edf_path, np_format):
    #edf_path = os.path.normpath(edf_path)
    path_parts = edf_path.split(os.sep)
    path_parts[-1] = path_parts[-1][:-len('.edf')] +'_raw' + np_format
    np_path = os.path.join(*path_parts)
    return np_path

def preprocess_single_tuh_file(edf_path, verbose=False, resampling_parameters=None, lowpass=None, logger=None, np_format='.fif'):
    try:
        raw = load_raw(edf_path=edf_path, verbose=verbose)
        data = preprocess_single_raw(
            file_path=edf_path, raw=raw, verbose=verbose, resampling_parameters=resampling_parameters,
            lowpass=lowpass, logger=logger
        )
        if data is not None:
            home_path = Path(get_numpy_tuh_path(edf_path=edf_path, np_format=np_format))
            home_path.parent.mkdir(parents=True, exist_ok=True)
            if np_format == '.fif':
                data.save(home_path, overwrite=True)
            else:
                raise NotImplementedError("Only '.fif' format is supported.")
            return 0
        else:
            logging.info(f'{edf_path} data is None. A file did not fit provided requirements (i.e. too short, '
                         f'wrong channels), skipped')
            return None
    except Exception:
        if logger:
            logger.exception(f'{edf_path} failed to process')
        else:
            logging.exception(f'{edf_path} failed to process')
        return None
    
def preprocess_files(df_path, verbose, np_format, 
                           resampling_parameters, lowpass, n_jobs, logger):
    df = pd.read_csv(df_path)
    file_paths = df['file_path'].tolist()
    if n_jobs == 1:
        results = [preprocess_single_tuh_file(
            edf_path=edf_path, verbose=verbose,
            resampling_parameters=resampling_parameters, lowpass=lowpass,
            np_format = np_format
        )
            for edf_path in tqdm(file_paths, total=len(file_paths),
                                 desc=f'Preprocessing EEG TUH Dataset (n_jobs={n_jobs})')]
    #else:
    #    results = Parallel(n_jobs)(delayed(
     #       preprocess_single_tuh_file)(
     #        edf_path=edf_path, verbose=verbose,
     #       resampling_parameters=resampling_parameters, lowpass=lowpass,
      #      saving_format = saving_format
      #  ) 
       #     for edf_path in tqdm(file_paths, total=len(file_paths),
        #                       desc=f'Preprocessing EEG TUH Dataset (n_jobs={n_jobs})'))
    return results

if __name__ == '__main__':
    logging.basicConfig(filename='eeg_tuh_preprocessing_logs.log',
                        filemode='a',
                        format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
                        datefmt='%H:%M:%S',
                        level=logging.INFO)
    logger = logging.getLogger(__name__)
    logger.info('Start preprocessing the EEG TUH Dataset')

    preprocess_files(
        #ds_root= '..',
        df_path = 'mozgi.csv',
        verbose='WARNING', np_format='.fif',
        resampling_parameters={
            'sfreq': 250,
        },
        
        lowpass = {
            'l_freq': None,
            'h_freq': 80,
            'method': 'iir',
            'iir_params': {
                'ftype': 'butter',
                'order': 3 
            }
        },
        n_jobs= 1,

        logger=logger
    )

    logger.info('Preprocessing done!')

Preprocessing EEG TUH Dataset (n_jobs=1): 100%|██████████████████████████████████████████| 6/6 [00:02<00:00,  2.89it/s]


In [None]:
ds_root= '..'
file_paths = list(glob.glob(os.path.join(ds_root, '**/*.edf'), recursive=True))
a= create_data_frame (file_paths)
six = preprocess_files (a)