Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Repeating Basis Functions #171

Merged
merged 14 commits into from
Aug 22, 2019
131 changes: 131 additions & 0 deletions sklego/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pandas as pd
from patsy import dmatrix, build_design_matrices, PatsyError
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.utils import check_array, check_X_y
from sklearn.utils.validation import FLOAT_DTYPES, check_random_state, check_is_fitted

Expand Down Expand Up @@ -609,3 +610,133 @@ def _check_X_for_type(X):
"""Checks if input of the Selector is of the required dtype"""
if not isinstance(X, pd.DataFrame):
raise TypeError("Provided variable X is not of type pandas.DataFrame")

class RepeatingBasisFunction(TransformerMixin, BaseEstimator):
"""
This is a transformer for features with some form of circularity.
E.g. for days of the week you might face the problem that, conceptually, day 7 is as
close to day 6 as it is to day 1. While numerically their distance is different.
This transformer solves that problem.
RensDimmendaal marked this conversation as resolved.
Show resolved Hide resolved
The transformer selects a column and transforms it with a give number of repeating (radial) basis functions.
RensDimmendaal marked this conversation as resolved.
Show resolved Hide resolved
Radial basis functions are bell-curve shaped functions which take the original data as input.
The basis functions are equally spaced over the input data range (specified with floor and ceil).
The key feature of repeating basis funtions is that they are continuous when moving
from the min to the max. As a result these repeating basis functions can capture the
the "close" to the center of each respective basis function

:type column: int or list, default=0
:param column: Indexes the data on its second axis. Integers are interpreted as
positional columns, while strings can reference DataFrame columns by name.

:type remainder: {'drop', 'passthrough'}, default="drop"
:param remainder: By default, only the specified column is transformed, and the
non-specified columns are dropped. (default of ``'drop'``). By specifying
``remainder='passthrough'``, all remaining columns will be automatically passed
through. This subset of columns is concatenated with the output of the transformer.

:type n_periods: int, default=12
:param n_periods: number of basis functions to create, i.e., the number of columns that
will exit the transformer.

:type floor: float or 'min', default='min'
koaning marked this conversation as resolved.
Show resolved Hide resolved
:param floor: the value at which the basis function should equal the ceiling. Used
to scale the input data to a 0-1 range.

:type ceil: float or 'max', default='max'
:param ceil: the value at which the basis function should equal the floor. Used to
scale the input data to a 0-1 range.
"""

def __init__(
self, column=0, remainder="passthrough", n_periods=12, floor="min", ceil="max"
):
self.column = column
self.remainder = "passthrough"
self.n_periods = n_periods
self.floor = floor
self.ceil = ceil
self.pipeline = None

def fit(self, X, y=None):
self.pipeline = ColumnTransformer(
[
(
"repeatingbasis",
_RepeatingBasisFunction(
n_periods=self.n_periods, floor=self.floor, ceil=self.ceil
),
[self.column],
)
],
remainder=self.remainder,
)

self.pipeline.fit(X, y)

return self

def transform(self, X):
check_is_fitted(self, ["pipeline"])
return self.pipeline.transform(X)


class _RepeatingBasisFunction(TransformerMixin, BaseEstimator):
def __init__(self, n_periods: int = 12, floor="min", ceil="max"):
self.n_periods = n_periods
self.floor = floor
self.ceil = ceil

def fit(self, X, y=None):
X = check_array(X, estimator=self)

# find min and max for standardization if not given explicitly
if self.floor == "min":
self.floor = X.min()
if self.ceil == "max":
self.ceil = X.max()

# exclude the last value because it's identical to the first for repeating basis functions
self.bases_ = np.linspace(0, 1, self.n_periods + 1)[:-1]

# curves should narrower (wider) when we have more (fewer) basis functions
self.width_ = 1 / self.n_periods

return self

def transform(self, X):
X = check_array(X, estimator=self, ensure_2d=True)
check_is_fitted(self, ["bases_", "width_"])
# This transformer only accepts one feature as input
if len(X.shape) == 1:
raise ValueError(f"X should have exactly one column, it has: {X.shape[1]}")

# MinMax Scale to 0-1
X = (X - self.floor) / (self.ceil - self.floor)

base_distances = self._array_bases_distances(X, self.bases_)

# apply rbf function to series for each basis
return self._rbf(base_distances)

def _array_base_distance(self, arr: np.ndarray, base: float) -> np.ndarray:
"""Calculates the distances between all array values and the base,
where 0 and 1 are assumed to be at the same position"""
abs_diff_0 = np.abs(arr - base)
abs_diff_1 = 1 - abs_diff_0
concat = np.concatenate(
(abs_diff_0.reshape(-1, 1), abs_diff_1.reshape(-1, 1)), axis=1
)
final = concat.min(axis=1)
return final

def _array_bases_distances(self, array, bases):
"""Calculates the distances between all combinations of array and bases values"""
array = array.reshape(-1, 1)
bases = bases.reshape(1, -1)

return np.apply_along_axis(
lambda b: self._array_base_distance(array, base=b), axis=0, arr=bases
)

def _rbf(self, arr):
return np.exp(-(arr / self.width_) ** 2)
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import pytest
import numpy as np
import pandas as pd

from sklego.preprocessing import RepeatingBasisFunction


@pytest.fixture()
def df():
return pd.DataFrame({"a": [1, 2, 3, 4, 5, 6],
"b": np.log([10, 9, 8, 7, 6, 5]),
"c": ["a", "b", "a", "b", "c", "c"],
"d": ["b", "a", "a", "b", "a", "b"],
"e": [0, 1, 0, 1, 0, 1]})


def test_int_indexing(df):
X, y = df[["a", "b", "c", "d"]], df[["e"]]
tf = RepeatingBasisFunction(column=0,n_periods=4,remainder="passthrough")
assert tf.fit(X, y).transform(X).shape == (6, 7)

def test_str_indexing(df):
X, y = df[["a", "b", "c", "d"]], df[["e"]]
tf = RepeatingBasisFunction(column="b",n_periods=4,remainder="passthrough")
assert tf.fit(X, y).transform(X).shape == (6, 7)

def test_dataframe_equals_array(df):
X, y = df[["a", "b", "c", "d"]], df[["e"]]
tf = RepeatingBasisFunction(column=1, n_periods=4, remainder="passthrough")
df_transformed = tf.fit(X,y).transform(X)
array_transformed = tf.fit(X.values,y).transform(X.values)
np.testing.assert_array_equal(df_transformed, array_transformed)