In [1]:
# import packages

import matplotlib.animation as animation
import matplotlib.pyplot as plt
from matplotlib.widgets import Button, Slider

from scipy.stats import beta
import numpy as np
import pandas as pd
from random import gauss

from src.change_detector.corrected_change_detector import PDFChangeDetector

In [2]:
%matplotlib ipympl

def plot_slider_animation(alpha_estimates_list, 
                        beta_estimates_list, 
                        min_u1s_list, 
                        min_u2s_list, 
                        min_u3s_list, 
                        current_u1s_list,
                        first_frame, last_frame,
                        y_min=0, y_max=100, x_min=0, x_max=0.3):
    
    framed_time=slice(first_frame, last_frame)
    
    frame_data_list = list(zip(alpha_estimates_list[framed_time], beta_estimates_list[framed_time],
                min_u1s_list[framed_time], min_u2s_list[framed_time], min_u3s_list[framed_time],
                current_u1s_list[framed_time], 
                range(first_frame, last_frame)))
    
    
    
    a_process, b_process, min_u1, min_u2, min_u3, current_u1, start_time = frame_data_list[0]
    end_time = frame_data_list[-1][-1]
    
    fig, ax = plt.subplots(figsize=(15,7))
    
    if (a_process is not None) and (b_process is not None):
        x_process = np.linspace(beta.ppf(0.001, a_process, b_process),beta.ppf(0.999, a_process, b_process), 1000)
        y_process = beta.pdf(x_process, a_process, b_process)
        line_process = ax.plot(x_process, y_process, 'b-', label='distance do reference distribution')[0]
    else:
        line_process = ax.plot([], [], 'b-', label='distance to reference distribution')[0]
    
    if current_u1 is not None:    
        u1_process = ax.axvline(current_u1, ymin=0, ymax= beta.pdf(current_u1, a_process, b_process), color="blue", linestyle="dashdot", label='current u1')
    else:
        u1_process = ax.axvline(0, ymin=0, ymax= 0, color="blue", linestyle="dashdot")
        
    if min_u1 is not None:    
        u1_marker = ax.plot([min_u1], [0], marker='^', markersize=12, color="green", alpha=0.5, label="u1_threshold")
        u2_marker = ax.plot([min_u2], [0], marker='^', markersize=12, color="orange", alpha=0.5, label="u2_threshold")
        u3_marker = ax.plot([min_u3], [0], marker='^', markersize=12, color="red", alpha=0.5, label="u3_threshold")
    else:
        u1_marker = ax.plot([], [], marker='^', markersize=12, color="green", alpha=0.5, label="u1_threshold")
        u2_marker = ax.plot([], [], marker='^', markersize=12, color="orange", alpha=0.5, label="u2_threshold")
        u3_marker = ax.plot([], [], marker='^', markersize=12, color="red", alpha=0.5, label="u3_threshold")
    
    ax.set(xlim=(x_min, x_max), xlabel='Values of Random Variable X (0, 1)',
        ylim=(y_min,y_max), ylabel='Probability density', title='Beta Distribution of pdf distances to reference distribution')
    time_legend = ax.text(0.9, 0.95, f'time: {start_time}', ha='left', va='top', transform=ax.transAxes,
                    bbox=dict(facecolor='none', edgecolor='black', boxstyle='round,pad=0.5'))
    ax.legend(loc='lower right')

    # adjust the main plot to make room for the sliders
    fig.subplots_adjust(left=0.25, bottom=0.25)

    # Make a horizontal slider to control the frequency.
    axfreq = fig.add_axes([0.25, 0.1, 0.65, 0.03])
    freq_slider = Slider(
        ax=axfreq,
        label='Time step',
        valmin=start_time,
        valmax=end_time,
        valinit=start_time,
    )
        
    # Plot the beta distribution
    def update(val):
        
        idx = int(freq_slider.val-start_time)
        a_process, b_process, min_u1, min_u2, min_u3, current_u1, time = frame_data_list[idx]

        if (a_process is not None) and (b_process is not None):
            x_process = np.linspace(beta.ppf(0.001, a_process, b_process),beta.ppf(0.999, a_process, b_process), 1000)
            y_process = beta.pdf(x_process, a_process, b_process)
            line_process.set_data(x_process, y_process)
        else:
            line_process.set_data([], [])
            
        if current_u1 is not None:    
            u1_process.set_xdata([current_u1, current_u1])
            u1_process.set_ydata([0,beta.pdf(current_u1, a_process, b_process)])
        else:
            u1_process.set_xdata([0,0])
            u1_process.set_ydata([0,0])
            
        if min_u1 is not None:    
            u1_marker[0].set_xdata([min_u1])
            u1_marker[0].set_ydata([0])
            u2_marker[0].set_xdata([min_u2])
            u2_marker[0].set_ydata([0])
            u3_marker[0].set_xdata([min_u3])
            u3_marker[0].set_ydata([0])
        else:
            u1_marker[0].set_xdata([])
            u1_marker[0].set_ydata([])
            u2_marker[0].set_xdata([])
            u2_marker[0].set_ydata([])
            u3_marker[0].set_xdata([])
            u3_marker[0].set_ydata([])
        
        time_legend.set_text(f'time: {time}')
        
        fig.canvas.draw_idle()
        
    # register the update function with each slider
    freq_slider.on_changed(update)
    # Create a `matplotlib.widgets.Button` to reset the sliders to initial values.
    resetax = fig.add_axes([0.8, 0.025, 0.1, 0.04])
    button = Button(resetax, 'Reset', hovercolor='0.975')
    
    def reset(event):
        freq_slider.reset()
    
    button.on_clicked(reset)

    plt.show()

In [3]:
def gaussian_histogram(mu:float, sigma:float, sample_size:int):
    bin_edges = np.linspace(-10,10,21)
    histogram, _ = np.histogram([gauss(mu, sigma) for _ in range(sample_size)], bins=bin_edges, density=True)
    return pd.Series(histogram)

In [4]:
hist_f = lambda: gaussian_histogram(2, 1, 10000)

In [5]:
test_context = {
    "base_interval": {
        "duration": 1000,
        "mean": 0,
        "var": 1,
        "histogram_function": hist_f
    },
    "test_mean_a":{
            "duration": 500,
            "mean": 1,
            "var": 1,
            "histogram_function": hist_f
    },
    "test_mean_b":{
        "interval_2": {
            "duration": 500,
            "mean": 0.1,
            "var": 1,
            "histogram_function": hist_f
            },
    },
    "test_mean_c":{
        "interval_1": {
            "duration": 1000,
            "mean": 0,
            "var": 1,
            "histogram_function": hist_f
            },
        "interval_2": {
            "duration": 500,
            "mean": 0.01,
            "var": 1,
            "histogram_function": hist_f
            },
    }
}

In [6]:
def run_analysis(duration_1:int, mean_1:float, var_1:float, histogram_function_1,
                duration_2:int, mean_2:float, var_2:float, histogram_function_2,
                method:str):

    change_detector = PDFChangeDetector(reference_size=30)

    detection_status_list = []
    alpha_estimates_list = []
    beta_estimates_list = []
    u1_min_list = []
    u2_min_list = []
    u3_min_list = []
    current_u1_list = []

    for i in range(500):
        new_pdf = gaussian_histogram(2, 1, 10000)
        detect_response = change_detector.detect_change(new_pdf)
        
        detection_status_list.append(detect_response.state.name)
        alpha_estimates_list.append(detect_response.alpha_estimate)
        beta_estimates_list.append(detect_response.beta_estimate)
        u1_min_list.append(detect_response.min_u1)
        u2_min_list.append(detect_response.min_u2)
        u3_min_list.append(detect_response.min_u3)
        current_u1_list.append(detect_response.current_u1)
        
        print("Status: ", detect_response.state.name, 
            "| Alpha :", detect_response.alpha_estimate, 
            "| Beta", detect_response.beta_estimate, 
            "\n***\n")


    for i in range(100):
        new_pdf = gaussian_histogram(2.1, 1, 10000)
        detect_response = change_detector.detect_change(new_pdf)
        
        detection_status_list.append(detect_response.state.name)
        alpha_estimates_list.append(detect_response.alpha_estimate)
        beta_estimates_list.append(detect_response.beta_estimate)
        u1_min_list.append(detect_response.min_u1)
        u2_min_list.append(detect_response.min_u2)
        u3_min_list.append(detect_response.min_u3)
        current_u1_list.append(detect_response.current_u1)
        
        print("Status: ", detect_response.state.name, 
            "| Alpha :", detect_response.alpha_estimate, 
            "| Beta", detect_response.beta_estimate, 
            "\n***\n")
        
    return {
        "detection_status_list": detection_status_list,
        "alpha_estimates_list": alpha_estimates_list,
        "beta_estimates_list": beta_estimates_list,
        "u1_min_list": u1_min_list,
        "u2_min_list": u2_min_list,
        "u3_min_list": u3_min_list,
        "current_u1_list": current_u1_list
    }