Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions pandas/core/kernels.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import ctypes

import numpy as np
import numba


@numba.jit(nopython=True, nogil=True, parallel=True)
def add_mean_numba(val, nobs, sum_x, neg_ct, compensation):
if not np.isnan(val):
nobs += 1
y = val - compensation
t = sum_x + y
compensation = t - sum_x - y
sum_x = t
if val < 0:
neg_ct += 1
return nobs, sum_x, neg_ct, compensation


@numba.jit(nopython=True, nogil=True, parallel=True)
def remove_mean_numba(val, nobs, sum_x, neg_ct, compensation):
if not np.isnan(val):
nobs -= 1
y = -val - compensation
t = sum_x + y
compensation = t - sum_x - y
sum_x = t
if val < 0:
neg_ct -= 1
return nobs, sum_x, neg_ct, compensation


@numba.jit(nopython=True, nogil=True, parallel=True)
def sliding_mean_pure_numba(
values: np.array,
start: np.array,
end: np.array,
min_periods: int,
):
N = len(start)
nobs = 0.0
sum_x = 0.0
neg_ct = 0
compensation_add = 0.0
compensation_remove = 0.0
output = np.empty(N, dtype=np.float64)
# TODO: Experiment with prange
for i in range(N):
s = start[i]
e = end[i]
if i == 0:
for j in range(s, e):
val = values[j]
nobs, sum_x, neg_ct, compensation_add = add_mean_numba(val, nobs, sum_x, neg_ct, compensation_add)
else:
for j in range(start[i - 1], s):
val = values[j]
nobs, sum_x, neg_ct, compensation_remove = remove_mean_numba(val, nobs, sum_x, neg_ct, compensation_remove)

for j in range(end[i - 1], e):
val = values[j]
nobs, sum_x, neg_ct, compensation_add = add_mean_numba(val, nobs, sum_x, neg_ct, compensation_add)

if nobs >= min_periods and nobs > 0:
result = sum_x / nobs
if neg_ct == 0 and result < 0:
result = 0
elif neg_ct == nobs and result > 0:
result = 0
else:
result = np.nan

output[i] = result

return output


@numba.jit(nopython=True, nogil=True, parallel=True)
def np_nanmean_pure_numba(
values: np.array,
start: np.array,
end: np.array,
min_periods: int,
):
result = np.empty(len(start))
for i in numba.prange(len(start)):
s = start[i]
e = end[i]
window = values[s:e]
num_nan = np.sum(np.isnan(window))
if num_nan > min_periods:
result[i] = np.nan
else:
result[i] = np.nanmean(window)
return result

"""
addr = numba.extending.get_cython_function_address("pandas", "_libs", "window", "aggregations")
functype = ctypes.CFUNCTYPE(ctypes.c_double, ctypes.c_double, ctypes.c_int64)
myexp = functype(addr)

def cython_rolling_mean(
values,
start,
end,
min_periods
):
pass
"""
60 changes: 60 additions & 0 deletions pandas/core/shared_executer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import numba
import numpy as np

from pandas.core.window import indexers

CACHE = {}


def generate_numba_func(func, kernel, op):
if (func, kernel, op) in CACHE:
return CACHE[(func, kernel, op)]

@numba.jit(nopython=True, nogil=True, parallel=True)
def column_looper(
df_values,
start,
end,
min_periods,
):
result = np.empty((len(start), df_values.shape[1]))
for i in numba.prange(df_values.shape[1]):
result[:, i] = func(df_values[:, i], start, end, min_periods)
return result
return column_looper


def rolling_mean(df, window, min_periods, kernel_func, kernel_type):
values = df.to_numpy()
indexer = indexers.FixedWindowIndexer(
window_size=window,
)
start, end = indexer.get_window_bounds(num_values=len(values))
func = generate_numba_func(kernel_func, kernel_type, "rolling")
result = func(values, start, end, min_periods)
CACHE[(kernel_func, kernel_type, "rolling")] = func
return result


def groupby_mean(df, groupby_key, min_periods, kernel_func, kernel_type):
gb = df.groupby(groupby_key)
indexer = indexers.GroupbyIndexer(
groupby_indicies=gb.indices
)
start, end = indexer.get_window_bounds()
values = gb._selected_obj.to_numpy()
func = generate_numba_func(kernel_func, kernel_type, "groupby")
result = func(values, start, end, min_periods)
CACHE[(kernel_func, kernel_type, "groupby")] = func
return result


def mean(df, min_periods, kernel_func, kernel_type):
# TODO: Make an indexer for this
start = np.array([0])
end = np.array([len(df)])
values = df.to_numpy()
func = generate_numba_func(kernel_func, kernel_type, "mean")
result = func(values, start, end, min_periods)
CACHE[(kernel_func, kernel_type, "mean")] = func
return result
8 changes: 4 additions & 4 deletions pandas/core/window/ewm.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from pandas.core.window.indexers import (
BaseIndexer,
ExponentialMovingWindowIndexer,
GroupbyIndexer,
GroupbyRollingIndexer,
)
from pandas.core.window.numba_ import (
generate_ewma_numba_table_func,
Expand Down Expand Up @@ -714,15 +714,15 @@ def __init__(self, obj, *args, _grouper=None, **kwargs):
self.halflife,
)

def _get_window_indexer(self) -> GroupbyIndexer:
def _get_window_indexer(self) -> GroupbyRollingIndexer:
"""
Return an indexer class that will compute the window start and end bounds

Returns
-------
GroupbyIndexer
GroupbyRollingIndexer
"""
window_indexer = GroupbyIndexer(
window_indexer = GroupbyRollingIndexer(
groupby_indicies=self._grouper.indices,
window_indexer=ExponentialMovingWindowIndexer,
)
Expand Down
8 changes: 4 additions & 4 deletions pandas/core/window/expanding.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from pandas.core.window.indexers import (
BaseIndexer,
ExpandingIndexer,
GroupbyIndexer,
GroupbyRollingIndexer,
)
from pandas.core.window.rolling import (
BaseWindowGroupby,
Expand Down Expand Up @@ -675,15 +675,15 @@ class ExpandingGroupby(BaseWindowGroupby, Expanding):

_attributes = Expanding._attributes + BaseWindowGroupby._attributes

def _get_window_indexer(self) -> GroupbyIndexer:
def _get_window_indexer(self) -> GroupbyRollingIndexer:
"""
Return an indexer class that will compute the window start and end bounds

Returns
-------
GroupbyIndexer
GroupbyRollingIndexer
"""
window_indexer = GroupbyIndexer(
window_indexer = GroupbyRollingIndexer(
groupby_indicies=self._grouper.indices,
window_indexer=ExpandingIndexer,
)
Expand Down
48 changes: 48 additions & 0 deletions pandas/core/window/indexers.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,54 @@ def get_window_bounds(


class GroupbyIndexer(BaseIndexer):
def __init__(
self,
index_array: np.ndarray | None = None,
window_size: int = 0,
groupby_indicies: dict | None = None,
**kwargs,
):
"""
Parameters
----------
index_array : np.ndarray or None
np.ndarray of the index of the original object that we are performing
a chained groupby operation over. This index has been pre-sorted relative to
the groups
window_size : int
window size during the windowing operation
groupby_indicies : dict or None
dict of {group label: [positional index of rows belonging to the group]}
**kwargs :
keyword arguments that will be available when get_window_bounds is called
"""
self.groupby_indicies = groupby_indicies or {}
super().__init__(index_array, window_size, **kwargs)

@Appender(get_window_bounds_doc)
def get_window_bounds(
self,
num_values: int = 0,
min_periods: int | None = None,
center: bool | None = None,
closed: str | None = None,
) -> tuple[np.ndarray, np.ndarray]:
start = []
end = []
#window_indices_start = 0
for indices in self.groupby_indicies.values():
# TODO: Not entirely correct, I assume the rows belonging to the groups are contiguous with each other
s = indices[0]
e = s + len(indices)
#window_indices_start += len(indices)
start.append(s)
end.append(e)
start = np.array(start)
end = np.array(end)
return start, end


class GroupbyRollingIndexer(BaseIndexer):
"""Calculate bounds to compute groupby rolling, mimicking df.groupby().rolling()"""

def __init__(
Expand Down
8 changes: 4 additions & 4 deletions pandas/core/window/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
from pandas.core.window.indexers import (
BaseIndexer,
FixedWindowIndexer,
GroupbyIndexer,
GroupbyRollingIndexer,
VariableWindowIndexer,
)
from pandas.core.window.numba_ import (
Expand Down Expand Up @@ -2312,13 +2312,13 @@ class RollingGroupby(BaseWindowGroupby, Rolling):

_attributes = Rolling._attributes + BaseWindowGroupby._attributes

def _get_window_indexer(self) -> GroupbyIndexer:
def _get_window_indexer(self) -> GroupbyRollingIndexer:
"""
Return an indexer class that will compute the window start and end bounds

Returns
-------
GroupbyIndexer
GroupbyRollingIndexer
"""
rolling_indexer: type[BaseIndexer]
indexer_kwargs: dict[str, Any] | None = None
Expand All @@ -2336,7 +2336,7 @@ def _get_window_indexer(self) -> GroupbyIndexer:
else:
rolling_indexer = FixedWindowIndexer
window = self.window
window_indexer = GroupbyIndexer(
window_indexer = GroupbyRollingIndexer(
index_array=index_array,
window_size=window,
groupby_indicies=self._grouper.indices,
Expand Down