In [None]:
import os
import psutil

import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F

import math
import numpy as np
import einops
import mne
import matplotlib.pyplot as plt
from tqdm import tqdm

from torch.utils.data import Dataset as TDataset, DataLoader

from datasets import DatasetDict, Dataset
from datasets import load_dataset
from evaluate import load as load_metric
import scipy.special
from comet_ml import Experiment, ExistingExperiment

import datetime
import time

In [None]:
if torch.cuda.is_available():
    print(f"Number of GPUs: {torch.cuda.device_count()}")
    for i in range(torch.cuda.device_count()):
        print(f"GPU {i}: {torch.cuda.get_device_name(i)}")
else:
    print("CUDA is not available, no GPUs found.")


In [None]:
total_memory = psutil.virtual_memory().total
available_memory = psutil.virtual_memory().available
print(f"The available RAM memory is {available_memory / (1024**3):.2f} GB out of {total_memory / (1024**3):.2f} GB")

In [None]:
class CLIPloss(nn.Module):
    def __init__(
        self,
        clip_temperature,
        clip_temperature_type,
    ):
        super().__init__()
        if clip_temperature_type == 'param':
            self.temperature = nn.Parameter(torch.tensor(math.log(clip_temperature), dtype=torch.float32))
        elif clip_temperature_type == 'hparam':
            self.temperature = torch.tensor(math.log(clip_temperature), dtype=torch.float32, requires_grad=False)

    def forward(self, brainwave_embeddings, audio_embeddings):
        batch_size = brainwave_embeddings.size(0)
        if len(audio_embeddings.shape) == 3:
            brainwave_embeddings = F.normalize(brainwave_embeddings, dim=(-2, -1))
            audio_embeddings = F.normalize(audio_embeddings, dim=(-2, -1))
            similarity = torch.einsum('Bef, bef -> Bb', brainwave_embeddings, audio_embeddings)
        elif len(audio_embeddings.shape) == 2:
            brainwave_embeddings = F.normalize(brainwave_embeddings, dim=(-1))
            audio_embeddings = F.normalize(audio_embeddings, dim=(-1))
            similarity = torch.einsum('Bf, bf -> Bb', brainwave_embeddings, audio_embeddings)
        similarity_temperature = similarity / torch.exp(self.temperature)
        labels = torch.arange(batch_size).to(similarity.device)
        loss = F.cross_entropy(similarity, labels)
        loss_temperature = F.cross_entropy(similarity_temperature, labels)
        return loss, loss_temperature

In [None]:
class MSE(nn.Module):
    def __init__(self):
        super().__init__()
        self.mse = nn.MSELoss()

    def forward(self, brainwave_embeddings, audio_embeddings, temperature):
        brainwave_embeddings = einops.rearrange(brainwave_embeddings, 'b f t -> b (f t)')
        audio_embeddings = einops.rearrange(audio_embeddings, 'b f t -> b (f t)')
        loss = self.mse(brainwave_embeddings, audio_embeddings)
        return loss

In [None]:
def metrics(brainwave_embeddings, audio_embeddings, labels):
    if len(audio_embeddings.shape) == 3:
        brainwave_embeddings = F.normalize(brainwave_embeddings, dim=(-2, -1))
        audio_embeddings = F.normalize(audio_embeddings, dim=(-2, -1))
        similarity = torch.einsum('Bef, bef -> Bb', brainwave_embeddings, audio_embeddings)
    elif len(audio_embeddings.shape) == 2:
        brainwave_embeddings = F.normalize(brainwave_embeddings, dim=(--1))
        audio_embeddings = F.normalize(audio_embeddings, dim=(-1))
        similarity = torch.einsum('Bf, bf -> Bb', brainwave_embeddings, audio_embeddings)
    labels = labels.view(-1,1)
    
    index_top10 = torch.topk(similarity, 10, dim=-1).indices
    index_top1 = torch.topk(similarity, 1, dim=-1).indices
    is_in_top10 = torch.eq(index_top10, labels).any(dim=1)
    is_in_top1 = torch.eq(index_top1, labels).any(dim=1)
    return is_in_top10, is_in_top1

In [None]:
def cart2sph(sensor_xyz):
    x, y, z = sensor_xyz[:,0], sensor_xyz[:,1], sensor_xyz[:,2]
    xy = np.linalg.norm(sensor_xyz[:,:2], axis=-1)
    r = np.linalg.norm(sensor_xyz, axis=-1)
    theta = np.arctan2(xy, z)
    phi = np.arctan2(y, x)
    return np.stack((r, theta, phi), axis=-1)

In [None]:
class SpatialAttentionLayer(nn.Module):
    def __init__(self, n_input, n_output, K, coords_xy, n_dropout, dropout_radius, seed=None):
        super().__init__()

        self.n_input = n_input
        self.n_dropout = n_dropout
        self.dropout_radius = dropout_radius
        
        coords_xy = torch.tensor(coords_xy, dtype=torch.float32, requires_grad=False)
        self.register_buffer('_coords_xy', coords_xy)

        coords_xyz = np.load('../datasets/MASC-MEG/process_v2/coords/sensor_xyz.npy')
        coords_sph = cart2sph(coords_xyz)
        coords_sph = torch.tensor(coords_sph, dtype=torch.float32, requires_grad=False)
        
        layout = self._create_layout(coords_sph, K-1)
        self.register_buffer('_layout', layout)
        
        Z = self._create_parameters(n_input, n_output, K, seed)
        self.Z = nn.Parameter(Z)

        
    def _create_layout(self, coords_sph, L=8):
            n_input = coords_sph.shape[0]

            coords_theta = coords_sph[:,1]
            coords_phi = coords_sph[:,2]
            layout = torch.zeros((1, L+1, L+1, n_input), dtype=torch.float32, requires_grad=False)
        
            Plak_values = np.zeros((L+1, L+1, n_input))
            for i in range (0, n_input):
                Plak_values[:,:, i] = scipy.special.lpmn(L, L, math.cos(coords_theta[i]))[0]
            Plak_values = torch.tensor(Plak_values, dtype=torch.float32)
            
            def get_factor(l, m):
                result = math.log(2 * l + 1)
                result -= math.log((2 if m==0 else 1) * 2 * math.pi)
                result += scipy.special.gammaln(l - abs(m) + 1)
                result -= scipy.special.gammaln(l + abs(m) + 1)
                result /= 2
                result = math.exp(result)
                return result

            counter = -1
            for l in range (0, L+1):
                for m in range(-l,l+1):
                    counter += 1
                    i, j = counter % (L+1), counter // (L+1)
                    mult_left = get_factor(l, m)
                    # mult_left = 1
                    
                    if m >= 0:
                        mult = torch.cos(m * coords_phi)
                    elif m < 0:
                        mult = torch.sin(- m * coords_phi)
                    sela = mult_left * Plak_values[abs(m), l, :] * mult
                    
                    layout[:,i,j,:] = sela
            return layout

    def _create_parameters(self, n_input, n_output, K, seed=None):
        if seed is None:
            seed = int(torch.empty((), dtype=torch.int64).random_().item())
        generator = torch.Generator()
        generator.manual_seed(seed)
        
        Z = torch.randn(size=((n_output, K, K)), generator=generator) * 2 / (n_input + n_output)
        Z = einops.rearrange(Z, 'j k l -> j k l 1')
        return Z
    
    def to(self, device):
        self._coords_xy = self._coords_xy.to(device)
        self._layout = self._layout.to(device)
        return super().to(device)

    def get_spatial_filter(self):
        A = einops.reduce(self.Z * self._layout, 'j k l i -> j i', 'sum')
        ASoftmax = F.softmax(A, dim=1)
        return ASoftmax.clone().detach()
    
    def forward(self, x):
        A = einops.reduce(self.Z * self._layout, 'j k l i -> j i', 'sum')
        if self.training and self.n_dropout > 0:
            mask = torch.zeros((1, self.n_input), dtype=A.dtype, device=A.device)
            dropout_location = torch.rand(size=(self.n_dropout, 2), device=A.device) * 0.8 + 0.1
            for k in range(self.n_dropout):
                for i in range(self.n_input):
                    if torch.linalg.norm(self._coords_xy[i] - dropout_location[k]) <= self.dropout_radius:
                        mask[:,i] = - float('inf')
            A = A + mask
        ASoftmax = F.softmax(A, dim=1)
        SAx = torch.einsum('oi, bit -> bot', ASoftmax, x)
        return SAx

In [None]:
class SubjectPlusLayer(nn.Module):
    def __init__(self, n_input, n_output, n_subjects, regularize=True, bias=False, seed=None):
        super().__init__()
        self.bias = bias
        self.regularize = regularize
        if self.regularize:
            self.regularizer = None
        
        A, b = self._create_parameters(n_input, n_output, n_subjects)
        self.A = nn.Parameter(A)

        I = torch.zeros((1, n_output, n_input), requires_grad=False)
        self.register_buffer('I', I)
        
        if self.bias:
            self.b = nn.Parameter(b)
            zero = torch.zeros(size=(1, n_output, 1))
            self.register_buffer('zero', zero)
        
    def _create_parameters(self, n_input, n_output, n_subjects, seed=None):
        A = torch.zeros(size=(n_subjects, n_output, n_input))
        b = torch.zeros(size=(n_subjects, n_output, 1)) if self.bias else None
        with torch.no_grad():
            for subjects in range(n_subjects):
                layer = nn.Conv1d(in_channels=n_input, out_channels=n_output, kernel_size=1)
                A[subjects] = einops.rearrange(layer.weight.data, 'o i 1 -> o i')
                if self.bias:
                    b[subjects] = einops.rearrange(layer.bias.data, 'o -> o 1')
        return A, b
    
    def _create_regularizer(self, A, b):
        batch_size = A.shape[0]
        reg = torch.norm(A - self.I, p='fro')
        if self.bias:
            reg += torch.norm(b, p='fro')
        reg = reg / batch_size
        return reg
    
    def get_regularizer(self):
        regularizer = self.regularizer
        self.regularizer = None
        return regularizer
    
    def forward(self, x, s):
        batch_size = x.shape[0]
        
        A = torch.cat([self.I, self.A], dim=0)
        s[s >= A.size(0)] = 0
        A_ = A[s,:,:]
        out = torch.einsum('bji, bit -> bjt', A_, x)
        
        if self.bias:
            b = torch.cat([self.zero, self.b], dim=0)
            b_ = b[s,:,:]
            out = out + b_
        
        if self.regularize and self.training:
            self.regularizer = self._create_regularizer(A_, b_)
            
        return out


In [None]:
class ConvBlock(nn.Module):
    def __init__(self, n_input, n_output, block_index):
        super().__init__()
        
        self.kernel_size = 3
        self.block_index = block_index
        dilation1 = 2**(2*block_index % 5)
        dilation2 = 2**((2*block_index + 1) % 5)
        dilation3 = 2
        
        self.conv1 = nn.Conv1d(in_channels=n_input, out_channels=n_output, kernel_size=self.kernel_size, dilation=dilation1, padding='same')
        self.conv2 = nn.Conv1d(in_channels=n_output, out_channels=n_output, kernel_size=self.kernel_size, dilation=dilation2, padding='same')
        self.conv3 = nn.Conv1d(in_channels=n_output, out_channels=2*n_output, kernel_size=self.kernel_size, dilation=dilation3, padding='same')
        
        self.batchnorm1 = nn.BatchNorm1d(n_output)
        self.batchnorm2 = nn.BatchNorm1d(n_output)
        
        self.activation1 = nn.GELU()
        self.activation2 = nn.GELU()
        self.activation3 = nn.GLU(dim=-2)

    def forward(self, x):
        
        c1x = self.conv1(x)
        res1 = c1x if self.block_index == 0 else x + c1x
        res1 = self.batchnorm1(res1)
        res1 = self.activation1(res1)
        
        c2x = self.conv2(res1)
        res2 = res1 + c2x
        res2 = self.batchnorm2(res2)
        res2 = self.activation2(res2)
        
        c3x = self.conv3(res2)
        out = self.activation3(c3x)

        return out

In [None]:
class ConvHead(nn.Module):
    def __init__(self, n_channels, n_features, pool, head_stride):
        super().__init__()
        
        
        if pool == 'max':
            self.pool = nn.Sequential(
                nn.MaxPool1d(kernel_size=3, stride=stride, padding=0 if head_stride==2 else 1),
                nn.Conv1d(in_channels=n_channels, out_channels=2*n_channels, kernel_size=1)
            )
        elif pool == 'conv':
            self.pool = nn.Conv1d(in_channels=n_channels, out_channels=2*n_channels, kernel_size=3, stride=head_stride, padding=0 if head_stride==2 else 1)
            
        self.conv = nn.Conv1d(in_channels=2*n_channels, out_channels=n_features, kernel_size=1)
        self.activation = nn.GELU()
        self.batch_norm = nn.BatchNorm1d(n_features)

    def forward(self, x):
        x = self.pool(x)
        x = self.activation(x)
        x = self.conv(x)
        x = self.batch_norm(x)
        return x

In [None]:
class SpatialModule(nn.Module):
    def __init__(
        self,
        n_input,
        n_attantion,
        n_unmix,
        use_spatial_attention,
        n_spatial_harmonics,
        coords_xy_scaled,
        spatial_dropout_number,
        spatial_dropout_radius,
        use_unmixing_layer,
        use_subject_layer,
        n_subjects,
        regularize_subject_layer,
        bias_subject_layer,
    ):
        super().__init__()
        
        if use_spatial_attention:
            self.self_attention = SpatialAttentionLayer(
                n_input, n_attantion, n_spatial_harmonics, coords_xy_scaled, spatial_dropout_number, spatial_dropout_radius
            )
        else: self.self_attention = None

        if use_unmixing_layer:
            n_attantion = n_attantion if self.self_attention else n_input
            self.unmixing_layer = nn.Conv1d(in_channels=n_attantion, out_channels=n_attantion, kernel_size=1)
        else: self.unmixing_layer = None
            
        if use_subject_layer:
            n_attantion = n_attantion if (self.self_attention or self.unmixing_layer) else n_input
            self.subject_layer = SubjectPlusLayer(
                n_attantion, n_unmix, n_subjects, regularize=regularize_subject_layer, bias=bias_subject_layer
            )
        else: self.subject_layer = None

    def forward(self, xs):
        x, s = xs
        x = self.self_attention(x) if self.self_attention else x
        x = self.unmixing_layer(x) if self.unmixing_layer else x
        x = self.subject_layer(x, s) if self.subject_layer else x
        return x

In [None]:
class TemporalModule(nn.Module):
    def __init__(
        self,
        n_unmix,
        n_block,
    ):
        super().__init__()

        self.conv_blocks = nn.ModuleDict()
        for block_index in range(0, 5):
            n_in = n_unmix if block_index == 0 else n_block
            self.conv_blocks[f'conv_block_{block_index}'] = ConvBlock(n_in, n_block, block_index)

    def forward(self, x):
        for _, module in self.conv_blocks.items():
            x = module(x)
        return x

In [None]:
class BrainModule(nn.Module):
    def __init__(
        self,
        **kwargs
    ):
        super().__init__()
        self.spatial_module = SpatialModule(
            n_input=kwargs["n_channels_input"],
            n_attantion=kwargs["n_channels_attantion"],
            n_unmix=kwargs["n_channels_unmix"],
            use_spatial_attention=kwargs["use_spatial_attention"],
            n_spatial_harmonics=kwargs["n_spatial_harmonics"],
            coords_xy_scaled=np.load(kwargs["dirprocess"] + 'coords/coords208_xy_scaled.npy'),
            spatial_dropout_number=kwargs["spatial_dropout_number"],
            spatial_dropout_radius=kwargs["spatial_dropout_radius"],
            use_unmixing_layer=kwargs["use_unmixing_layer"],
            use_subject_layer=kwargs["use_subject_layer"],
            n_subjects=kwargs["n_subjects"],
            regularize_subject_layer=kwargs["regularize_subject_layer"],
            bias_subject_layer=kwargs["bias_subject_layer"],
        )
        
        self.temporal_module = TemporalModule(
            n_unmix=kwargs["n_channels_unmix"],
            n_block=kwargs["n_channels_block"],
        )

        self.feature_projection = ConvHead(
            kwargs["n_channels_block"], 
            kwargs["n_features"], 
            kwargs["head_pool"],
            kwargs["head_stride"],
        )


    def forward(self, xs):
        z = self.spatial_module(xs)
        y = self.temporal_module(z)
        y = self.feature_projection(y)
        return z, y

In [None]:
class DoubleDataset(TDataset):
    def __init__(self, meg, hidden, df, meg_sr, meg_offset=0):

        self.meg = meg
        self.hidden = hidden
        self.df = df
        self.meg_sr = meg_sr
        self.meg_offset = int(meg_offset * self.meg_sr)
    
    def __len__(self):
        return self.df.shape[0]
    
    def __getitem__(self, index):
        row_df = self.df.loc[index]
        
        subject_id, session_id, story_id = row_df['subject_id'], row_df['session_id'], row_df['story_id']
        sbj = torch.tensor(subject_id, dtype=torch.long)
        subject_id = str(subject_id)
        subject_id = '0' + subject_id if len(subject_id) == 1 else subject_id
        subset = f'subject{subject_id}_session{session_id}_story{story_id}'
        
        meg_start, meg_stop = row_df[f'meg{self.meg_sr}_start'], row_df[f'meg{self.meg_sr}_stop']
        meg_start, meg_stop = meg_start + self.meg_offset, meg_stop + self.meg_offset
        wav_index = row_df['wav_index']
        
        meg = torch.tensor(self.meg[subset][:,meg_start:meg_stop], dtype=torch.float32)
        hid = torch.tensor(self.hidden[wav_index], dtype=torch.float32)
        widx = torch.tensor(wav_index, dtype=torch.long)

        return meg, sbj, hid, widx

In [None]:
class Trainer:
    def __init__(self, model, hparam, experiment=None):
        self.model = model
        self.experiment = experiment
        if hparam["loss"] == "clip": self.criterion = CLIPloss(
            clip_temperature=hparam["clip_temperature"], clip_temperature_type=hparam["clip_temperature_type"]
        )
        elif hparam["loss"] == "mse": self.criterion = MSE()
        
        parameters = [
            {'params': self.model.spatial_module.parameters(), 'lr':hparam["lr_fe"], 'weight_decay':hparam["weight_decay"]},
            {'params': self.model.temporal_module.parameters(), 'lr':hparam["lr_fe"], 'weight_decay':hparam["weight_decay"]},
            {'params': self.model.feature_projection.parameters(), 'lr':hparam["lr_fe"], 'weight_decay':hparam["weight_decay"]},
            {'params': self.criterion.parameters(), 'lr':hparam["clip_temperature_lr"], 'weight_decay':0}
        ]
            
        if hparam["optim"] == "Adam": self.optimizer = torch.optim.Adam(parameters)
        elif hparam["optim"] == "AdamW": self.optimizer = torch.optim.AdamW(parameters)
        
        self.scheduler = torch.optim.lr_scheduler.ExponentialLR(self.optimizer, gamma=hparam["scheduler_rate"]) if hparam["use_scheduler"] else None
        self.save_file = hparam["checkpoint"]

    
    def fit(self, dataloader_train, dataloader_val, hidden_test, nepoch=1, device_index=0):
        hidden_test = torch.tensor(hidden_test, dtype=torch.float32).to(f'cuda:{device_index}')
        hidden_test = einops.rearrange(hidden_test, 'b t f -> b f t')
        torch.cuda.empty_cache()

        self.model = self.model.to(f'cuda:{device_index}')
        self.criterion = self.criterion.to(f'cuda:{device_index}')
        val_loss_min = float('inf')
        
        for epoch in range(nepoch):
            
            self.model.train()
            self.criterion.train()
            train_loss = 0
            pbar = tqdm(dataloader_train, leave=False)
            for meg, sbj, hidden, _ in pbar:
                pbar.set_description(desc=f"train epoch {epoch}")
  
                hidden = einops.rearrange(hidden, 'b t f -> b f t')
                meg, sbj, hidden = meg.to(f'cuda:{device_index}'), sbj.to(f'cuda:{device_index}'), hidden.to(f'cuda:{device_index}')
                _, result = self.model((meg, sbj))

                _, loss_temperature = self.criterion(result, hidden)
                
                self.optimizer.zero_grad()
                loss_temperature.backward()
                self.optimizer.step()
                train_loss += loss_temperature.detach().cpu().numpy().item() * meg.shape[0]
                
            train_loss /= len(dataset_train)
            if self.experiment is not None:
                self.experiment.log_metric("loss_train", train_loss, step=epoch)
                
            self.optimizer.zero_grad()
            if self.scheduler:
                self.scheduler.step()
            
            
            del meg, sbj, hidden, loss_temperature, result
            torch.cuda.empty_cache()
            
            time.sleep(1)
            model.eval()
            self.criterion.eval()
            val_loss = 0
            pbar = tqdm(dataloader_val, leave=False)
            is_in_top10s, is_in_top1s = [], []
            for meg, sbj, hidden, widx in pbar:
                with torch.no_grad():
                    pbar.set_description(desc=f"val epoch {epoch}")
                    hidden = einops.rearrange(hidden, 'b t f -> b f t')
                        
                    meg, sbj, hidden, widx = meg.to(f'cuda:{device_index}'), sbj.to(f'cuda:{device_index}'), hidden.to(f'cuda:{device_index}'), widx.to(f'cuda:{device_index}')
                    _, result = model((meg, sbj))
    
                    _, loss_temperature = self.criterion(result, hidden)
                    val_loss += loss_temperature.detach().cpu().numpy().item() * meg.shape[0]
    
                    is_in_top10, is_in_top1 = metrics(result, hidden_test, widx)
                    is_in_top10s.append(is_in_top10.detach().cpu().numpy())
                    is_in_top1s.append(is_in_top1.detach().cpu().numpy())
                
            top10s = np.mean(np.concatenate(is_in_top10s)).item()
            top1s = np.mean(np.concatenate(is_in_top1s)).item()
            
            val_loss /= len(dataset_test)
            if self.experiment is not None:
                self.experiment.log_metric("loss_test", val_loss, step=epoch)
                self.experiment.log_metric("top10s_test", top10s, step=epoch)
                self.experiment.log_metric("top1s_test", top1s, step=epoch)

            if val_loss < val_loss_min:
                checkpoint_dir = "checkpoint"
                if not os.path.exists(checkpoint_dir):
                    os.makedirs(checkpoint_dir)
                torch.save({
                    'epoch': epoch,
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': self.optimizer.state_dict(),
                    'loss': val_loss,
                }, f"checkpoint/{self.save_file}.pt")
                val_loss = val_loss_min
            
            del meg, sbj, hidden, loss_temperature, result
            torch.cuda.empty_cache()

In [None]:
hyper_params = {
    "name": "BrainModule",
    "batch_size": 100,
    "lr_fe":3e-4,
    "use_scheduler":False,
    "scheduler_rate":0.95,
    "optim":"AdamW",
    "weight_decay":1e-1,
    "clip_temperature":1,
    "clip_temperature_type":"param",
    "clip_temperature_lr":1e-3,
    "meg_sr":100,
    "meg_offset":0.15,
    "hidden":"extract_features",
    "n_subjects":27,
    "preprocess":"default",

    "n_channels_input":208,
    "n_channels_attantion":270,
    "n_channels_unmix":5,
    "use_spatial_attention":True,
    "n_spatial_harmonics":24,
    "spatial_dropout_number":0, 
    "spatial_dropout_radius":0.1,
    "use_unmixing_layer":True,
    "use_subject_layer":True,
    "regularize_subject_layer":False,
    "bias_subject_layer":False,
    "n_channels_block":320,
    "n_blocks":5,
    "head_pool":"conv",
    "n_features":1024,
    "loss":"clip",
    "head_stride":2,
    
}

if hyper_params["hidden"] == "extract_features":
    hyper_params["n_features"] = 512
if hyper_params["hidden"] == 'lms':
    hyper_params["n_features"] = 120
    hyper_params["head_stride"] = 1


dirprocess =  '../datasets/MASC-MEG/process_v2/'
hyper_params["dirprocess"] = dirprocess
coords_xy_scaled = np.load(dirprocess + 'coords/coords208_xy_scaled.npy')
coords_xyz = np.load(dirprocess + 'coords/sensor_xyz.npy')
hyper_params["robust_scale"] = 1
hyper_params["checkpoint"] = 'br6_sp3d_base_temperature'

In [None]:

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

model = BrainModule(
    n_channels_input=hyper_params["n_channels_input"],
    n_channels_attantion=hyper_params["n_channels_attantion"],
    n_channels_unmix=hyper_params["n_channels_unmix"],
    use_spatial_attention=hyper_params["use_spatial_attention"],
    n_spatial_harmonics=hyper_params["n_spatial_harmonics"],
    dirprocess=hyper_params["dirprocess"],
    coords_xy_scaled=coords_xy_scaled,  
    spatial_dropout_number=hyper_params["spatial_dropout_number"],
    spatial_dropout_radius=hyper_params["spatial_dropout_radius"],
    use_unmixing_layer=hyper_params["use_unmixing_layer"],
    use_subject_layer=hyper_params["use_subject_layer"],
    n_subjects=hyper_params["n_subjects"],
    regularize_subject_layer=hyper_params["regularize_subject_layer"],
    bias_subject_layer=hyper_params["bias_subject_layer"],
    n_channels_block=hyper_params["n_channels_block"],
    n_features=hyper_params["n_features"],
    head_pool=hyper_params["head_pool"],
    head_stride=hyper_params["head_stride"]
)

num_trainable_params = count_parameters(model)
print(f"Количество обучаемых параметров: {num_trainable_params}")

subject_layer = model.spatial_module

def count_layer_parameters(layer):
    return sum(p.numel() for p in layer.parameters() if p.requires_grad)

num_params_subject_layer = count_layer_parameters(subject_layer)
print(f"Количество обучаемых параметров в Subject Layer: {num_params_subject_layer}")

In [None]:
hidden_train = np.load(dirprocess + f'audio/{hyper_params["hidden"]}_train4.npy')
hidden_test = np.load(dirprocess + f'audio/{hyper_params["hidden"]}_test4.npy')

df_train = pd.read_csv(dirprocess + f'dataframe/df_train{hyper_params["n_subjects"]}.csv')
df_test = pd.read_csv(dirprocess + f'dataframe/df_test{hyper_params["n_subjects"]}.csv')

meg = dict(np.load(dirprocess + f'meg/meg{hyper_params["n_subjects"]}_sr{hyper_params["meg_sr"]}_{hyper_params["preprocess"]}_v{hyper_params["robust_scale"]}.npz'))

In [None]:
dataset_train = DoubleDataset(meg, hidden_train, df_train, hyper_params["meg_sr"], hyper_params["meg_offset"])
dataset_test = DoubleDataset(meg, hidden_test, df_test, hyper_params["meg_sr"], hyper_params["meg_offset"])

In [None]:
dataloader_train = DataLoader(dataset_train, batch_size=hyper_params['batch_size'], shuffle=True, drop_last=True)#, generator=torch.Generator(device='cuda'))
dataloader_test = DataLoader(dataset_test, batch_size=hyper_params['batch_size'] // 5, shuffle=False)#, generator=torch.Generator(device='cuda'))

In [None]:
experiment = Experiment(
    api_key="your_api",
    project_name=f'metameg{hyper_params["n_subjects"]}',
    workspace="Your_workspace"
    auto_output_logging=False,
)

experiment.log_parameters(hyper_params)
experiment.log_code()


In [None]:
model = BrainModule(**hyper_params)

In [None]:
trainer = Trainer(model, hyper_params, experiment=experiment)

In [None]:
checkpoint = torch.load(f'checkpoint/{hyper_params["checkpoint"]}.pt')

model = model.to('cpu')
model.load_state_dict(checkpoint['model_state_dict'])
epoch = checkpoint['epoch']
loss = checkpoint['loss']

model.eval();