# **REMOTE PHOTOPLETHYSMOGRAPHY (rPPG)**

## IMPORTS Y CREACIÓN DEL ENTORNO PARA EL PROYECTO

Se va a crear un nuevo environment utilizando **_Anaconda Prompt_** con las dependencias necesarias y únicas para la correcta realización del proyecto.

Ya con las instalaciones y el environment creado, se va a proceder a la importación de las librerías necesarias para el proyecto.

In [1]:
import cv2
import mediapipe as mp
import numpy as np
import time
from scipy import signal
from scipy.fft import rfft, rfftfreq

## REMOTE HEARD FRECUENCY

Se va a capturar la frecuencia cardiaca a través de la WebCam del dispositivo.

**Limitaciones y Consideraciones técnicas:**

Aunque el sistema implementa algoritmos robustos como **POS (Plane-Orthogonal-to-Skin)** y segmentación **Multi-ROI**, la fotoplestimografía remota presenta limitaciones inherentes que dependen del entorno y la fisiología del sujeto.

- **Iluminación y Relación Señal-Ruido (SNR):** El sistema no mide directamente el flujo sanguíneo, sino las variaciones sutiles en la absorción de luz. Por tanto, la calidad de la señal depende estrictamente de la fuente lumínica:

    - **Baja iluminación:** El sensor de la cámara aumenta ganancia (ISO) automáticamente, introduciendo ruido de disparo (shot noise) y ruido térmico granular que enmascara la componente AC (pulsátil) de la señal cardíaca.

    - **_Aliasing_ por iluminación artificial:** Fuentes de luz fluorescente aintiguas con parpadeo perceptible (50/60Hz) pueden introducir armónicos en el espectro de frecuencias que, por *aliasing* se podrían interpretar como latidos cardíacos si no se filtran adecuadamente.

- **Variabilidad fisiológica y fototipos de piel:** La eficacia del algoritmo varía según diferentes grupos étnicos y características físicas del sujeto:

    - **Fototipos altos:** Las pieles con mayor concentración de melanina absorben mayor cantidad de luz, reduciendo así la intensidad de la luz reflejada y disminuyendo en consecuencia la amplitud de la señal PPG. El resultado de lo anterior es que la **Relación Señal-Ruido (SNR)** es más baja comparada a la obtenida en pieles claras. Como solución, se puede implementar mayor iluminación o que la misma sea más intensa para obtener resultados fiables.

    - **Oclusiones faciales:** El modelo utiliza tanto la frente como las mejillas, por lo tanto, si el individuo posee vello facial denso, gafas de montura muy gruesa, flequillos o cualquier objeto que interfiera directamente en estas zonas, se reduce el área efectiva de la ROI (Región de Interés).

- **Artefactos de movimiento:** Aunque se implementa el algormito de proyección ortogonal (POS), si se suceden movimientos bruscos como hablar o gesticular se introducen cambios no lineales en la reflexión de la luz que no pueden ser cancelados totalmente por el modelo matemático lineal.

    - **Restricción:** El individuo debe mantenerse a una posición relativamente estática para garantizar una precisión óptima.

- **Latencia en la inicialización:** El sistema hace uso de un buffer temporal para realizar el análisis en el dominio de la frecuencia (FFT).

    - **Cold Start:** El sistema tiene un retardo inicial de aproximadamente 5/6 segundos (viene dado por el tamaño del buffer) desde que se detecta la cara hasta que se encuentra el primer dato fiable, impidiendo así la medición instantánea.

- **Limitaciones Hardware:**

    - **Compresión de Video:** Muchas webcams comprimen el video en tiempo real. Los artefactos de compresión pueden alterar lso valores de color de los píxeles, suavisando los micro-cambios de color necesarios para la rPPG.

    - **Jitter de FPS:** Si la CPU está saturada, la tasa de frames puede no ser constante. Aunque el código recalcula lso FPS dinámicamente, caídas drásticas afectarán la precisión de la **Transformada de Fourier**.

In [None]:
import cv2
import numpy as np
import mediapipe as mp
import time
from scipy import signal
from scipy.fft import rfft, rfftfreq
from scipy.interpolate import interp1d

class RobustHeartRateDetector:
    def __init__(self):
        # --- CONFIGURACIÓN DE MEDIAPIPE ---
        self.mp_face_mesh = mp.solutions.face_mesh
        self.face_mesh = self.mp_face_mesh.FaceMesh(
            max_num_faces=1,
            refine_landmarks=False, # True consume más CPU, False está bien para rPPG
            min_detection_confidence=0.6,
            min_tracking_confidence=0.6
        )
        
        # --- PARÁMETROS DEL SISTEMA ---
        self.BUFFER_SIZE = 180     # Aumentado a ~6 segundos (mejor resolución de frecuencia)
        self.FS_TARGET = 30.0      # Frecuencia de muestreo objetivo para interpolación
        
        # Buffers
        self.rgb_buffer = [] 
        self.times_buffer = [] 
        self.bpm_history = [] 
        
        self.current_bpm = 0.0
        self.is_stable = False
        self.snr = 0.0             # Relación Señal-Ruido para validar calidad

    def setup_camera(self):
        cap = cv2.VideoCapture(0)
        # Nota: En Windows/DirectShow, usar números negativos grandes para manual
        cap.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0.25) 
        cap.set(cv2.CAP_PROP_EXPOSURE, -4.0) 
        cap.set(cv2.CAP_PROP_AUTOFOCUS, 0)
        return cap

    def get_roi_average(self, frame, landmarks):
        """
        Versión optimizada que usa coordenadas de landmarks.
        En lugar de dibujar máscaras complejas, tomamos puntos clave.
        Como se usan distancias fihas, esto es mucho más rápido que sacar máscaras completas
        """
        h, w, _ = frame.shape
        
        # Índices de Mejillas y Frente (Simplificado para velocidad)
        # Frente: 10, Mejilla I: 330, Mejilla D: 101 (aprox centros)
        roi_centers = [10, 330, 101] 
        roi_size = int(w * 0.08) # Tamaño dinámico según ancho de imagen
        
        accum_color = np.array([0.0, 0.0, 0.0])
        count = 0

        for idx in roi_centers:
            pt = landmarks.landmark[idx]
            cx, cy = int(pt.x * w), int(pt.y * h)
            
            # Clipping seguro
            x1 = max(0, cx - roi_size)
            y1 = max(0, cy - roi_size)
            x2 = min(w, cx + roi_size)
            y2 = min(h, cy + roi_size)
            
            # Extraer crop y calcular media (mucho más rápido que máscaras completas)
            crop = frame[y1:y2, x1:x2]
            if crop.size > 0:
                mean = np.mean(crop, axis=(0, 1)) # BGR
                accum_color += mean
                count += 1
                
                # Visualización (Debug)
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 1)

        if count > 0:
            return accum_color / count
        return np.array([0.0, 0.0, 0.0])

    def pos_algorithm(self, rgb_array):
        """ Algoritmo POS optimizado con NumPy """
        # rgb_array: (N, 3)
        # Normalización temporal
        mean_color = np.mean(rgb_array, axis=0)
        Cn = rgb_array / (mean_color + 1e-6) - 1
        
        # Proyección
        # Cn columns: 0=B, 1=G, 2=R (OpenCV standard)
        r = Cn[:, 2]
        g = Cn[:, 1]
        b = Cn[:, 0]
        
        S1 = g - b
        S2 = g + b - 2*r
        
        # Fusión Alpha con manejo de división por cero
        std_s1 = np.std(S1)
        std_s2 = np.std(S2)
        alpha = std_s1 / (std_s2 + 1e-6)
        
        P = S1 + alpha * S2
        return P

    def compute_heart_rate(self):
        if len(self.rgb_buffer) < self.BUFFER_SIZE:
            return

        # 1. Obtener datos crudos
        raw_rgb = np.array(self.rgb_buffer) # (Frames, 3)
        raw_times = np.array(self.times_buffer)
        
        # --- INTERPOLACIÓN ---
        # Las cámaras no capturan a intervalos perfectos. Interpolar a FS_TARGET (30Hz).
        raw_times = raw_times - raw_times[0] # Empezar en t=0
        
        if raw_times[-1] <= 0: return # Evitar errores de tiempo cero

        # Crear eje de tiempo uniforme
        num_samples = int(raw_times[-1] * self.FS_TARGET)
        uniform_times = np.linspace(0, raw_times[-1], num_samples)
        
        # Interpolar cada canal (R, G, B)
        interpolated_rgb = np.zeros((num_samples, 3))
        for i in range(3):
            f_interp = interp1d(raw_times, raw_rgb[:, i], kind='linear', fill_value="extrapolate")
            interpolated_rgb[:, i] = f_interp(uniform_times)

        # 2. Aplicar POS sobre datos interpolados
        ppg_signal = self.pos_algorithm(interpolated_rgb)
        
        # 3. Detrending (Quitar tendencias de iluminación lenta)
        ppg_signal = signal.detrend(ppg_signal)
        
        # 4. Filtrado Bandpass (Butterworth)
        # Rango 42 BPM (0.7Hz) a 180 BPM (3.0Hz)
        nyquist = self.FS_TARGET / 2
        b, a = signal.butter(3, [0.7/nyquist, 3.0/nyquist], btype='band')
        filtered_ppg = signal.filtfilt(b, a, ppg_signal)
        
        # --- VENTANEO (WINDOWING) ---
        # Reduce fugas espectrales en la FFT
        window = np.hanning(len(filtered_ppg))
        windowed_signal = filtered_ppg * window
        
        # 5. FFT
        N = len(windowed_signal)
        yf = rfft(windowed_signal)
        xf = rfftfreq(N, 1 / self.FS_TARGET)
        
        # Buscar picos en rango humano
        mask = (xf >= 0.7) & (xf <= 3.0)
        valid_xf = xf[mask]
        valid_yf = np.abs(yf[mask])
        
        if len(valid_yf) > 0:
            peak_idx = np.argmax(valid_yf)
            freq_hz = valid_xf[peak_idx]
            peak_val = valid_yf[peak_idx]
            
            # --- CALCULO DE SNR (Calidad de señal) ---
            # SNR = Energía del pico / Energía del ruido circundante
            avg_noise = np.mean(valid_yf)
            self.snr = peak_val / (avg_noise + 1e-6)
            
            new_bpm = freq_hz * 60.0
            
            # Solo actualizar si el SNR es decente (evita ruido aleatorio)
            if self.snr > 1.5: # Umbral empírico
                self.bpm_history.append(new_bpm)
                if len(self.bpm_history) > 12: 
                    self.bpm_history.pop(0)
                
                # Media ponderada (dar más peso a las últimas lecturas)
                self.current_bpm = np.mean(self.bpm_history)
                self.is_stable = True
            else:
                self.is_stable = False # Señal ruidosa

    def estimate_distance_and_check(self, frame, landmarks):
        """
        Calcula la distancia aproximada usuario-cámara basándose en la distancia interpupilar.
        Devuelve (distancia_cm, estado_texto, color_estado)
        """
        h, w, _ = frame.shape
        
        # Landmarks de las esquinas exteriores de los ojos
        # 33: Ojo izquierdo (esquina ext), 263: Ojo derecho (esquina ext)
        left_eye = landmarks.landmark[33]
        right_eye = landmarks.landmark[263]
        
        # Calcular distancia en píxeles entre los ojos
        x1, y1 = left_eye.x * w, left_eye.y * h
        x2, y2 = right_eye.x * w, right_eye.y * h
        pixel_dist = np.sqrt((x2 - x1)**2 + (y2 - y1)**2)
        
        # --- CALIBRACIÓN APROXIMADA ---
        # Ancho real promedio entre esquinas de ojos humanos: ~11.5 cm
        # Distancia Focal: ~600 es un valor estándar para webcams a 640x480.
        # Si usas HD (1280x720), ajusta esto a aprox 1100-1200.
        # Fórmula: F = (P * D) / W -> F = (pixel_dist * distancia_conocida) / ancho_real
        REAL_EYE_WIDTH = 11.5 # cm
        FOCAL_LENGTH = 640    # Ajustar según tu cámara (prueba y error)
        
        if pixel_dist == 0: return 0, "Error", (0,0,255)
        
        distance_cm = (REAL_EYE_WIDTH * FOCAL_LENGTH) / pixel_dist
        
        # Definir zonas
        if distance_cm < 45:
            return distance_cm, "Muy Cerca", (0, 0, 255) # Rojo
        elif distance_cm > 80:
            return distance_cm, "Muy Lejos", (0, 165, 255) # Naranja
        else:
            return distance_cm, "Distancia Optima", (0, 255, 0) # Verde

    def run(self):
            cap = self.setup_camera()
            print("Sistema Iniciado...")
    
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret: break
    
                curr_time = time.time()
                rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                results = self.face_mesh.process(rgb_frame)
    
                if results.multi_face_landmarks:
                    for face_landmarks in results.multi_face_landmarks:
                        
                        # --- NUEVO: CALCULAR DISTANCIA ---
                        dist_cm, status_text, status_color = self.estimate_distance_and_check(frame, face_landmarks)
                        
                        # Dibujar UI de Distancia
                        cv2.putText(frame, f"Dist: {int(dist_cm)}cm ({status_text})", (20, 30), 
                                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, status_color, 2)
    
                        # --- LÓGICA DE CONTROL ---
                        # Solo procesamos el ritmo cardiaco si la distancia es aceptable
                        # Esto evita meter ruido en el buffer cuando te acercas/alejas mucho
                        if status_text == "Distancia Optima":
                            
                            bgr_val = self.get_roi_average(frame, face_landmarks)
                            self.rgb_buffer.append(bgr_val)
                            self.times_buffer.append(curr_time)
                            
                            if len(self.rgb_buffer) > self.BUFFER_SIZE:
                                self.rgb_buffer.pop(0)
                                self.times_buffer.pop(0)
                            
                            if len(self.rgb_buffer) == self.BUFFER_SIZE and (len(self.rgb_buffer) % 10 == 0):
                                self.compute_heart_rate()
    
                            # Mostrar BPM solo si estamos en rango
                            if self.is_stable:
                                cv2.putText(frame, f"BPM: {self.current_bpm:.1f}", (20, 70), 
                                            cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 0), 3)
                                cv2.putText(frame, f"SNR: {self.snr:.2f}", (20, 100), 
                                            cv2.FONT_HERSHEY_PLAIN, 1, (200, 200, 200), 1)
                            else:
                                cv2.putText(frame, "Calculando...", (20, 70), 
                                            cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2)
                        else:
                            # Si la distancia es mala, mostramos aviso grande
                            cv2.putText(frame, "AJUSTA DISTANCIA", (int(frame.shape[1]/2)-100, int(frame.shape[0]/2)), 
                                        cv2.FONT_HERSHEY_SIMPLEX, 1, status_color, 3)
                            
                            # Opcional: Reiniciar buffers si el usuario se mueve demasiado para evitar falsos picos
                            if len(self.rgb_buffer) > 0:
                                self.rgb_buffer.pop(0) 
                                self.times_buffer.pop(0)
    
                else:
                    self.rgb_buffer.clear()
                    self.times_buffer.clear()
                    self.is_stable = False
                    cv2.putText(frame, "NO FACE", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
    
                cv2.imshow('Robust rPPG', frame)
                if cv2.waitKey(1) & 0xFF == ord('q'): break
    
            cap.release()
            cv2.destroyAllWindows()

if __name__ == "__main__":
    app = RobustHeartRateDetector()
    app.run()

Sistema Iniciado...


