In [1]:
import sys
sys.path.append('../src')  # Add source directory to path

In [2]:
from signal_processing.base_signal_processor import BaseSignalProcessor
from signal_processing.motion_artifact_detector import MotionArtifactDetector
from signal_processing.adaptive_filter import AdaptiveFilter
from signal_processing.kalman_filter import KalmanFilter
from signal_processing.wavelet_denoiser import WaveletDenoiser
from signal_processing.pipeline import SignalProcessingPipeline

### Loading Unified Data

In [3]:
processor = BaseSignalProcessor(data_path="../data/processed/cleaned_unified_dataset.parquet")
dataset = processor.load_data()
print(dataset.head())

Loading unified dataset...
                                 bvp  label  subject_id    dataset  \
2020-01-03 08:00:00+00:00   5.673109      0           2  physionet   
2020-01-03 08:00:00+00:00   7.687833      0           2  physionet   
2020-01-03 08:00:00+00:00   1.509560      0           2  physionet   
2020-01-03 08:00:00+00:00  12.999866      0           2  physionet   
2020-01-03 08:00:00+00:00  20.798602      0           2  physionet   

                                 device skin_tone  noise_level     acc_x  \
2020-01-03 08:00:00+00:00   apple_watch      V-VI      0.05088 -0.817685   
2020-01-03 08:00:00+00:00   apple_watch      I-II      0.07712 -0.973498   
2020-01-03 08:00:00+00:00   apple_watch    III-IV      0.06400 -1.054134   
2020-01-03 08:00:00+00:00  galaxy_watch    III-IV      0.09600 -1.000000   
2020-01-03 08:00:00+00:00  galaxy_watch      V-VI      0.07632 -1.000000   

                               acc_y     acc_z  
2020-01-03 08:00:00+00:00 -62.628226  4.996602

In [4]:
import numpy as np

In [5]:
def _robust_normalize(data: np.ndarray) -> np.ndarray:
    """Enhanced normalization with fallback"""
    data = np.nan_to_num(data, nan=np.median(data))
        
    # Fallback to std if IQR is zero
    q75, q25 = np.percentile(data, [75, 25])
    iqr = q75 - q25
    if iqr < 1e-6:
        std = np.std(data) + 1e-6
        normalized = (data - np.mean(data)) / std
    else:
        normalized = (data - np.median(data)) / iqr
        
    # Secondary clipping
    return np.clip(normalized, -3, 3)

# Compute and normalize accelerometer magnitude
dataset['acc_mag'] = np.sqrt(dataset['acc_x']**2 + dataset['acc_y']**2 + dataset['acc_z']**2)
dataset['acc_mag'] = _robust_normalize(dataset['acc_mag'].values)

### Motion Artifact Detection

In [6]:
detector = MotionArtifactDetector()
dataset = detector.detect_motion_bursts(dataset)
print(dataset[['acc_x', 'acc_y', 'acc_z', 'motion_burst']].head())

                              acc_x      acc_y     acc_z  motion_burst
2020-01-03 08:00:00+00:00 -0.817685 -62.628226  4.996602           0.0
2020-01-03 08:00:00+00:00 -0.973498 -62.739436  5.184150           0.0
2020-01-03 08:00:00+00:00 -1.054134 -62.992483  5.020381           0.0
2020-01-03 08:00:00+00:00 -1.000000 -69.300000  5.000000           0.0
2020-01-03 08:00:00+00:00 -1.000000 -69.300000  5.000000           0.0


In [7]:
num_unique_motion_bursts = dataset['motion_burst'].nunique()
motion_burst_counts = dataset['motion_burst'].value_counts()
print(f"Value counts of motion_burst:\n{motion_burst_counts}")

# Check motion burst distribution
motion_burst_counts = dataset['motion_burst'].value_counts(normalize=True) * 100
print(f"Motion Burst Distribution:\n{motion_burst_counts}")


Value counts of motion_burst:
motion_burst
0.0    6397175
1.0     157578
Name: count, dtype: int64
Motion Burst Distribution:
motion_burst
0.0    97.595973
1.0     2.404027
Name: proportion, dtype: float64


In [8]:
print(f"New artifact density: {dataset['motion_burst'].mean() * 100:.2f}%")
print(dataset['acc_mag'].describe())

New artifact density: 2.40%
count    6.554753e+06
mean     4.667615e-01
std      9.818329e-01
min     -8.811038e-01
25%     -4.866879e-02
50%      0.000000e+00
75%      9.513312e-01
max      1.122975e+01
Name: acc_mag, dtype: float64


In [9]:
# import matplotlib.pyplot as plt
# # Visualize results
# plt.figure(figsize=(12, 6))
# plt.plot(dataset['acc_mag'], label="Accelerometer Magnitude")
# plt.plot(dataset['motion_burst'] * dataset['acc_mag'].max(), label="Motion Bursts", linestyle='--')
# plt.legend()
# plt.title("Motion Burst Detection")
# plt.show()

### Adaptive Filtering for Motion Artifact Removal

In [10]:
adaptive_filter = AdaptiveFilter()
cleaned_bvp = adaptive_filter.apply_adaptive_filter(
    noisy_signal=dataset['bvp'].values,
    reference_signal=dataset['acc_mag'].values,
    motion_burst=dataset['motion_burst'].values
)
dataset['bvp_cleaned'] = cleaned_bvp

### Apply kalman filter

In [11]:
kalman_filter = KalmanFilter()
bvp_smoothed = kalman_filter.apply_kalman_filter(
    signal=dataset['bvp_cleaned'].values,
    motion_burst=dataset['motion_burst'].values
)
dataset['bvp_smoothed'] = bvp_smoothed

### Wavelet Denoising

In [12]:


wavelet_denoiser = WaveletDenoiser()

denoised_bvp = wavelet_denoiser.apply_wavelet_denoising(
    dataset['bvp_smoothed'].values,
    motion_burst=dataset['motion_burst'].values,
    skin_tone=dataset['skin_tone'].iloc[0],
    noise_level=dataset['noise_level'].median()  # Add noise level from dataset
)

# Verify lengths match before assignment
assert len(denoised_bvp) == len(dataset), "Denoised signal length mismatch"

dataset['bvp_denoised'] = denoised_bvp

## Runnig the Pipeline

In [10]:
import pandas as pd

In [11]:
dataset_ = pd.read_parquet("../data/processed/cleaned_unified_dataset.parquet")
pipeline = SignalProcessingPipeline()
processed_df = pipeline.process_signal(dataset_)



In [12]:
processed_df.head()

Unnamed: 0,bvp,bvp_cleaned,bvp_smoothed,bvp_denoised,motion_burst,acc_mag,device,skin_tone,noise_level,label,subject_id
2020-01-03 08:00:00+00:00,5.673109,5.559646,3.808351,3.900794,0.0,-0.023022,apple_watch,V-VI,0.05088,0,2
2020-01-03 08:00:00+00:00,7.687833,7.534076,4.133103,4.18534,0.0,-0.021271,apple_watch,I-II,0.07712,0,2
2020-01-03 08:00:00+00:00,1.50956,1.479369,4.258502,4.282481,0.0,-0.01799,apple_watch,III-IV,0.064,0,2
2020-01-03 08:00:00+00:00,12.999866,12.739869,4.32637,4.328632,0.0,1.016824,galaxy_watch,III-IV,0.096,0,2
2020-01-03 08:00:00+00:00,20.798602,20.38263,4.369558,4.357221,0.0,1.016824,galaxy_watch,V-VI,0.07632,0,2


In [13]:
print(processed_df[['bvp', 'bvp_cleaned', 'bvp_smoothed']].std())

bvp             28.719013
bvp_cleaned      2.112163
bvp_smoothed     1.719293
dtype: float64


In [14]:
import numpy as np
from scipy.signal import find_peaks
from scipy.signal import correlate
from scipy.signal import periodogram

### Signal Quality Metrics

In [15]:
class SignalQualityMetrics:
    def compute_snr(self, cleaned: np.ndarray, original: np.ndarray, 
                   noise_level: np.ndarray = None, fs: int = 30) -> float:
        """
        Compute noise-aware SNR with spectral weighting.
        
        Parameters:
            cleaned (np.ndarray): Processed signal
            original (np.ndarray): Original noisy signal
            noise_level (np.ndarray): Array of noise levels from dataset
            fs (int): Sampling frequency
            
        Returns:
            float: SNR in dB
        """
        # 1. Spectral alignment
        freqs, psd_clean = periodogram(cleaned, fs=fs)
        _, psd_noisy = periodogram(original, fs=fs)
        
        # 2. Noise-floor estimation using dataset's noise level
        if noise_level is not None:
            # Resample noise level to match PSD bins
            noise_weights = np.interp(freqs, 
                                    np.linspace(0, fs/2, len(noise_level)),
                                    noise_level)
            noise_floor = np.percentile(psd_noisy, 10) * noise_weights
        else:
            noise_floor = np.percentile(psd_noisy, 10)
        
        # 3. Frequency-band specific SNR calculation
        bands = {
            'cardiac': (0.8, 4),    # Typical heart rate range
            'respiration': (0.1, 0.5),
            'motion': (0.5, 10)
        }
        
        snr_values = []
        for band_name, (low, high) in bands.items():
            band_mask = (freqs >= low) & (freqs <= high)
            signal_power = np.trapz(psd_clean[band_mask], freqs[band_mask])
            noise_power = np.trapz(noise_floor[band_mask], freqs[band_mask])
            
            # Add epsilon to avoid division by zero
            snr = 10 * np.log10((signal_power + 1e-9) / (noise_power + 1e-9))
            snr_values.append(snr)
        
        # 4. Weighted composite SNR (emphasize cardiac band)
        weights = np.array([0.6, 0.2, 0.2])  # Cardiac, Respiration, Motion
        return np.dot(snr_values, weights)

    def compute_artifact_density(self, motion_burst: np.ndarray,
                                noise_level: np.ndarray) -> float:
        """
        Compute noise-weighted artifact density.
        
        Parameters:
            motion_burst (np.ndarray): Binary motion artifact indicators
            noise_level (np.ndarray): Associated noise levels
            
        Returns:
            float: Weighted artifact density percentage
        """
        # Weight artifacts by their noise level contribution
        weighted_artifacts = motion_burst * noise_level
        return 100 * np.sum(weighted_artifacts) / np.sum(noise_level)

    def temporal_consistency(self, signal: np.ndarray) -> float:
        """
        Measure signal continuity using autocorrelation.
        
        Parameters:
            signal (np.ndarray): Processed signal
            
        Returns:
            float: Consistency metric (higher = more consistent)
        """
        autocorr = np.correlate(signal, signal, mode='full')
        autocorr /= autocorr.max()
        mid = len(autocorr) // 2
        return np.trapz(autocorr[mid:mid+300])  # Integrate first 10s correlation

In [16]:
# Compute metrics using original BVP as reference
metrics = SignalQualityMetrics()
snr = metrics.compute_snr(processed_df['bvp_cleaned'], 
                         processed_df['bvp'],
                         processed_df['noise_level'])

artifact_density = metrics.compute_artifact_density(processed_df['motion_burst'].values, processed_df['noise_level'].values)

print(f"SNR: {snr:.2f} dB, Artifact Density: {artifact_density:.2f}%")

SNR: 2.23 dB, Artifact Density: 2.37%


## Saving cleaned data

In [20]:
pipeline.save_cleaned_dataset(dataset, "../data/cleaned_signal_dataset")

Cleaned dataset saved to ../data/cleaned_signal_dataset
