In [16]:
import numpy as np
import mne

# Create a simple RawArray
sfreq = 250  # Sampling frequency
ch_names = [f'EEG{d}' for d in range(1,75)]
ch_types = ["eeg"] * 74
info = mne.create_info(ch_names=ch_names, sfreq=sfreq, ch_types=ch_types)

data = np.random.randn(74, 747750)  # 2 channels, 1000 samples
raw = mne.io.RawArray(data, info)

print(isinstance(raw, mne.io.Raw))  # True
print(type(raw))  # <class 'mne.io.array.array.RawArray'>

Creating RawArray with float64 data, n_channels=74, n_times=747750
    Range : 0 ... 747749 =      0.000 ...  2990.996 secs
Ready.
False
<class 'mne.io.array.array.RawArray'>


braindecode call __getitem__ of mne.base.Raw, which then calls _getitem which calls _read_segment of BaseRaw. mne uses _read_segment to read a specific range of the file. We want to test whether S3 file via fsspec can be integrated
It calls _read_segments_file of the BaseRaw class. Any subclass must implement this method. EEGLAB calls fiff reader function: mne/_fiff/utils.py#L200

In [7]:
import numpy as np
import s3fs
import tempfile
import os
filesystem = s3fs.S3FileSystem(anon=True, client_kwargs={'region_name': 'us-east-2'})
s3path = 's3://testspeedeegdash/sub-002_task-FaceRecognition_eeg.set'

In [8]:
with tempfile.NamedTemporaryFile(delete=False, suffix='.set') as tmp:
    # Download from S3 to temp file
    with filesystem.open(s3path) as s3_file:
        tmp.write(s3_file.read())
    tmp_path = tmp.name
    np.memmap(tmp_path)
    # Clean up temp file
    os.unlink(tmp_path)

In [10]:
start_byte = 1024
stop_byte = 1024 + 1024
with filesystem.open(s3path) as s3_file:
    s3_file.seek(start_byte)
    data = s3_file.read(stop_byte - start_byte if stop_byte else None)
print(data)

b'r\x00a\x00w\x00.\x00f\x00i\x00f\x00\x00\x00\x0e\x00\x00\x008\x00\x00\x00\x06\x00\x00\x00\x08\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x08\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x06\x00\x00\x00nbchan\x00\x00\x02\x00\x01\x00J\x00\x00\x00\x0e\x00\x00\x008\x00\x00\x00\x06\x00\x00\x00\x08\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x08\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x06\x00\x00\x00trials\x00\x00\x02\x00\x01\x00\x01\x00\x00\x00\x0e\x00\x00\x000\x00\x00\x00\x06\x00\x00\x00\x08\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x08\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x04\x00pnts\x05\x00\x04\x00\xe6h\x0b\x00\x0e\x00\x00\x008\x00\x00\x00\x06\x00\x00\x00\x08\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x08\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x05\x00\x00\x00srate\x00\x00\x00\x02\x00\x01\x00\xfa\x00\x00\x00\x0e\x00\x00\x008\x00\x00\x00\x06\x00\x00\x0

In [37]:
import sys
def _mult_cal_one(data_view, one, idx, cals, mult):
    """Take a chunk of raw data, multiply by mult or cals, and store."""
    one = np.asarray(one, dtype=data_view.dtype)
    assert data_view.shape[1] == one.shape[1], (
        data_view.shape[1],
        one.shape[1],
    )  # noqa: E501
    if mult is not None:
        assert mult.ndim == one.ndim == 2
        data_view[:] = mult @ one[idx]
    else:
        assert cals is not None
        if isinstance(idx, slice):
            data_view[:] = one[idx]
        else:
            # faster than doing one = one[idx]
            np.take(one, idx, axis=0, out=data_view)
        data_view *= cals

def _read_segments_file(
    filesystem,
    s3path,
    # data,
    # idx,
    # fi,
    start,
    stop,
    # cals,
    # mult,
    dtype,
    n_channels=None,
    offset=0,
    trigger_ch=None,
):
    """Read a chunk of raw data."""
    n_bytes = np.dtype(dtype).itemsize
    # data_offset and data_left count data samples (channels x time points),
    # not bytes.
    data_offset = n_channels * start * n_bytes + offset # in bytes
    data_left = (stop - start) * n_channels # in samples not bytes

    # Read up to 100 MB of data at a time, block_size is in data samples
    block_size = ((int(100e6) // n_bytes) // n_channels) * n_channels
    block_size = min(data_left, block_size)
    with filesystem.open(s3path) as fid:
        fid.seek(data_offset)
        # extract data in chunks
        for sample_start in np.arange(0, data_left, block_size) // n_channels:
            count = min(block_size, data_left - sample_start * n_channels)
            # block = np.fromfile(fid, dtype, count)
            block = fid.read(count)
            block_size = sys.getsizeof(block)
            print(block_size, count)
            if block_size != count:
                raise RuntimeError(
                    f"Incorrect number of samples ({block.size} != {count}), please "
                    "report this error to MNE-Python developers"
                )
            block = block.reshape(n_channels, -1, order="F")
            n_samples = block.shape[1]  # = count // n_channels
            sample_stop = sample_start + n_samples
            if trigger_ch is not None:
                stim_ch = trigger_ch[start:stop][sample_start:sample_stop]
                block = np.vstack((block, stim_ch))
            # data_view = data[:, sample_start:sample_stop]
            # _mult_cal_one(data_view, block, idx, cals, mult)

In [38]:
_read_segments_file(filesystem, s3path, 3, 1000, np.single, n_channels=74)

73811 73778


AttributeError: 'bytes' object has no attribute 'size'