Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basis_spline stateful transform. #21

Merged
merged 1 commit into from
Jul 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions formulaic/materializers/transforms/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from .basis_spline import basis_spline
from .center import center
from .identity import identity
from .encode_categorical import encode_categorical


TRANSFORMS = {
'bs': basis_spline,
'center': center,
'C': encode_categorical,
'I': identity,
Expand Down
164 changes: 164 additions & 0 deletions formulaic/materializers/transforms/basis_spline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
from collections import defaultdict
from enum import Enum
from typing import Iterable, Optional, Union

import numpy
import pandas

from formulaic.utils.stateful_transforms import stateful_transform


class SplineExtrapolation(Enum):
"""
Specification for how extrapolation should be performed during spline
computations.
"""
RAISE = 'raise'
CLIP = 'clip'
NA = 'na'
ZERO = 'zero'
EXTEND = 'extend'


@stateful_transform
def basis_spline(
x: Union[pandas.Series, numpy.ndarray],
df: Optional[int] = None,
knots: Optional[Iterable[float]] = None,
degree: int = 3,
include_intercept: bool = False,
lower_bound: Optional[float] = None,
upper_bound: Optional[float] = None,
extrapolation: Union[str, SplineExtrapolation] = 'raise',
state: dict = None,
):
"""
Evaluates the B-Spline basis vectors for given inputs `x`.

This is especially useful in the context of allowing non-linear fits to data
in linear regression. Except for the addition of the `extrapolation`
parameter, this implementation shares its API with `patsy.splines.bs`, and
should behave identically to both `patsy.splines.bs` and R's `splines::bs`
where functionality overlaps.

Args:
x: The vector for which the B-Spline basis should be computed.
df: The number of degrees of freedom to use for this spline. If
specified, `knots` will be automatically generated such that they
are `df` - `degree` (minus one if `include_intercept` is True)
equally spaced quantiles. You cannot specify both `df` and `knots`.
knots: The internal breakpoints of the B-Spline. If not specified, they
default to the empty list (unless `df` is specified), in which case
the ordinary polynomial (Bezier) basis is generated.
degree: The degree of the B-Spline (the highest degree of terms in the
resulting polynomial). Must be a non-negative integer.
include_intercept: Whether to return a complete (full-rank) basis. Note
that if `ensure_full_rank=True` is passed to the materializer, then
the intercept will (depending on context) nevertheless be omitted.
lower_bound: The lower bound for the domain for the B-Spline basis. If
not specified this is determined from `x`.
upper_bound: The upper bound for the domain for the B-Spline basis. If
not specified this is determined from `x`.
extrapolation: Selects how extrapolation should be performed when values
in `x` extend beyond the lower and upper bounds. Valid values are:
- 'raise': Raises a `ValueError` if there are any values in `x`
outside the B-Spline domain.
- 'clip': Any values above/below the domain are set to the
upper/lower bounds.
- 'na': Any values outside of bounds are set to `numpy.nan`.
- 'zero': Any values outside of bounds are set to `0`.
- 'extend': Any values outside of bounds are computed by extending
the polynomials of the B-Spline (this is the same as the default
in R).

Returns:
dict: A dictionary representing the encoded vectors ready for ingestion
by materializers.

Notes:
The implementation employed here uses a slightly generalised version of
the ["Cox-de Boor" algorithm](https://en.wikipedia.org/wiki/B-spline#Definition),
extended by this author to allow for extrapolations (although this
author doubts this is terribly novel). We have not used the `splev`
methods from `scipy` since in benchmarks this implementation outperforms
them for our use-cases.

If you would like to learn more about B-Splines, the primer put together
by Jeffrey Racine is an excellent resource:
https://cran.r-project.org/web/packages/crs/vignettes/spline_primer.pdf

As a stateful transform, we only keep track of `knots`, `lower_bound`
and `upper_bound`, which are sufficient given that all other information
must be explicitly specified.
"""
# Prepare and check arguments
if df is not None and knots is not None:
raise ValueError("You cannot specify both `df` and `knots`.")
lower_bound = numpy.min(x) if state.get('lower_bound', lower_bound) is None else lower_bound
upper_bound = numpy.max(x) if state.get('upper_bound', upper_bound) is None else upper_bound
extrapolation = SplineExtrapolation(extrapolation)
state['lower_bound'] = lower_bound
state['upper_bound'] = upper_bound

# Prepare data
if extrapolation is SplineExtrapolation.RAISE and numpy.any((x < lower_bound) | (x > upper_bound)):
raise ValueError(
"Some field values extend bound upper and/or lower bounds, which can result in ill-conditioned bases. "
"Pass a value for `extrapolation` to control how extrapolation should be performed."
)
if extrapolation is SplineExtrapolation.CLIP:
x = numpy.clip(x, lower_bound, upper_bound)
if extrapolation is SplineExtrapolation.NA:
x = numpy.where((x >= lower_bound) & (x <= upper_bound), x, numpy.nan)

# Prepare knots
if 'knots' not in state:
knots = knots or []
if df:
nknots = df - degree - (1 if include_intercept else 0)
if nknots < 0:
raise ValueError(f"Invalid value for `df`. `df` must be greater than {degree + (1 if include_intercept else 0)} [`degree` (+ 1 if `include_intercept` is `True`)].")
knots = list(numpy.quantile(x, numpy.linspace(0, 1, nknots + 2))[1:-1].ravel())
knots.insert(0, lower_bound)
knots.append(upper_bound)
knots = list(numpy.pad(knots, degree, mode='edge'))
state['knots'] = knots
knots = state['knots']
print(knots)

# Compute basis splines
# The following code is equivalent to [B(i, j=degree) for in range(len(knots)-d-1)], with B(i, j) as defined below.
# B = lambda i, j: ((x >= knots[i]) & (x < knots[i+1])).astype(float) if j == 0 else alpha(i, j, x) * B(i, j-1, x) + (1 - alpha(i+1, j, x)) * B(i+1, j-1, x)
# We don't directly use this recurrence relation so that we can memoise the B(i, j).
cache = defaultdict(dict)
alpha = lambda i, j: (x - knots[i]) / (knots[i + j] - knots[i]) if knots[i + j] != knots[i] else 0
for i in range(len(knots) - 1):
if extrapolation is SplineExtrapolation.EXTEND:
cache[0][i] = (
(x > (knots[i] if i != degree else -numpy.inf)) &
(x < (knots[i + 1] if i + 1 != len(knots) - degree - 1 else numpy.inf))
).astype(float)
else:
cache[0][i] = (
((x > knots[i]) if i != degree else (x >= knots[i])) &
((x < knots[i + 1]) if i + 1 != len(knots) - degree - 1 else (x <= knots[i + 1]))
).astype(float)
for d in range(1, degree + 1):
cache[d % 2].clear()
for i in range(len(knots) - d - 1):
cache[d % 2][i] = alpha(i, d) * cache[(d - 1) % 2][i] + (1 - alpha(i + 1, d)) * cache[(d - 1) % 2][i + 1]

# Prepare output
out = {
i: cache[degree % 2][i]
for i in sorted(cache[degree % 2])
if i > 0 or include_intercept
}
out.update({
'__kind__': 'numerical',
'__spans_intercept__': include_intercept,
'__drop_field__': 0,
'__format__': "{name}[{field}]",
'__encoded__': False,
})
return out
159 changes: 159 additions & 0 deletions tests/materializers/transforms/test_basis_spline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import numpy
import pytest

from formulaic.materializers.transforms.basis_spline import basis_spline


class TestBasisSpline:

@pytest.fixture(scope='session')
def data(self):
return numpy.linspace(0, 1, 21)

def test_basic(self, data):
V = basis_spline(data)

assert len([k for k in V if isinstance(k, int)]) == 3

# Comparison data copied from R output of:
# > library(splines)
# > data = seq(from=0, to=1, by=0.05)
# > bs(data)

assert numpy.allclose(V[1], [
0.000000, 0.135375, 0.243000, 0.325125, 0.384000, 0.421875, 0.441000,
0.443625, 0.432000, 0.408375, 0.375000, 0.334125, 0.288000, 0.238875,
0.189000, 0.140625, 0.096000, 0.057375, 0.027000, 0.007125, 0.000000,
])

assert numpy.allclose(V[2], [
0.000000, 0.007125, 0.027000, 0.057375, 0.096000, 0.140625, 0.189000,
0.238875, 0.288000, 0.334125, 0.375000, 0.408375, 0.432000, 0.443625,
0.441000, 0.421875, 0.384000, 0.325125, 0.243000, 0.135375, 0.000000,
])

assert numpy.allclose(V[3], [
0.000000, 0.000125, 0.001000, 0.003375, 0.008000, 0.015625, 0.027000,
0.042875, 0.064000, 0.091125, 0.125000, 0.166375, 0.216000, 0.274625,
0.343000, 0.421875, 0.512000, 0.614125, 0.729000, 0.857375, 1.000000,
])

def test_degree(self, data):
V = basis_spline(data, degree=1)

assert len([k for k in V if isinstance(k, int)]) == 1

# Comparison data copied from R output of:
# > library(splines)
# > data = seq(from=0, to=1, by=0.05)
# > bs(data, degree=1)

assert numpy.allclose(V[1], [
0.00, 0.05, 0.10, 0.15, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50,
0.55, 0.60, 0.65, 0.70, 0.75, 0.80, 0.85, 0.90, 0.95, 1.00
])

def test_include_intercept(self, data):
V = basis_spline(data, degree=1, include_intercept=True)

assert len([k for k in V if isinstance(k, int)]) == 2

# Comparison data copied from R output of:
# > library(splines)
# > data = seq(from=0, to=1, by=0.05)
# > bs(data, degree=1, intercept=TRUE)

assert numpy.allclose(V[0], [
1.00, 0.95, 0.90, 0.85, 0.80, 0.75, 0.70, 0.65, 0.60, 0.55, 0.50,
0.45, 0.40, 0.35, 0.30, 0.25, 0.20, 0.15, 0.10, 0.05, 0.00
])

assert numpy.allclose(V[1], [
0.00, 0.05, 0.10, 0.15, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50,
0.55, 0.60, 0.65, 0.70, 0.75, 0.80, 0.85, 0.90, 0.95, 1.00
])

def test_df(self, data):
V = basis_spline(data, df=5)

assert len([k for k in V if isinstance(k, int)]) == 5

# Comparison data copied from R output of:
# > library(splines)
# > data = seq(from=0, to=1, by=0.05)
# > bs(data, df=5)

assert numpy.allclose(V[1], [
0.00000000, 0.35465625, 0.54225000, 0.59821875, 0.55800000, 0.45703125,
0.33075000, 0.21434375, 0.12800000, 0.06865625, 0.03125000, 0.01071875,
0.00200000, 0.00003125, 0.00000000, 0.00000000, 0.00000000, 0.00000000,
0.00000000, 0.00000000, 0.00000000,
])

assert numpy.allclose(V[2], [
0.00000000, 0.03065625, 0.11025000, 0.22021875, 0.34200000, 0.45703125,
0.54675000, 0.59278125, 0.58800000, 0.54271875, 0.46875000, 0.37790625,
0.28200000, 0.19284375, 0.12150000, 0.07031250, 0.03600000, 0.01518750,
0.00450000, 0.00056250, 0.00000000,
])

assert numpy.allclose(V[3], [
0.00000000, 0.00056250, 0.00450000, 0.01518750, 0.03600000, 0.07031250,
0.12150000, 0.19284375, 0.28200000, 0.37790625, 0.46875000, 0.54271875,
0.58800000, 0.59278125, 0.54675000, 0.45703125, 0.34200000, 0.22021875,
0.11025000, 0.03065625, 0.00000000,
])

assert numpy.allclose(V[4], [
0.00000000, 0.00000000, 0.00000000, 0.00000000, 0.00000000, 0.00000000,
0.00000000, 0.00003125, 0.00200000, 0.01071875, 0.03125000, 0.06865625,
0.12800000, 0.21434375, 0.33075000, 0.45703125, 0.55800000, 0.59821875,
0.54225000, 0.35465625, 0.00000000,
])

assert numpy.allclose(V[5], [
0.000000, 0.000000, 0.000000, 0.000000, 0.000000, 0.000000, 0.000000,
0.000000, 0.000000, 0.000000, 0.000000, 0.000000, 0.000000, 0.000000,
0.001000, 0.015625, 0.064000, 0.166375, 0.343000, 0.614125, 1.000000,
])

def test_extrapolation(self, data):
# Comparison data based on R output of:
# > library(splines)
# > data = seq(from=0, to=1, by=0.05)
# > bs(data, Boundary.knots=c(0.25, 0.75))

with pytest.raises(ValueError, match="Some field values extend bound upper and/or lower bounds"):
basis_spline(data, lower_bound=0.25, upper_bound=0.75)

V = basis_spline(data, lower_bound=0.25, upper_bound=0.75, extrapolation='clip')
assert numpy.allclose(V[3], [
0.000, 0.000, 0.000, 0.000, 0.000, 0.000, 0.001, 0.008, 0.027, 0.064,
0.125, 0.216, 0.343, 0.512, 0.729, 1.000, 1.000, 1.000, 1.000, 1.000, 1.000,
])

V2 = basis_spline(data, lower_bound=0.25, upper_bound=0.75, extrapolation='na')
assert numpy.allclose(V2[3], [
numpy.nan, numpy.nan, numpy.nan, numpy.nan, numpy.nan, 0.000, 0.001, 0.008, 0.027, 0.064,
0.125, 0.216, 0.343, 0.512, 0.729, 1.000, numpy.nan, numpy.nan, numpy.nan, numpy.nan, numpy.nan,
], equal_nan=True)

V3 = basis_spline(data, lower_bound=0.25, upper_bound=0.75, extrapolation='zero')
assert numpy.allclose(V3[3], [
0.000, 0.000, 0.000, 0.000, 0.000, 0.000, 0.001, 0.008, 0.027, 0.064,
0.125, 0.216, 0.343, 0.512, 0.729, 1.000, 0.000, 0.000, 0.000, 0.000, 0.000,
], equal_nan=True)


V4 = basis_spline(data, lower_bound=0.25, upper_bound=0.75, extrapolation='extend')
assert numpy.allclose(V4[3], [
-0.125, -0.064, -0.027, -0.008, -0.001, 0.000, 0.001, 0.008, 0.027, 0.064,
0.125, 0.216, 0.343, 0.512, 0.729, 1.000, 1.331, 1.728, 2.197, 2.744, 3.375,
])

def test_invalid_df(self, data):
with pytest.raises(ValueError, match="You cannot specify both `df` and `knots`."):
basis_spline(data, df=2, knots=[])

with pytest.raises(ValueError, match="Invalid value for `df`. `df` must be greater than 3"):
basis_spline(data, df=2)