In [1]:
import os
import numpy as np
import struct


def avg_to_ascii(file_name, chanlist='all', typerange='all', accepttype='all', rtrange='all', responsetype='all', data_format='auto'):
    """This function reads the data from a binary EEG file, extracts and scales the data, and returns it in ASCII format.

    Parameters:

    - **file_name (str)**: The name of the binary EEG file you want to read.
    - **chanlist (str or list)**: List of channels to read from the file. Default is 'all', which reads all channels in the file.
    - **triallist (str or list)**: List of trials to read from the file. Default is 'all', which reads all trials in the file.
    - **typerange (str, list or tuple)**: Range of type codes to read from the file. Default is 'all', which reads all type codes in the file.
    - **accepttype (str, list or tuple)**: Range of accept codes to read from the file. Default is 'all', which reads all accept codes in the file.
    - **rtrange (str, list or tuple)**: Range of reaction times to read from the file. Default is 'all', which reads all reaction times in the file.
    - **responsetype (str, list, or tuple)**: Range of response codes to read from the file. Default is 'all', which reads all response codes in the file.
    - **data_format (str)**: The format in which to return the data. Default is 'auto', which returns the data in its native format.

    Returns:

    - **data (numpy.ndarray)**: A 2D array containing the EEG data (in volts) for each channel and time point.
    - **chan_names (list of str)**: A list of channel names.
    - **rate (int)**: The sample rate (in Hz) of the EEG data.
    - **xmin (float)**: The minimum time (in seconds) of the data."""

    # Check if file ends with .eeg
    if not file_name.endswith('.avg'):
        raise ValueError("File must be a binary avg file (.avg).")

    if not os.path.isfile(file_name):
        raise ValueError(f"File {file_name} not found.")

    with open(file_name, 'rb') as f:
        try:
            # Read general part of the ERP header and set variables
            f.read(20)  # skip revision number
            f.read(342)  # skip the first 362 bytes

            nsweeps = struct.unpack('<H', f.read(2))[0]  # number of sweeps
            accepted_sweeps = struct.unpack('<H', f.read(2))[
                0]  # number of accepted sweeps
            # If accepted_sweeps is 0, raise an error 
            if accepted_sweeps == 0:
                raise ValueError(
                    "No accepted sweeps found.")
            rejected_sweeps = struct.unpack('<H', f.read(2))[
                0]  # number of rejected sweepss
            # number of points per waveform
            pnts = struct.unpack('<H', f.read(2))[0]
            chan = struct.unpack('<H', f.read(2))[0]  # number of channels
            f.read(4)  # skip 4 bytes
            rate = struct.unpack('<H', f.read(2))[0]  # sample rate (Hz)
            f.read(127)  # skip 127 bytes
            xmin = struct.unpack('<f', f.read(4))[0]  # in s
            xmax = struct.unpack('<f', f.read(4))[0]  # in s
            f.read(387)  # skip 387 bytes

            print(f"Number of sweeps: {nsweeps}")

            # Read electrode configuration
            chan_names = []
            baselines = []
            sensitivities = []
            calibs = []
            for elec in range(chan):
                chan_name = f.read(10).decode('ascii').strip('\x00')
                chan_names.append(chan_name)
                f.read(37)  # skip 37 bytes
                baseline = struct.unpack('<H', f.read(2))[0]
                baselines.append(baseline)
                f.read(10)  # skip 10 bytes
                sensitivity = struct.unpack('<f', f.read(4))[0]
                sensitivities.append(sensitivity)
                f.read(8)  # skip 8 bytes
                calib = struct.unpack('<f', f.read(4))[0]
                calibs.append(calib)

        except struct.error:
            raise ValueError(
                "Error reading binary file. File may be corrupted or not in the expected format.")
        except Exception as e:
            raise ValueError(f"Error reading {file_name}: {e}")

    # Read and process epoch datapoints data
    data = np.empty((len(chan_names), pnts), dtype=float)

    with open(file_name, 'rb') as f:
        # Ensure the file pointer is at the beginning of the EEG data
        f.seek((900 + chan * 75))

        for chan in range(len(chan_names)):
            f.read(5)  # skip 5 bytes
            for point in range(pnts):
                data[chan, point] = struct.unpack('<f', f.read(4))[
                    0] * calibs[chan] / accepted_sweeps

    # Convert data from microvolts to volts
    data = data * 1e-6
    # Return relevant data in ASCII format
    return data, chan_names, rate, xmin

In [2]:
import pandas as pd
import numpy as np
import os
import mne
#Get current data directory
data_path = '/home/woess/workspace/mnt/c/Users/woess/Desktop/ANT_coga_avg/avg'
#Change to data directory
os.chdir(data_path)
i = 0
df_list = []

for i, filename in enumerate(os.listdir(data_path)):
    
    file = os.path.join(filename)
    if file.endswith('.avg'):
        print(filename + " " + str(i) + "/" + str(len(os.listdir(data_path))))
        try:
            data, chan_names, rate, xmin = avg_to_ascii(file)
        except ValueError as e:
            print(e)
            continue
        # Convert data from volts to microvolts
        data = data * 1e6
        first = int(round(xmin * rate))
        last = first + np.shape(data)[-1] - 1
        
        time = np.arange(first, last + 1, dtype=np.float64) / rate
        # df["subject"] = filename[:-4]
        # df["time_ms"] = time.tolist()
        # df = pd.melt(df,var_name='Channel', id_vars = ["time_ms","subject"]) #Convert Wide to Long 
        num_t_points = len(data[1])
        # Convert data array into long format
        data = data.flatten()
        time = np.tile(time, len(chan_names))
        chan_names = np.repeat(chan_names, num_t_points)
        df = pd.DataFrame({'subject': filename, 'time_ms': time, 'value': data, 'channel': chan_names})
        df_list.append(df)

        
subject_df = pd.concat(df_list, ignore_index=True)  
    

50001003_ANT_Ac.avg 0/1050
Number of sweeps: 300
50001003_ANT_Jc.avg 1/1050
Number of sweeps: 300
50001003_ANT_Pc.avg 2/1050
Number of sweeps: 300
50001003_ANT_Wc.avg 3/1050
Number of sweeps: 300
50001066_ANT_Ac.avg 4/1050
Number of sweeps: 300
50001066_ANT_Jc.avg 5/1050
Number of sweeps: 300
50001066_ANT_Pc.avg 6/1050
Number of sweeps: 300
50001066_ANT_Wc.avg 7/1050
Number of sweeps: 300
50004003_ANT_Ac.avg 8/1050
Number of sweeps: 300
50004003_ANT_Jc.avg 9/1050
Number of sweeps: 300
50004003_ANT_Pc.avg 10/1050
Number of sweeps: 300
50004003_ANT_Wc.avg 11/1050
Number of sweeps: 300
50004023_ANT_Ac.avg 12/1050
Number of sweeps: 300
50004023_ANT_Jc.avg 13/1050
Number of sweeps: 300
50004023_ANT_Pc.avg 14/1050
Number of sweeps: 300
50004023_ANT_Wc.avg 15/1050
Number of sweeps: 300
50007016_ANT_Ac.avg 16/1050
Number of sweeps: 300
50007016_ANT_Jc.avg 17/1050
Number of sweeps: 300
50007016_ANT_Pc.avg 18/1050
Number of sweeps: 300
50007016_ANT_Wc.avg 19/1050
Number of sweeps: 300
50009011_A

In [3]:
subject_df

Unnamed: 0,filename,time_ms,value,channel
0,50001003_ANT_Ac.avg,-0.200,-0.787973,FP1
1,50001003_ANT_Ac.avg,-0.198,-0.729561,FP1
2,50001003_ANT_Ac.avg,-0.196,-0.634789,FP1
3,50001003_ANT_Ac.avg,-0.194,-0.512004,FP1
4,50001003_ANT_Ac.avg,-0.192,-0.364780,FP1
...,...,...,...,...
33442747,50758001_ANT_Wc.avg,0.792,25.443733,Y
33442748,50758001_ANT_Wc.avg,0.794,24.913252,Y
33442749,50758001_ANT_Wc.avg,0.796,24.352968,Y
33442750,50758001_ANT_Wc.avg,0.798,23.759903,Y


In [4]:
import datetime
now = datetime.datetime.now()
now = now.strftime("%Y-%m-%d_%H-%M")

subject_df.to_csv(
    '/home/woess/workspace/mnt/c/Users/woess/Desktop/ANT_coga_avg/coga_ant_avg_comb' + now + '.csv')

-----------------------------------------------TEST--------------------------------------------------------