In [34]:
import cv2
import mediapipe as mp
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import pandas as pd
from scipy.io import loadmat
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, TensorDataset
import scipy.fftpack as sf

In [2]:
mp_face_mesh = mp.solutions.face_mesh
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles

In [3]:
cap = cv2.VideoCapture('subject33/vid.avi')  

In [4]:
CHEEK_RIGHT = [49, 58, 64, 47]
CHEEK_LEFT = [264, 288, 294, 277]
BETWEEN_EYEBROWS = [107, 55, 285, 336]
NOSE_BRIDGE = [114, 115, 278, 277]

In [5]:
bvp_signal_series = []
window_size = 30

In [6]:
def rgb_to_yuv(rgb):
    R, G, B = rgb
    Y = 0.299 * R + 0.587 * G + 0.114 * B
    U = -0.169 * R - 0.331 * G + 0.5 * B
    V = 0.5 * R - 0.419 * G - 0.081 * B
    return np.array([Y, U, V])


In [7]:
def process_frame(regions):
    yuv_values = {}
    
    for name, region in regions:
        avg_color = np.mean(region, axis=(0,1))
        yuv = rgb_to_yuv(avg_color)
        yuv_values[name] = yuv

    return yuv_values


In [21]:
def time_domain_normalization(spatial_temporal_map):
    normalized_map = {}

    regions = spatial_temporal_map[0].keys()

    for region in regions:
        values = np.array([frame[region] for frame in spatial_temporal_map])
        mean = values.mean(axis=0)
        std = values.std(axis=0) + 1e-8
        normalized = (values - mean) / std
        normalized_map[region] = normalized
    
    return normalized_map

In [9]:
def add_white_noise(normalized_series, noise_std=0.1):
    noisy_series = {}
    for key, data in normalized_series.items():
        noise = np.random.normal(0, noise_std, data.shape)
        noisy_series[key] = data + noise
    return noisy_series

In [10]:
regions_by_frames = []
spatial_temporal_map = []
yuv_values = []

In [31]:
# Функция для индексов, соответствующих (f_low, f_high) в DCT-спектре
def get_freq_indices(f_low, f_high, n, fs):
    # Индекс в DCT соответствует частоте: freq = (fs / (2*n)) * index, приблизительно для DCT-I
    # Упрощённо можно использовать формулу для DFT: freq_index = freq * N / fs
    # Здесь n = n_frames
    idx_low = int(np.floor(f_low * (2*n) / fs))
    idx_high = int(np.ceil(f_high * (2*n) / fs))
    # Ограничим индексы сверху, чтобы не выходить за размер
    idx_low = max(idx_low, 0)
    idx_high = min(idx_high, n-1)
    return idx_low, idx_high

In [None]:
# ---- Функция для Алгоритма 1 (Multi-frequency mode signal decompose) ----
def multi_freq_decompose(noisy_series, freq_bands, fs=30):
    """
    Разбивает сигналы на несколько частотных диапазонов с помощью DCT/IDCT.
    :param noisy_series: словарь {регион -> np.array(shape=(window_size, 3))}.
    :param freq_bands: список кортежей (f_low, f_high) в Гц, например [(0.7, 2.5), (2.5, 5.0)].
    :param fs: частота кадров (Гц).
    :return: словарь {band_idx -> np.array(shape=(window_size, I*3))},
             где I = число регионов, а 3 — каналы YUV.
             Для каждого частотного диапазона мы получаем объединённый сигнал по всем регионам.
    """
    # Список всех регионов
    regions = list(noisy_series.keys())
    n_frames = noisy_series[regions[0]].shape[0]  # длина окна (window_size)
    
    # Подготовим структуру для результата
    # multi_band_signals[band_idx] = список [ f'_k(n) ] по регионам,
    # которые потом объединим (конкатенируем) вдоль оси "признаки".
    multi_band_signals = {band_idx: [] for band_idx in range(len(freq_bands))}
    
    for region in regions:
        region_data = noisy_series[region]
        
        # Для каждого канала Y, U, V делаем DCT по времени
        # DCT будет shape (window_size,) для каждого канала
        dct_region = []
        for ch in range(3):
            channel_signal = region_data[:, ch]  # временной ряд
            # Применяем DCT (type=2 — стандартная)
            # Для удобства можно использовать scipy.fftpack.dct, но здесь — numpy (c некоторыми оговорками)
            channel_dct = cv2.dct(channel_signal.reshape(-1,1).astype(np.float32))[:,0]
            dct_region.append(channel_dct)
        
        dct_region = np.array(dct_region)  # shape (3, window_size)
        
        # Для каждого частотного диапазона выделяем нужные компоненты
        for band_idx, (f_low, f_high) in enumerate(freq_bands):
            # Копируем исходный спектр
            dct_band = np.copy(dct_region)
            # Получаем индексы частот
            low_idx, high_idx = get_freq_indices(f_low, f_high, n_frames, fs)
            
            # Зануляем всё, что вне [low_idx, high_idx]
            # (учитываем, что ось времени => dct_band[ch, :]
            for ch in range(3):
                for freq_i in range(n_frames):
                    if freq_i < low_idx or freq_i > high_idx:
                        dct_band[ch, freq_i] = 0.0
            
            # Применяем IDCT, чтобы вернуться в временную область
            # cv2.dct — это прямая DCT, для обратной можно вызвать ещё раз,
            # но надо учесть нормировку (или использовать scipy.fftpack.idct).
            # Для наглядности используем обратную через cv2.dct, но с умножением на соответствующий фактор.
            # Упрощённый способ: опять вызов cv2.dct(dct_band[ch].reshape(-1,1)), но нужна правильная нормировка.
            # Для упрощения воспользуемся scipy, если это допустимо. Если нет — оставим как пример.

            # Здесь продемонстрируем с scipy:

            time_band = []
            for ch in range(3):
                # idct type=2 (обратная) можно вызвать как type=3
                channel_time = sf.idct(dct_band[ch], type=2, norm='ortho')
                time_band.append(channel_time)
            
            # time_band теперь shape (3, window_size)
            time_band = np.array(time_band).T  # (window_size, 3)
            
            # Сохраняем результат для данного band_idx,
            # но объединяем все регионы в единый массив: (window_size, I*3)
            multi_band_signals[band_idx].append(time_band)
    
    # Теперь для каждого частотного диапазона конкатенируем сигналы всех регионов
    # multi_band_signals[band_idx] = список [ (window_size, 3), (window_size, 3), ... ] по регионам
    # Объединим вдоль оси признаков => (window_size, I*3)
    for band_idx in range(len(freq_bands)):
        # Список массивов по всем регионам
        region_arrays = multi_band_signals[band_idx]
        # Конкатенация вдоль оси 1
        if len(region_arrays) > 0:
            multi_band_signals[band_idx] = np.concatenate(region_arrays, axis=1)
        else:
            multi_band_signals[band_idx] = None
    
    return multi_band_signals


In [28]:
freq_bands = [(0.7, 2.5), (2.5, 5.0)]

In [None]:
with mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1, refine_landmarks=True) as face_mesh:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        h, w, _ = frame.shape
        results = face_mesh.process(rgb_frame)

        if results.multi_face_landmarks:
            for face_landmarks in results.multi_face_landmarks:
                landmarks_pixel = [(int(lm.x * w), int(lm.y * h)) for lm in face_landmarks.landmark]

                right_cheek_points = np.array([[int(face_landmarks.landmark[idx].x * w), int(face_landmarks.landmark[idx].y * h)] for idx in CHEEK_RIGHT], np.int32)
                left_cheek_points = np.array([[int(face_landmarks.landmark[idx].x * w), int(face_landmarks.landmark[idx].y * h)] for idx in CHEEK_LEFT], np.int32)
                between_eyebrows_points = np.array([[int(face_landmarks.landmark[idx].x * w), int(face_landmarks.landmark[idx].y * h)] for idx in BETWEEN_EYEBROWS], np.int32)
                nose_bridge_points = np.array([[int(face_landmarks.landmark[idx].x * w), int(face_landmarks.landmark[idx].y * h)] for idx in NOSE_BRIDGE], np.int32)

                cv2.polylines(frame, [right_cheek_points], isClosed=True, color=(0, 255, 0), thickness=1)
                cv2.polylines(frame, [left_cheek_points], isClosed=True, color=(0, 255, 0), thickness=1)
                cv2.polylines(frame, [between_eyebrows_points], isClosed=True, color=(255, 0, 0), thickness=1)
                cv2.polylines(frame, [nose_bridge_points], isClosed=True, color=(0, 0, 255), thickness=1)

                regions = [("right cheek", right_cheek_points), ("left cheek", left_cheek_points), ("between eyebrows", between_eyebrows_points), ("nose bridge", nose_bridge_points)]
                cropped_regions = []

                for name, region in regions:
                    min_x = min(region, key=lambda p: p[0])[0]
                    max_x = max(region, key=lambda p: p[0])[0]
                    min_y = min(region, key=lambda p: p[1])[1]
                    max_y = max(region, key=lambda p: p[1])[1]

                    cropped_region = frame[min_y:max_y, min_x:max_x]
                    cropped_regions.append((name, cropped_region))

                regions_by_frames.append(cropped_regions)

                if len(regions_by_frames) >= window_size:
                    for cropped in regions_by_frames:
                        yuv_values.append(process_frame(cropped))
                        
                    spatial_temporal_map = time_domain_normalization(yuv_values)
                    spatial_temporal_map = add_white_noise(spatial_temporal_map)

                    multi_band_signals = multi_freq_decompose(spatial_temporal_map, freq_bands, fs=30)

        cv2.imshow("Face Landmarks", frame)

        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

cap.release()
cv2.destroyAllWindows()

RuntimeError: The size of tensor a (64) must match the size of tensor b (16) at non-singleton dimension 1

In [26]:
class TMSCModule(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(TMSCModule, self).__init__()
        self.conv1x1_1 = nn.Conv1d(in_channels, out_channels, kernel_size=1)
        
        self.conv3x1 = nn.Conv1d(out_channels, out_channels, kernel_size=3, padding=1)
        self.conv5x1 = nn.Conv1d(out_channels, out_channels, kernel_size=5, padding=2)
        self.conv7x1 = nn.Conv1d(out_channels, out_channels, kernel_size=7, padding=3)
        
        self.conv1x1_2 = nn.Conv1d(3 * out_channels, out_channels, kernel_size=1)
        
    def forward(self, x):
        residual = x
        
        x = self.conv1x1_1(x)
        x3 = self.conv3x1(x)
        x5 = self.conv5x1(x)
        x7 = self.conv7x1(x)
        
        x = torch.cat([x3, x5, x7], dim=1)
        x = self.conv1x1_2(x)
        
        return x + residual

In [25]:
class SSAModule(nn.Module):
    def __init__(self, in_channels):
        super(SSAModule, self).__init__()
        self.conv1 = nn.Conv1d(in_channels, in_channels, kernel_size=1)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv1d(in_channels, in_channels, kernel_size=1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.conv1(x)
        x = self.relu(x)
        x = self.conv2(x)
        attention_weights = self.sigmoid(x)
        out = x * attention_weights

        return out


In [24]:
class SRRN(nn.Module):
    def __init__(self):
        super(SRRN, self).__init__()

        self.red_layer = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=3, padding=1),
            nn.BatchNorm1d(16),
            nn.ReLU()
        )

        self.green_layer = TMSCModule(in_channels=16, out_channels=64)

        self.yellow_layer = nn.Sequential(
            nn.AdaptiveAvgPool1d(1)
        )

        self.blue_layer = SSAModule(in_channels=64)

        self.dark_green_layer = nn.Sequential(
            nn.ConvTranspose1d(256, 128, kernel_size=4, stride=2, padding=1),
            nn.BatchNorm1d(128),
            nn.ELU()
        )

        self.fc = nn.Linear(128, 1)

    def forward(self, x):
        x = self.red_layer(x)
        x = self.green_layer(x)
        x = self.yellow_layer(x)

        x = self.red_layer(x)
        x = self.green_layer(x)
        x_second = self.yellow_layer(x)

        # первое разветвление

        x = self.red_layer(x_second)
        x = self.green_layer(x)
        x_third = self.yellow_layer(x)

        # второе разветвление

        x = self.red_layer(x_third)
        x = self.green_layer(x)

        # переходим во вторую пирамидку

        x = self.blue_layer(x)
        x = self.dark_green_layer(x)

        # конкатенируем с output со второго разветвления

        concat1 = torch.cat([x, x_third], dim=1)
        
        # идем дальше

        x = self.blue_layer(concat1)
        x = self.dark_green_layer(x)

        # опять конкатенация, но с output с первого разветвления

        concat2 = torch.cat([x, x_second], dim=1)

        # идем дальше

        x = self.blue_layer(concat2)
        x = self.dark_green_layer(x)

        x = self.fc(x.view(x.size(0), -1))

        return x
