## Extracting eye-tracking data outputs

In [1]:
# Import modules

import os
import pandas as pd
from pathlib import Path
import numpy as np
import glob
import shutil
#import mne

In [2]:
# Function to format ID
def format_id(id_str):
    parts = id_str.split('_')
    #if len(parts) != 2:
        #raise ValueError("Invalid ID format. Should be 'XXX_XX' or 'XXXX_XX' or 'XXX_X' or 'XXXX_XX'.")
    
    numeric_part = parts[0]
    letter_part = parts[1]
    
    #if not numeric_part.isdigit():
     #   print("Numeric part before '_' should consist of digits only.")
    
    
    # Ensure numeric part is 4 digits long by padding with zeros if necessary
    padded_numeric_part = numeric_part.zfill(4)
    
    formatted_id = f"{padded_numeric_part}_{letter_part}"
    
    return formatted_id

# Fuction to fix experiment builder ID's so that they match the ET ID's
def eb_id_transform(file):
    file = file.upper()
    file = file.replace('Q', "")

    if "_" not in file:
        # Add "_" right before the first letter from the end 
        # Find the index of the first alphanumeric character
        for i, char in enumerate(file):
            if char.isalpha():
                break      
        # Insert "_" before the first alphanumeric character found
        file = file[:i] + '_' + file[i:]
        
    # add 0's to the end of the file name to make it 4 digits
    file = format_id(file)
    return file

# Function to check if file name contains task information
def has_task_info(file_name, task_info):
    return task_info in file_name

## Function to check if file name contains task information
def process_participants(task_folder, eeg_q1k_subjects_df, missing_eeg, et_subjects, transformed_et):
    """
    Process participant folders to track missing EEG data.
    
    :param task_folder: Path to the current task folder.
    :param eeg_q1k_subjects_df: DataFrame containing EEG subjects data.
    :param missing_eeg: List to store participants with missing EEG data.
    :param et_subjects: List to store participant names.
    :param transformed_et: List to store transformed participant IDs.
    """    
    for participant in os.listdir(task_folder):
        participant_folder = os.path.join(task_folder, participant)
        
        et_subjects.append(participant)
        transformed_id = eb_id_transform(participant)
        transformed_et.append(transformed_id)
        
        if transformed_id in eeg_q1k_subjects_df.et_ID.values:
            # Retrieve new participant ID if available
            new_participant = eeg_q1k_subjects_df.loc[eeg_q1k_subjects_df['et_ID'] == transformed_id, 'q1k_ID'].values
            if new_participant.size > 0:
                new_id = new_participant[0]
        else:
            missing_eeg.append(participant)
    return new_id

## Create reference file to map Q1K IDs to eye tracking data

In [3]:
eeg_q1k_subjects= []
truncated_eeg_q1k_subjects = []
family_id_subjects = []
sites = ["HSJ", "MNI"]

In [4]:
# Create a list of all files in the EEG folders

for site in sites: 
    for file in glob.glob(f"../../Sharing/CHUSJ-Q1K-PILOT/{site}/eeg/*"):
        subject_id = file.split('/')[-1]
        # Skip sessions that have already been processed
       # print(subject_id)
        eeg_q1k_subjects.append(subject_id)
        if "1525" in subject_id:
            truncated_id=subject_id.split('1525')[1][1:]
        elif "HSJ" in subject_id:
            truncated_id=subject_id.split('Q1K_HSJ_100')[1]
        elif "MHC" in subject_id:
            truncated_id=subject_id.split('Q1K_MHC_200')[1]

        truncated_eeg_q1k_subjects.append(truncated_id)
       # print(truncated_id)
        length = len(subject_id)
        family_id = truncated_id.split('_')[0]
        family_id_subjects.append(family_id)


In [5]:
eeg_q1k_subjects


['Q1K_HSJ_100152_P',
 'Q1K_HSJ_10083_M1',
 'Q1K_HSJ_100123_F1',
 'Q1K_HSJ_100114_S1',
 'Q1K_HSJ_10064_S1',
 'Q1K_HSJ_1525-1006_P',
 'Q1K_HSJ_100134_F1',
 'Q1K_HSJ_100131_P',
 'Q1K_HSJ_100162_M1',
 'Q1K_HSJ_100100_P',
 'Q1K_HSJ_100105_M1',
 'Q1K_HSJ_100162_P',
 'Q1K_HSJ_100100_S1',
 'Q1K_HSJ_10086_S1',
 'Q1K_HSJ_1525-1026_P',
 'Q1K_HSJ_100111_M1',
 'Q1K_HSJ_100119_F1',
 'Q1K_HSJ_100111_P',
 'Q1K_HSJ_100159_M1',
 'Q1K_HSJ_100129_M1',
 'Q1K_HSJ_100114_S2',
 'Q1K_HSJ_100162_S1',
 'Q1K_HSJ_1525-1024_P',
 'Q1K_HSJ_1525-1024_M1',
 'Q1K_HSJ_10043_F1',
 'Q1K_HSJ_100114_M1',
 'Q1K_HSJ_10064_M1',
 'Q1K_HSJ_1525-1026_M1',
 'Q1K_HSJ_10064_P',
 'Q1K_HSJ_1525-1001_F1',
 'Q1K_HSJ_10086_M1',
 'Q1K_HSJ_100147_F2',
 'Q1K_HSJ_100123_P',
 'Q1K_HSJ_100108_F1',
 'Q1K_HSJ_100129_P',
 'Q1K_HSJ_10083_P',
 'Q1K_HSJ_100150_P',
 'Q1K_HSJ_100154_P',
 'Q1K_HSJ_1525_1037_F1',
 'Q1K_HSJ_100119_S1',
 'Q1K_HSJ_10086_F1',
 'Q1K_HSJ_1525-1001_M1',
 'Q1K_HSJ_100131_M1',
 'Q1K_HSJ_100100_F1',
 'Q1K_HSJ_100157_M1',
 'Q1K_HSJ

In [6]:
eeg_q1k_subjects_df = pd.DataFrame({'q1k_ID': eeg_q1k_subjects, 'et_ID': truncated_eeg_q1k_subjects,
                                    'family_ID': family_id_subjects})
# Add 0s to the et_ID ID to make it 4 digits
eeg_q1k_subjects_df['et_ID'] = eeg_q1k_subjects_df['et_ID'].apply(lambda x: format_id(x))

In [7]:
eeg_q1k_subjects_df

Unnamed: 0,q1k_ID,et_ID,family_ID
0,Q1K_HSJ_100152_P,0152_P,152
1,Q1K_HSJ_10083_M1,0083_M1,83
2,Q1K_HSJ_100123_F1,0123_F1,123
3,Q1K_HSJ_100114_S1,0114_S1,114
4,Q1K_HSJ_10064_S1,0064_S1,64
...,...,...,...
96,Q1K_MHC_20068_S1,0068_S1,68
97,Q1K_MHC_20042_P,0042_P,42
98,Q1K_MHC_200183_P,0183_P,183
99,Q1K_MHC_200186_M1,0186_M1,186


In [8]:
print("There are a total of" , len(eeg_q1k_subjects_df.et_ID.unique()), "unique participants")

There are a total of 100 unique participants


In [None]:
# Define root directory for all eye-tracking tasks 
root_dir = '../../Sharing/CHUSJ-Q1K-PILOT/'

# Define the output directory for the processed data
output_dir = './ET_2024/BIDS_ET_2024_TEST'

if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Track participants with missing EEG data
missing_eeg = []
et_subjects=[]
transformed_et=[]
test_missing=[]
# Create a list of tasks  
tasks = ['GO', 'NSP', 'AS', 'PLR','VS','FSP','REST', 'SSEP', 'SSVEP','TMMN','TO']
versions = ["version_warnings_on", "version_warnings_off"]


# Iterate thourgh each site
for site in sites: 
    if site == "HSJ":
        for version in versions: 
            et_dir= os.path.join(root_dir, f"{site}/eye_tracking/Q1K_EB_2.2.299/", f"{version}/")
    # Iterate through each task
            for task in tasks:
                task_folders = glob.glob(os.path.join(et_dir, f"*_{task}_*/results/"))
                for task_folder in task_folders:
                    print(task_folder)
                    # Iterate over participant folders in current task folder 
                    participants = os.listdir(task_folder)
                    for participant in participants:
                        participant_folder = os.path.join(task_folder, participant)
                       # print(participant_folder)
                        et_subjects.append(participant)

                        # Process .edf file 
                        
                        # Create new partcipant name based on Q1K ID
                        transformed_id= eb_id_transform(participant) # Transform the ID to match the EEG ID
                        transformed_et.append(transformed_id) # Append the transformed ID to the list
                        if transformed_id in eeg_q1k_subjects_df.et_ID.values: # Check if the transformed ID is in the EEG ID list
                            new_participant = eeg_q1k_subjects_df.loc[eeg_q1k_subjects_df['et_ID'] == transformed_id].q1k_ID.values[0]
                            #print(new_participant)
                            final_output_dir = output_dir
                        else:
                            missing_eeg.append(participant)
                            final_output_dir = os.path.join(output_dir + "/missing_eeg/")
                            new_participant = participant
                            #continue

                        if not os.path.exists(os.path.join(final_output_dir, new_participant)):
                            os.makedirs(os.path.join(final_output_dir, new_participant))

                        for edf_file_name in os.listdir(participant_folder):
                            if edf_file_name.endswith('.edf'):

                                # Rename file based on task information                  
                                new_edf_file_name = f"{new_participant}_{task}.{edf_file_name.split('.')[-1]}"

                                # Construct source and destination paths
                                source_path = os.path.join(participant_folder, edf_file_name)
                                destination_path = os.path.join(final_output_dir, new_participant, new_edf_file_name)

                                # Copy file to participant's destination folder
                                shutil.copy(source_path, destination_path)
                        # Process .txt files 
                        for txt_file_name in os.listdir(participant_folder):
                            if txt_file_name.endswith('.txt') and has_task_info(txt_file_name, task):

                                # Rename file based on task information
                                new_txt_file_name = f"{new_participant}_{task}.{txt_file_name.split('.')[-1]}"

                                # Construct source and destination paths
                                source_path = os.path.join(participant_folder, txt_file_name)
                                destination_path = os.path.join(final_output_dir, new_participant, new_txt_file_name)

                                # Copy file to participant's destination folder
                                shutil.copy(source_path, destination_path)

                        et_subjects.append(new_participant)
                        #print(new_participant)
            
    elif site == "MNI":
            et_dir= os.path.join(root_dir, f"{site}/eye_tracking/Q1K_EB_2.2.299/")
            # Iterate through each task
            for task in tasks:
                task_folders = glob.glob(os.path.join(et_dir, f"*_{task}_*/results/"))
                for task_folder in task_folders:
                    print(task_folder)
                    # Iterate over participant folders in current task folder
                    participants = os.listdir(task_folder)

                    for participant in participants:
                        participant_folder = os.path.join(task_folder, participant)
                       # print(participant_folder)
                        et_subjects.append(participant)
                        # Process .edf file 
                        # Create new partcipant name based on Q1K ID
                        transformed_id= eb_id_transform(participant)
                        transformed_et.append(transformed_id)
                        if transformed_id in eeg_q1k_subjects_df.et_ID.values:
                            new_participant = eeg_q1k_subjects_df.loc[eeg_q1k_subjects_df['et_ID'] == transformed_id].q1k_ID.values[0]
                            final_output_dir = output_dir
                            #print(new_participant)
                        else:
                            missing_eeg.append(participant)
                            final_output_dir = os.path.join(output_dir + "/missing_eeg/")
                            new_participant = participant

                            #continue
                       # print(new_participant)

                        if not os.path.exists(os.path.join(final_output_dir, new_participant)):
                            os.makedirs(os.path.join(final_output_dir, new_participant))

                        for edf_file_name in os.listdir(participant_folder):
                            if edf_file_name.endswith('.edf'):

                                # Rename file based on task information                  
                                new_edf_file_name = f"{new_participant}_{task}.{edf_file_name.split('.')[-1]}"

                                # Construct source and destination paths
                                source_path = os.path.join(participant_folder, edf_file_name)
                                destination_path = os.path.join(final_output_dir, new_participant, new_edf_file_name)

                                # Copy file to participant's destination folder
                                shutil.copy(source_path, destination_path)
                        # Process .txt files 
                        for txt_file_name in os.listdir(participant_folder):
                            if txt_file_name.endswith('.txt') and has_task_info(txt_file_name, task):

                                # Rename file based on task information
                                new_txt_file_name = f"{new_participant}_{task}.{txt_file_name.split('.')[-1]}"

                                # Construct source and destination paths
                                source_path = os.path.join(participant_folder, txt_file_name)
                                destination_path = os.path.join(final_output_dir, new_participant, new_txt_file_name)

                                # Copy file to participant's destination folder
                                shutil.copy(source_path, destination_path)

                        et_subjects.append(new_participant)
                       # print(new_participant)



../../Sharing/CHUSJ-Q1K-PILOT/MNI/eye_tracking/Q1K_EB_2.2.299/1_Q1K_ACAR_REST_pfp_v1_deploy/results/


In [11]:
# Save the list of participants with missing EEG data
missing_eeg_df = pd.DataFrame({'participants': list(set(missing_eeg))})
missing_eeg_df.to_csv(os.path.join(output_dir, './missing_eeg/q1k_missing_eeg.csv'), index=False)

In [19]:
# Save look up table
# Save the list of participants with missing EEG data
eeg_q1k_subjects_df.to_csv(os.path.join(output_dir, './missing_eeg/et_eeg_lookup_table.csv'), index=False)