In [12]:
import numpy as np
from typing import Tuple
from oxytrace.src.utils.custom_logger import LOGGER


class StatisticalDetector:
    """Detect point anomalies using statistical methods."""
    
    def __init__(self, z_threshold: float = 3.0, iqr_multiplier: float = 1.5):
        """
        Initialize statistical detector.
        
        Args:
            z_threshold: Z-score threshold for anomaly detection
            iqr_multiplier: IQR multiplier for outlier detection
        """
        self.z_threshold = z_threshold
        self.iqr_multiplier = iqr_multiplier
        self.mean = None
        self.std = None
        self.q1 = None
        self.q3 = None
        self.iqr = None
    
    def fit(self, values: np.ndarray):
        """Learn normal distribution parameters."""
        self.mean = np.mean(values)
        self.std = np.std(values)
        self.q1 = np.percentile(values, 25)
        self.q3 = np.percentile(values, 75)
        self.iqr = self.q3 - self.q1
        
        LOGGER.info(
            "Statistical detector fitted",
            mean=float(self.mean),
            std=float(self.std),
            q1=float(self.q1),
            q3=float(self.q3)
        )
    
    def detect(self, values: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """
        Detect anomalies using Z-score and IQR methods.
        
        Returns:
            Tuple of (is_anomaly, anomaly_scores)
        """
        # Z-score based detection
        z_scores = np.abs((values - self.mean) / (self.std + 1e-8))
        z_anomalies = z_scores > self.z_threshold
        
        # IQR based detection
        lower_bound = self.q1 - self.iqr_multiplier * self.iqr
        upper_bound = self.q3 + self.iqr_multiplier * self.iqr
        iqr_anomalies = (values < lower_bound) | (values > upper_bound)
        
        # Combine both methods
        is_anomaly = z_anomalies | iqr_anomalies
        
        # Score is normalized z-score (0-100 scale)
        anomaly_scores = np.clip((z_scores / self.z_threshold) * 100, 0, 100)
        
        return is_anomaly, anomaly_scores

In [42]:
import os
import pandas as pd


dataset_path = os.path.join("oxytrace", "dataset", "curated_dataset.csv")

df = pd.read_csv(dataset_path, nrows=1000)

train_data = df[:800]
test_data = df[800:]

stat_detector = StatisticalDetector()
stat_detector.fit(values=train_data["Oxygen[%sat]"].to_numpy())

anomaly_detection_results = stat_detector.detect(test_data["Oxygen[%sat]"].to_numpy())

anomalies = []
for index, anomaly_status in enumerate(anomaly_detection_results[0]):
    if anomaly_status == True:
        anomalies.append(anomaly_detection_results[1][index])

print(anomalies)

# print([
#     (anomaly_detection_results[0][index], anomaly_detection_results[1][index])
#         for index, anomaly_status in enumerate(anomaly_detection_results[0]) 
#             if anomaly_detection_results[0][index] is False])



2025-11-15 23:51:28,578 - INFO - {'mean': 91.21122540473938, 'std': 1.4643869749288108, 'q1': 90.3380832672119, 'q3': 92.32570266723633, 'logger': 'default_logger', 'level': 'info', 'timestamp': '2025-11-15T17:51:28.578020Z', 'func_name': 'fit', 'lineno': 33, 'pathname': '/var/folders/6b/qgp33bqn7knbrt8q3tccmck40000gp/T/ipykernel_48439/3580856483.py', 'message': 'Statistical detector fitted'}
[2m2025-11-15T17:51:28.578020Z[0m [[32m[1minfo     [0m] [1mStatistical detector fitted   [0m [[0m[1m[34mdefault_logger[0m][0m [36mfunc_name[0m=[35mfit[0m [36mlineno[0m=[35m33[0m [36mmean[0m=[35m91.21122540473938[0m [36mpathname[0m=[35m/var/folders/6b/qgp33bqn7knbrt8q3tccmck40000gp/T/ipykernel_48439/3580856483.py[0m [36mq1[0m=[35m90.3380832672119[0m [36mq3[0m=[35m92.32570266723633[0m [36mstd[0m=[35m1.4643869749288108[0m


[100.0, 100.0, 100.0, 99.63814126031195, 100.0, 92.20561532946289, 96.00923192223489, 91.9779401695175, 96.93990406727484, 98.02079666491035, 100.0, 100.0, 100.0, 95.17459666769822, 93.19759131238288, 95.17459666769822, 100.0, 91.83570831902529, 100.0, 97.1606326166956, 100.0, 88.39557312195558]
