In [1]:
from gzip import open as gzipOpen
import os
import matplotlib.pylab as plt
import numpy as np
import pandas as pd
import matplotlib
from tarfile import open as tarOpen

In [2]:
# Specify the path to your data repository
path_root = "../Data/"
print(os.listdir(path_root))

['.DS_Store', 'smni_eeg_data', 'smni_eeg_data.tar']


In [12]:
# Select the dataset which you want to extract from the previous list
name_dataTar = os.listdir(path_root)[1]

In [13]:
def extTargzFiles(name_dataTar, path_root = "../Data"):
    
    # Conditions on name_dataTar in case a mistake mas made
    if name_dataTar not in os.listdir(path_root):
        print("ERROR: Please choose a comprassed file located in your data folder")
        return
    if not name_dataTar.endswith(".tar"):
        if (name_dataTar + ".tar") in os.listdir(path_root):
            name_dataTar += ".tar"
        else:
            print("ERROR: The specified folder does not correspond to any compressed folder in the repo.\nPlease select another one.")
            return
    
    # Define path to tar.gz folder
    path_folderTar = path_root + "/" + name_dataTar
    path_folder    = path_folderTar[:-4]
    
    # Check if folder has not already been extracted
    # If so, move to next layer
    if name_dataTar[:-4] not in os.listdir(path_root):
        tar = tarOpen(path_folderTar, "r:")
        tar.extractall(path_root)
        tar.close()
        
    # Next layer: do the same check as above
    name_folders = os.listdir(path_folder)

    for folder in name_folders:
        if folder.endswith(".tar.gz") and (folder[:-4] not in name_folders):
            tar = tarOpen(path_folder + "/" + folder, "r:gz")
            tar.extractall(path_folder)
            tar.close()
            
    print("All remaining compressed folders have been uncompressed!")
        
    return

In [14]:
extTargzFiles(name_dataTar)

All remaining compressed folders have been uncompressed!


In [16]:
# Save all .gz files in a list
list_gzFiles = []

for root, _, files in os.walk(path_root, topdown=False):
    for name in files:
        if name.endswith(".gz") and not name.endswith("tar.gz"):
            list_gzFiles.append(os.path.join(root, name))
            
print(list_gzFiles[:10])

['../Data/smni_eeg_data/a_1_co2a0000364/co2a0000364.rd.028.gz', '../Data/smni_eeg_data/a_1_co2a0000364/co2a0000364.rd.018.gz', '../Data/smni_eeg_data/a_1_co2a0000364/co2a0000364.rd.022.gz', '../Data/smni_eeg_data/a_1_co2a0000364/co2a0000364.rd.012.gz', '../Data/smni_eeg_data/a_1_co2a0000364/co2a0000364.rd.002.gz', '../Data/smni_eeg_data/a_1_co2a0000364/co2a0000364.rd.010.gz', '../Data/smni_eeg_data/a_1_co2a0000364/co2a0000364.rd.024.gz', '../Data/smni_eeg_data/a_1_co2a0000364/co2a0000364.rd.000.gz', '../Data/smni_eeg_data/a_1_co2a0000364/co2a0000364.rd.020.gz', '../Data/smni_eeg_data/a_1_co2a0000364/co2a0000364.rd.014.gz']


In [17]:
def import_eeg_file(file_obj, df_type='long', optimize=True):

    def parse_subject(line):
       return line[2:-4]

    def parse_alcoholic(line):
       char = line.strip('# ')[3]
       return True if char == 'a' else False

    def parse_obj(line):
       char = line.strip('# ')[1]
       return True if char == '1' else False

    def parse_match(line):
       string = line.strip('# ').split(',')[0].split(' ')[1]
       if string == 'nomatch':
           return 'nomatch'
       elif string == 'obj':
           return 'obj'
       elif string == 'match':
           return 'match'

    def parse_err(line):
       strings = line.strip('# ').split(',')[0].split(' ')
       if len(strings) == 3 and strings[2] == 'err':
           return True
       else:
           return False

    from io import TextIOWrapper
    if isinstance(file_obj, TextIOWrapper):
       text_obj = file_obj
    else:
       text_obj = TextIOWrapper(file_obj)

    header = []
    loc = None
    while True:
       loc = text_obj.tell()
       newline = text_obj.readline()
       if newline[0] == "#":
           header += [newline]
       else:
           text_obj.seek(loc)
           break

    subject = parse_subject(header[0])
    alcoholic = parse_alcoholic(header[0])
    obj = parse_obj(header[3])
    match = parse_match(header[3])
    err = parse_err(header[3])

    df = pd.read_csv(text_obj, sep=' ', header=None, names=['trial','sensor','sample','value'], comment='#')
    df['alcoholic'] = alcoholic
    df['object'] = obj
    df['match'] = match
    df['err'] = err
    df['subject'] = subject

    df = df[['subject','trial','alcoholic','match','err','sensor','sample','value']]

    if optimize:
       df[['trial','sample']] = df[['trial','sample']].apply(pd.to_numeric, downcast='unsigned')
       df['value'] = df['value'].astype(np.float32)
       df['sensor'] = pd.Categorical(df['sensor'])
       df['match'] = pd.Categorical(df['match'])
       df['subject'] = pd.Categorical(df['subject'])

    if df_type == 'wide':
       df = df.pivot_table(values='value', index='sample', columns=['subject','trial','alcoholic','match','err','sensor'])

    if df_type == 'long':
       df = df.set_index(['subject','trial','alcoholic','match','err','sample'])

    return df

In [18]:
# Open all .gz files
import gzip

# Import all EEGs into data frames and save them in a list
list_dfEEGs = []
for path_EEG in list_gzFiles:
    file_EEG = gzipOpen(path_EEG, 'rb')
    list_dfEEGs.append(import_eeg_file(file_EEG, df_type='long', optimize=True))

In [19]:
list_dfEEGs[1].head(10)

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,Unnamed: 5_level_0,sensor,value
subject,trial,alcoholic,match,err,sample,Unnamed: 6_level_1,Unnamed: 7_level_1
co2a0000364,18,True,obj,False,0,FP1,11.434
co2a0000364,18,True,obj,False,1,FP1,9.481
co2a0000364,18,True,obj,False,2,FP1,6.551
co2a0000364,18,True,obj,False,3,FP1,3.133
co2a0000364,18,True,obj,False,4,FP1,1.18
co2a0000364,18,True,obj,False,5,FP1,1.18
co2a0000364,18,True,obj,False,6,FP1,1.668
co2a0000364,18,True,obj,False,7,FP1,2.157
co2a0000364,18,True,obj,False,8,FP1,2.645
co2a0000364,18,True,obj,False,9,FP1,4.11
