Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Student's t-distribution #5050

Merged
merged 19 commits into from Aug 24, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
166 changes: 166 additions & 0 deletions sktime/proba/t.py
@@ -0,0 +1,166 @@
# copyright: sktime developers, BSD-3-Clause License (see LICENSE file)
"""Student's t-distribution."""

__author__ = ["Alex-JG3"]

import numpy as np
import pandas as pd
from scipy.special import betaincinv, gamma, hyp2f1, loggamma

from sktime.proba.base import BaseDistribution


class TDistribution(BaseDistribution):
"""Student's t-distribution (sktime native).

Parameters
----------
mean : float or array of float (1D or 2D)
mean of the t-distribution distribution
sd : float or array of float (1D or 2D), must be positive
standard deviation of the t-distribution distribution
df : float or array of float (1D or 2D), must be positive
Degrees of freedom of the t-distribution
index : pd.Index, optional, default = RangeIndex
columns : pd.Index, optional, default = RangeIndex

Example
-------
>>> from sktime.proba.t import TDistribution

>>> n = TDistribution(mu=[[0, 1], [2, 3], [4, 5]], sigma=1, df=10)
"""

_tags = {
"capabilities:approx": ["pdfnorm", "energy"],
"capabilities:exact": ["mean", "var", "pdf", "log_pdf", "cdf", "ppf"],
"distr:measuretype": "continuous",
}

def __init__(self, mu, sigma, df=1, index=None, columns=None):
self.mu = mu
self.sigma = sigma
self.df = df
self.index = index
self.columns = columns

self._mu, self._sigma, self._df = self._get_bc_params()
shape = self._mu.shape

if index is None:
index = pd.RangeIndex(shape[0])

if columns is None:
columns = pd.RangeIndex(shape[1])

super().__init__(index=index, columns=columns)

def _get_bc_params(self):
fkiraly marked this conversation as resolved.
Show resolved Hide resolved
"""Fully broadcast parameters of self, given param shapes and index, columns."""
to_broadcast = [self.mu, self.sigma, self.df]
if hasattr(self, "index") and self.index is not None:
to_broadcast += [self.index.to_numpy().reshape(-1, 1)]
if hasattr(self, "columns") and self.columns is not None:
to_broadcast += [self.columns.to_numpy()]
bc = np.broadcast_arrays(*to_broadcast)
return bc[0], bc[1], bc[2]

def mean(self):
r"""Return expected value of the distribution.

Let :math:`X` be a random variable with the distribution of `self`.
Returns the expectation :math:`\mathbb{E}[X]`. The expectation,
:math:`\mathbb{E}[X]`, as infinite if :math:`\nu \le 1`.

Returns
-------
pd.DataFrame with same rows, columns as `self`
expected value of distribution (entry-wise)
"""
mean_arr = self._mu.copy()
if (self._df <= 1).any():
mean_arr = mean_arr.astype(np.float32)
mean_arr[self._df <= 1] = np.inf
return pd.DataFrame(mean_arr, index=self.index, columns=self.columns)

def var(self):
r"""Return element/entry-wise variance of the distribution.

Let :math:`X` be a random variable with the distribution of `self`.
Returns,

.. math::
\mathbb{V}[X] = \begin{cases}
\frac{\nu}{\nu - 2} & \text{if} \nu > 2, \\
\infty & \text{if} \nu \le 2, \\
\begin{cases}

Where :math:`\nu` is the degrees of freedom of the t-distribution.

Returns
-------
pd.DataFrame with same rows, columns as `self`
variance of distribution (entry-wise)
"""
df_arr = self._df.copy()
df_arr = df_arr.astype(np.float32)
df_arr[df_arr <= 2] = np.inf
mask = (df_arr > 2) & (df_arr != np.inf)
df_arr[mask] = df_arr[mask] / (df_arr[mask] - 2)
return pd.DataFrame(df_arr, index=self.index, columns=self.columns)

def pdf(self, x):
"""Probability density function."""
d = self.loc[x.index, x.columns]
pdf_arr = gamma((d._df + 1) / 2)
pdf_arr = pdf_arr / (np.sqrt(np.pi * d._df) * gamma(d._df / 2))
pdf_arr = pdf_arr * (1 + x**2 / d._df) ** (-(d._df + 1) / 2)
return pd.DataFrame(pdf_arr, index=x.index, columns=x.columns)

def log_pdf(self, x):
fkiraly marked this conversation as resolved.
Show resolved Hide resolved
"""Logarithmic probability density function."""
d = self.loc[x.index, x.columns]
lpdf_arr = loggamma((d._df + 1) / 2)
lpdf_arr = lpdf_arr - 0.5 * np.log(d._df * np.pi)
lpdf_arr = lpdf_arr - loggamma(d._df / 2)
lpdf_arr = lpdf_arr - ((d._df + 1) / 2) * np.log(1 + x**2 / d._df)
return pd.DataFrame(lpdf_arr, index=x.index, columns=x.columns)

def cdf(self, x):
"""Cumulative distribution function."""
d = self.loc[x.index, x.columns]
cdf_arr = x * gamma((d._df + 1) / 2)
cdf_arr = cdf_arr * hyp2f1(0.5, (d._df + 1) / 2, 3 / 2, -(x**2) / d._df)
cdf_arr = 0.5 + cdf_arr / (np.sqrt(np.pi * d._df) * gamma(d._df / 2))
return pd.DataFrame(cdf_arr, index=x.index, columns=x.columns)

def ppf(self, p):
"""Quantile function = percent point function = inverse cdf."""
d = self.loc[p.index, p.columns]
ppf_arr = p.to_numpy(copy=True)
ppf_arr[p.values == 0.5] = 0.0
ppf_arr[p.values <= 0] = -np.inf
ppf_arr[p.values >= 1] = np.inf

mask1 = (p.values < 0.5) & (p.values > 0)
mask2 = (p.values < 1) & (p.values > 0.5)
ppf_arr[mask1] = 1 / betaincinv(0.5 * d._df[mask1], 0.5, 2 * ppf_arr[mask1])
ppf_arr[mask2] = 1 / betaincinv(
0.5 * d._df[mask2], 0.5, 2 * (1 - ppf_arr[mask2])
)
ppf_arr[mask1 | mask2] = np.sqrt(ppf_arr[mask1 | mask2] - 1)
ppf_arr[mask1 | mask2] = np.sqrt(d._df[mask1 | mask2]) * ppf_arr[mask1 | mask2]
ppf_arr[mask1] = -ppf_arr[mask1]
return pd.DataFrame(ppf_arr, index=p.index, columns=p.columns)

@classmethod
def get_test_params(cls, parameter_set="default"):
"""Return testing parameter settings for the estimator."""
params1 = {"mu": [[0, 1], [2, 3], [4, 5]], "sigma": 1}
params2 = {
"mu": 0,
"sigma": 1,
"index": pd.Index([1, 2, 5]),
"columns": pd.Index(["a", "b"]),
}
return [params1, params2]