# Imports

In [1]:
from IPython.display import clear_output
import subprocess

subprocess.run(["pip", "freeze"], check=True)
clear_output(wait=False)

In [2]:
import os
import sys

module_path = os.path.abspath(os.path.join(".."))
if module_path not in sys.path:
    sys.path.append(module_path)
import models
import matplotlib.pyplot as plt
import numpy as np
import torch
from fpdf import FPDF
import random
from datetime import datetime
import csv
from omegaconf import OmegaConf
import pyedflib
from scipy import signal, stats

import pandas as pd

from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
import umap
import seaborn as sns

# Functions

In [3]:
# === Функции ===
def edf_extractor(file_name):
    f = pyedflib.EdfReader(file_name)
    n = f.signals_in_file
    signal_labels = f.getSignalLabels()
    sigbufs = np.zeros((n, f.getNSamples()[0]))
    for i in np.arange(n):
        sigbufs[i, :] = f.readSignal(i)
    return sigbufs, signal_labels


def build_model(cfg):
    ckpt_path = cfg.upstream_ckpt
    init_state = init_state = torch.load(ckpt_path, weights_only=False)
    upstream_cfg = init_state["model_cfg"]
    upstream = models.build_model(upstream_cfg)
    return upstream


def load_model_weights(model, states, multi_gpu):
    if multi_gpu:
        model.module.load_weights(states)
    else:
        model.load_weights(states)


def initialize_model(ckpt_path: str):
    """
    Инициализация модели с заданным путем к контрольной точке (checkpoint).

    Args:
        ckpt_path (str): Путь к файлу контрольной точки.

    Returns:
        model: Инициализированная модель.
    """

    # Создание конфигурации и загрузка модели
    cfg = OmegaConf.create({"upstream_ckpt": ckpt_path})
    model = build_model(cfg)
    model.to("cuda")

    # Загрузка весов модели
    init_state = torch.load(ckpt_path, weights_only=False)
    load_model_weights(model, init_state["model"], False)

    return model


def create_save_dir(base_path: str) -> str:
    """Создаёт директорию с именем, основанным на текущей дате и времени."""
    timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    save_dir = os.path.join(base_path, timestamp)
    os.makedirs(save_dir, exist_ok=True)
    return save_dir


def read_intervals_from_csv(file_path: str):
    """Читает интервалы из CSV-файла."""
    intervals = []
    activities = []
    with open(file_path, mode="r", encoding="utf-8") as csv_file:
        csv_reader = csv.reader(csv_file)
        for row in csv_reader:
            activities.append(row[0])
            intervals.append((float(row[1]), float(row[2])))
    return activities, intervals


def extract_intervals(original_file_path):
    """
    Преобразует оригинальный TSV файл в переменные для дальнейшей работы: activities, intervals и activity_type.

    Args:
        original_file_path (str): Путь к оригинальному TSV файлу.

    Returns:
        tuple: Три списка - activities, intervals и activity_type.
    """
    import pandas as pd

    # Чтение оригинального TSV файла
    df = pd.read_csv(original_file_path, sep="\t")

    # Извлечение необходимых столбцов
    required_columns = [
        "onset",
        "duration",
        "item_name",
        "trial_type",
        "test",
        "answer",
    ]
    if not all(col in df.columns for col in required_columns):
        raise ValueError(
            f"Оригинальный файл должен содержать столбцы: {required_columns}"
        )

    # Формирование интервалов, где конец интервала это onset следующего элемента
    onset_list = df["onset"].tolist() + [0]  # Добавляем 0 в конец
    intervals = [(onset_list[i], onset_list[i + 1]) for i in range(len(onset_list) - 1)]

    # Формирование activity_type
    activity_type = df["trial_type"].tolist()

    # Формирование activities с условием для PROB
    activities = [
        f"{test} = {answer}" if trial_type == "PROB" else item_name
        for test, answer, trial_type, item_name in zip(
            df["test"], df["answer"], df["trial_type"], df["item_name"]
        )
    ]

    return activities, intervals, activity_type


def get_model_outputs(stfts, model):
    # Форматирование тензоров и инференс модели
    inputs = torch.FloatTensor(np.stack(stfts)).transpose(1, 2).to("cuda")
    mask = torch.zeros((inputs.shape[:2])).bool().to("cuda")

    with torch.no_grad():
        out = model.forward(inputs, mask, intermediate_rep=True)
    return out

# Main launchers

In [4]:
class STFTProcessor:
    @staticmethod
    def baseline(data):
        # Пример реализации baseline (замените на свою, если требуется)
        return data - np.mean(data, axis=-1, keepdims=True)

    def __init__(self, fs, window_size, overlap, normalizing=None):
        """
        Инициализация процессора STFT.

        Параметры:
          fs : частота дискретизации.
          window_size : размер окна для STFT.
          overlap : количество отсчетов перекрытия.
          normalizing : тип нормализации ('zscore', 'baselined', 'db' или None).
        """
        self.fs = fs
        self.window_size = window_size
        self.overlap = overlap
        self.normalizing = normalizing
        # Эффективная длина – число отсчетов активной части, которые берутся из сигнала
        self.effective_length = window_size - overlap
        # Длина левого паддинга, чтобы итоговая длина стала window_size
        self.left_padding = self.window_size - self.effective_length
        # Порог для решения о дополнении: если длина интервала меньше threshold, то интервал отбрасывается и объединяется с последующим.
        self.threshold = self.effective_length // 2  # например, 50 -> 25

    def _padded_signal(self, segment):
        """
        Подготавливает активный участок:
          - Берутся первые effective_length отсчетов из segment (если их больше, иначе весь segment).
          - Слева добавляются нули для формирования полного окна.
        """
        padded_signal = np.pad(segment, (self.left_padding, 0), mode="constant")
        return padded_signal

    def _compute_stft(self, x):
        """
        Вычисляет STFT для подготовленного сигнала x с заданными параметрами.
        Применяется нормализация, если она указана.
        Возвращает:
          f : ось частот,
          t : временную ось,
          Zxx : матрицу STFT.
        """
        f, t, Zxx = signal.stft(
            x, self.fs, nperseg=self.window_size, noverlap=self.overlap
        )
        Zxx = np.abs(Zxx)
        clip = 5  # для устранения граничных эффектов
        if self.normalizing == "zscore":
            Zxx = Zxx[:, clip:-clip]
            Zxx = stats.zscore(Zxx, axis=-1)
            t = t[clip:-clip]
        elif self.normalizing == "baselined":
            Zxx = Zxx[:, clip:-clip]
            Zxx = self.baseline(Zxx)
            t = t[clip:-clip]
        elif self.normalizing == "db":
            Zxx = Zxx[:, clip:-clip]
            Zxx = np.log2(Zxx)
            t = t[clip:-clip]
        return f, t, Zxx

    def process_interval(self, segment, previous_t_offset=0):
        """
        Обрабатывает один активный интервал (зону активности).
          - Подготавливает сегмент (дополняя слева нулями до длины окна).
          - Вычисляет STFT.
          - Смещает временную ось на previous_t_offset.
          - Возвращает STFT, оси частот и времени, а также последнее значение времени.
        """
        padded_signal = self._prepare_signal(segment)
        f, t, Zxx = self._compute_stft(padded_signal)
        t = t + previous_t_offset
        last_t = t[-1] if len(t) > 0 else previous_t_offset
        return Zxx, f, t, last_t

    def process_trial(self, trial_signal, intervals, trial_start_time=None):
        """
        Обрабатывает весь трайал с учетом заданных интервалов активности и реализует логику объединения интервалов.

        Параметры:
          trial_signal : 1D массив, сигнал трайала, обрезанный по границам [trial_start, trial_end].
          intervals : список кортежей вида (label, (start, end)), где start и end заданы в секундах.
          trial_start_time : если задан, интервалы переводятся в относительные (относительно начала trial_signal).

        Логика объединения:
          - Интервалы переводятся в отсчеты.
          - Если длина интервала меньше effective_length:
               • если длина < threshold, то интервал отбрасывается из рассмотрения,
                 а левая граница следующего интервала сдвигается влево (принимая за начало текущего).
               • если длина между threshold и effective_length, то недостающая часть дополняется за счет начала следующего интервала,
                 а левая граница следующего интервала корректируется.
        Возвращает список кортежей: (label, Zxx, f, t, last_t) для каждого обработанного интервала.
        """
        results = []
        previous_t_offset = 0

        # Переводим интервалы из секунд в отсчеты
        intervals_samples = []
        for label, (start_sec, end_sec) in intervals:
            start_idx = int(start_sec * self.fs)
            end_idx = int(end_sec * self.fs)
            intervals_samples.append(
                {"label": label, "start": start_idx, "end": end_idx}
            )

        i = 0
        while i < len(intervals_samples):
            current = intervals_samples[i]
            current_length = current["end"] - current["start"]
            modulo = current_length % self.effective_length
            division = current_length // self.effective_length

            if modulo > self.threshold:
                # Интервал имеет длину между threshold и effective_length – пытаемся дополнить его за счет следующего интервала
                needed = self.effective_length - current_length
                next_interval = intervals_samples[i + 1]
                available = next_interval["end"] - next_interval["start"]
                if available >= needed:
                    # Дополняем сегмент:
                    segment_current = trial_signal[current["start"] : current["end"]]
                    segment_next = trial_signal[
                        next_interval["start"] : next_interval["start"] + needed
                    ]
                    segment = np.concatenate([segment_current, segment_next])
                    # Сдвигаем левую границу следующего интервала
                    next_interval["start"] += needed
                    Zxx, f, t, last_t = self.process_interval(
                        segment, previous_t_offset
                    )
                    results.append((current["label"], Zxx, f, t, last_t))
                    previous_t_offset = last_t
                else:
                    # Если следующего интервала недостаточно, объединяем текущий с следующим:
                    next_interval["start"] = current["start"]
            elif modulo > 0:
                # Если интервал слишком короткий (< threshold), отбрасываем его и объединяем с следующим
                # (если следующий есть, иначе пропустить операцию):
                if i + 1 < len(intervals_samples):
                    # Сдвигаем левую границу следующего интервала влево до начала текущего
                    intervals_samples[i + 1]["start"] = (
                        intervals_samples[i + 1]["start"] - modulo
                    )
                    current["end"] = intervals_samples[i + 1]["start"] - modulo - 1

            # Если интервал достаточной длины, берем первые effective_length отсчетов
            segment = trial_signal[
                current["start"] : current["start"] + self.effective_length
            ]
            Zxx, f, t, last_t = self.process_interval(segment, previous_t_offset)
            results.append((current["label"], Zxx, f, t, last_t))
            previous_t_offset = last_t
            i += 1

        return results

In [5]:
# === Параметры ===
# Пути к данным и моделям
ckpt_path = "/trinity/home/asma.benachour/BERT_init_weights/stft_large_pretrained.pth"
path_to = "/trinity/home/asma.benachour/Haydn Free recall/sub-R1001P/ses-0/ieeg/"
# csv_file_path = "/beegfs/home/g.soghoyan/George/BERT-intervals/first_intervalR1001P.csv"
tsv_file_path = "/trinity/home/asma.benachour/Haydn Free recall/sub-R1001P/ses-0/ieeg/sub-R1001P_ses-0_task-FR1_events.tsv"
pdf_output_dir = "/trinity/home/asma.benachour/PDF/"

os.makedirs(pdf_output_dir, exist_ok=True)

In [6]:
# Инициализация модели
model = initialize_model(ckpt_path)



In [7]:
# Создание директории для сохранения
save_dir = create_save_dir(pdf_output_dir)

In [8]:
path_to = "/trinity/home/asma.benachour/Haydn Free recall/sub-R1001P/ses-0/ieeg/"
left_suff = "sub-R1001P_ses-0_task-FR1_acq-"
right_suff = "_ieeg.edf"
signal_types = ["monopolar", "bipolar"]
file_name = {}
recording = {}
signal_labels = {}
for s_type in signal_types:
    file_name[s_type] = f"{path_to}{left_suff}{s_type}{right_suff}"
    recording[s_type], signal_labels[s_type] = edf_extractor(file_name[s_type])
SamplingFrequency = 500
chosen_rec = "bipolar"
framecap = 50000

In [9]:
def universal_label_and_merge(group, search_col, mapping, default_label="OTHER"):
    """
    Универсальная функция для разметки и объединения интервалов в рамках одного трайала.

    Аргументы:
        group (pd.DataFrame): Датафрейм с событиями одного трайала.
                              Обязательно должен содержать столбцы "onset" и "duration".
        search_col (str): Имя столбца, по которому производится сопоставление (например, "trial_type").
        mapping (dict): Словарь вида {new_label (str): set(исходных меток)}.
        default_label (str): Ярлык для событий, не попавших ни в одну группу из mapping.

    Возвращает:
        pd.DataFrame с колонками ["label", "start", "end"].
    """
    # 1. Сортировка по времени
    group_sorted = (
        group.loc[:, ["onset", "duration", search_col]]
        .sort_values("onset")
        .reset_index(drop=True)
    )

    # 2. Корректировка столбца "duration"
    ind = group_sorted.columns.get_loc("duration")
    dur = group_sorted.iloc[-1, ind]
    group_sorted["duration"] = (
        group_sorted["onset"].diff(periods=-1).apply(lambda x: x * -1)
    )
    group_sorted.iloc[-1, ind] = dur

    # 3. Присваиваем новый ярлык каждому событию
    reverse_mapping = {
        orig_label: new_label
        for new_label, orig_labels in mapping.items()
        for orig_label in orig_labels
    }
    group_sorted["label"] = (
        group_sorted[search_col].map(reverse_mapping).fillna(default_label)
    )

    # 4. Вычисление конечных точек (start, end)
    group_sorted["end"] = group_sorted["onset"] + group_sorted["duration"]

    # 5. Группировка подряд идущих событий с одинаковым `label`
    group_sorted["group"] = (
        group_sorted["label"] != group_sorted["label"].shift()
    ).cumsum()

    merged_df = group_sorted.groupby("group", as_index=False).agg(
        label=("label", "first"), start=("onset", "first"), end=("end", "last")
    )

    return merged_df[["label", "start", "end"]]

In [10]:
# 1. Загрузка поведенческого документа.
# Предполагается, что файл имеет разделитель табуляция, если другой – измените sep.
behavior_df = pd.read_csv(tsv_file_path, sep="\t")
df = behavior_df
# 2. Фильтрация строк с маркером начала трайала.
# Здесь предполагается, что начало трайала отмечено в столбце "trial_type" значением "TRIAL".
trial_df = behavior_df[behavior_df["trial_type"] == "TRIAL"].reset_index(drop=True)

# 3. Исключаем первый трайал, если требуется (например, он может быть служебным).
trial_df = trial_df.iloc[1:]  # Если нужно исключить первый трайал

# 4. Создаем интервалы: для каждой строки берем время начала текущего трайала и время начала следующего.
trial_df["next_onset"] = trial_df["onset"].shift(-1)
# Убираем последнюю строку, у которой нет следующего onset (т.е. не образует интервал)
trial_df = trial_df.dropna(subset=["next_onset"])

# Преобразуем интервалы в список пар (start, end)
trials = trial_df[["onset", "next_onset"]].values.tolist()
print("Извлечённые интервалы трайалов:", trials)

Извлечённые интервалы трайалов: [[308.201, 407.853], [407.853, 507.522], [507.522, 615.209], [615.209, 711.377], [711.377, 1071.861], [1071.861, 1184.266], [1184.266, 1308.922], [1308.922, 1420.81], [1420.81, 1533.248], [1533.248, 1646.853], [1646.853, 1759.907], [1759.907, 1873.912], [1873.912, 1974.314], [1974.314, 2079.917], [2079.917, 2569.459], [2569.459, 2669.645], [2669.645, 2774.398], [2774.398, 2878.034], [2878.034, 2997.34], [2997.34, 3113.845], [3113.845, 3221.466], [3221.466, 3327.852], [3327.852, 3433.139]]


In [11]:
df_trials = df[df["list"] > 0].copy()
df_trials.sort_values("onset", inplace=True)

results = []
# Пример словаря: для COUNTDOWN объединяем "COUNTDOWN_START" и "TRIAL", для WORD – "WORD", для PROB – "PROB", для REC_WORD – "REC_WORD"
mapping = {
    "COUNTDOWN": {"COUNTDOWN_START", "TRIAL", "COUNTDOWN_STOP", "COUNTDOWN_END"},
    "WORD": {"WORD"},
    "PROB": {"PROB"},
    "REC_WORD": {"REC_WORD"},
}

for trial_id, group in df_trials.groupby("list"):
    intervals = universal_label_and_merge(
        group, search_col="trial_type", mapping=mapping, default_label="OTHER"
    )
    results.append(intervals)

In [164]:
results

[       label    start      end
 0  COUNTDOWN  200.214  210.649
 1      OTHER  210.649  213.233
 2       WORD  213.233  242.522
 3      OTHER  242.522  242.539
 4       PROB  242.539  268.729
 5      OTHER  268.729  308.201,
        label    start      end
 0  COUNTDOWN  308.201  318.636
 1      OTHER  318.636  321.087
 2       WORD  321.087  350.376
 3      OTHER  350.376  350.392
 4       PROB  350.392  373.802
 5      OTHER  373.802  379.302
 6   REC_WORD  379.302  383.901
 7      OTHER  383.901  397.586
 8   REC_WORD  397.586  405.042
 9      OTHER  405.042  407.853,
        label    start      end
 0  COUNTDOWN  407.853  418.288
 1      OTHER  418.288  420.906
 2       WORD  420.906  450.411
 3      OTHER  450.411  450.428
 4       PROB  450.428  470.562
 5      OTHER  470.562  473.550
 6   REC_WORD  473.550  486.268
 7      OTHER  486.268  488.046
 8   REC_WORD  488.046  501.643
 9      OTHER  501.643  507.522,
        label    start      end
 0  COUNTDOWN  507.522  517.957
 1   

In [224]:
import numpy as np
import pandas as pd
from scipy import signal, stats
from typing import List, Tuple, Dict, Optional


class STFTProcessor:
    """
    Класс для обработки сигналов с использованием STFT (Кратковременное преобразование Фурье).

    Функционал:
    - Преобразует интервалы из секунд в дискретные отсчёты.
    - Корректирует интервалы, устраняя артефакты округления.
    - Объединяет или дополняет короткие интервалы.
    - Выполняет STFT на обработанных интервалах с возможностью нормализации.
    """

    @staticmethod
    def baseline(data: np.ndarray) -> np.ndarray:
        """Вычисляет базовую линию, вычитая среднее значение по оси."""
        return data - np.mean(data, axis=-1, keepdims=True)

    def __init__(
        self,
        fs: int,
        window_size: int,
        overlap: int,
        normalizing: Optional[str] = None,
        debug: bool = False,
    ):
        """
        Инициализация класса STFTProcessor.

        :param fs: Частота дискретизации (Гц)
        :param window_size: Размер окна для STFT
        :param overlap: Количество перекрывающихся отсчётов
        :param normalizing: Способ нормализации ('zscore', 'baselined', 'db' или None)
        :param debug: Флаг для вывода отладочной информации
        """
        self.fs = fs
        self.window_size = window_size
        self.overlap = overlap
        self.normalizing = normalizing
        self.debug = debug
        self.effective_length = window_size - overlap
        self.left_padding = window_size - self.effective_length
        self.threshold = self.effective_length // 2
        if self.debug:
            print(
                "Initialized STFTProcessor with fs:",
                fs,
                "window_size:",
                window_size,
                "overlap:",
                overlap,
                "effective_length:",
                self.effective_length,
            )

    def _convert_intervals_to_samples(
        self, intervals_df: pd.DataFrame
    ) -> List[Dict[str, int]]:
        """
        Преобразует интервалы из секунд в отсчёты.

        :param intervals_df: DataFrame с колонками ["label", "start", "end"] в секундах
        :return: Список интервалов в отсчётах
        """
        intervals_df = intervals_df.copy()
        # Преобразование секунд в отсчёты
        intervals_df["start"] = (intervals_df["start"] * self.fs).round().astype(int)
        intervals_df["end"] = (intervals_df["end"] * self.fs).round().astype(
            int
        ) - 1  # Вычитание 1 для соответствия индексам

        if self.debug:
            print("Converted intervals to samples:")
            print(intervals_df)

        return intervals_df.to_dict(orient="records")

    def _merge_or_extend_intervals(
        self, intervals_samples: List[Dict[str, int]]
    ) -> List[Dict[str, int]]:
        """
        Объединяет или дополняет короткие интервалы, чтобы избежать пропусков.

        :param intervals_samples: Список интервалов в отсчётах
        :return: Обновленный список интервалов
        """
        i = 0
        processed_intervals = []
        if self.debug:
            print("Original intervals_samples:", intervals_samples)
        while i < len(intervals_samples):
            current = intervals_samples[i]
            current_length = current["end"] - current["start"]
            modulo = current_length % self.effective_length
            division = current_length // self.effective_length
            if self.debug:
                print(
                    f"Processing interval {i}: start={current['start']}, end={current['end']}, "
                    f"length={current_length}, modulo={modulo}, division={division}"
                )

            if modulo > self.threshold:
                if i + 1 < len(intervals_samples):
                    next_interval = intervals_samples[i + 1]
                    needed = self.effective_length - modulo
                    available = next_interval["end"] - next_interval["start"]
                    if self.debug:
                        print(
                            f"Interval {i} modulo > threshold, needed={needed}, next interval available={available}"
                        )
                    if available >= needed:
                        next_interval["start"] += needed + 1
                        current["end"] += needed
                        if self.debug:
                            print(
                                f"Extended interval {i} by {needed} samples, new end: {current['end']}, "
                                f"next interval new start: {next_interval['start']}"
                            )
                    else:
                        combined_end = intervals_samples.pop(i + 1)["end"]
                        current["end"] = combined_end
                        if self.debug:
                            print(
                                f"Merged interval {i} with next interval, new end: {current['end']}"
                            )
                        continue
                processed_intervals.append(current)
            elif modulo > 0 and division > 0:
                if i + 1 < len(intervals_samples):
                    next_interval = intervals_samples[i + 1]
                    next_interval["start"] -= modulo
                    current["end"] = next_interval["start"] - 1
                    if self.debug:
                        print(
                            f"Adjusted interval {i} with modulo {modulo}, new end: {current['end']}, "
                            f"next interval new start: {next_interval['start']}"
                        )
                processed_intervals.append(current)
            else:
                if self.debug:
                    print(f"Discarding interval {i} as it has no valid length")
                intervals_samples.pop(i)
                continue
            i += 1
        if self.debug:
            print("Processed intervals_samples:", processed_intervals)
        return processed_intervals

    def process_trial(
        self, trial_signal: np.ndarray, intervals: pd.DataFrame
    ) -> List[Tuple[str, np.ndarray, np.ndarray, np.ndarray, float]]:
        """
        Обрабатывает трайал, вычисляя STFT для заданных интервалов,
        и возвращает единую структуру с матрицей STFT и соответствующими метками.

        Возвращает словарь с ключами:
        - 't': 1D numpy массив временных меток для всех бинов
        - 'f': 1D numpy массив частотной сетки
        - 'Zxx': 2D numpy массив STFT-коэффициентов (shape = (n_f, n_time_bins))
        - 'labels': 1D numpy массив меток активности для каждого временного бина
        """
        t_all = []
        labels_all = []
        Zxx_list = []
        f_global = None

        # Преобразуем интервалы и объединяем их
        intervals_samples = self._convert_intervals_to_samples(intervals)
        intervals_samples = self._merge_or_extend_intervals(intervals_samples)

        for interval in intervals_samples:
            segment = trial_signal[interval["start"] : interval["end"]]
            t_offset = interval["start"] / self.fs
            Zxx, f, t, _ = self.process_interval(segment, t_offset)
            if f_global is None:
                f_global = (
                    f  # частотная сетка должна быть одинаковой во всех интервалах
                )
            t_all.append(t)
            labels_all.extend([interval["label"]] * len(t))
            Zxx_list.append(Zxx)  # Zxx имеет форму (n_f, n_t_interval)

        # Объединяем данные по времени
        t_all = np.concatenate(t_all)  # shape = (n_total_time_bins,)
        Zxx_all = np.hstack(Zxx_list)  # shape = (n_f, n_total_time_bins)
        labels_all = np.array(labels_all)

        return {"t": t_all, "f": f_global, "Zxx": Zxx_all, "labels": labels_all}

    def process_interval(
        self, segment: np.ndarray, t_offset: float = 0
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, float]:
        """
        Обрабатывает отдельный интервал сигнала.

        :param segment: Сегмент сигнала
        :param t_offset: Абсолютное смещение времени (в секундах)
        :return: (Zxx, f, t, последнее значение t)
        """
        if self.debug:
            print(
                f"Processing segment of length {len(segment)} with t_offset={t_offset:.4f}"
            )

        padded_signal = self._prepare_signal(segment)

        f, t, Zxx = self._compute_stft(padded_signal)

        # Используем абсолютное смещение вместо накопительного previous_t_offset
        t += t_offset

        last_t = t[-1] if len(t) > 0 else t_offset
        if self.debug:
            print(
                f"Interval processed: new t[0]={t[0] if len(t) > 0 else 'N/A'}, last_t={last_t:.4f}"
            )

        return Zxx, f, t, last_t

    def _prepare_signal(self, segment: np.ndarray) -> np.ndarray:
        """
        Дополняет сигнал нулями для корректного разбиения окна.

        :param segment: Одномерный массив сигнала
        :return: Дополненный сигнал
        """
        if self.debug:
            print("Preparing signal segment, original length:", len(segment))
        padded_signal = np.pad(segment, (self.left_padding, 0), mode="constant")
        if self.debug:
            print("Signal segment length after padding:", len(padded_signal))
        return padded_signal

    def _compute_stft(self, x: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """
        Вычисляет STFT и применяет нормализацию.

        :param x: Одномерный массив сигнала
        :return: Частотная ось, временная ось, STFT-коэффициенты
        """
        f, t, Zxx = signal.stft(
            x,
            self.fs,
            nperseg=self.window_size,
            noverlap=self.overlap,
            boundary=None,
            padded=True,
        )
        if self.debug:
            print(
                "STFT raw output: len(f) =",
                len(f),
                "len(t) =",
                len(t),
                "Zxx shape =",
                Zxx.shape,
            )
        Zxx = np.abs(Zxx)
        clip = 5
        if self.normalizing == "zscore":
            Zxx = Zxx[:, clip:-clip]
            Zxx = stats.zscore(Zxx, axis=-1)
            t = t[clip:-clip]
        elif self.normalizing == "baselined":
            Zxx = Zxx[:, clip:-clip]
            Zxx = self.baseline(Zxx)
            t = t[clip:-clip]
        elif self.normalizing == "db":
            Zxx = Zxx[:, clip:-clip]
            Zxx = np.log2(Zxx)
            t = t[clip:-clip]
        if self.debug:
            print("STFT after normalization: len(t) =", len(t))
        return f, t, Zxx

In [1]:
SFTP = STFTProcessor(fs=500, window_size=150, overlap=50, debug=False)

NameError: name 'STFTProcessor' is not defined

In [147]:
# recording[референс][канал][round(trials[номер][начало]*частота): round(trials[номер][конец]*частота) - 1].shape
trial_lft_bnd = round(trials[0][0] * 500)
trial_rgt_bnd = round(trials[0][-1] * 500) - 1
channel = 0
recording[chosen_rec][channel][trial_lft_bnd:trial_rgt_bnd].shape
# signal_labels["bipolar"]

(49825,)

In [186]:
trial_signal = recording[chosen_rec][channel]
intervals = results[15]

In [187]:
intervals

Unnamed: 0,label,start,end
0,COUNTDOWN,2079.917,2090.353
1,OTHER,2090.353,2092.87
2,WORD,2092.87,2122.442
3,OTHER,2122.442,2122.459
4,PROB,2122.459,2142.45
5,OTHER,2142.45,2145.376
6,REC_WORD,2145.376,2161.231
7,OTHER,2161.231,2569.459


In [188]:
intervals.iloc[-1, 2] - intervals.iloc[0, 1]

489.5419999999999

In [227]:
sftp_trial = SFTP.process_trial(trial_signal, intervals)

In [231]:
sftp_trial["t"].shape

(2448,)

In [232]:
sftp_trial["f"].shape

(76,)

In [233]:
sftp_trial["labels"].shape

(2448,)

In [230]:
sftp_trial["Zxx"].shape

(76, 2448)

In [157]:
sftp_trial[-1][-1]

2569.534

In [158]:
(sftp_trial[-1][-1] * 500 - 10 * 100) / 500

2567.534

In [172]:
itr.iloc[-1, 2]

3575.032

In [171]:
itr.iloc[0, 1]

3433.138

In [208]:
summ = 0.0
for i, itr in enumerate(results):
    sftp_trial = SFTP.process_trial(trial_signal, itr)
    # difference = itr.iloc[-1,2] - itr.iloc[0,1] - sftp_trial[-1][-1]
    difference = itr.iloc[-1, 2] - sftp_trial[-1][-1]
    summ += difference
    print(f"{i},{difference}")
print(
    f"итого: {summ} из {results[-1].iloc[-1,2] - results[0].iloc[0,1]} секунд исходной записи."
)

Converted intervals to samples:
       label   start     end
0  COUNTDOWN  100107  105323
1      OTHER  105324  106615
2       WORD  106616  121260
3      OTHER  121261  121269
4       PROB  121270  134363
5      OTHER  134364  154099
Original intervals_samples: [{'label': 'COUNTDOWN', 'start': 100107, 'end': 105323}, {'label': 'OTHER', 'start': 105324, 'end': 106615}, {'label': 'WORD', 'start': 106616, 'end': 121260}, {'label': 'OTHER', 'start': 121261, 'end': 121269}, {'label': 'PROB', 'start': 121270, 'end': 134363}, {'label': 'OTHER', 'start': 134364, 'end': 154099}]
Processing interval 0: start=100107, end=105323, length=5216, modulo=16, division=52
Adjusted interval 0 with modulo 16, new end: 105307, next interval new start: 105308
Processing interval 1: start=105308, end=106615, length=1307, modulo=7, division=13
Adjusted interval 1 with modulo 7, new end: 106608, next interval new start: 106609
Processing interval 2: start=106609, end=121260, length=14651, modulo=51, division=1

In [18]:
# # Чтение интервалов
# # activities, intervals = read_intervals_from_csv(csv_file_path)
# activities, intervals, activity_type = extract_intervals(tsv_file_path)
# print(f"Размерность intervals:{len(intervals)} на {len(intervals[0])}; Размерность activity_type:{len(activity_type)} на 1; Размерность activities:{len(activities)} на 1")

# # 6. Выбор одного случайного электродного канала (один рандомный электрод)
# num_electrodes = 1
# num_channels = random.sample(range(72), num_electrodes)

In [19]:
# # 5. Фильтрация интервалов по длительности (минимум 400 сэмплов)
# filtered_idx = []
# for i, d in enumerate(intervals):
#     if (d[1] - d[0]) * SamplingFrequency >= 400:
#         filtered_idx.append(i)
# threshold_intervals = [intervals[i] for i in filtered_idx]
# threshold_activities = [activities[i] for i in filtered_idx]
# threshold_activity_type = [activity_type[i] for i in filtered_idx]

In [20]:
# all_models = []
# all_intervals = []
# for trial_idx, (start, end) in enumerate(trials):
#     try:
#         filtered_int_index = []
#         new_intervals = []
#         for i, intr in enumerate(intervals):
#             if return_state(intr[0], intr[1], start, end) == "11":
#                 filtered_int_index.append(i)
#                 new_intervals.append((intr[0], intr[1]))
#             elif return_state(intr[0], intr[1], start, end) == "10":
#                 filtered_int_index.append(i)
#                 new_intervals.append((intr[0], end))
#             elif return_state(intr[0], intr[1], start, end) == "01":
#                 filtered_int_index.append(i)
#                 new_intervals.append((start, intr[1]))

#         # Теперь filtered_digit содержит только те позиции, которые удовлетворяют условию
#         fit_intervals = new_intervals
#         fit_activities = [activities[i] for i in filtered_int_index]
#         fit_activity_type = [activity_type[i] for i in filtered_int_index]
#         stft_data_array, f, t = generate_stft_data(recording, chosen_rec, (start, end), SamplingFrequency)
#         with torch.no_grad():
#             model_outputs = get_model_outputs(stft_data_array, model)
#             all_models.append(model_outputs[num_channels[0]].cpu().numpy())
#             torch.cuda.empty_cache()
#         all_intervals.append(sovu_na_globus(fit_intervals, t, start, fit_activity_type))
#     except RuntimeError as e:
#         print(f'| WARNING: ran out of memory, retrying batch \n {e}')
#         torch.cuda.empty_cache()

In [21]:
# pca = PCA(n_components=2).fit(all_models[0])
# fig, ax = plt.subplots(1, 1, figsize=(12, 12))
# plt.subplots_adjust(right=0.90)  # Оставляем 15% ширины для легенды

# # Цвета для activity_type
# unique_activity_types = list(set(activity_type))
# colors = sns.husl_palette(len(unique_activity_types))
# activity_color_map = {atype: color for atype, color in zip(unique_activity_types, colors)}


# for averaged_data, activity_t in zip(all_models, all_intervals):
#     proj = pca.transform(averaged_data)
#     col = [activity_color_map[atype] for atype in activity_t]
#     ax.minorticks_on()
#     ax.grid(True, which='minor', linestyle=':', linewidth=0.9, color='gray', alpha=0.5)
#     ax.grid(True, which='major', linestyle='-', linewidth=0.5, color='gray', alpha=0.7)
#     ax.scatter(proj[:, 0], proj[:, 1], color=col, s=10, edgecolor="k", alpha = 0.5)


# # Создаем легенду с элементами для всех типов активности
# handles = [
#     plt.Line2D([0], [0], marker='o', color='w', label=atype,
#                 markerfacecolor=color, markersize=10)
#     for atype, color in activity_color_map.items()
# ]

# # Проверяем, что handles не пустой
# if handles:
#     fig.legend(
#         handles=handles,
#         title="Activity Type",
#         loc="center left",             # Привязываем к левому краю области для легенды
#         bbox_to_anchor=(0.90, 0.5),     # Центрируем вертикально
#         borderaxespad=0.0,             # Убираем лишние отступы
#         frameon=False                  # Убираем рамку вокруг легенды
#     )

# plt.show()

In [22]:
# pca = PCA(n_components=2).fit(all_models[0])
# fig, axs = plt.subplots(5, 5, figsize=(12, 12))
# plt.subplots_adjust(right=0.90)  # Оставляем 15% ширины для легенды

# # Цвета для activity_type
# unique_activity_types = list(set(activity_type))
# colors = sns.husl_palette(len(unique_activity_types))
# activity_color_map = {atype: color for atype, color in zip(unique_activity_types, colors)}

# act_alignment = {}

# for ax, atype in zip(axs.flatten(), unique_activity_types):
#     ax.set_title(atype)
#     ax.set_xlim(-23, 20)
#     ax.set_ylim(-22, 19)

#     ax.minorticks_on()
#     ax.grid(True, which='minor', linestyle=':', linewidth=0.9, color='gray', alpha=0.5)
#     ax.grid(True, which='major', linestyle='-', linewidth=0.5, color='gray', alpha=0.7)

#     ax.spines['top'].set_visible(False)
#     ax.spines['right'].set_visible(False)
#     ax.spines['bottom'].set_visible(False)
#     ax.spines['left'].set_visible(False)

#     ax.tick_params(axis='both', which='both', length=0)
#     ax.set_xticklabels([])
#     ax.set_yticklabels([])

#     act_alignment[atype] = ax

# points_by_activity = {atype: [] for atype in unique_activity_types}
# for averaged_data, activity_t in zip(all_models, all_intervals):
#     proj = pca.transform(averaged_data)
#     for point, atype in zip(proj, activity_t):
#         points_by_activity[atype].append(point)

# for atype, ax in act_alignment.items():
#     pts = np.array(points_by_activity[atype])
#     if pts.size > 0:
#         ax.scatter(pts[:, 0], pts[:, 1], s=10, color=activity_color_map[atype],
#                    edgecolor="k", alpha=0.25)

# # Создаем легенду с элементами для всех типов активности
# handles = [
#     plt.Line2D([0], [0], marker='o', color='w', label=atype,
#                 markerfacecolor=color, markersize=10)
#     for atype, color in activity_color_map.items()
# ]

# # Проверяем, что handles не пустой
# if handles:
#     fig.legend(
#         handles=handles,
#         title="Activity Type",
#         loc="center left",             # Привязываем к левому краю области для легенды
#         bbox_to_anchor=(0.90, 0.5),     # Центрируем вертикально
#         borderaxespad=0.0,             # Убираем лишние отступы
#         frameon=False                  # Убираем рамку вокруг легенды
#     )

# plt.show()