In [1]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

import numpy as np

In [31]:
# nY= 2
# nX= 2
# Y[0]= 1
# Y[1]= 0.25
# array of X[0] with 400 elements
#     8.193433843E-8
# array of X[1] with 60 elements
# 3.685414961E-43
# 0.07688365132
import os

def parse_directory(directory_path):
    all_samples = []
    
    # Проверяем, что директория существует
    if not os.path.isdir(directory_path):
        raise ValueError(f"Directory {directory_path} does not exist")
    
    # Проходим по всем файлам в директории
    for filename in os.listdir(directory_path):
        file_path = os.path.join(directory_path, filename)
        
        # Пропускаем поддиректории и не-файлы
        if not os.path.isfile(file_path):
            continue
            
        # Парсим каждый файл и добавляем образцы в общий массив
        try:
            file_samples = parse_data_file(file_path)
            all_samples.extend(file_samples)
        except Exception as e:
            print(f"Error parsing file {filename}: {str(e)}")
            continue
    
    return all_samples

def parse_data_file(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()

    # Извлекаем количество образцов A из первой строки
    try:
        first_line = lines[1].strip()
        A = int(first_line.split('with ')[1].split(' records')[0])
    except:
        print("test data")
    samples = []
    current_sample = None
    current_x_key = None
    collecting_x_data = False
    x_data_buffer = []

    for line in lines:
        line = line.strip()
        
        # Начало нового образца
        if "*******************new*record*******************" in line:
            if current_sample is not None:
                # Завершаем предыдущий образец, если есть
                if current_x_key is not None and x_data_buffer:
                    current_sample[current_x_key] = x_data_buffer
                samples.append(current_sample)
            
            current_sample = {"Yi": []}
            current_x_key = None
            collecting_x_data = False
            x_data_buffer = []
            continue
        
        # Обработка nY (количество целевых переменных)
        if line.startswith("nY="):
            nY = int(line.split('=')[1].strip())
            # Это значение уже должно соответствовать количеству Y-переменных
            continue
        
        # Обработка строк с Y-переменными
        if line.startswith("Y"):
            parts = line.split('=')
            if len(parts) == 2:
                try:
                    y_value = float(parts[1].strip())
                    current_sample["Yi"].append(y_value)
                except ValueError:
                    pass
            continue
        
        # Обработка nX (количество экспериментов)
        if line.startswith("nX="):
            nX = int(line.split('=')[1].strip())
            # Инициализируем ключи для экспериментов
            for i in range(nX):
                current_sample[f"X[{i}]"] = []
            continue
        
        # Обработка начала массива эксперимента
        if "array of X[" in line and "with" in line:
            # Завершаем сбор предыдущих данных, если они есть
            if current_x_key is not None and x_data_buffer:
                current_sample[current_x_key] = x_data_buffer
                x_data_buffer = []
            
            # Извлекаем номер эксперимента
            x_index = line.split('array of X[')[1].split(']')[0]
            current_x_key = f"X[{x_index}]"
            collecting_x_data = True
            continue
        
        # Сбор данных эксперимента
        if collecting_x_data:
            # Пытаемся извлечь числа из строки
            numbers = []
            for part in line.split():
                try:
                    num = float(part)
                    numbers.append(num)
                except ValueError:
                    pass
            x_data_buffer.extend(numbers)
    
    # Добавляем последний образец, если он есть
    if current_sample is not None:
        if current_x_key is not None and x_data_buffer:
            current_sample[current_x_key] = x_data_buffer
        samples.append(current_sample)
    
    # Проверяем, что количество образцов соответствует A
    try:
        if len(samples) != A:
            print(f"Warning: Expected {A} samples, but found {len(samples)}")
    finally:
        return samples

# Пример использования
file_path = "../kozlopan_ml1000noise.txt"
parsed_data = parse_data_file(file_path)

# Вывод первых двух образцов для проверки
for i, sample in enumerate(parsed_data[:2]):
    print(f"Sample {i}:")
    print(f"Yi: {sample.get('Yi', [])}")
    for key in sorted(sample.keys()):
        if key.startswith("X["):
            print(f"{key}: first 5 values - {sample[key][:5]}")
    print()

print(parsed_data[0]["X[0]"])
print(len(parsed_data))
print(len(parsed_data[0]["Yi"]))
print(len(parsed_data[0]["X[0]"]))
print(len(parsed_data[0]["X[1]"]))
print(len(parsed_data[0])-1)
# print(parsed_data)

Sample 0:
Yi: [0.0, 0.0]
X[0]: first 5 values - [0.99637568, 1.023678303, 0.9975966215, 0.9909123182, 1.023943067]
X[1]: first 5 values - [-0.009428942576, 0.2132565677, 0.3970399499, 0.5280015469, 0.631996572]

Sample 1:
Yi: [1.0, 1.0]
X[0]: first 5 values - [1.000395894, 0.9988130927, 0.9877015352, 0.9881074429, 0.9872054458]
X[1]: first 5 values - [0.0006334115169, 0.05415157974, 0.09516408294, 0.1379769295, 0.1814425886]

[0.99637568, 1.023678303, 0.9975966215, 0.9909123182, 1.023943067, 1.000510573, 0.9967061281, 0.9915313125, 0.9672198892, 1.018810511, 0.9488478899, 0.9793912172, 0.9582349658, 0.9980856776, 0.9709777832, 0.9585749507, 0.9518962502, 0.9515383244, 0.9456630945, 0.9394155145, 0.9190992713, 0.9366926551, 0.9262087345, 0.911018908, 0.9323714972, 0.8748521209, 0.8935570717, 0.8939537406, 0.8839122057, 0.8600383997, 0.8556703925, 0.8464894295, 0.885545671, 0.8065662384, 0.8185435534, 0.7923355103, 0.7976151109, 0.7660022378, 0.8178522587, 0.7531133294, 0.7159899473, 0.7

In [4]:
file_path = "../kozlopan_signal.txt"
test_data = parse_data_file(file_path)
print(test_data)
print(test_data[0]["X[0]"])
print(len(test_data))
print(len(test_data[0]["Yi"]))
print(len(test_data[0]["X[0]"]))
print(len(test_data[0]["X[1]"]))
print(len(test_data[0])-1)

FileNotFoundError: [Errno 2] No such file or directory: '../kozlopan_signal.txt'

In [40]:
parsed_data_for_test = parse_directory(directory_path="../SygnalsWithoutNoise")
print(len(parsed_data_for_test))
# print(parsed_data_for_test)

test data
test data
test data
test data
test data
test data
test data
test data
test data
test data
test data
test data
test data
test data
test data
test data
16


PREPROC

In [41]:
import random
from copy import deepcopy

def split_data(parsed_data, train_ratio=0.8, shuffle=True, random_seed=None):
    """
    Разделяет данные на обучающую и тестовую выборки
    
    :param parsed_data: Исходные данные (массив словарей)
    :param train_ratio: Доля обучающих данных (по умолчанию 0.8)
    :param shuffle: Флаг перемешивания данных перед разделением
    :param random_seed: Фиксирует случайность для воспроизводимости
    :return: train_data, test_data (оба в том же формате, что и parsed_data)
    """
    # Создаем копию данных, чтобы не изменять оригинал
    data_copy = deepcopy(parsed_data)
    
    # Фиксируем случайность при необходимости
    if random_seed is not None:
        random.seed(random_seed)
    
    # Перемешиваем данные, если требуется
    if shuffle:
        random.shuffle(data_copy)
    
    # Определяем индекс разделения
    split_idx = int(len(data_copy) * train_ratio)
    
    # Разделяем данные
    train_data = data_copy[:split_idx]
    test_data = data_copy[split_idx:]
    
    return train_data, test_data

# Пример использования:
train_data, test_data = split_data(parsed_data, train_ratio=0.8, shuffle=True, random_seed=42)

print(f"Всего образцов: {len(parsed_data)}")
print(f"Обучающая выборка: {len(train_data)} образцов")
print(f"Тестовая выборка: {len(test_data)} образцов")

Всего образцов: 1000
Обучающая выборка: 800 образцов
Тестовая выборка: 200 образцов


In [42]:
def splitSamples(parsed_data_list):
    # Извлечение данных из parsed_data
    y_data = [sample["Yi"] for sample in parsed_data_list]
    x_data = []

    # Собираем данные по каждому эксперименту
    for i in range(len(parsed_data_list[0])-1):
        x_key = f"X[{i}]"
        x_data_i = [sample.get(x_key, []) for sample in parsed_data_list]
        x_data.append(x_data_i)

    # Проверяем, что все массивы имеют одинаковую длину
    assert all(len(x) == len(y_data) for x in x_data), "Несоответствие размеров данных"
    return x_data, y_data

In [43]:
# x_train, y_train = splitSamples(parsed_data)
# x_test, y_test = splitSamples(parsed_data_for_test)
x_train, y_train = splitSamples(train_data)
x_test, y_test = splitSamples(parsed_data_for_test)

In [44]:
# Динамический Dataset
class DynamicNMRDataset(Dataset):
    def __init__(self, *x_signals, y):
        """
        :param x_signals: Списки сигналов (каждый размером [P, L_i], где L_i может отличаться)
        :param y: Целевые переменные [P, N]
        """
        self.x_signals = [torch.FloatTensor(x) for x in x_signals]
        self.y = torch.FloatTensor(y)
        
    def __len__(self):
        return len(self.y)
    
    def __getitem__(self, idx):
        return tuple(x[idx] for x in self.x_signals) + (self.y[idx],)

# Пример использования:
train_dataset = DynamicNMRDataset(*x_train, y=y_train)
batch_size = 32
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = DynamicNMRDataset(*x_test, y=y_test)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [45]:
class DynamicNMRRegressor(nn.Module):
    def __init__(self, input_dims: list, num_targets: int, conv_filters: int = 32):
        """
        :param input_dims: Список размерностей для каждого типа сигнала (например, [1000, 2000] для FID и CPMG)
        :param num_targets: Количество целевых переменных (N)
        :param conv_filters: Базовое количество фильтров в сверточных слоях
        """
        super().__init__()
        self.num_experiments = len(input_dims)  # M
        self.num_targets = num_targets          # N
        
        # Динамическое создание ветвей для каждого типа сигнала
        self.branches = nn.ModuleList()
        for dim in input_dims:
            branch = nn.Sequential(
                nn.Conv1d(1, conv_filters, kernel_size=5, padding=2),
                nn.ReLU(),
                nn.MaxPool1d(4 if dim >= 2000 else 2),  # Автоматический выбор pooling
                nn.Conv1d(conv_filters, conv_filters * 2, kernel_size=5, padding=2),
                nn.ReLU(),
                nn.MaxPool1d(2),
                nn.Flatten()
            )
            self.branches.append(branch)
        
        # Вычисление общего размера признаков после всех ветвей
        self.total_features = 0
        for i, dim in enumerate(input_dims):
            dummy_input = torch.zeros(1, 1, dim)
            flattened_size = self.branches[i](dummy_input).shape[1]
            self.total_features += flattened_size
        
        # Финальный классификатор
        self.final_fc = nn.Sequential(
            nn.Linear(self.total_features, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, num_targets)
        )
    
    def forward(self, *x_signals):
        # Обработка каждого сигнала через свою ветвь
        features = []
        for i in range(self.num_experiments):
            x = x_signals[i].unsqueeze(1)  # Добавляем размерность канала [B, 1, L]
            features.append(self.branches[i](x))
        
        # Объединение всех признаков
        combined = torch.cat(features, dim=1)
        return self.final_fc(combined)

In [46]:
input_dims = [len(x[0]) for x in x_train]  # Длины каждого X[i]
num_targets = len(y_train[0])  # Количество целевых переменных (Yi)

# Проверяем, что все массивы имеют одинаковую длину
assert input_dims == [len(x[0]) for x in x_test] and num_targets == len(y_test[0]), "Несоответствие размеров train/test"

# print(x_train)
print(input_dims, num_targets)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Используемое устройство:", device)

model = DynamicNMRRegressor(input_dims, num_targets)
model.to(device)

criterion = nn.MSELoss()  # Для регрессии
optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)

[400, 60] 2
Используемое устройство: cpu


In [29]:
# Цикл обучения
# for epoch in range(20):
#     model.train()
#     running_loss = 0.0
    
#     for batch in train_dataloader:
#         *x_batch, y_batch = batch
#         x_batch = [x.to(device) for x in x_batch]
#         y_batch = y_batch.to(device)
        
#         optimizer.zero_grad()
#         outputs = model(*x_batch)
#         loss = criterion(outputs, y_batch)
#         loss.backward()
#         optimizer.step()
        
#         running_loss += loss.item()

#     model.eval()
#     with torch.no_grad():
#         for batch in test_dataloader:
#             *x_batch, y_batch = batch
#             x_batch = [x.to(device) for x in x_batch]
#             y_batch = y_batch.to(device)

    
#     print(f"Epoch {epoch + 1}, Loss: {running_loss / len(train_dataloader):.4f}")

In [47]:
from sklearn.metrics import r2_score
import numpy as np

# Цикл обучения
for epoch in range(20):
    # Фаза обучения
    model.train()
    train_running_loss = 0.0
    
    for batch in train_dataloader:
        *x_batch, y_batch = batch
        x_batch = [x.to(device) for x in x_batch]
        y_batch = y_batch.to(device)
        
        optimizer.zero_grad()
        outputs = model(*x_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        
        train_running_loss += loss.item()
    
    # Фаза валидации
    model.eval()
    test_running_loss = 0.0
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for batch in test_dataloader:
            *x_batch, y_batch = batch
            x_batch = [x.to(device) for x in x_batch]
            y_batch = y_batch.to(device)
            
            outputs = model(*x_batch)
            loss = criterion(outputs, y_batch)
            test_running_loss += loss.item()
            
            all_preds.append(outputs.cpu().numpy())
            all_targets.append(y_batch.cpu().numpy())
    
    # Собираем все предсказания и цели
    all_preds = np.concatenate(all_preds, axis=0)
    all_targets = np.concatenate(all_targets, axis=0)
    
    # Вычисляем метрики
    train_loss = train_running_loss / len(train_dataloader)
    test_loss = test_running_loss / len(test_dataloader)
    r2 = r2_score(all_targets, all_preds)
    
    # Выводим статистику
    print(f"Epoch {epoch + 1}")
    print(f"Train Loss: {train_loss:.4f} | Test Loss: {test_loss:.4f} | R² Score: {r2:.4f}")
    print("-" * 50)

Epoch 1
Train Loss: 0.0978 | Test Loss: 0.0463 | R² Score: 0.5302
--------------------------------------------------
Epoch 2
Train Loss: 0.0447 | Test Loss: 0.0286 | R² Score: 0.7079
--------------------------------------------------
Epoch 3
Train Loss: 0.0314 | Test Loss: 0.0193 | R² Score: 0.8014
--------------------------------------------------
Epoch 4
Train Loss: 0.0186 | Test Loss: 0.0134 | R² Score: 0.8617
--------------------------------------------------
Epoch 5
Train Loss: 0.0130 | Test Loss: 0.0103 | R² Score: 0.8943
--------------------------------------------------
Epoch 6
Train Loss: 0.0113 | Test Loss: 0.0096 | R² Score: 0.9018
--------------------------------------------------
Epoch 7
Train Loss: 0.0102 | Test Loss: 0.0104 | R² Score: 0.8940
--------------------------------------------------
Epoch 8
Train Loss: 0.0091 | Test Loss: 0.0060 | R² Score: 0.9388
--------------------------------------------------
Epoch 9
Train Loss: 0.0088 | Test Loss: 0.0039 | R² Score: 0.959

In [None]:
# Для данных с M=3 (FID, CPMG, другой сигнал) и N=2

input_dims = [1000, 2000, 500]  # Длины сигналов для каждого эксперимента
# model = DynamicNMRRegressor(input_dims, num_targets=2)

# Пример входных данных (P=100, M=3 сигнала разной длины)
x_fid = torch.randn(100, 1000)    # [P, 1000]
x_cpmg = torch.randn(100, 2000)   # [P, 2000]
x_other = torch.randn(100, 500)   # [P, 500]
y = torch.randn(100, 2)           # [P, 2]

# Forward pass
outputs = model(x_fid, x_cpmg, x_other)  # Автоматически обрабатывает все M сигналов

# Для другого набора данных (M=2, N=3):
# model = DynamicNMRRegressor(input_dims=[800, 1500], num_targets=3)

✅ Преимущества:
Гибкость: Модель адаптируется к любому количеству экспериментов (M) и целей (N).

Масштабируемость: Можно добавлять новые типы сигналов без изменения кода.

Автоматическая адаптация: Размеры слоев вычисляются динамически.

⚠️ Потенциальные проблемы:
Производительность:

Каждая ветвь обрабатывается независимо → может увеличиться время обучения.

Решение: Использовать nn.ModuleDict для ветвей, если M фиксировано.

Нормализация данных:

Разные типы сигналов могут требовать разной предобработки.

Решение: Добавить параметр scalers в модель.

Интерпретируемость:

Сложнее отслеживать вклад каждого типа сигнала.

Решение: Визуализировать attention-веса или градиенты.

In [None]:
# Автоматический подбор гиперпараметров:

def create_model(input_dims, num_targets):
    conv_filters = 32 if max(input_dims) <= 2000 else 64
    return DynamicNMRRegressor(input_dims, num_targets, conv_filters)

Динамическое построение архитектуры — это нормально и даже рекомендуется для задач с изменяющейся структурой данных. Главное:

Четко определить правила генерации слоев.

Проверять размерности тензоров в forward().

Тестировать на крайних случаях (например, M=1, N=10).