In [3]:
import numpy as np
import pandas as pd

In [4]:
pd.options.display.float_format = "{:,.4f}".format

In [5]:
__version__ = "1.01"
import numpy as np

def _psi(expected: np.ndarray, actual: np.ndarray, bucket_type: str = "bins", n_bins: int = 10) -> float:
    """Calculate PSI metric for two arrays.

    Parameters
    ----------
        expected : list-like
            Array of expected values
        actual : list-like
            Array of actual values
        bucket_type : str
            Binning strategy. Accepts two options: 'bins' and 'quantiles'. Defaults to 'bins'.
            'bins': input arrays are splitted into bins with equal
                and fixed steps based on 'expected' array
            'quantiles': input arrays are binned according to 'expected' array
                with given number of n_bins
        n_bins : int
            Number of buckets for binning. Defaults to 10.

    Returns
    -------
        A single float number
    """
    breakpoints = np.arange(0, n_bins + 1) / (n_bins) * 100
    if bucket_type == "bins":
        breakpoints = np.histogram(expected, n_bins)[1]
    elif bucket_type == "quantiles":
        breakpoints = np.percentile(expected, breakpoints)

    # Calculate frequencies
    expected_percents = np.histogram(expected, breakpoints)[0] / len(expected)
    actual_percents = np.histogram(actual, breakpoints)[0] / len(actual)
    # Clip freaquencies to avoid zero division
    expected_percents = np.clip(expected_percents, a_min=0.0001, a_max=None)
    actual_percents = np.clip(actual_percents, a_min=0.0001, a_max=None)
    # Calculate PSI
    psi_value = (expected_percents - actual_percents) * np.log(expected_percents / actual_percents)
    psi_value = sum(psi_value)

    return psi_value


def calculate_psi(
        expected: np.ndarray, actual: np.ndarray, bucket_type: str = "bins", n_bins: int = 10, axis: int = 0
) -> np.ndarray:
    """Apply PSI calculation to 2 1-d or 2-d arrays.

    Parameters
    ----------
    expected : list-like
        Array of expected values
    actual : list-like
        Array of actual values
    bucket_type : str
        Binning strategy. Accepts two options: 'bins' and 'quantiles'. Defaults to 'bins'.
            'bins' - input arrays are splitted into bins with equal
                and fixed steps based on ’expected' array
            'quantiles' - input arrays are binned according to ’expected’ array
                with given number of n_bins
    n_bins : int
        Number of buckets for binning. Defaults to 10.

    Returns
    -------
        np.ndarray
    """
    if len(expected.shape) == 1:
        psi_values = np.empty(len(expected.shape))
    else:
        psi_values = np.empty(expected.shape[axis])

    for i in range(0, len(psi_values)):
        if len(psi_values) == 1:
            psi_values = _psi(expected, actual, bucket_type, n_bins)
        elif axis == 0:
            psi_values[i] = _psi(expected[:, i], actual[:, i], bucket_type, n_bins)
        elif axis == 1:
            psi_values[i] = _psi(expected[i, :], actual[i, :], bucket_type, n_bins)
        return np.array(psi_values)

if __name__ == "__main__":
    np.random.seed(44)
    SAMPLE_SIZE = 100
    data_control = -np.random.normal(1, 1, SAMPLE_SIZE)
    data_pilot = -np.random.normal(1.2, 1, SAMPLE_SIZE)
    a = calculate_psi(data_control, data_pilot, bucket_type="bins", n_bins=10, axis=0)
    print(type(a))
    assert (
               calculate_psi(data_control, data_pilot, bucket_type="bins", n_bins=10, axis=0) == 0.2315847887596773
    ), "The PSI value is incorrect"
    assert (
               _psi(data_control, data_pilot, bucket_type="bins", n_bins=10) == 0.2315847887596773
    ), "The PSI value is incorrect"

<class 'numpy.ndarray'>


In [6]:
df = pd.read_csv("Customer_Flight_Activity.csv")

In [10]:
df_expected = df[df["Year"]==2017]["Total Flights"]

In [11]:
df_expected.describe()

Unnamed: 0,Total Flights
count,191100.0
mean,1.1683
std,1.6295
min,0.0
25%,0.0
50%,0.0
75%,2.0
max,7.0


In [12]:
df_actual = df[df["Year"]==2018]["Total Flights"]

In [13]:
print(len(df_expected), len(df_actual))

191100 201836


In [15]:
calculate_psi(df_expected, df_actual, bucket_type="bins", n_bins=100, axis=0)

array(0.01204838)

In [16]:
calculate_psi(df_expected, df_expected, bucket_type="bins", n_bins=100, axis=0)

array(0.)