In [None]:
import torch
import torch.nn.functional as F
from sklearn.metrics import roc_curve
import numpy as np
import itertools # Для создания пар

# Предположим, что у вас есть код ваших классов
from src.model.siam import HierarchicalSignalNet
from src.dataloader.dataloader import SignalDataset, create_dummy_data

# --- 1. Загрузка обученной модели ---
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
EMBEDDING_DIM = 64 # Должен совпадать с тем, что использовался при обучении
model = HierarchicalSignalNet(embedding_dim=EMBEDDING_DIM)
model.load_state_dict(torch.load("path/to/your/trained_model.pth")) # Укажите путь к вашим весам
model.to(DEVICE)
model.eval() # !!! ОБЯЗАТЕЛЬНО переводим модель в режим оценки

# --- 2. Создание валидационного набора данных ---
# ВАЖНО: Используйте данные, которые не участвовали в обучении!
# Например, можно сгенерировать новый набор или выделить часть изначальных данных.
val_signals, val_element_ids, val_session_ids, val_keypoints = create_dummy_data(
    num_elements=5, sessions_per_element=5, signals_per_session=5, start_el_id=100
)
val_dataset = SignalDataset(val_signals, val_element_ids, val_session_ids, val_keypoints)

print(f"Создан валидационный набор данных из {len(val_dataset)} сигналов.")

### Шаг 2: Вычисление Оптимального Порога
Это самый важный шаг. Мы прогоним все возможные пары сигналов из нашего валидационного набора через модель, вычислим расстояния и найдем порог, который лучше всего разделяет пары "из одного замера" и пары "из разных замеров". Для этого идеально подходит ROC-кривая (Receiver Operating Characteristic).

In [1]:
def find_optimal_threshold(model, dataset, device):
    """
    Вычисляет оптимальный порог расстояния для разделения замеров,
    используя ROC-кривую на валидационном наборе данных.
    """
    model.eval()
    embeddings = []
    session_ids = []

    # Сначала получим эмбеддинги для всех сигналов в наборе
    with torch.no_grad(): # Отключаем расчет градиентов
        for i in range(len(dataset)):
            sample = dataset[i]
            signal_tensor = torch.tensor(sample['signal'], dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(device)
            # unsqueeze(0) добавляет batch и channel размерности
            
            emb, _ = model(signal_tensor)
            embeddings.append(emb.cpu().numpy().flatten())
            session_ids.append(sample['session_id'])

    embeddings = np.array(embeddings)
    session_ids = np.array(session_ids)
    
    distances = []
    labels = [] # 1 - если из одного замера, 0 - если из разных

    # Теперь создаем все возможные пары и вычисляем расстояния
    indices = list(range(len(dataset)))
    for i, j in itertools.combinations(indices, 2):
        # Вычисляем евклидово расстояние
        dist = np.linalg.norm(embeddings[i] - embeddings[j])
        distances.append(dist)
        
        # Определяем, из одного ли замера эта пара
        is_same_session = 1 if session_ids[i] == session_ids[j] else 0
        labels.append(is_same_session)

    distances = np.array(distances)
    labels = np.array(labels)

    # Используем ROC-кривую для поиска порога
    fpr, tpr, thresholds = roc_curve(labels, -distances) # Используем -distance, т.к. roc_curve ожидает "оценки", а не расстояния

    # Находим порог, который дает точку на ROC-кривой, ближайшую к идеальной (0, 1)
    # (минимальная ошибка)
    optimal_idx = np.argmin(np.sqrt(fpr**2 + (1 - tpr)**2))
    optimal_threshold = thresholds[optimal_idx]
    
    # Так как мы передавали -distances, порог тоже будет отрицательным. Берем его по модулю.
    optimal_threshold = -optimal_threshold

    print(f"Найдено {len(distances)} пар для анализа.")
    print(f"Оптимальный порог расстояния: {optimal_threshold:.4f}")
    
    return optimal_threshold

# Вычисляем наш порог
SESSION_THRESHOLD = find_optimal_threshold(model, val_dataset, DEVICE)

NameError: name 'model' is not defined

Примечание: Порог, который вы найдете, должен быть близок к значению margin_session, которое вы задавали при обучении. Это хороший индикатор того, что модель обучилась правильно.

### Шаг 3: Создание Финальной Функции-Детектора
Теперь, когда у нас есть модель и вычисленный порог, мы можем написать простую и понятную функцию для решения нашей задачи.

In [None]:
def get_embedding(signal_np, model, device):
    """Вспомогательная функция для получения эмбеддинга одного сигнала."""
    model.eval()
    with torch.no_grad():
        signal_tensor = torch.tensor(signal_np, dtype=torch.float32).unsqueeze(0).unsqueeze(0).to(device)
        embedding, _ = model(signal_tensor)
        return embedding.cpu()

def are_from_same_session(signal_A, signal_B, model, threshold, device):
    """
    Принимает два сигнала и определяет, принадлежат ли они одному замеру.

    Args:
        signal_A (np.array): Первый временной ряд.
        signal_B (np.array): Второй временной ряд.
        model: Обученная модель.
        threshold (float): Пороговое значение расстояния.
        device: Устройство (cpu/cuda).

    Returns:
        bool: True, если сигналы из одного замера, иначе False.
        float: Вычисленное расстояние.
    """
    # Получаем эмбеддинги для каждого сигнала
    emb_A = get_embedding(signal_A, model, device)
    emb_B = get_embedding(signal_B, model, device)

    # Вычисляем евклидово расстояние
    distance = F.pairwise_distance(emb_A, emb_B).item()

    # Сравниваем с порогом
    is_same = distance < threshold
    
    return is_same, distance

# --- Пример использования ---

# Возьмем два сигнала, которые точно из одного замера
signal_1_data = val_dataset[0]
signal_2_data = val_dataset[1] 
# Предполагаем, что 0 и 1 из одного замера (проверим)
print(f"Проверка пары 1: ID сессии {signal_1_data['session_id']} и {signal_2_data['session_id']}")

is_same, dist = are_from_same_session(
    signal_1_data['signal'], 
    signal_2_data['signal'], 
    model, 
    SESSION_THRESHOLD, 
    DEVICE
)
print(f"Результат: {is_same}. Расстояние: {dist:.4f}\n")


# Теперь возьмем два сигнала, которые точно из разных замеров
signal_3_data = val_dataset[0]
signal_4_data = val_dataset[-1] # Последний сигнал, скорее всего, из другой сессии
print(f"Проверка пары 2: ID сессии {signal_3_data['session_id']} и {signal_4_data['session_id']}")

is_same, dist = are_from_same_session(
    signal_3_data['signal'], 
    signal_4_data['signal'], 
    model, 
    SESSION_THRESHOLD, 
    DEVICE
)
print(f"Результат: {is_same}. Расстояние: {dist:.4f}\n")