Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

online ewma #46

Closed
jreback opened this issue Feb 8, 2021 · 12 comments
Closed

online ewma #46

jreback opened this issue Feb 8, 2021 · 12 comments

Comments

@jreback
Copy link
Collaborator

jreback commented Feb 8, 2021

tis is an implementation of online ewma. its self contained, copying mostly the pandas ewma implementation (and exposing the outputs).

code

import typing
import numba
import numpy as np
from numpy import nan
def ewma(
    x: np.ndarray,
    *,
    alpha: float,
    min_periods: int,
    ignore_na: bool,
) -> typing.Tuple[
    np.ndarray,
    np.ndarray,
    np.ndarray
]:
    assert isinstance(x, np.ndarray)
    assert len(x.shape) == 1
    assert x.dtype == np.float64
    n: np.ndarray = np.empty_like(x, dtype=np.int_)
    w: np.ndarray = np.empty_like(x)
    y: np.ndarray = np.empty_like(x)
    if x.shape[0]:
        y_0: float = x[0]
        is_observation: bool = y_0 == y_0
        n_0: int = int(is_observation)
        w_0: float = 1.0
        y[0] = y_0 if n_0 >= min_periods else nan
        n[0] = n_0
        w[0] = w_0
        _inc_ewma(
            x,
            n,
            w,
            y,
            alpha,
            min_periods,
            ignore_na,
            n_0,
            w_0,
            y_0,
            1
        )
    return n, w, y
def inc_ewma(
    x: np.ndarray,
    *,
    alpha: float,
    min_periods: int,
    ignore_na: bool,
    n_minus_1: int,
    w_minus_1: float,
    y_minus_1: float
) -> typing.Tuple[
    np.ndarray,
    np.ndarray,
    np.ndarray
]:
    assert isinstance(x, np.ndarray)
    assert len(x.shape) == 1
    assert x.dtype == np.float64
    n: np.ndarray = np.empty_like(x, dtype=np.int_)
    w: np.ndarray = np.empty_like(x)
    y: np.ndarray = np.empty_like(x)
    _inc_ewma(
        x,
        n,
        w,
        y,
        alpha,
        min_periods,
        ignore_na,
        n_minus_1,
        w_minus_1,
        y_minus_1,
        0
    )
    return n, w, y
@numba.njit  # type: ignore
def _inc_ewma(
    x: np.ndarray,
    n: np.ndarray,
    w: np.ndarray,
    y: np.ndarray,
    alpha: float,
    min_periods: int,
    ignore_na: bool,
    n_i: int,
    w_i: float,
    y_i: float,
    i: int
) -> None:
    beta: float = 1.0 - alpha
    for i in range(i, len(x)):
        x_i: float = x[i]
        is_observation = x_i == x_i
        n_i += is_observation
        if y_i == y_i:
            if is_observation or not ignore_na:
                w_i *= beta
                if is_observation:
                    # avoid numerical errors on constant series
                    if y_i != x_i:
                        y_i = ((w_i * y_i) + x_i) / (w_i + 1.0)
                    w_i += 1.0
        elif is_observation:
            y_i = x_i
        y[i] = y_i if n_i >= min_periods else nan
        n[i] = n_i
        w[i] = w_i

tests

import unittest
import numpy as np
import pandas as pd
from incewma import ewma, inc_ewma
class IncEwmaTestCase(unittest.TestCase):
    def test_ewma(self) -> None:
        alpha = 0.5
        min_periods = 3
        ignore_na = False
        x = pd.Series(np.arange(10, dtype=np.float64) + 1.7)
        y: np.ndarray = x.ewm(
            alpha=alpha,
            min_periods=min_periods,
            adjust=True,
            ignore_na=ignore_na
        ).mean().to_numpy()
        n_all, w_all, y_all = ewma(
            x.to_numpy(),
            alpha=alpha,
            min_periods=min_periods,
            ignore_na=ignore_na
        )
        np.testing.assert_equal(np.arange(1, x.shape[0] + 1), n_all)
        np.testing.assert_allclose(y, y_all)
        k = 3
        n_k, w_k, y_k = inc_ewma(
            x.to_numpy()[-k:],
            alpha=alpha,
            min_periods=min_periods,
            ignore_na=ignore_na,
            n_minus_1=n_all[-(k + 1)],
            w_minus_1=w_all[-(k + 1)],
            y_minus_1=y_all[-(k + 1)]
        )
        np.testing.assert_equal(n_all[-k:], n_k)
        np.testing.assert_allclose(w_all[-k:], w_k)
        np.testing.assert_allclose(y[-k:], y_k)
    def test_ewma_empty(self) -> None:
        alpha = 0.5
        min_periods = 3
        ignore_na = False
        x = pd.Series(np.empty((0,), dtype=np.float64))
        y: np.ndarray = x.ewm(
            alpha=alpha,
            min_periods=min_periods,
            adjust=True,
            ignore_na=ignore_na
        ).mean().to_numpy()
        np.testing.assert_allclose(np.empty((0,), dtype=np.float64), y)
        n_all, w_all, y_all = ewma(
            x.to_numpy(),
            alpha=alpha,
            min_periods=min_periods,
            ignore_na=ignore_na
        )
        np.testing.assert_equal(np.empty((0,), dtype=np.int_), n_all)
        np.testing.assert_allclose(np.empty((0,), dtype=np.float64), w_all)
        np.testing.assert_allclose(np.empty((0,), dtype=np.float64), y_all)
        n_k, w_k, y_k = inc_ewma(
            x.to_numpy(),
            alpha=alpha,
            min_periods=min_periods,
            ignore_na=ignore_na,
            n_minus_1=1,
            w_minus_1=1.0,
            y_minus_1=1.0
        )
        np.testing.assert_equal(np.empty((0,), dtype=np.int_), n_k)
        np.testing.assert_allclose(np.empty((0,), dtype=np.float64), w_k)
        np.testing.assert_allclose(np.empty((0,), dtype=np.float64), y_k)
@jreback jreback mentioned this issue Feb 8, 2021
32 tasks
@mroeschke
Copy link
Collaborator

API Proposal

  • Add an online argument to rolling/expanding/ewm constructors
ewm = df.ewm(2, online=True)
  • Add an update method to prime the object with the next observations
def update(self, observations):
    if not self.online:
        raise ValueError("Update must be called with online=True")
    if self.observations:
        raise ValueError("Already called update")
    # other data validations
    self.observations = observations
    return self

So the flow would be:

ewm = df.ewm(2, online=True)

# ewm object updated with last_weight and last_result
result = ewm.mean()

# len(next_results) == 3 and ewm updated with last_weight and last_result 
next_results = ewm.update(np.array([1, 2, 3])).mean() 

@jreback
Copy link
Collaborator Author

jreback commented Mar 15, 2021

what if we do this

ewm_stateful = df.ewm(2).online()

# get the mean
ewm_stateful.mean()

# get the std
ewm_stateful.std()

# update some values
ewm_stateful.update(.....)

# mean and std are both saved
ewm_stateful.mean() 
ewm_stateful.std()
ewm_stateful.agg(['mean', 'std'])
ewm_stateful.count()

@jreback
Copy link
Collaborator Author

jreback commented Mar 15, 2021

@jreback
Copy link
Collaborator Author

jreback commented Mar 15, 2021

@mroeschke
Copy link
Collaborator

xref python-streamz/streamz#412

@mroeschke
Copy link
Collaborator

xref #48

@mroeschke
Copy link
Collaborator

Last week we were discussing how to pass the update data, and I think the two options we're still discussing boil down to:

  1. New method
def update(data, deltas):
    # validation
    self.update_data = data
    self.updaet_deltas = deltas

-----

# Maybe a new object
ewm = df.ewm(...)
ewm.update(df_new)
ewm.mean()
# Continued weights from df_new
ewm.update(df_new2)
ewm.mean()
ewm.reset_update()
  1. New keyword argument
# numba only feature
def mean(engine="numba", update=None, update_delta=None):
    ... 
    return result

-----

ewm = df.ewm(...)
ewm.mean(engine="numba")
ewm.mean(engine="numba", update=df_new)
# Continued weights from df_new
ewm.mean(engine="numba", update=df_new2)
ewm.reset_update()

I still prefer 2 over 1, because:

  1. Directly evident which methods (future looking) support update or not.
  2. Nicely scoped which methods need which update variables e.g mean can take a update df and update_times, std can take an update df

@jreback
Copy link
Collaborator Author

jreback commented May 3, 2021

what about option 3

ewm = df.ewm()
online = ewm.online(engine='numba')

....

online.mean(update=df_new)
online.var(update=df_new)
# return current mean, no updates
online.mean(update=None)
...

what about agg

online.agg(['mean', 'var'], update=....)

another way

online.update()
online.mean()

or in concise notation

updated = online.update(.....)
updated.mean()

@jreback
Copy link
Collaborator Author

jreback commented May 3, 2021

option 4

ewm = df.ewm()
online = ewm.online(engine='numba')
meaner = online.mean()
meaner.update(....)
meaner =df.ewm().online(engine='numba').mean()
meaner.update(...)

@jreback
Copy link
Collaborator Author

jreback commented May 3, 2021

option 5

ewm = df.ewm()
meaner = ewm.online(engine='numba').mean()
scalar = meaner.update(.....)
scalar = meaner.get_value()
...

@mroeschke
Copy link
Collaborator

Method 3 Demo'd in #49

@mroeschke
Copy link
Collaborator

Closed by pandas-dev#41888

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants