In [1]:
import numpy as np
import pandas as pd

In [12]:
def _psi(expected: np.ndarray, actual: np.ndarray, bucket_type: str = "bins", n_bins: int = 10) -> float:
    """Calculate PSI metric for two arrays.
    
    Parameters
    ----------
        expected : list-like
            Array of expected values
        actual : list-like
            Array of actual values
        bucket_type : str
            Binning strategy. Accepts two options: 'bins' and 'quantiles'. Defaults to 'bins'.
            'bins': input arrays are splitted into bins with equal
                and fixed steps based on 'expected' array
            'quantiles': input arrays are binned according to 'expected' array
                with given number of n_bins
        n_bins : int
            Number of buckets for binning. Defaults to 10.

    Returns
    -------
        A single float number
    """
    breakpoints = np.arange(0, n_bins + 1) / (n_bins) * 100
    if bucket_type == "bins":
        breakpoints = np.histogram(expected, n_bins)[1]
    elif bucket_type == "quantiles":
        breakpoints = np.percentile(expected, breakpoints)

    # Calculate frequencies
    expected_percents = np.histogram(expected, breakpoints)[0] / len(expected)
    actual_percents = np.histogram(actual, breakpoints)[0] / len(actual)
    # Clip freaquencies to avoid zero division
    expected_percents = np.clip(expected_percents, a_min=0.0001, a_max=None)
    actual_percents = np.clip(actual_percents, a_min=0.0001, a_max=None)
    # Calculate PSI
    psi_value = (expected_percents - actual_percents) * np.log(expected_percents / actual_percents)
    psi_value = sum(psi_value)

    return psi_value


def calculate_psi(
        expected: np.ndarray, actual: np.ndarray, bucket_type: str = "bins", n_bins: int = 10, axis: int = 0
) -> np.ndarray:
    """Apply PSI calculation to 2 1-d or 2-d arrays.

    Parameters
    ----------
    expected : list-like
        Array of expected values
    actual : list-like
        Array of actual values
    bucket_type : str
        Binning strategy. Accepts two options: 'bins' and 'quantiles'. Defaults to 'bins'.
            'bins' - input arrays are splitted into bins with equal
                and fixed steps based on ’expected' array
            'quantiles' - input arrays are binned according to ’expected’ array
                with given number of n_bins
    n_bins : int
        Number of buckets for binning. Defaults to 10.

    Returns
    -------
        np.ndarray
    """
    if len(expected.shape) == 1:
        psi_values = np.empty(len(expected.shape))
    else:
        psi_values = np.empty(expected.shape[axis])

    for i in range(0, len(psi_values)):
        if len(psi_values) == 1:
            psi_values = _psi(expected, actual, bucket_type, n_bins)
        elif axis == 0:
            psi_values[i] = _psi(expected[:, i], actual[:, i], bucket_type, n_bins)
        elif axis == 1:
            psi_values[i] = _psi(expected[i, :], actual[i, :], bucket_type, n_bins)
        return np.array(psi_values)

In [13]:
df = pd.read_csv('googl_daily_prices.csv')

In [14]:
df.head()

Unnamed: 0,date,1. open,2. high,3. low,4. close,5. volume
0,2025-05-30,171.35,172.205,167.44,171.74,50912792.0
1,2025-05-29,174.0,174.4193,170.63,171.86,29373803.0
2,2025-05-28,173.16,175.265,171.9107,172.36,34783997.0
3,2025-05-27,170.16,173.17,170.0,172.9,37995670.0
4,2025-05-23,169.055,169.96,167.89,168.47,35211439.0


In [15]:
df['date'] = pd.to_datetime(df['date'], format='%Y-%m-%d')
df['year'] = df['date'].dt.year

In [16]:
df_expected = df[df['year']==2024]['5. volume']

In [17]:
df_expected.describe()

count    2.520000e+02
mean     2.744196e+07
std      1.129977e+07
min      1.024213e+07
25%      2.048537e+07
50%      2.403958e+07
75%      3.136293e+07
max      7.191004e+07
Name: 5. volume, dtype: float64

In [18]:
df_actual = df[df['year']==2023]['5. volume']

In [19]:
df_actual.describe()

count    2.500000e+02
mean     3.272150e+07
std      1.321764e+07
min      1.251432e+07
25%      2.508057e+07
50%      2.925293e+07
75%      3.623504e+07
max      1.194550e+08
Name: 5. volume, dtype: float64

In [20]:
calculate_psi(df_expected, df_actual, bucket_type="bins", n_bins=10, axis=0)

array(0.46948006)