In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

def generate_data_stream(size=1000, period=50, amplitude=5, noise_factor=1.0):
    """
    Simulate a data stream with regular patterns, seasonal elements, and random noise.

    Parameters:
    - size: Number of data points to generate.
    - period: Period of the regular pattern.
    - amplitude: Amplitude of the regular pattern.
    - noise_factor: Magnitude of random noise.

    Returns:
    - A NumPy array representing the generated data stream.
    """
    data_stream = []

    for i in range(size):
        # Regular pattern
        regular_pattern = amplitude * np.sin(2 * np.pi * i / period)

        # Seasonal element (e.g., daily fluctuations)
        seasonal_element = 2 * np.sin(2 * np.pi * i / 24)

        # Random noise
        random_noise = noise_factor * np.random.randn()

        # Combine components to generate data point
        data_point = regular_pattern + seasonal_element + random_noise

        data_stream.append(data_point)

    return np.array(data_stream)

# Print the first 10 data points
# print(generated_stream[:10])

class StreamingMovingAverage:
    def __init__(self) -> None:
        # Initialize data stream
        self.data_streaming = []

    def add_data_point(self, value):
        # Add new data point to the streaming data
        self.data_streaming.append(value)

    def detect_anomaly(self, value):
        # Override this method in the derived class for specific anomaly detection
        pass

    def stream_data_and_detect_anomalies(self, data_stream):
        anomalies = []
        for value in data_stream:
            self.add_data_point(value)
            anomaly = self.detect_anomaly(value)
            anomalies.append(anomaly)
        return anomalies

class StreamingMovingTrimean(StreamingMovingAverage):
    def __init__(self, threshold=1.5) -> None:
        super().__init__()
        # Parameters
        self.max_deviation_from_expected = threshold

    def _enough_data(self) -> bool:
        '''Check if there is enough data'''
        return len(self.data_streaming) > 0

    def _standard_deviation(self) -> float:
        '''Return the standard deviation'''
        data = self.data_streaming
        data = pd.Series(data=data, dtype=float)
        variance = trimean(data) - data
        return pow(sum(variance ** 2) / len(data), 1/2)

    def _expected_value(self, timestamp: int) -> float:
        '''Return the expected value'''
        data = self.data_streaming
        data = pd.Series(data=data, dtype=float)
        return trimean(data)

    def detect_anomaly(self, value):
        if self._enough_data():
            expected_value = self._expected_value(len(self.data_streaming))
            deviation = abs(value - expected_value)
            if deviation > self.max_deviation_from_expected * self._standard_deviation():
                return 1  # Anomaly detected
        return 0  # No anomaly

def trimean(values):
    return (np.quantile(values, 0.25) + (2 * np.quantile(values, 0.50)) + np.quantile(values, 0.75)) / 4

def plot_anomalies(data_streams, algorithm, parameters):
    # Initialize the algorithm
    anomaly_detector = algorithm(**parameters)

    # Initialize a list to store anomaly labels for each data stream
    anomaly_labels_list = []

    # Stream data and detect anomalies for each data stream
    for data_stream in data_streams:
        anomaly_labels = anomaly_detector.stream_data_and_detect_anomalies(data_stream)
        anomaly_labels_list.append(anomaly_labels)

    # Plot the data streams with anomalies highlighted
    plt.figure(figsize=(10, 6))
    for i, data_stream in enumerate(data_streams):
        plt.plot(data_stream, label=f'Data Stream {i + 1}')
        plt.scatter(np.where(anomaly_labels_list[i] == 1), data_stream[anomaly_labels_list[i] == 1], color='red', label=f'Anomalies {i + 1}')
    plt.title('Streaming Moving Trimean Anomaly Detection')
    plt.xlabel('Time')
    plt.ylabel('Value')
    plt.legend()
    plt.show()

# Example usage:
stream_size = 200
generated_stream = generate_data_stream(size=stream_size, period=30, amplitude=8, noise_factor=2)

# Convert the generated stream to a list (for demonstration purposes)
data_streams = [list(generated_stream)]

# Example parameters for the StreamingMovingTrimean algorithm
algorithm = StreamingMovingTrimean
parameters = {'threshold': 2.0}

# Plot anomalies
plot_anomalies(data_streams, algorithm, parameters)


ModuleNotFoundError: No module named 'numpy'

In [3]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from typing import List
from IPython.display import clear_output  # Used for updating the plot in Jupyter Notebooks

def generate_data_point(period=50, amplitude=5, noise_factor=1.0):
    # Regular pattern
    regular_pattern = amplitude * np.sin(2 * np.pi * len(data_stream) / period)

    # Seasonal element (e.g., daily fluctuations)
    seasonal_element = 2 * np.sin(2 * np.pi * len(data_stream) / 24)

    # Random noise
    random_noise = noise_factor * np.random.randn()

    # Combine components to generate data point
    return regular_pattern + seasonal_element + random_noise

class StreamingMovingAverage:
    def __init__(self) -> None:
        # Initialize data stream
        self.data_streaming = []

    def add_data_point(self, value):
        # Add new data point to the streaming data
        self.data_streaming.append(value)

    def detect_anomaly(self, value):
        # Override this method in the derived class for specific anomaly detection
        pass

    def stream_data_and_detect_anomalies(self, data_stream):
        anomalies = []
        for value in data_stream:
            self.add_data_point(value)
            anomaly = self.detect_anomaly(value)
            anomalies.append(anomaly)
        return anomalies

class StreamingMovingTrimean(StreamingMovingAverage):
    def __init__(self, threshold=1.5) -> None:
        super().__init__()
        # Parameters
        self.max_deviation_from_expected = threshold

    def _enough_data(self) -> bool:
        '''Check if there is enough data'''
        return len(self.data_streaming) > 0

    def _standard_deviation(self) -> float:
        '''Return the standard deviation'''
        data = self.data_streaming
        data = pd.Series(data=data, dtype=float)
        variance = trimean(data) - data
        return pow(sum(variance ** 2) / len(data), 1/2)

    def _expected_value(self, timestamp: int) -> float:
        '''Return the expected value'''
        data = self.data_streaming
        data = pd.Series(data=data, dtype=float)
        return trimean(data)

    def detect_anomaly(self, value):
        if self._enough_data():
            expected_value = self._expected_value(len(self.data_streaming))
            deviation = abs(value - expected_value)
            if deviation > self.max_deviation_from_expected * self._standard_deviation():
                return 1  # Anomaly detected
        return 0  # No anomaly

def trimean(values):
    return (np.quantile(values, 0.25) + (2 * np.quantile(values, 0.50)) + np.quantile(values, 0.75)) / 4

def continuous_plot_anomalies(algorithm, parameters, iterations=200, pause_duration=0.1):
    # Initialize the algorithm
    anomaly_detector = algorithm(**parameters)

    # Set up the plot
    plt.figure(figsize=(10, 6))
    plt.title('Streaming Moving Trimean Anomaly Detection')
    plt.xlabel('Time')
    plt.ylabel('Value')

    # Continuously stream data, detect anomalies, and update the plot
    for _ in range(iterations):
        data_point = generate_data_point()
        anomaly_labels = anomaly_detector.stream_data_and_detect_anomalies([data_point])

        # Update the streaming data
        anomaly_detector.add_data_point(data_point)

        # Update the plot
        plt.plot(data_stream, label='Data Stream')
        plt.scatter(np.nonzero(np.array(anomaly_labels) == 1)[0].astype(int), 
                    np.array(data_stream)[np.array(anomaly_labels) == 1], 
                    color='red', 
                    label='Anomalies')
        
        plt.legend()
        plt.draw()

        # For Jupyter Notebooks, use clear_output to update the plot in each iteration
        clear_output(wait=True)
        plt.pause(pause_duration)

# Initialize an empty data stream
data_stream = []

# Example parameters for the StreamingMovingTrimean algorithm
algorithm = StreamingMovingTrimean
parameters = {'threshold': 2.0}

# Run the continuous streaming and visualization
continuous_plot_anomalies(algorithm, parameters)


ModuleNotFoundError: No module named 'numpy'