In [2]:
import hydra
from omegaconf import DictConfig
import torch
from model import Conv_att_simple_new, ExtractorModel, AlignedExtractorModel, Channel_Alignment
import numpy as np
import pytorch_lightning as pl
from pytorch_lightning.loggers import WandbLogger
import wandb
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
from data.pl_datamodule import EEGDataModule,SEEDVDataModule, FACEDDataModule
import os
import glob
import logging
import torch.nn as nn
import matplotlib.pyplot as plt
import seaborn as sns

def list_npy_files(directory):
    npy_files = [f for f in os.listdir(directory) if f.endswith('.npy')]
    return npy_files

def load_npy_files_to_matrix(directory):
    npy_files = list_npy_files(directory)
    data_list = [np.load(os.path.join(directory, f)) for f in npy_files]
    data_matrix = np.stack(data_list, axis=0)
    return data_matrix

def calculate_covariance_matrices(npy_matrix):
    num_samples = npy_matrix.shape[0]
    num_features = npy_matrix.shape[1]
    cov_matrices = np.zeros((num_samples, num_features, num_features))
    for i in range(num_samples):
        cov_matrices[i] = np.cov(npy_matrix[i], rowvar=True)
    return cov_matrices

def plot_covariance_matrix(cov_matrix, file_path):
    plt.figure(figsize=(10, 8))
    sns.heatmap(cov_matrix, annot=True, fmt=".2f", cmap='viridis')
    plt.xlabel('Feature Index')
    plt.ylabel('Feature Index')
    plt.savefig(file_path)
    plt.show()

@hydra.main(config_path="cfgs", config_name="config_cov", version_base="1.3")
def compute_cen(cfg: DictConfig) -> None:
    pl.seed_everything(cfg.seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    n_folds = 1
    n_per = round(cfg.data.n_subs / n_folds)

    for fold in range(0,n_folds):
        # load data
        print("fold:", fold)
        cp_dir = os.path.join(cfg.log.cp_dir, cfg.data.dataset_name, f'r{cfg.log.run}')
        os.makedirs(cp_dir, exist_ok=True)
        # split data
        if n_folds == 1:
            val_subs = []
        elif fold < n_folds - 1:
            val_subs = np.arange(n_per * fold, n_per * (fold + 1))
        else:
            val_subs = np.arange(n_per * fold, cfg.data.n_subs)            
        train_subs = list(set(np.arange(cfg.data.n_subs)) - set(val_subs))
        if len(val_subs) == 1:
            val_subs = list(val_subs) + train_subs
        print('train_subs:', train_subs)
        print('val_subs:', val_subs)
        
        # load_dir = os.path.join(cfg.data.data_dir,'processed_data')
        # save_dir = os.path.join(cfg.data.data_dir,'sliced_data')
        if cfg.data.dataset_name == 'FACED':
            if cfg.data.n_class == 2:
                n_vids = 24
            elif cfg.data.n_class == 9:
                n_vids = 28
        else:
            n_vids = cfg.data.n_vids
        train_vids = np.arange(n_vids)
        val_vids = np.arange(n_vids)
        #     pass
        dm = EEGDataModule(cfg.data, train_subs, val_subs, train_vids, val_vids,
                           cfg.train.valid_method=='loo', cfg.train.num_workers)
        sliced_dir = os.path.join(dm.sliced_data_dir, 'data')
        print(sliced_dir)
        sample_matrix = load_npy_files_to_matrix(sliced_dir)
        print(sample_matrix.shape)
        cov_matrix = calculate_covariance_matrices(sample_matrix)
        print(cov_matrix.shape)
        idx = 114
        plot_covariance_matrix(cov_matrix[idx])


  from .autonotebook import tqdm as notebook_tqdm


In [None]:
compute_cen