In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.stats import wasserstein_distance, ks_2samp, entropy
from scipy.stats import gaussian_kde
from matplotlib.animation import FuncAnimation, PillowWriter

plt.style.use('seaborn')
sns.set_palette("husl")
plt.rcParams['figure.facecolor'] = 'white'

In [None]:
def create_test_distribution(change_type, param, size=1000):
    if change_type == 'std':
        # std drift
        return np.random.normal(0, 1 + param, size)
    elif change_type == 'mean':
        # mean drift
        return np.random.normal(param, 1, size)
    elif change_type == 'tail':
        # tail stretching
        base = np.random.normal(0, 1, size)
        if param == 0:
            return base
        cutoff = np.percentile(base, 90)
        mask = base > cutoff
        modified = base.copy()
        modified[mask] *= (1 + param)
        compensation = -np.sum(base[mask] * param) / np.sum(~mask)
        modified[~mask] += compensation
        return modified

In [None]:
def empirical_cdf(data):
    sorted_data = np.sort(data)
    n = len(sorted_data)
    cumprob = np.arange(1, n + 1) / n
    return sorted_data, cumprob

In [None]:
def find_max_ks_distance(cdf1_x, cdf1_y, cdf2_x, cdf2_y):

    all_x = np.sort(np.unique(np.concatenate([cdf1_x, cdf2_x])))
    
    # Interpolate CDF
    cdf1_interp = np.interp(all_x, cdf1_x, cdf1_y)
    cdf2_interp = np.interp(all_x, cdf2_x, cdf2_y)
    
    differences = np.abs(cdf1_interp - cdf2_interp)
    max_diff_idx = np.argmax(differences)
    max_diff = differences[max_diff_idx]
    max_diff_x = all_x[max_diff_idx]
    
    return max_diff, max_diff_x, cdf2_interp[max_diff_idx], cdf1_interp[max_diff_idx]

In [None]:
def create_animation(change_type='std', output_filename='drift_animation.gif'):

    np.random.seed(42)
    train_data = np.random.normal(0, 1, 1000)
    kde_train = gaussian_kde(train_data)
    x_range = np.linspace(-5, 5, 1000)
    train_density = kde_train(x_range)

    # Setup figure with 4 subplots
    fig, (ax1, ax4, ax3, ax2) = plt.subplots(4, 1, figsize=(12, 13))
    ax1.set_title(f'Distribution Drift - {change_type.title().upper()} shift', fontsize=14)
    
    # Distribution density plot setup
    ax1.set_xlim(-5, 5)
    ax1.set_ylim(0, 0.5)

    line_train, = ax1.plot(x_range, train_density, label='Training', linewidth=2)
    line_test, = ax1.plot([], [], label='Test (Drifting)', linewidth=2)
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # KL divergence
    ax2.set_xlim(0, 50)
    ax2.set_ylim(0, 1)
    ax2.set_title("KL Divergence")
    kld_line, = ax2.plot([], [], color='blue', linewidth=2)
    ax2.grid(True, alpha=0.3)

    # Wasserstein distance
    ax3.set_xlim(0, 50)
    ax3.set_ylim(0, 1)
    ax3.set_title("Wasserstein Distance")
    wasserstein_line, = ax3.plot([], [], color='red', linewidth=2)
    ax3.grid(True, alpha=0.3)

    # KS test
    ax4.set_xlim(-5, 5)
    ax4.set_ylim(0, 1)
    train_sorted, train_cdf = empirical_cdf(train_data)
    cdf_train, = ax4.plot(train_sorted, train_cdf, label='Training CDF', linewidth=2)
    cdf_test, = ax4.plot([], [], label='Test CDF (Drifting)', linewidth=2)

    max_dist_line = ax4.axvline(x=0, color='gray', linestyle='--', alpha=0)
    max_dist_marker, = ax4.plot([], [], 'ro', alpha=0)
    
    ax4.legend()
    ax4.grid(True, alpha=0.3)

    metrics = {'KLD': [], 'Wasserstein': [], 'KS': []}

    param_settings = {
        'std': (0.0, 0.04),
        'mean': (0.0, 0.02),
        'tail': (0.0, 0.02)
    }
    param, delta = param_settings[change_type]

    def update(frame):
        nonlocal param
        param += delta
        
        test_data = create_test_distribution(change_type, param)
        kde_test = gaussian_kde(test_data)
        test_density = kde_test(x_range)
        
        # Update density plot
        line_test.set_data(x_range, test_density)

        # Calculate metrics
        kld = entropy(train_density + 1e-20, test_density + 1e-20)
        wasser = wasserstein_distance(train_data, test_data)
        ks_stat, p_value = ks_2samp(train_data, test_data)

        metrics['KLD'].append(kld)
        metrics['Wasserstein'].append(wasser)
        metrics['KS'].append(ks_stat)

        # Update KL 
        kld_line.set_data(range(len(metrics['KLD'])), metrics['KLD'])

        # Update Wasserstein distance
        wasserstein_line.set_data(range(len(metrics['Wasserstein'])), 
                                 metrics['Wasserstein'])

        # Update KS
        test_sorted, test_cdf = empirical_cdf(test_data)
        cdf_test.set_data(test_sorted, test_cdf)
        
        max_diff, max_diff_x, cdf2, cdf1 = find_max_ks_distance(train_sorted, train_cdf, 
                                                  test_sorted, test_cdf)
        
        max_dist_line.set_xdata([max_diff_x, max_diff_x])
        max_dist_line.set_ydata([min([cdf2, cdf1]),
                                 max([cdf2, cdf1])
                                ]
                               )
        max_dist_line.set_alpha(0.8)
        
        max_dist_marker.set_data([max_diff_x], 
                                [np.interp(max_diff_x, train_sorted, train_cdf)])
        max_dist_marker.set_alpha(1)

        ax4.set_title(f"Kolmogorov-Smirnov Test\n" + 
                     f"Maximum Vertial Distance (KS Distance): {ks_stat:.3f} at x={max_diff_x:.2f}\n" +
                     f"(p-value: {p_value:.3e})")

        return [line_test, kld_line, wasserstein_line, cdf_test, 
                max_dist_line, max_dist_marker]

    # Create and save animation
    ani = FuncAnimation(fig, update, frames=50, blit=True)
    plt.tight_layout()
    ani.save(output_filename, writer=PillowWriter(fps=5))
    plt.close()

In [None]:
for change in ['mean', 'std', 'tail']:
    create_animation(change, f'drift_animation_{change}.gif')