Compare model performance, where models are implemented using Maximum likelihood optimization, MAP + Laplace approximation, ELBO for different dataset shifts: covariate shift, prior probability shift, concept shift. Plot dependency from the dataset shift significance

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from scipy.stats import norm


In [None]:
def generate_target(x, weight, bias, std=0.07, shift=0):
  noise = np.random.normal(0, std, len(x))
  y = x*weight + bias + noise
  return y

def sample_from_mixture(mean1, mean2, std, size):
  x1 = np.random.normal(mean1, std, size)
  x2 = np.random.normal(mean2, std, size)
  choice = np.random.rand(size)
  return np.where(choice < 0.5, x1, x2)

# Models

## MLE

Правдоподобие модели

$$p(y|X, a, b, σ) = \frac{1}{(\sqrt{2π\sigma^2})^n}\exp(-\frac{\sum(y_i - a - bx_i)}{2\sigma^2})$$

Из принципа максимума провдоподобия формулы для параметров модели:

$$b = \frac{\sum(x_i - \overline{x})(y_t - \overline{y})}{\sum(x_i - \overline{x})^2}; a = \overline{y} - b\overline{x}$$


In [None]:
def mle_reg(x_train, y_train, x_test):
  b = (np.sum((x_train - np.mean(x_train)) * (y_train - np.mean(y_train))) /
       np.sum((x_train - np.mean(x_train)) * (x_train - np.mean(x_train))))
  a = np.mean(y_train) - b * np.mean(x_train)

  return b * x_test + a

## LA

In [None]:
def la_reg(x_train, y_train, x_test):
  train = np.concatenate((x_train.reshape((-1, 1)), np.ones(len(x_train)).reshape((-1, 1))), axis=1)
  H = np.ones([2,2]) + train.T @ train
  w = np.linalg.inv(H) @ train.T @ y_train

  test = np.concatenate((x_test.reshape((-1, 1)), np.ones(len(x_test)).reshape((-1, 1))), axis=1)

  print(w.shape)
  print(test.shape)

  return test @ w

## ELBO

https://link.springer.com/article/10.1007/s00357-010-9045-9


Пусть наша модель
$$\begin{aligned}
y|\beta,\sigma^2 \sim N(X\beta, \sigma^2 I_n) \\
\beta \sim N(\mu_0,\Sigma_0) \\
\sigma^2 \sim IG(a_0, b_0).
\end{aligned}$$

И пусть $q(\beta,\sigma^2;\xi) = q(\beta;\xi_\beta)q(\sigma^2;\xi_\sigma)$, где
$$
\begin{aligned}
q(\beta;\xi_\beta) = N(\beta|\mu_\beta, \Sigma_\beta) \\
q(\sigma^2;\xi_\sigma) = IG(\sigma^2|a_{\sigma^2}, b_{\sigma^2}).
\end{aligned}
$$

Тогда

$
\mathcal{L}(y|\theta;q) = $

  $ -\frac{1}{2}\left\{n\ln(2\pi) - \ln(b_{\sigma^2}) + \psi(a_{\sigma^2}) + \frac{a_{\sigma^2}}{b_{\sigma^2}}\left[(y-X\mu_\beta)^\top(y-X\mu_\beta)+\text{tr}(X^\top X\Sigma_\beta)\right]\right\} - $

  $ -\frac{1}{2}\left\{d\ln(2\pi)+\ln|\Sigma_0| + (\mu_\beta-\mu_0)^\top\Sigma_0^{-1}(\mu_\beta-\mu_0) + \text{tr}(\Sigma_0^{-1}\Sigma_\beta)\right\} $

  $ + a_0\ln(b_0)-\ln\Gamma(a_0)-(a_0+1)(\ln(b_{\sigma^2}) - \psi(a_{\sigma^2}))- b_0\frac{a_{\sigma^2}}{b_{\sigma^2}}$
  $+ \frac{1}{2}\left[d(1 + \ln(2\pi)) + \ln|\Sigma_\beta|\right]$

  $+ a_{\sigma^2} + \ln(b_{\sigma^2}) + \ln\Gamma(a_{\sigma^2}) - (a_{\sigma^2} + 1)\psi(a_{\sigma^2})$

Тогда итоговые формулы для параметров распределения:

$$\begin{cases}
\Sigma_\beta \leftarrow \left(\frac{a_{\sigma^2}}{b_{\sigma^2}}X^\top X + \Sigma_0^{-1}\right)^{-1}\\
\mu_\beta \leftarrow \Sigma\left(\frac{a_{\sigma^2}}{b_{\sigma^2}}X^\top y + \Sigma_0^{-1}\mu_0\right)\\
a_{\sigma^2} \leftarrow a_0 + \frac{n}{2} \\
b_{\sigma^2} \leftarrow b_0 + \frac{(y-X\mu_\beta)^\top(y-X\mu_\beta) + \text{tr}(X^\top X \Sigma)}{2}
\end{cases}$$

In [None]:
import numpy as np
from scipy.special import digamma, gamma

def mvn_ent(Sigma):
    return (Sigma.shape[1] * (1 + np.log(2 * np.pi)) + np.log(np.linalg.det(Sigma))) / 2

def ig_ent(a, b):
    return a + np.log(b) + gamma(a) - (a + 1) * digamma(a)

def elbo_reg(X, y, mu0=np.array([0,0]), Sigma0=np.array([[1,0], [0,1]]), a0=1e-2, b0=1e-2, maxiter=100, tol=1e-5, verbose=True):
    X = np.concatenate((X.reshape((-1, 1)), np.ones(len(X)).reshape((-1, 1))), axis=1)
    d = X.shape[1]
    n = X.shape[0]
    invSigma0 = np.linalg.inv(Sigma0)
    invSigma0_x_mu0 = np.dot(invSigma0, mu0)
    XtX = np.dot(X.T, X)
    Xty = np.dot(X.T, y)
    mu = mu0
    Sigma = Sigma0
    a = a0 + n / 2
    b = b0
    lb = np.zeros(maxiter)
    i = 0
    converged = False
    while i <= maxiter and not converged:
        i += 1
        a_div_b = a / b

        Sigma = np.linalg.inv(a_div_b * XtX + invSigma0)
        mu = np.dot(Sigma, a_div_b * Xty + invSigma0_x_mu0)

        y_m_Xmu = y - np.dot(X, mu)
        b = b0 + 0.5 * (np.dot(y_m_Xmu.T, y_m_Xmu) + np.trace(np.dot(Sigma, XtX)))

        # Calculate L(q)

        print(np.dot(mu - mu0, np.dot(Sigma0, mu - mu0)))

        lb[i-1] = (mvn_ent(Sigma)
        + ig_ent(a, b) + a0 * np.log(b0) - gamma(a0) - (a0 + 1) * (np.log(b) - digamma(a))
        - b0 * a / b - 0.5 * (d * np.log(2 * np.pi) + np.log(np.linalg.det(Sigma0))
        + np.dot(mu - mu0, np.dot(Sigma0, mu - mu0)) + np.trace(np.dot(invSigma0, Sigma)))
        - 0.5 * (n * np.log(2 * np.pi) - np.log(b) + digamma(a) + a / b * (np.dot(y_m_Xmu.T, y_m_Xmu) + np.trace(np.dot(XtX, Sigma)))))

        if verbose:
            print(f"Iteration {i}, ELBO = {lb[i-1]}")

        if i > 1 and abs(lb[i-1] - lb[i-2]) < tol:
            converged = True

    return {'lb': lb[:i], 'mu': mu, 'Sigma': Sigma, 'a': a, 'b': b}

# Covariate shift

In [None]:
N = 50
scale = 1
weight = 0.1
bias = 1

train_x = np.random.normal(0, scale, N)
train_y = generate_target(train_x, weight, bias)

test_x = sample_from_mixture(-2, 0, scale, N)
test_y = generate_target(test_x, weight, bias)


In [None]:
trained_elbo = elbo_reg(train_x, train_y, verbose=False)
w_elbo = np.random.multivariate_normal(trained_elbo['mu'], trained_elbo['Sigma'])


1.0218419974794224
1.0216614213537478


In [None]:
import plotly.graph_objects as go
from scipy.stats import norm

fig = go.Figure()

grid = np.linspace(min(test_x.min(), train_x.min()) - 1, max(test_x.max(), train_x.max()) + 1, 1000)
train_pdf = norm(loc=0, scale=scale).pdf(grid)
test_pdf = (norm(loc=-2, scale=scale).pdf(grid) + norm(loc=0, scale=scale).pdf(grid))/2

fig.add_trace(go.Scatter(x=grid, y=train_pdf, mode='lines', name='X train density'))
fig.add_trace(go.Scatter(x=grid, y=test_pdf, mode='lines', name='X test density'))
fig.add_trace(go.Scatter(x=train_x, y=train_y, mode='markers', marker=dict(symbol='circle', opacity=0.8), name='Train data'))
fig.add_trace(go.Scatter(x=test_x, y=test_y, mode='markers', marker=dict(symbol='circle', opacity=0.8), name='Test data'))
fig.add_trace(go.Scatter(x=grid, y=mle_reg(train_x, train_y, grid), mode='lines', name='Predicted MLE', line=dict(color='green', dash='solid')))
fig.add_trace(go.Scatter(x=grid, y=la_reg(train_x, train_y, grid), mode='lines', name='Predicted LA', line=dict(color='red', dash='solid')))
fig.add_trace(go.Scatter(x=grid, y=np.concatenate((grid.reshape((-1, 1)), np.ones(len(grid)).reshape((-1, 1))), axis=1) @ w_elbo, mode='lines', name='Predicted ELBO', line=dict(color='brown', dash='solid')))
fig.add_trace(go.Scatter(x=grid, y=grid*weight + bias, mode='lines', name='Real dependence', line=dict(color='black', dash='solid')))

fig.update_layout(
    height=700,
    xaxis_title='X',
    yaxis_title='y',
    legend=dict(
        yanchor="top",
        y=0.99,
        xanchor="left",
        x=0.01
    ),
    title='Covariate shift'
)

fig.show()

(2,)
(1000, 2)


# Prior probability shift

In [None]:
N = 50
scale = 1
weight = 0.1
bias = 1

train_y = np.random.normal(0, scale, N)
train_x = generate_target(train_y, weight, bias)

test_y = np.random.normal(2, scale, N)
test_x = generate_target(test_y, weight, bias)


In [None]:
trained_elbo = elbo_reg(train_x, train_y, verbose=False)
w_elbo = np.random.multivariate_normal(trained_elbo['mu'], trained_elbo['Sigma'])


89.94174869734456
42.028157472020695


In [None]:
fig = go.Figure()

grid = np.linspace(min(test_x.min(), train_x.min()), max(test_x.max(), train_x.max()), 1000)

fig.add_trace(go.Scatter(x=train_x, y=train_y, mode='markers', marker=dict(symbol='circle', opacity=0.8), name='Train data'))
fig.add_trace(go.Scatter(x=test_x, y=test_y, mode='markers', marker=dict(symbol='circle', opacity=0.8), name='Test data'))
#fig.add_trace(go.Scatter(x=grid, y=model.predict(grid.reshape(-1, 1)), mode='lines', name='Predicted dependence', line=dict(color='black', dash='dash')))
fig.add_trace(go.Scatter(x=grid, y=mle_reg(train_x, train_y, grid), mode='lines', name='Predicted dependence', line=dict(color='green', dash='solid')))
fig.add_trace(go.Scatter(x=grid, y=la_reg(train_x, train_y, grid), mode='lines', name='Predicted LA', line=dict(color='red', dash='dash', width=3)))
fig.add_trace(go.Scatter(x=grid, y=np.concatenate((grid.reshape((-1, 1)), np.ones(len(grid)).reshape((-1, 1))), axis=1) @ w_elbo, mode='lines', name='Predicted ELBO', line=dict(color='brown', dash='solid')))
fig.add_trace(go.Scatter(x=grid, y=(grid - bias)/weight, mode='lines', name='Real dependence', line=dict(color='black', dash='solid')))

fig.update_layout(
    height=700,
    xaxis_title='X',
    yaxis_title='y',
    legend=dict(
        yanchor="top",
        y=0.99,
        xanchor="left",
        x=0.01
    ),
    title='Prior probability shift'
)

fig.show()

(2,)
(1000, 2)


# Concept shift

In [None]:
N = 50
scale = 1
weight = 0.1
bias = 1

train_x = np.random.normal(0, scale, N)
train_y = generate_target(train_x, weight, bias)

test_x = np.random.normal(0, scale, N)
test_y = generate_target(test_x, weight + 0.1, bias)

In [None]:
trained_elbo = elbo_reg(train_x, train_y, verbose=False)
w_elbo = np.random.multivariate_normal(trained_elbo['mu'], trained_elbo['Sigma'])

1.027944058327892
1.0277573013176795


In [None]:
fig = go.Figure()

grid = np.linspace(min(test_x.min(), train_x.min()) - 1, max(test_x.max(), train_x.max()) + 1, 1000)
pdf = norm(loc=0, scale=scale).pdf(grid)

fig.add_trace(go.Scatter(x=grid, y=pdf, mode='lines', name='X density', line=dict(color='black')))
fig.add_trace(go.Scatter(x=train_x, y=train_y, mode='markers', marker=dict(symbol='circle', opacity=0.8), name='Train data'))
fig.add_trace(go.Scatter(x=test_x, y=test_y, mode='markers', marker=dict(symbol='circle', opacity=0.8), name='Test data'))
fig.add_trace(go.Scatter(x=grid, y=grid*weight + bias, mode='lines', name='Train dependence'))
fig.add_trace(go.Scatter(x=grid, y=grid*(weight + 0.1) + bias, mode='lines', name='Test dependence'))
fig.add_trace(go.Scatter(x=grid, y=mle_reg(train_x, train_y, grid), mode='lines', name='Predicted MLE', line=dict(color='green', dash='solid')))
fig.add_trace(go.Scatter(x=grid, y=la_reg(train_x, train_y, grid), mode='lines', name='Predicted LA', line=dict(color='red', dash='solid', width=1)))
fig.add_trace(go.Scatter(x=grid, y=np.concatenate((grid.reshape((-1, 1)), np.ones(len(grid)).reshape((-1, 1))), axis=1) @ w_elbo, mode='lines', name='Predicted ELBO', line=dict(color='brown', dash='solid')))

fig.update_layout(
    height=700,
    xaxis_title='X',
    yaxis_title='y',
    legend=dict(
        yanchor="top",
        y=0.99,
        xanchor="left",
        x=0.01
    ),
    title='Concept shift'
)

fig.show()

(2,)
(1000, 2)
