# Reading the data

#### The data used is the anolamly detection database called "Numenta Anomaly Benchmark" (NAB) which  is a novel benchmark for evaluating algorithms for anomaly detection in streaming, real-time applications. 



In [None]:
# All Imports
import os
import pandas as pd
import json
import time
import numpy as np
from sklearn.metrics import precision_score, recall_score, f1_score # for evaluation only


In [21]:
# Directory path where the CSV files are stored
base_dir = 'data/numenta NAB master data/'

# List to store tuples of (file name, DataFrame)
dfs = []

# Walk through all subdirectories and files
for root, dirs, files in os.walk(base_dir):
    for file in files:
        if file.endswith('.csv'):  # Check if the file is a CSV
            # Construct full file path
            file_path = os.path.join(root, file)
            # Read the CSV file and store it in a DataFrame
            df = pd.read_csv(file_path)
            
            # Append the tuple (file name, DataFrame) to the list
            dfs.append((file_path.split('/')[-1].replace('\\', '/'), df))

# Now `dfs` contains tuples of (file name, DataFrame) from all CSV files


In [23]:
# Load the JSON file
with open('data/combined_labels.json', 'r') as f:
    labels = json.load(f)

# Iterate over the list of (file name, DataFrame) tuples
for file_name, df in dfs:
    # Get the corresponding file name's anomalies from the JSON file
    anomaly_times = labels.get(file_name, [])

    # Convert the DataFrame's time column to datetime if not already
    if 'timestamp' in df.columns:
        df['timestamp'] = pd.to_datetime(df['timestamp'])
    else:
        raise ValueError(f"DataFrame for {file_name} does not have a 'timestamp' column.")

    # Create the 'anomaly' column and mark anomalies based on the timestamps in the JSON
    df['anomaly'] = df['timestamp'].apply(lambda x: 1 if str(x) in anomaly_times else 0)

    # If you'd like to print or check the updated DataFrame
    print(f"Updated DataFrame for {file_name}:")
    print(df.head())  # Print the first few rows to confirm



Updated DataFrame for artificialWithAnomaly/art_daily_flatmiddle.csv:
            timestamp      value  anomaly
0 2014-04-01 00:00:00 -21.048383        0
1 2014-04-01 00:05:00 -20.295477        0
2 2014-04-01 00:10:00 -18.127229        0
3 2014-04-01 00:15:00 -20.171665        0
4 2014-04-01 00:20:00 -21.223762        0
Updated DataFrame for artificialWithAnomaly/art_daily_jumpsdown.csv:
            timestamp      value  anomaly
0 2014-04-01 00:00:00  18.090486        0
1 2014-04-01 00:05:00  20.359843        0
2 2014-04-01 00:10:00  21.105470        0
3 2014-04-01 00:15:00  21.151585        0
4 2014-04-01 00:20:00  18.137141        0
Updated DataFrame for artificialWithAnomaly/art_daily_jumpsup.csv:
            timestamp      value  anomaly
0 2014-04-01 00:00:00  19.761252        0
1 2014-04-01 00:05:00  20.500833        0
2 2014-04-01 00:10:00  19.961641        0
3 2014-04-01 00:15:00  21.490266        0
4 2014-04-01 00:20:00  20.187739        0
Updated DataFrame for artificialWithAn

# Testing the Algorithms

##### I have read and researched many algorithms, However the following were choosen because of their efficent implementation using only Numpy, and their diversity

In [25]:

class OMLADStreaming:
    def __init__(self, threshold=3):
        """
        Streaming anomaly detection using a rolling Z-score.

        Parameters:
        threshold (float): Z-score threshold to classify anomalies.
        """
        self.threshold = threshold
        self.n = 0  # Counter for the number of observations
        self.mean = 0  # Initial mean
        self.m2 = 0  # Sum of squares of differences from the mean
        self.std = 0  # Initial standard deviation
        self.anomalies = []  # To store anomaly detection results

    def update_stats(self, new_value):
        """
        Update the rolling mean and standard deviation with a new value.
        
        Parameters:
        new_value (float): The new value from the time series.
        """
        self.n += 1
        delta = new_value - self.mean
        self.mean += delta / self.n
        delta2 = new_value - self.mean
        self.m2 += delta * delta2

        if self.n > 1:
            self.std = np.sqrt(self.m2 / (self.n - 1))
        else:
            self.std = 0  # Avoid division by zero for the first point

    def update(self, new_value):
        """
        Detect whether a new value is an anomaly based on the Z-score.
        
        Parameters:
        new_value (float): The new value from the time series.

        Returns:
        int: 1 if the value is an anomaly, 0 otherwise.
        """
        if self.n < 2:
            # Not enough data points to compute Z-score yet
            self.anomalies.append(0)
            self.update_stats(new_value)
            return 0
        
        # Calculate Z-score for the new value
        z_score = (new_value - self.mean) / self.std if self.std > 0 else 0
        
        # Check if the Z-score exceeds the threshold
        if abs(z_score) > self.threshold:
            anomaly = 1
        else:
            anomaly = 0
        
        # Update rolling statistics after checking for an anomaly
        self.update_stats(new_value)
        
        # Append result to anomaly list
        self.anomalies.append(anomaly)
        
        return anomaly


def apply_omlad_streaming(df, threshold=3):
    """
    Apply OMLADStreaming to a DataFrame, detect anomalies, and record time taken per point using time.perf_counter().
    
    Parameters:
    df (pd.DataFrame): DataFrame with 'timestamp' and 'value' columns.
    threshold (float): Z-score threshold for anomaly detection.
    
    Returns:
    pd.DataFrame: DataFrame with additional columns 'predicted_anomaly' and 'time_taken_per_point'.
    """
    omlad = OMLADStreaming(threshold=threshold)
    
    times_taken = []
    anomalies = []
    
    for value in df['value']:
        start_time = time.perf_counter()  # High precision timing
        
        # Detect anomaly
        anomaly = omlad.update(value)
        anomalies.append(anomaly)
        
        end_time = time.perf_counter()  # High precision timing
        times_taken.append(end_time - start_time)
    
    # Add the results to the DataFrame
    df['predicted_anomaly_OMLADStreaming'] = anomalies
    df['time_taken_per_point_OMLADStreaming'] = times_taken
    
    return df

In [26]:
class OMLADEWMAStreaming:
    def __init__(self, threshold=3, alpha=0.3):
        """
        Streaming anomaly detection using EWMA for mean and standard deviation.
        
        Parameters:
        threshold (float): Z-score threshold to classify anomalies.
        alpha (float): Smoothing factor for EWMA. A higher value gives more weight to recent observations.
        """
        self.threshold = threshold
        self.alpha = alpha
        self.ewma_mean = None  # EWMA for mean
        self.ewma_var = None   # EWMA for variance
        self.std = None        # EWMA for standard deviation
        self.anomalies = []    # To store anomaly detection results

    def update_ewma(self, new_value):
        """
        Update the EWMA mean and variance with a new value.
        
        Parameters:
        new_value (float): The new value from the time series.
        """
        if self.ewma_mean is None:
            # Initialize EWMA mean and variance with the first value
            self.ewma_mean = new_value
            self.ewma_var = 0
            self.std = 0
        else:
            # Update EWMA mean
            self.ewma_mean = self.alpha * new_value + (1 - self.alpha) * self.ewma_mean

            # Update EWMA variance (mean square error)
            self.ewma_var = self.alpha * (new_value - self.ewma_mean)**2 + (1 - self.alpha) * self.ewma_var

            # Calculate EWMA standard deviation
            self.std = np.sqrt(self.ewma_var)

    def update(self, new_value):
        """
        Detect whether a new value is an anomaly based on the EWMA Z-score.
        
        Parameters:
        new_value (float): The new value from the time series.

        Returns:
        int: 1 if the value is an anomaly, 0 otherwise.
        """
        if self.std is None or self.std == 0:
            # Not enough data to calculate standard deviation, can't detect anomalies
            self.update_ewma(new_value)
            self.anomalies.append(0)
            return 0

        # Calculate Z-score based on EWMA mean and standard deviation
        z_score = (new_value - self.ewma_mean) / self.std
        
        # Check if the Z-score exceeds the threshold
        if abs(z_score) > self.threshold:
            anomaly = 1
        else:
            anomaly = 0

        # Update EWMA stats after checking for an anomaly
        self.update_ewma(new_value)

        # Append result to anomaly list
        self.anomalies.append(anomaly)
        
        return anomaly

def apply_omladewma_streaming(df, threshold=3, alpha=0.3):
    """
    Apply OMLADEWMAStreaming to a DataFrame, detect anomalies, and record time taken per point using time.perf_counter().
    
    Parameters:
    df (pd.DataFrame): DataFrame with 'timestamp' and 'value' columns.
    threshold (float): Z-score threshold for anomaly detection.
    alpha (float): Smoothing factor for EWMA.
    
    Returns:
    pd.DataFrame: DataFrame with an additional column 'predicted_anomaly' and 'time_taken_per_point_OMLADEWMAStreaming'.
    """
    ewma = OMLADEWMAStreaming(threshold=threshold, alpha=alpha)
    
    times_taken = []
    anomalies = []
    
    for value in df['value']:
        start_time = time.perf_counter()  # High precision timing
        
        # Detect anomaly
        anomaly = ewma.update(value)
        anomalies.append(anomaly)
        
        end_time = time.perf_counter()  # High precision timing
        times_taken.append(end_time - start_time)
    
    # Add the results to the DataFrame
    df['predicted_anomaly_OMLADEWMAStreaming'] = anomalies
    df['time_taken_per_point_OMLADEWMAStreaming'] = times_taken
    
    return df



In [27]:
class StreamingHampelFilter:
    def __init__(self, window_size=7, n_sigmas=3):
        """
        Streaming Hampel Filter for anomaly detection.
        
        Parameters:
        window_size (int): The size of the sliding window (number of points to include on each side).
        n_sigmas (int): The number of median absolute deviations to use for the threshold.
        """
        self.window_size = window_size
        self.n_sigmas = n_sigmas
        self.window = []  # Sliding window to hold the recent data points

    def update(self, new_value):
        """
        Update the Hampel filter with a new value and detect if it's an anomaly.
        
        Parameters:
        new_value (float): New value from the data stream.
        
        Returns:
        int: 1 if the value is an anomaly, 0 otherwise.
        """
        # Append the new value to the sliding window
        self.window.append(new_value)

        # Ensure the window size is maintained
        if len(self.window) > 2 * self.window_size + 1:
            self.window.pop(0)

        # If the window is not full yet, return 0 (no anomaly)
        if len(self.window) < 2 * self.window_size + 1:
            return 0

        # Calculate the median and MAD (Median Absolute Deviation) of the window
        median = np.median(self.window)
        mad = np.median(np.abs(self.window - median))

        # Define the threshold based on the MAD
        threshold = self.n_sigmas * mad

        # Check if the current value is an anomaly
        if np.abs(new_value - median) > threshold:
            return 1  # Anomaly detected
        else:
            return 0  # No anomaly detected


def apply_streaming_hampel_to_df(df, window_size=7, n_sigmas=3):
    """
    Apply the Streaming Hampel filter to a time series data in a DataFrame and return a DataFrame with predicted anomalies and time taken per point.
    
    Parameters:
    df (pd.DataFrame): DataFrame with 'timestamp' and 'value' columns.
    window_size (int): The size of the sliding window for anomaly detection.
    n_sigmas (int): Number of MADs used for the threshold.
    
    Returns:
    pd.DataFrame: DataFrame with additional columns 'predicted_anomaly_StreamingHampelFilter' and 'time_taken_per_point_StreamingHampelFilter'.
    """
    hampel_filter = StreamingHampelFilter(window_size=window_size, n_sigmas=n_sigmas)

    # Lists to store time taken and anomaly detection results
    times_taken = []
    anomalies = []
    
    for value in df['value']:
        start_time = time.perf_counter()  # Start high-precision timing

        # Detect anomaly
        anomaly = hampel_filter.update(value)
        anomalies.append(anomaly)

        end_time = time.perf_counter()  # End high-precision timing
        times_taken.append(end_time - start_time)
    
    # Add the results to the DataFrame
    df['predicted_anomaly_StreamingHampelFilter'] = anomalies
    df['time_taken_per_point_StreamingHampelFilter'] = times_taken
    
    return df


In [28]:
class ADWIN:
    def __init__(self, delta=0.001):
        """
        ADWIN algorithm for adaptive windowing and change detection in data streams.

        Parameters:
        delta (float): Confidence level. Lower values make the algorithm more sensitive to changes.
        """
        self.delta = delta
        self.window = []
        self.width = 0  # Size of the window
        self.total = 0  # Sum of the elements in the window
        self.total_squared = 0  # Sum of squares of the elements in the window

    def _variance(self, mean):
        """
        Calculate variance of the window.
        
        Parameters:
        mean (float): The mean of the window.

        Returns:
        float: Variance of the window.
        """
        if self.width == 0:
            return 0
        return (self.total_squared / self.width) - mean**2

    def _cut_point(self, mean, mean_left, var_left, mean_right, var_right, n_left, n_right):
        """
        Compute whether there is a significant difference between the two sub-windows.

        Parameters:
        mean (float): Mean of the entire window.
        mean_left (float): Mean of the left sub-window.
        mean_right (float): Mean of the right sub-window.
        var_left (float): Variance of the left sub-window.
        var_right (float): Variance of the right sub-window.
        n_left (int): Number of points in the left sub-window.
        n_right (int): Number of points in the right sub-window.

        Returns:
        bool: True if change is detected, False otherwise.
        """
        diff = abs(mean_left - mean_right)
        epsilon = np.sqrt(
            (1 / (2 * n_left)) * var_left +
            (1 / (2 * n_right)) * var_right
        ) + np.sqrt(self.delta / (2 * n_left)) + np.sqrt(self.delta / (2 * n_right))

        return diff > epsilon

    def update(self, value):
        """
        Update the window with a new value and check for change points.

        Parameters:
        value (float): New value from the data stream.

        Returns:
        bool: True if a change is detected, False otherwise.
        """
        # Add new value to the window
        self.window.append(value)
        self.width += 1
        self.total += value
        self.total_squared += value ** 2

        mean = self.total / self.width
        change_detected = False

        for i in range(1, self.width):
            left_window = self.window[:i]
            right_window = self.window[i:]

            # Calculate statistics for the left and right sub-windows
            n_left, n_right = len(left_window), len(right_window)
            mean_left = np.mean(left_window)
            mean_right = np.mean(right_window)
            var_left = np.var(left_window)
            var_right = np.var(right_window)

            # Check if a change occurred
            if self._cut_point(mean, mean_left, var_left, mean_right, var_right, n_left, n_right):
                # Change detected, shrink the window
                self.window = self.window[i:]
                self.width = len(self.window)
                self.total = np.sum(self.window)
                self.total_squared = np.sum(np.square(self.window))
                change_detected = True
                break

        return change_detected

def apply_adwin_to_df(df, delta=0.002):
    """
    Apply ADWIN to a time series data in a DataFrame and return a DataFrame with predicted anomalies and time taken per point.
    
    Parameters:
    df (pd.DataFrame): DataFrame with 'timestamp' and 'value' columns.
    delta (float): Confidence level for ADWIN.

    Returns:
    pd.DataFrame: DataFrame with additional columns 'predicted_anomaly_ADWIN' and 'time_taken_per_point_ADWIN'.
    """
    adwin = ADWIN(delta=delta)
    
    times_taken = []
    anomalies = []
    
    for value in df['value']:
        start_time = time.perf_counter()  # Start high-precision timing
        
        # Detect anomaly
        anomaly = adwin.update(value)
        anomalies.append(1 if anomaly else 0)
        
        end_time = time.perf_counter()  # End high-precision timing
        times_taken.append(end_time - start_time)
    
    # Add the results to the DataFrame
    df['predicted_anomaly_ADWIN'] = anomalies
    df['time_taken_per_point_ADWIN'] = times_taken
    
    return df



In [29]:
class SlidingWindowDensityEstimation:
    def __init__(self, window_size=50, bandwidth=1.0, density_threshold=0.01):
        """
        Sliding Window Density Estimation for anomaly detection.
        
        Parameters:
        window_size (int): Size of the sliding window to hold recent data points.
        bandwidth (float): Bandwidth parameter for the Gaussian kernel.
        density_threshold (float): Probability density threshold below which the point is considered an anomaly.
        """
        self.window_size = window_size
        self.bandwidth = bandwidth
        self.density_threshold = density_threshold
        self.window = []

    def gaussian_kernel(self, distance):
        """
        Gaussian kernel function.
        
        Parameters:
        distance (float): Distance between points.

        Returns:
        float: The Gaussian kernel value.
        """
        return (1 / (np.sqrt(2 * np.pi) * self.bandwidth)) * np.exp(-0.5 * (distance / self.bandwidth) ** 2)

    def kernel_density_estimate(self, value):
        """
        Estimate the density of the value using a Gaussian kernel based on the sliding window.

        Parameters:
        value (float): The new value for which to estimate the density.

        Returns:
        float: The estimated density.
        """
        distances = np.abs(np.array(self.window) - value)
        kernel_values = self.gaussian_kernel(distances)
        density = np.sum(kernel_values) / (len(self.window) * self.bandwidth)
        return density

    def update(self, value):
        """
        Update the sliding window with a new value and detect anomalies using kernel density estimation.

        Parameters:
        value (float): New value from the data stream.

        Returns:
        int: 1 if the value is an anomaly, 0 otherwise.
        """
        # Add the new value to the sliding window
        self.window.append(value)

        # If the window exceeds the window_size, remove the oldest value
        if len(self.window) > self.window_size:
            self.window.pop(0)

        # If there are fewer points than needed for density estimation, return 0 (non-anomalous)
        if len(self.window) < 2:
            return 0

        # Estimate the density of the new value
        density = self.kernel_density_estimate(value)

        # Detect anomaly based on the density threshold
        if density < self.density_threshold:
            return 1  # Anomaly detected
        else:
            return 0  # No anomaly detected

def apply_streaming_swde_to_df(df, window_size=50, bandwidth=1.0, density_threshold=0.01):
    """
    Apply Streaming Sliding Window Density Estimation to a time series data in a DataFrame 
    and return a DataFrame with predicted anomalies and time taken per point.
    
    Parameters:
    df (pd.DataFrame): DataFrame with 'timestamp' and 'value' columns.
    window_size (int): Size of the sliding window for anomaly detection.
    bandwidth (float): Bandwidth parameter for Gaussian kernel.
    density_threshold (float): Probability density threshold for anomaly detection.

    Returns:
    pd.DataFrame: DataFrame with additional columns 'predicted_anomaly_SWDE' and 'time_taken_per_point_SWDE'.
    """
    swde = SlidingWindowDensityEstimation(window_size=window_size, bandwidth=bandwidth, density_threshold=density_threshold)

    times_taken = []
    anomalies = []
    
    for value in df['value']:
        start_time = time.perf_counter()  # Start high-precision timing

        # Detect anomaly
        anomaly = swde.update(value)
        anomalies.append(anomaly)

        end_time = time.perf_counter()  # End high-precision timing
        times_taken.append(end_time - start_time)
    
    # Add the results to the DataFrame
    df['predicted_anomaly_SWDE'] = anomalies
    df['time_taken_per_point_SWDE'] = times_taken
    
    return df


In [30]:
class EWMA:
    def __init__(self, alpha=0.3, threshold=3):
        """
        Exponential Weighted Moving Average (EWMA) for anomaly detection.
        
        Parameters:
        alpha (float): Smoothing factor for the EWMA, ranges between 0 and 1.
        threshold (float): Z-score threshold for detecting anomalies.
        """
        self.alpha = alpha
        self.threshold = threshold
        self.ewma_value = None
        self.ewma_std = None
        self.n = 0  # Keeps track of the number of points processed
        self.mean = 0
        self.m2 = 0  # To track the variance

    def update(self, value):
        """
        Update the EWMA with a new value and detect anomalies.
        
        Parameters:
        value (float): New value from the data stream.
        
        Returns:
        int: 1 if the value is an anomaly, 0 otherwise.
        """
        if self.ewma_value is None:
            self.ewma_value = value
            self.ewma_std = 0
            return 0

        # Update EWMA
        self.ewma_value = self.alpha * value + (1 - self.alpha) * self.ewma_value

        # Update the variance using Welford's online algorithm
        self.n += 1
        delta = value - self.mean
        self.mean += delta / self.n
        delta2 = value - self.mean
        self.m2 += delta * delta2
        variance = self.m2 / (self.n - 1) if self.n > 1 else 0
        self.ewma_std = np.sqrt(variance)

        # Calculate the Z-score
        z_score = (value - self.ewma_value) / self.ewma_std if self.ewma_std > 0 else 0

        # Detect anomaly
        return 1 if np.abs(z_score) > self.threshold else 0

def apply_ewma_to_df(df, alpha=0.3, threshold=3):
    ewma = EWMA(alpha=alpha, threshold=threshold)

    times_taken = []
    anomalies = []

    for value in df['value']:
        start_time = time.perf_counter()
        anomaly = ewma.update(value)
        anomalies.append(anomaly)
        end_time = time.perf_counter()
        times_taken.append(end_time - start_time)

    df['predicted_anomaly_EWMA'] = anomalies
    df['time_taken_per_point_EWMA'] = times_taken
    
    return df


In [31]:
class RankedSlidingWindowAnomalyDetection:
    def __init__(self, window_size=50, rank_threshold=0.95):
        """
        Ranked Sliding Window Anomaly Detection (RSWAD).
        
        Parameters:
        window_size (int): Size of the sliding window to hold recent data points.
        rank_threshold (float): Rank threshold for detecting anomalies (e.g., 0.95 means the top 5% ranked points are anomalies).
        """
        self.window_size = window_size
        self.rank_threshold = rank_threshold
        self.window = []

    def update(self, value):
        """
        Update the sliding window with a new value and detect anomalies using rank-based detection.
        
        Parameters:
        value (float): New value from the data stream.
        
        Returns:
        int: 1 if the value is an anomaly, 0 otherwise.
        """
        self.window.append(value)

        if len(self.window) > self.window_size:
            self.window.pop(0)

        if len(self.window) < self.window_size:
            return 0

        # Calculate median and deviations
        median = np.median(self.window)
        deviations = np.abs(np.array(self.window) - median)

        # Rank the deviations
        ranked_deviations = np.argsort(deviations)

        # Rank the current value's deviation
        current_deviation = np.abs(value - median)
        current_rank = np.searchsorted(deviations[ranked_deviations], current_deviation)

        # Calculate the normalized rank
        normalized_rank = current_rank / len(self.window)

        # Detect anomaly
        return 1 if normalized_rank > self.rank_threshold else 0

def apply_rswad_to_df(df, window_size=50, rank_threshold=0.95):
    rswad = RankedSlidingWindowAnomalyDetection(window_size=window_size, rank_threshold=rank_threshold)

    times_taken = []
    anomalies = []

    for value in df['value']:
        start_time = time.perf_counter()
        anomaly = rswad.update(value)
        anomalies.append(anomaly)
        end_time = time.perf_counter()
        times_taken.append(end_time - start_time)

    df['predicted_anomaly_RSWAD'] = anomalies
    df['time_taken_per_point_RSWAD'] = times_taken

    return df



In [32]:
class MovingZScore:
    def __init__(self, window_size=20, threshold=3):
        """
        Moving Z-Score for anomaly detection.
        
        Parameters:
        window_size (int): Size of the sliding window to hold recent data points.
        threshold (float): Z-score threshold for detecting anomalies.
        """
        self.window_size = window_size
        self.threshold = threshold
        self.window = []

    def update(self, value):
        """
        Update the sliding window with a new value and detect anomalies using Z-Score.
        
        Parameters:
        value (float): New value from the data stream.
        
        Returns:
        int: 1 if the value is an anomaly, 0 otherwise.
        """
        # Add the new value to the sliding window
        self.window.append(value)

        # If the window exceeds the window_size, remove the oldest value
        if len(self.window) > self.window_size:
            self.window.pop(0)

        # If there are fewer points than the window size, return 0 (no anomaly detected)
        if len(self.window) < self.window_size:
            return 0

        # Calculate mean and standard deviation of the window
        mean = np.mean(self.window)
        std = np.std(self.window)

        # If the standard deviation is 0, treat the point as non-anomalous
        if std == 0:
            return 0

        # Calculate the Z-score of the new value
        z_score = (value - mean) / std

        # Detect anomaly based on the Z-score threshold
        if abs(z_score) > self.threshold:
            return 1  # Anomaly detected
        else:
            return 0  # No anomaly detected

def apply_moving_zscore_to_df(df, window_size=20, threshold=3):
    """
    Apply Moving Z-Score to a time series data in a DataFrame and return a DataFrame with predicted anomalies and time taken per point.

    Parameters:
    df (pd.DataFrame): DataFrame with 'timestamp' and 'value' columns.
    window_size (int): Size of the sliding window for anomaly detection.
    threshold (float): Z-score threshold for anomaly detection.

    Returns:
    pd.DataFrame: DataFrame with additional columns 'predicted_anomaly' (0: normal, 1: anomaly) and 'time_taken_per_point_MovingZScore'.
    """
    mzs = MovingZScore(window_size=window_size, threshold=threshold)

    times_taken = []
    anomalies = []

    for value in df['value']:
        start_time = time.perf_counter()  # Start high-precision timing
        anomaly = mzs.update(value)
        anomalies.append(anomaly)
        end_time = time.perf_counter()  # End high-precision timing
        times_taken.append(end_time - start_time)

    df['predicted_anomaly_MovingZScore'] = anomalies
    df['time_taken_per_point_MovingZScore'] = times_taken
    
    return df



# Testing the Algorithms 

In [33]:
result_dfs = []
for i in range(len(dfs)):
    df = dfs[i][1]
    df = apply_omlad_streaming(df, threshold=3)
    df = apply_omladewma_streaming(df, threshold=3, alpha=0.3)
    df = apply_streaming_hampel_to_df(df, window_size=7, n_sigmas=3)
    df = apply_adwin_to_df(df, delta=0.002)
    df = apply_streaming_swde_to_df(df, window_size=50, bandwidth=1.0, density_threshold=0.01)
    df = apply_ewma_to_df(df, alpha=0.3, threshold=3)
    df = apply_rswad_to_df(df, window_size=50, rank_threshold=0.95)
    df = apply_moving_zscore_to_df(df, window_size=20, threshold=3)
    result_dfs.append(df)

##### Th concept of tolerance is crucial, as detecting the exact ground turth anomaly point could be challenging in a streamed enviroment. Hence, a 2% tolerance would consider detecting the anomaly within that range from the ground truth and award that in the evaluation

In [34]:


# Function to apply 1% tolerance window and calculate metrics
def apply_tolerance_and_evaluate(df, tolerance=0.01):
    num_points = len(df)
    tolerance_window = int(tolerance * num_points)  # Calculate the window size as 1% of the total points
    
    # Create a binary array for tolerance windows
    tolerance_array = np.zeros(len(df), dtype=int)
    anomaly_indices = df.index[df['anomaly'] == 1].tolist()
    
    for idx in anomaly_indices:
        start = max(0, idx - tolerance_window)
        end = min(num_points, idx + tolerance_window + 1)
        tolerance_array[start:end] = 1  # Mark all points within the tolerance window as 'anomaly'
    
    # Add the tolerance window as a new column to the DataFrame
    df['tolerance_window'] = tolerance_array
    
    methods = [col for col in df.columns if col.startswith('predicted_anomaly_')]
    results = {}
    
    for method in methods:
        precision = precision_score(df['tolerance_window'], df[method])
        recall = recall_score(df['tolerance_window'], df[method])
        f1 = f1_score(df['tolerance_window'], df[method])
        
        results[method] = {
            'precision': precision,
            'recall': recall,
            'f1_score': f1
        }
    
    return results

# Function to calculate average time taken per point for each method
def calculate_average_time(df):
    time_cols = [col for col in df.columns if col.startswith('time_taken_per_point_')]
    avg_time_results = {}
    
    for time_col in time_cols:
        avg_time = df[time_col].mean()
        avg_time_results[time_col] = avg_time
    
    return avg_time_results

final_results_list = []
for ele in result_dfs:
    # Apply the tolerance and evaluate metrics
    metrics = apply_tolerance_and_evaluate(ele, tolerance=0.02)

    # Calculate average time taken per point
    average_times = calculate_average_time(ele)

    # Convert metrics to a DataFrame
    metrics_df = pd.DataFrame(metrics).T  # Transpose to have methods as rows

    # Convert average times to a DataFrame
    times_df = pd.DataFrame(list(average_times.items()), columns=['method', 'average_time'])

    # Clean the 'method' column to match between both DataFrames
    metrics_df = metrics_df.reset_index()  # Reset index to make method names a column
    metrics_df['index'] = metrics_df['index'].apply(lambda x: x.split('_')[-1])  # Extract method names

    times_df['method'] = times_df['method'].apply(lambda x: x.split('_')[-1])  # Extract method names

    # Merge both DataFrames on the method name
    final = pd.merge(metrics_df, times_df, left_on='index', right_on='method', how='inner')

    # Drop unnecessary columns
    final = final.drop(columns=['method'])

    # append to final_results_list
    final_results_list.append(final)








  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [35]:
final_results_concat = pd.concat(final_results_list)

# calculate the mean for each metric
mean_metrics = final_results_concat.groupby('index').mean()

mean_metrics

Unnamed: 0_level_0,precision,recall,f1_score,average_time
index,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
ADWIN,0.054949,0.822629,0.098765,0.000451
EWMA,0.036809,0.018331,0.024366,3e-06
MovingZScore,0.054703,0.013978,0.020132,3.1e-05
OMLADEWMAStreaming,0.048432,0.049748,0.047863,2e-06
OMLADStreaming,0.204724,0.098451,0.119517,2e-06
RSWAD,0.080794,0.070311,0.062563,3.9e-05
SWDE,0.057087,0.024431,0.028602,1.5e-05
StreamingHampelFilter,0.056354,0.124949,0.0744,5e-05


- **ADWIN** achieved the highest recall (0.822629), indicating its effectiveness in identifying a large proportion of actual anomalies, but has low precision.
- **OMLADStreaming** has the best precision (0.204724), meaning it has the highest ratio of true positives among the detected anomalies.
- **OMLADStreaming** also has the highest F1 score (0.119517), reflecting a better balance between precision and recall compared to other methods.
- All methods maintained low average processing times, suggesting efficient performance.
- The nature of the datasets varies, which may affect the results.
- No hyperparameter tuning was performed due to time constraints, which could lead to suboptimal performance of the algorithms.
- The nature of the actual data is unknown, aside from having drift and seasonality, making this evaluation primarily a demonstration of the methods rather than definitive conclusions on their effectiveness.
- This process serves to illustrate that further examination and analysis are essential for selecting the best anomaly detection algorithm.
