Skip to content

Commit

Permalink
Merge pull request #21 from matthewwardrop/add_basis_splines
Browse files Browse the repository at this point in the history
Add basis_spline stateful transform.
  • Loading branch information
matthewwardrop committed Jul 27, 2020
2 parents b6702ff + b367270 commit 174db5c
Show file tree
Hide file tree
Showing 3 changed files with 325 additions and 0 deletions.
2 changes: 2 additions & 0 deletions formulaic/materializers/transforms/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from .basis_spline import basis_spline
from .center import center
from .identity import identity
from .encode_categorical import encode_categorical


TRANSFORMS = {
'bs': basis_spline,
'center': center,
'C': encode_categorical,
'I': identity,
Expand Down
164 changes: 164 additions & 0 deletions formulaic/materializers/transforms/basis_spline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
from collections import defaultdict
from enum import Enum
from typing import Iterable, Optional, Union

import numpy
import pandas

from formulaic.utils.stateful_transforms import stateful_transform


class SplineExtrapolation(Enum):
"""
Specification for how extrapolation should be performed during spline
computations.
"""
RAISE = 'raise'
CLIP = 'clip'
NA = 'na'
ZERO = 'zero'
EXTEND = 'extend'


@stateful_transform
def basis_spline(
x: Union[pandas.Series, numpy.ndarray],
df: Optional[int] = None,
knots: Optional[Iterable[float]] = None,
degree: int = 3,
include_intercept: bool = False,
lower_bound: Optional[float] = None,
upper_bound: Optional[float] = None,
extrapolation: Union[str, SplineExtrapolation] = 'raise',
state: dict = None,
):
"""
Evaluates the B-Spline basis vectors for given inputs `x`.
This is especially useful in the context of allowing non-linear fits to data
in linear regression. Except for the addition of the `extrapolation`
parameter, this implementation shares its API with `patsy.splines.bs`, and
should behave identically to both `patsy.splines.bs` and R's `splines::bs`
where functionality overlaps.
Args:
x: The vector for which the B-Spline basis should be computed.
df: The number of degrees of freedom to use for this spline. If
specified, `knots` will be automatically generated such that they
are `df` - `degree` (minus one if `include_intercept` is True)
equally spaced quantiles. You cannot specify both `df` and `knots`.
knots: The internal breakpoints of the B-Spline. If not specified, they
default to the empty list (unless `df` is specified), in which case
the ordinary polynomial (Bezier) basis is generated.
degree: The degree of the B-Spline (the highest degree of terms in the
resulting polynomial). Must be a non-negative integer.
include_intercept: Whether to return a complete (full-rank) basis. Note
that if `ensure_full_rank=True` is passed to the materializer, then
the intercept will (depending on context) nevertheless be omitted.
lower_bound: The lower bound for the domain for the B-Spline basis. If
not specified this is determined from `x`.
upper_bound: The upper bound for the domain for the B-Spline basis. If
not specified this is determined from `x`.
extrapolation: Selects how extrapolation should be performed when values
in `x` extend beyond the lower and upper bounds. Valid values are:
- 'raise': Raises a `ValueError` if there are any values in `x`
outside the B-Spline domain.
- 'clip': Any values above/below the domain are set to the
upper/lower bounds.
- 'na': Any values outside of bounds are set to `numpy.nan`.
- 'zero': Any values outside of bounds are set to `0`.
- 'extend': Any values outside of bounds are computed by extending
the polynomials of the B-Spline (this is the same as the default
in R).
Returns:
dict: A dictionary representing the encoded vectors ready for ingestion
by materializers.
Notes:
The implementation employed here uses a slightly generalised version of
the ["Cox-de Boor" algorithm](https://en.wikipedia.org/wiki/B-spline#Definition),
extended by this author to allow for extrapolations (although this
author doubts this is terribly novel). We have not used the `splev`
methods from `scipy` since in benchmarks this implementation outperforms
them for our use-cases.
If you would like to learn more about B-Splines, the primer put together
by Jeffrey Racine is an excellent resource:
https://cran.r-project.org/web/packages/crs/vignettes/spline_primer.pdf
As a stateful transform, we only keep track of `knots`, `lower_bound`
and `upper_bound`, which are sufficient given that all other information
must be explicitly specified.
"""
# Prepare and check arguments
if df is not None and knots is not None:
raise ValueError("You cannot specify both `df` and `knots`.")
lower_bound = numpy.min(x) if state.get('lower_bound', lower_bound) is None else lower_bound
upper_bound = numpy.max(x) if state.get('upper_bound', upper_bound) is None else upper_bound
extrapolation = SplineExtrapolation(extrapolation)
state['lower_bound'] = lower_bound
state['upper_bound'] = upper_bound

# Prepare data
if extrapolation is SplineExtrapolation.RAISE and numpy.any((x < lower_bound) | (x > upper_bound)):
raise ValueError(
"Some field values extend bound upper and/or lower bounds, which can result in ill-conditioned bases. "
"Pass a value for `extrapolation` to control how extrapolation should be performed."
)
if extrapolation is SplineExtrapolation.CLIP:
x = numpy.clip(x, lower_bound, upper_bound)
if extrapolation is SplineExtrapolation.NA:
x = numpy.where((x >= lower_bound) & (x <= upper_bound), x, numpy.nan)

# Prepare knots
if 'knots' not in state:
knots = knots or []
if df:
nknots = df - degree - (1 if include_intercept else 0)
if nknots < 0:
raise ValueError(f"Invalid value for `df`. `df` must be greater than {degree + (1 if include_intercept else 0)} [`degree` (+ 1 if `include_intercept` is `True`)].")
knots = list(numpy.quantile(x, numpy.linspace(0, 1, nknots + 2))[1:-1].ravel())
knots.insert(0, lower_bound)
knots.append(upper_bound)
knots = list(numpy.pad(knots, degree, mode='edge'))
state['knots'] = knots
knots = state['knots']
print(knots)

# Compute basis splines
# The following code is equivalent to [B(i, j=degree) for in range(len(knots)-d-1)], with B(i, j) as defined below.
# B = lambda i, j: ((x >= knots[i]) & (x < knots[i+1])).astype(float) if j == 0 else alpha(i, j, x) * B(i, j-1, x) + (1 - alpha(i+1, j, x)) * B(i+1, j-1, x)
# We don't directly use this recurrence relation so that we can memoise the B(i, j).
cache = defaultdict(dict)
alpha = lambda i, j: (x - knots[i]) / (knots[i + j] - knots[i]) if knots[i + j] != knots[i] else 0
for i in range(len(knots) - 1):
if extrapolation is SplineExtrapolation.EXTEND:
cache[0][i] = (
(x > (knots[i] if i != degree else -numpy.inf)) &
(x < (knots[i + 1] if i + 1 != len(knots) - degree - 1 else numpy.inf))
).astype(float)
else:
cache[0][i] = (
((x > knots[i]) if i != degree else (x >= knots[i])) &
((x < knots[i + 1]) if i + 1 != len(knots) - degree - 1 else (x <= knots[i + 1]))
).astype(float)
for d in range(1, degree + 1):
cache[d % 2].clear()
for i in range(len(knots) - d - 1):
cache[d % 2][i] = alpha(i, d) * cache[(d - 1) % 2][i] + (1 - alpha(i + 1, d)) * cache[(d - 1) % 2][i + 1]

# Prepare output
out = {
i: cache[degree % 2][i]
for i in sorted(cache[degree % 2])
if i > 0 or include_intercept
}
out.update({
'__kind__': 'numerical',
'__spans_intercept__': include_intercept,
'__drop_field__': 0,
'__format__': "{name}[{field}]",
'__encoded__': False,
})
return out
159 changes: 159 additions & 0 deletions tests/materializers/transforms/test_basis_spline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import numpy
import pytest

from formulaic.materializers.transforms.basis_spline import basis_spline


class TestBasisSpline:

@pytest.fixture(scope='session')
def data(self):
return numpy.linspace(0, 1, 21)

def test_basic(self, data):
V = basis_spline(data)

assert len([k for k in V if isinstance(k, int)]) == 3

# Comparison data copied from R output of:
# > library(splines)
# > data = seq(from=0, to=1, by=0.05)
# > bs(data)

assert numpy.allclose(V[1], [
0.000000, 0.135375, 0.243000, 0.325125, 0.384000, 0.421875, 0.441000,
0.443625, 0.432000, 0.408375, 0.375000, 0.334125, 0.288000, 0.238875,
0.189000, 0.140625, 0.096000, 0.057375, 0.027000, 0.007125, 0.000000,
])

assert numpy.allclose(V[2], [
0.000000, 0.007125, 0.027000, 0.057375, 0.096000, 0.140625, 0.189000,
0.238875, 0.288000, 0.334125, 0.375000, 0.408375, 0.432000, 0.443625,
0.441000, 0.421875, 0.384000, 0.325125, 0.243000, 0.135375, 0.000000,
])

assert numpy.allclose(V[3], [
0.000000, 0.000125, 0.001000, 0.003375, 0.008000, 0.015625, 0.027000,
0.042875, 0.064000, 0.091125, 0.125000, 0.166375, 0.216000, 0.274625,
0.343000, 0.421875, 0.512000, 0.614125, 0.729000, 0.857375, 1.000000,
])

def test_degree(self, data):
V = basis_spline(data, degree=1)

assert len([k for k in V if isinstance(k, int)]) == 1

# Comparison data copied from R output of:
# > library(splines)
# > data = seq(from=0, to=1, by=0.05)
# > bs(data, degree=1)

assert numpy.allclose(V[1], [
0.00, 0.05, 0.10, 0.15, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50,
0.55, 0.60, 0.65, 0.70, 0.75, 0.80, 0.85, 0.90, 0.95, 1.00
])

def test_include_intercept(self, data):
V = basis_spline(data, degree=1, include_intercept=True)

assert len([k for k in V if isinstance(k, int)]) == 2

# Comparison data copied from R output of:
# > library(splines)
# > data = seq(from=0, to=1, by=0.05)
# > bs(data, degree=1, intercept=TRUE)

assert numpy.allclose(V[0], [
1.00, 0.95, 0.90, 0.85, 0.80, 0.75, 0.70, 0.65, 0.60, 0.55, 0.50,
0.45, 0.40, 0.35, 0.30, 0.25, 0.20, 0.15, 0.10, 0.05, 0.00
])

assert numpy.allclose(V[1], [
0.00, 0.05, 0.10, 0.15, 0.20, 0.25, 0.30, 0.35, 0.40, 0.45, 0.50,
0.55, 0.60, 0.65, 0.70, 0.75, 0.80, 0.85, 0.90, 0.95, 1.00
])

def test_df(self, data):
V = basis_spline(data, df=5)

assert len([k for k in V if isinstance(k, int)]) == 5

# Comparison data copied from R output of:
# > library(splines)
# > data = seq(from=0, to=1, by=0.05)
# > bs(data, df=5)

assert numpy.allclose(V[1], [
0.00000000, 0.35465625, 0.54225000, 0.59821875, 0.55800000, 0.45703125,
0.33075000, 0.21434375, 0.12800000, 0.06865625, 0.03125000, 0.01071875,
0.00200000, 0.00003125, 0.00000000, 0.00000000, 0.00000000, 0.00000000,
0.00000000, 0.00000000, 0.00000000,
])

assert numpy.allclose(V[2], [
0.00000000, 0.03065625, 0.11025000, 0.22021875, 0.34200000, 0.45703125,
0.54675000, 0.59278125, 0.58800000, 0.54271875, 0.46875000, 0.37790625,
0.28200000, 0.19284375, 0.12150000, 0.07031250, 0.03600000, 0.01518750,
0.00450000, 0.00056250, 0.00000000,
])

assert numpy.allclose(V[3], [
0.00000000, 0.00056250, 0.00450000, 0.01518750, 0.03600000, 0.07031250,
0.12150000, 0.19284375, 0.28200000, 0.37790625, 0.46875000, 0.54271875,
0.58800000, 0.59278125, 0.54675000, 0.45703125, 0.34200000, 0.22021875,
0.11025000, 0.03065625, 0.00000000,
])

assert numpy.allclose(V[4], [
0.00000000, 0.00000000, 0.00000000, 0.00000000, 0.00000000, 0.00000000,
0.00000000, 0.00003125, 0.00200000, 0.01071875, 0.03125000, 0.06865625,
0.12800000, 0.21434375, 0.33075000, 0.45703125, 0.55800000, 0.59821875,
0.54225000, 0.35465625, 0.00000000,
])

assert numpy.allclose(V[5], [
0.000000, 0.000000, 0.000000, 0.000000, 0.000000, 0.000000, 0.000000,
0.000000, 0.000000, 0.000000, 0.000000, 0.000000, 0.000000, 0.000000,
0.001000, 0.015625, 0.064000, 0.166375, 0.343000, 0.614125, 1.000000,
])

def test_extrapolation(self, data):
# Comparison data based on R output of:
# > library(splines)
# > data = seq(from=0, to=1, by=0.05)
# > bs(data, Boundary.knots=c(0.25, 0.75))

with pytest.raises(ValueError, match="Some field values extend bound upper and/or lower bounds"):
basis_spline(data, lower_bound=0.25, upper_bound=0.75)

V = basis_spline(data, lower_bound=0.25, upper_bound=0.75, extrapolation='clip')
assert numpy.allclose(V[3], [
0.000, 0.000, 0.000, 0.000, 0.000, 0.000, 0.001, 0.008, 0.027, 0.064,
0.125, 0.216, 0.343, 0.512, 0.729, 1.000, 1.000, 1.000, 1.000, 1.000, 1.000,
])

V2 = basis_spline(data, lower_bound=0.25, upper_bound=0.75, extrapolation='na')
assert numpy.allclose(V2[3], [
numpy.nan, numpy.nan, numpy.nan, numpy.nan, numpy.nan, 0.000, 0.001, 0.008, 0.027, 0.064,
0.125, 0.216, 0.343, 0.512, 0.729, 1.000, numpy.nan, numpy.nan, numpy.nan, numpy.nan, numpy.nan,
], equal_nan=True)

V3 = basis_spline(data, lower_bound=0.25, upper_bound=0.75, extrapolation='zero')
assert numpy.allclose(V3[3], [
0.000, 0.000, 0.000, 0.000, 0.000, 0.000, 0.001, 0.008, 0.027, 0.064,
0.125, 0.216, 0.343, 0.512, 0.729, 1.000, 0.000, 0.000, 0.000, 0.000, 0.000,
], equal_nan=True)


V4 = basis_spline(data, lower_bound=0.25, upper_bound=0.75, extrapolation='extend')
assert numpy.allclose(V4[3], [
-0.125, -0.064, -0.027, -0.008, -0.001, 0.000, 0.001, 0.008, 0.027, 0.064,
0.125, 0.216, 0.343, 0.512, 0.729, 1.000, 1.331, 1.728, 2.197, 2.744, 3.375,
])

def test_invalid_df(self, data):
with pytest.raises(ValueError, match="You cannot specify both `df` and `knots`."):
basis_spline(data, df=2, knots=[])

with pytest.raises(ValueError, match="Invalid value for `df`. `df` must be greater than 3"):
basis_spline(data, df=2)

0 comments on commit 174db5c

Please sign in to comment.