Skip to content

Commit

Permalink
Utilities for 1D Gaussian mixture model (#16)
Browse files Browse the repository at this point in the history
* Add new requirements

* Sketch of a Gaussian mixture

* Add Bayes rule

* Gaussian mixture model

* Add a Gaussian mixture model.
  • Loading branch information
pawel-czyz committed Oct 5, 2022
1 parent aceb0b0 commit 9d01f52
Show file tree
Hide file tree
Showing 5 changed files with 263 additions and 0 deletions.
79 changes: 79 additions & 0 deletions labelshift/algorithms/gaussian_mixture_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""This is a toy Bayesian quantification algorithm,
which assumes that the data were generated according
to a mixture of Gaussians.
Note:
This algorithm models the data in 1D.
"""
from typing import Optional, Sequence, Tuple

import numpy as np
import pymc as pm
from numpy.typing import ArrayLike

MEANS: str = "mu"
SIGMAS: str = "sigma"
P_UNLABELED_Y: str = "P_unabeled(Y)"


def build_model(
labeled_data: Sequence[ArrayLike],
unlabeled_data: ArrayLike,
mean_params: Tuple[float, float] = (0.0, 1.0),
sigma_param: float = 1.0,
alpha: Optional[ArrayLike] = None,
) -> pm.Model:
"""Builds a PyMC model for Bayesian quantification for 1D data
assumed to be sampled from a mixture of normals.
Args:
labeled_data: a list of arrays. The ith list should
contain the X samples from the ith component, so that
the length of this list is (n_components,).
Each of the inside arrays should be of shape
(n_samples_per_given_component,)
and, of course, these may be of different lengths
unlabeled_data: the X samples from the unlabeled
data distribution. Shape (n_unlabeled,)
mean_params: used to initialize the prior on the component means
sigma_param: used to initialize the prior on the component sigmas
alpha: used to initialize the Dirichlet prior on P_unlabeled(Y).
Shape (n_components,)
Returns:
a PyMC model with the following variables:
`MEANS`: the vector with the mean of each component
`SIGMAS`: the vector with the sigma of each component
`P_UNLABELED_Y`: the prevalence vector
"""
unlabeled_data = np.asarray(unlabeled_data)
# We only support 1D mixtures
assert unlabeled_data.shape == (len(unlabeled_data),)

n_y = len(labeled_data)
if alpha is None:
alpha = np.ones(n_y)
else:
alpha = np.asarray(alpha)

if alpha.shape != (n_y,):
raise ValueError(f"Shape mismatch: {alpha.shape} != ({n_y},).")

with pm.Model() as gaussian_mixture:
mu = pm.Normal("mu", mu=mean_params[0], sigma=mean_params[1], shape=n_y)
sigma = pm.HalfNormal("sigma", sigma=sigma_param, shape=n_y)

# For each component we have some samples, which can be used
# to constrain the mean and sigma of this distribution
for i in range(n_y):
pm.Normal(
f"X_labeled{i}", mu=mu[i], sigma=sigma[i], observed=labeled_data[i]
)

p_unlabeled_y = pm.Dirichlet("P_unlabeled(Y)", alpha)
# We sample the data points
pm.NormalMixture(
"X_unlabeled", w=p_unlabeled_y, mu=mu, sigma=sigma, observed=unlabeled_data
)

return gaussian_mixture
152 changes: 152 additions & 0 deletions labelshift/datasets/gaussian_mixture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""Model used for working with exact probabilities
in the Gaussian mixture model."""
from typing import Protocol

import numpy as np
from numpy.typing import ArrayLike
from scipy import stats


class PXGivenY(Protocol):
"""This is a protocol defining the P(X|Y) distribution,
where X is a vector-valued random variable and Y is a discrete variable.
"""

@property
def dim_x(self) -> int:
"""Dimension of the vector space in which X takes values."""
raise NotImplementedError

@property
def number_y(self) -> int:
"""The number of possible labels Y."""
raise NotImplementedError

def p_x_cond_y_fixed(self, xs: ArrayLike, y: int) -> np.ndarray:
"""Calculates the probability density function p(x|y).
Args:
xs: vector of points `x`, shape (n_points, dim_x)
y: label `y`
Returns
vector of probability density function values p(X=x | Y=y),
for each point `x`. Shape (n_points,)
"""
raise NotImplementedError

def p_x_cond_y(self, xs: ArrayLike) -> np.ndarray:
"""Calculates the probability density function p(x|y)
for all possible `y`.
Args:
xs: vector of points `x`, shape (n_points, dim_x)
Returns:
probability density function values p(X=x | Y=y),
for each point `x` and label `y`.
Shape (n_points, number_y).
"""
raise NotImplementedError

def sample_p_x_cond_y(self, ys: ArrayLike, rng) -> np.ndarray:
"""Samples from P(X|Y) for given Y.
Args:
ys: labels `y`, shape (n_points,)
rng: random number generator
Returns:
a random vector from P(X|Y=y), for each y in `ys`.
Shape (n_points, dim_x)
"""
raise NotImplementedError


class GaussianMixturePXGivenY(PXGivenY):
"""Generative process in which each P(X|Y)
is a multivariate normal distribution.
"""

def __init__(self, means: ArrayLike, covariances: ArrayLike) -> None:
"""
Args:
means: mean of the Gaussian associated to each label,
shape (number_y, dim_x)
covariances: covariance of the Gaussian associated to each label,
shape (number_y, dim_x, dim_x)
"""
means = np.asarray(means)
covariances = np.asarray(covariances)
self._dim_x = means.shape[1]
self._number_y = means.shape[0]

if covariances.shape != (means.shape[0], means.shape[1], means.shape[1]):
raise ValueError(f"Covariance matrix has wrong shape: {covariances.shape}")

self._distributions = [
stats.multivariate_normal(
mean=means[y, :],
cov=covariances[y, :, :],
allow_singular=False,
)
for y in range(self._number_y)
]

@property
def number_y(self) -> int:
"""The number of possible labels Y."""
return self._number_y

@property
def dim_x(self) -> int:
"""Dimension of the vector space in which X takes values."""
return self._dim_x

def p_x_cond_y_fixed(self, xs: ArrayLike, y: int) -> np.ndarray:
"""See the parent class."""
xs = np.asarray(xs)
dist = self._distributions[y]
return dist.pdf(xs)

def p_x_cond_y(self, xs: ArrayLike) -> np.ndarray:
"""See the parent class."""
xs = np.asarray(xs)
return np.vstack(
[self.p_x_cond_y_fixed(xs, y=y) for y in range(self.number_y)]
).T

def sample_p_x_cond_y(self, ys: ArrayLike, rng) -> np.ndarray:
"""See the parent class."""
rng = np.random.default_rng(rng)
return np.vstack([self._distributions[y].rvs(random_state=rng) for y in ys])


def posterior_probabilities(
p_y: ArrayLike, xs: ArrayLike, p_x_cond_y: PXGivenY
) -> np.ndarray:
"""Bayes rule for inference of P(Y|X).
Args:
p_y: prior probabilities (number_y,)
xs: covariates, shape (n_points, dim_x)
p_x_cond_y: a model of P(X|Y). Note that it needs to be
compatible with `number_y` and `dim_x` shapes
of the other arguments
Returns:
posterior probability P(Y|X) for each data point.
Shape (n_points, number_y)
"""
p_y = np.asarray(p_y)
xs = np.asarray(xs)
assert p_y.shape == (p_x_cond_y.number_y,)
assert xs.shape[1] == p_x_cond_y.dim_x

# P(X=x, Y=y) = P(X = x | Y = y) * P(Y=y)
# calculated pointwise.
# Shape (n_points, number_y)
p_x_and_y = p_x_cond_y.p_x_cond_y(xs) * p_y[None, :]

# We need to normalize p(x, y) to get p(y|x)
return p_x_and_y / p_x_and_y.sum(axis=1)[None, :]
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
arviz
numpy
pydantic
scikit-learn
scipy

# Code quality tools
black
Expand Down
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ install_requires =
numpy >= 1.12,<2
pydantic
pymc
scikit-learn
scipy

[options.packages.find]
where = labelshift
Expand Down
28 changes: 28 additions & 0 deletions tests/datasets/test_gaussian_mixture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Tests of the Gaussian mixture module."""
import numpy as np
import pytest

import labelshift.datasets.gaussian_mixture as gm


class TestPosteriorProbabilities:
@pytest.mark.parametrize("n_components", [1, 3])
@pytest.mark.parametrize("dim", [1, 4])
def test_sums_up_to_one(self, n_components: int, dim: int, seed: int = 0) -> None:
"""Tests whether the posterior P(Y|X=x) sums up to one."""
rng = np.random.default_rng(seed)
means = rng.uniform(size=(n_components, dim))
sigmas = [np.eye(dim) for _ in range(n_components)]

distribution = gm.GaussianMixturePXGivenY(means=means, covariances=sigmas)

posterior = gm.posterior_probabilities(
p_y=np.ones(n_components) / n_components,
xs=rng.uniform(size=(1, dim)),
p_x_cond_y=distribution,
)
assert posterior.shape == (1, n_components), f"Shape is wrong: {posterior}"
assert posterior.sum() == pytest.approx(1.0), f"Sum is wrong: {posterior}"


# TODO(Pawel): Add more tests.

0 comments on commit 9d01f52

Please sign in to comment.