Skip to content
Merged
21 changes: 15 additions & 6 deletions streamz/dataframe/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from numbers import Number

import numpy as np
import pandas as pd
from .utils import is_series_like, is_index_like, get_dataframe_package


Expand Down Expand Up @@ -202,10 +203,10 @@ def diff_loc(dfs, new, window=None):
"""
dfs = deque(dfs)
dfs.append(new)
mx = max(df.index.max() for df in dfs)
mx = pd.Timestamp(max(df.index.max() for df in dfs))
mn = mx - window
old = []
while dfs[0].index.min() < mn:
while pd.Timestamp(dfs[0].index.min()) < mn:
o = dfs[0].loc[:mn]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious as to how casting to a pandas timestamp helps here.

Copy link
Contributor Author

@chinmaychandak chinmaychandak Aug 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, cudf's df.index.min() (or max()) returns a numpy.datetime64 as opposed to Pandas dataframes returning a pandas._libs.tslibs.timestamps.Timestamp. Hence the explicit cast to make it compatible with Pandas Timedelta, which is required for these operations.

The statements modified for this purpose would be redundant for Pandas, since the types are compatible with Pandas Timedelta.

old.append(o) # TODO: avoid copy if fully lost
dfs[0] = dfs[0].iloc[len(o):]
Expand Down Expand Up @@ -347,8 +348,6 @@ def windowed_groupby_accumulator(acc, new, diff=None, window=None, agg=None, gro
for o, og in zip(old, old_groupers):
if 'groupers' in acc:
assert len(o) == len(og)
if hasattr(og, 'index'):
assert (o.index == og.index).all()
if len(o):
state, result = agg.on_old(state, o, grouper=og)
size_state, _ = size.on_old(size_state, o, grouper=og)
Expand Down Expand Up @@ -407,11 +406,13 @@ class GroupbySum(GroupbyAggregation):
def on_new(self, acc, new, grouper=None):
g = self.grouped(new, grouper=grouper)
result = acc.add(g.sum(), fill_value=0)
result.index.name = acc.index.name
return result, result

def on_old(self, acc, old, grouper=None):
g = self.grouped(old, grouper=grouper)
result = acc.sub(g.sum(), fill_value=0)
result.index.name = acc.index.name
return result, result

def initial(self, new, grouper=None):
Expand All @@ -427,12 +428,14 @@ def on_new(self, acc, new, grouper=None):
g = self.grouped(new, grouper=grouper)
result = acc.add(g.count(), fill_value=0)
result = result.astype(int)
result.index.name = acc.index.name
return result, result

def on_old(self, acc, old, grouper=None):
g = self.grouped(old, grouper=grouper)
result = acc.sub(g.count(), fill_value=0)
result = result.astype(int)
result.index.name = acc.index.name
return result, result

def initial(self, new, grouper=None):
Expand All @@ -448,12 +451,14 @@ def on_new(self, acc, new, grouper=None):
g = self.grouped(new, grouper=grouper)
result = acc.add(g.size(), fill_value=0)
result = result.astype(int)
result.index.name = acc.index.name
return result, result

def on_old(self, acc, old, grouper=None):
g = self.grouped(old, grouper=grouper)
result = acc.sub(g.size(), fill_value=0)
result = result.astype(int)
result.index.name = acc.index.name
return result, result

def initial(self, new, grouper=None):
Expand All @@ -467,10 +472,12 @@ def initial(self, new, grouper=None):
class ValueCounts(Aggregation):
def on_new(self, acc, new, grouper=None):
result = acc.add(new.value_counts(), fill_value=0).astype(int)
result.index.name = acc.index.name
return result, result

def on_old(self, acc, new, grouper=None):
result = acc.sub(new.value_counts(), fill_value=0).astype(int)
result.index.name = acc.index.name
return result, result

def initial(self, new, grouper=None):
Expand All @@ -483,15 +490,17 @@ def on_new(self, acc, new, grouper=None):
g = self.grouped(new, grouper=grouper)
totals = totals.add(g.sum(), fill_value=0)
counts = counts.add(g.count(), fill_value=0)

totals.index.name = acc[0].index.name
counts.index.name = acc[1].index.name
return (totals, counts), totals / counts

def on_old(self, acc, old, grouper=None):
totals, counts = acc
g = self.grouped(old, grouper=grouper)
totals = totals.sub(g.sum(), fill_value=0)
counts = counts.sub(g.count(), fill_value=0)

totals.index.name = acc[0].index.name
counts.index.name = acc[1].index.name
return (totals, counts), totals / counts

def initial(self, new, grouper=None):
Expand Down
Loading