# Real-time rPPG Pipeline (Improved)

Rayhan Fatih Gunawan

NIM.122140134

## Improvements
- **POS Algorithm**: Plane-Orthogonal-to-Skin for robust signal extraction.
- **Advanced ROI**: Specific regions (Cheeks + Forehead) to reduce noise.
- **Visualization**: Real-time signal plotting, Frequency Spectrum, and BPM display.

In [None]:
import cv2
import mediapipe as mp
import numpy as np
from scipy import signal, fftpack
import time
from collections import deque
import matplotlib.pyplot as plt

In [None]:
def bandpass_filter(data, fs, lowcut=0.7, highcut=4.0, order=4):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = signal.butter(order, [low, high], btype='band')
    return signal.filtfilt(b, a, data)

def calculate_bpm(processed_signal, fs):
    if len(processed_signal) < fs * 2: # Need at least 2 seconds
        return 0.0, None, None
    
    # FFT
    n = len(processed_signal)
    freqs = fftpack.fftfreq(n, d=1/fs)
    fft_val = np.abs(fftpack.fft(processed_signal))
    
    # Filter positive frequencies and within range
    mask = (freqs > 0.7) & (freqs < 4.0)
    valid_freqs = freqs[mask]
    valid_fft = fft_val[mask]
    
    if len(valid_fft) == 0:
        return 0.0, None, None
        
    peak_freq = valid_freqs[np.argmax(valid_fft)]
    bpm = peak_freq * 60.0
    return bpm, valid_freqs, valid_fft

In [None]:
class RealTimeRPPG:
    def __init__(self, buffer_size=300, fs=30):
        self.buffer_size = buffer_size
        self.fs = fs
        self.times = deque(maxlen=buffer_size)
        
        # Store RGB signals for POS
        self.raw_r = deque(maxlen=buffer_size)
        self.raw_g = deque(maxlen=buffer_size)
        self.raw_b = deque(maxlen=buffer_size)
        
        self.filtered_signal = []
        self.bpm = 0.0
        self.fft_freqs = None
        self.fft_val = None
        
        # MediaPipe Face Mesh
        self.mp_face_mesh = mp.solutions.face_mesh
        self.face_mesh = self.mp_face_mesh.FaceMesh(
            max_num_faces=1,
            refine_landmarks=True,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5
        )
        
        # ROI Indices (Approximate for Cheeks and Forehead)
        self.ROI_INDICES = {
            'forehead': [10, 338, 297, 332, 284, 251, 389, 356],
            'left_cheek': [330, 347, 423, 426, 427, 373, 374],
            'right_cheek': [101, 118, 203, 206, 207, 144, 145]
        }
        
    def get_roi_mean_rgb(self, image, landmarks):
        h, w, _ = image.shape
        mask = np.zeros((h, w), dtype=np.uint8)
        
        # Draw all ROIs on the mask
        for region, indices in self.ROI_INDICES.items():
            region_points = np.array([[landmarks[i].x * w, landmarks[i].y * h] for i in indices], dtype=np.int32)
            hull = cv2.convexHull(region_points)
            cv2.fillPoly(mask, [hull], 255)
            
        # Calculate mean of R, G, B in the mask
        # cv2.mean returns (B, G, R, A)
        mean_vals = cv2.mean(image, mask=mask)
        return mean_vals[2], mean_vals[1], mean_vals[0] # R, G, B

    def process_pos(self, r_list, g_list, b_list):
        # POS Algorithm
        r = np.array(r_list)
        g = np.array(g_list)
        b = np.array(b_list)
        
        # Normalize
        r_mean = np.mean(r)
        g_mean = np.mean(g)
        b_mean = np.mean(b)
        
        if r_mean == 0 or g_mean == 0 or b_mean == 0:
            return np.zeros(len(r))
            
        rn = r / r_mean
        gn = g / g_mean
        bn = b / b_mean
        
        # Projection
        s1 = gn - bn
        s2 = gn + bn - 2 * rn
        
        # Tuning
        if np.std(s2) == 0:
            alpha = 0
        else:
            alpha = np.std(s1) / np.std(s2)
            
        # Combination
        h = s1 + alpha * s2
        return h

    def draw_signal(self, image, signal_data, x, y, w, h, color=(0, 255, 0)):
        cv2.rectangle(image, (x, y), (x+w, y+h), (0, 0, 0), -1)
        
        if len(signal_data) < 2:
            return
            
        min_val = np.min(signal_data)
        max_val = np.max(signal_data)
        
        if max_val - min_val == 0:
            return
            
        norm_signal = (signal_data - min_val) / (max_val - min_val)
        
        points = []
        for i, val in enumerate(norm_signal):
            px = int(x + (i / len(signal_data)) * w)
            py = int(y + h - (val * h))
            points.append((px, py))
            
        for i in range(1, len(points)):
            cv2.line(image, points[i-1], points[i], color, 2)

    def draw_fft(self, image, freqs, fft_val, x, y, w, h, color=(255, 0, 255)):
        cv2.rectangle(image, (x, y), (x+w, y+h), (0, 0, 0), -1)
        
        if freqs is None or fft_val is None or len(freqs) < 2:
            return
        
        # Normalize FFT values
        max_val = np.max(fft_val)
        if max_val == 0:
            return
            
        norm_fft = fft_val / max_val
        
        # Map frequencies to x-axis (0.7 Hz to 4.0 Hz)
        min_freq = 0.7
        max_freq = 4.0
        
        points = []
        for i, f in enumerate(freqs):
            if f < min_freq or f > max_freq:
                continue
                
            px = int(x + ((f - min_freq) / (max_freq - min_freq)) * w)
            py = int(y + h - (norm_fft[i] * h))
            points.append((px, py))
            
        for i in range(1, len(points)):
            cv2.line(image, points[i-1], points[i], color, 2)

    def run(self):
        cap = cv2.VideoCapture(0)
        print("Starting rPPG with POS... Press 'ESC' to exit.")
        
        while cap.isOpened():
            success, image = cap.read()
            if not success:
                break
                
            image = cv2.flip(image, 1)
            rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            h, w, _ = image.shape
            
            results = self.face_mesh.process(rgb_image)
            
            if results.multi_face_landmarks:
                landmarks = results.multi_face_landmarks[0].landmark
                
                mr, mg, mb = self.get_roi_mean_rgb(image, landmarks)
                
                self.times.append(time.time())
                self.raw_r.append(mr)
                self.raw_g.append(mg)
                self.raw_b.append(mb)
                
                for region, indices in self.ROI_INDICES.items():
                    region_points = np.array([[landmarks[i].x * w, landmarks[i].y * h] for i in indices], dtype=np.int32)
                    hull = cv2.convexHull(region_points)
                    cv2.polylines(image, [hull], True, (255, 255, 0), 1)
                
                if len(self.raw_r) > self.fs * 2:
                    pos_signal = self.process_pos(list(self.raw_r), list(self.raw_g), list(self.raw_b))
                    detrended = signal.detrend(pos_signal)
                    filtered = bandpass_filter(detrended, self.fs)
                    self.filtered_signal = filtered
                    self.bpm, self.fft_freqs, self.fft_val = calculate_bpm(filtered, self.fs)
                
                # Display Info
                cv2.putText(image, f'BPM: {self.bpm:.1f}', (30, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                cv2.putText(image, 'Method: POS', (30, 80), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
                
                # Draw Signal Graph (Time Domain)
                if len(self.filtered_signal) > 0:
                    display_signal = self.filtered_signal[-150:] if len(self.filtered_signal) > 150 else self.filtered_signal
                    self.draw_signal(image, display_signal, 30, 100, 300, 100)
                    cv2.putText(image, 'Pulse Signal', (30, 95), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
                
                # Draw FFT Graph (Frequency Domain)
                if self.fft_freqs is not None:
                    self.draw_fft(image, self.fft_freqs, self.fft_val, 30, 230, 300, 100)
                    cv2.putText(image, 'Frequency Spectrum', (30, 225), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 255), 1)
            
            cv2.imshow('Real-time rPPG (POS)', image)
            if cv2.waitKey(5) & 0xFF == 27:
                break
                
        cap.release()
        cv2.destroyAllWindows()

In [None]:
if __name__ == "__main__":
    rppg = RealTimeRPPG()
    rppg.run()