Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions streamz/collection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import operator
import types

from streamz import Stream, core

Expand Down Expand Up @@ -41,10 +42,9 @@ def map_partitions(func, *args, **kwargs):
if not isinstance(arg, Streaming)]
stream = s.stream.map(partial_by_order, function=func, other=other,
**kwargs)

for typ, s_type in _stream_types[stream_type]:
if isinstance(example, typ):
return s_type(stream, example)
s_type = get_stream_type(example, stream_type)
if s_type:
return s_type(stream, example)
return Streaming(stream, example, stream_type=stream_type)


Expand Down Expand Up @@ -204,11 +204,11 @@ def accumulate_partitions(self, func, *args, **kwargs):
if returns_state:
_, example = example
stream = self.stream.accumulate(func, *args, start=start,
returns_state=returns_state, **kwargs)
returns_state=returns_state, **kwargs)

for typ, s_type in _stream_types[stream_type]:
if isinstance(example, typ):
return s_type(stream, example)
s_type = get_stream_type(example, stream_type)
if s_type:
return s_type(stream, example)
return Streaming(stream, example, stream_type=stream_type)

def __repr__(self):
Expand Down Expand Up @@ -242,12 +242,16 @@ def verify(self, x):
(self._subtype, type(x)))


def stream_type(example, stream_type='streaming'):
def get_stream_type(example, stream_type='streaming'):
for typ, s_type in _stream_types[stream_type]:
if isinstance(example, typ):
if isinstance(typ, types.FunctionType):
"""For Frame like objects we use utility functions to check type.
i.e, DataFrame like objects are checked using is_dataframe_like."""
if typ(example):
return s_type
elif isinstance(example, typ):
return s_type
raise TypeError("No streaming equivalent found for type %s" %
type(example).__name__)
return None


def partial_by_order(*args, **kwargs):
Expand Down
13 changes: 7 additions & 6 deletions streamz/dataframe/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import numpy as np
import pandas as pd
from .utils import is_series_like, is_index_like


class Aggregation(object):
Expand Down Expand Up @@ -324,7 +325,7 @@ def windowed_groupby_accumulator(acc, new, diff=None, window=None, agg=None, gro
acc = {'dfs': [],
'state': agg.initial(new, grouper=grouper),
'size-state': size.initial(new, grouper=grouper)}
if isinstance(grouper, (pd.Series, pd.Index, np.ndarray)):
if isinstance(grouper, np.ndarray) or is_series_like(grouper) or is_index_like(grouper):
acc['groupers'] = deque([])

dfs = acc['dfs']
Expand Down Expand Up @@ -416,7 +417,7 @@ def on_old(self, acc, old, grouper=None):
def initial(self, new, grouper=None):
if hasattr(grouper, 'iloc'):
grouper = grouper.iloc[:0]
if isinstance(grouper, (pd.Index, np.ndarray)):
if isinstance(grouper, np.ndarray) or is_index_like(grouper):
grouper = grouper[:0]
return self.grouped(new.iloc[:0], grouper=grouper).sum()

Expand All @@ -437,7 +438,7 @@ def on_old(self, acc, old, grouper=None):
def initial(self, new, grouper=None):
if hasattr(grouper, 'iloc'):
grouper = grouper.iloc[:0]
if isinstance(grouper, (pd.Index, np.ndarray)):
if isinstance(grouper, np.ndarray) or is_index_like(grouper):
grouper = grouper[:0]
return self.grouped(new.iloc[:0], grouper=grouper).count()

Expand All @@ -458,7 +459,7 @@ def on_old(self, acc, old, grouper=None):
def initial(self, new, grouper=None):
if hasattr(grouper, 'iloc'):
grouper = grouper.iloc[:0]
if isinstance(grouper, (pd.Index, np.ndarray)):
if isinstance(grouper, np.ndarray) or is_index_like(grouper):
grouper = grouper[:0]
return self.grouped(new.iloc[:0], grouper=grouper).size()

Expand Down Expand Up @@ -496,7 +497,7 @@ def on_old(self, acc, old, grouper=None):
def initial(self, new, grouper=None):
if hasattr(grouper, 'iloc'):
grouper = grouper.iloc[:0]
if isinstance(grouper, (pd.Index, np.ndarray)):
if isinstance(grouper, np.ndarray) or is_index_like(grouper):
grouper = grouper[:0]
g = self.grouped(new.iloc[:0], grouper=grouper)
return (g.sum(), g.count())
Expand Down Expand Up @@ -532,7 +533,7 @@ def on_old(self, acc, old, grouper=None):
def initial(self, new, grouper=None):
if hasattr(grouper, 'iloc'):
grouper = grouper.iloc[:0]
if isinstance(grouper, (pd.Index, np.ndarray)):
if isinstance(grouper, np.ndarray) or is_index_like(grouper):
grouper = grouper[:0]

new = new.iloc[:0]
Expand Down
94 changes: 61 additions & 33 deletions streamz/dataframe/core.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from __future__ import division, print_function

from collections import OrderedDict
import operator
from collections import OrderedDict
from time import time

import numpy as np
import pandas as pd
import toolz
Expand All @@ -14,6 +13,7 @@
from ..sources import Source
from ..utils import M
from . import aggregations
from .utils import is_dataframe_like, is_series_like, is_index_like, get_base_frame_type


class BaseFrame(Streaming):
Expand Down Expand Up @@ -245,14 +245,16 @@ def __setitem__(self, key, value):
else:
example = self.example.copy()
example[key] = value
result = self.map_partitions(pd.DataFrame.assign, self, **{key: value})
df_type = type(self.example)
result = self.map_partitions(df_type.assign, self, **{key: value})

self.stream = result.stream
self.example = result.example
return self

def query(self, expr, **kwargs):
return self.map_partitions(pd.DataFrame.query, self, expr, **kwargs)
df_type = type(self.example)
return self.map_partitions(df_type.query, self, expr, **kwargs)


class DataFrame(Frame, _DataFrameMixin):
Expand All @@ -266,10 +268,9 @@ class DataFrame(Frame, _DataFrameMixin):
--------
Series
"""
_subtype = pd.DataFrame

def __init__(self, *args, **kwargs):
# {'x': sdf.x + 1, 'y': sdf.y - 1}
# {'x': sdf.x + 1, 'y': sdf.y - 1} - works only with pandas
if len(args) == 1 and not kwargs and isinstance(args[0], dict):
def concat(tup, columns=None):
result = pd.concat(tup, axis=1)
Expand All @@ -283,7 +284,14 @@ def concat(tup, columns=None):
for k, v in args[0].items()})
DataFrame.__init__(self, stream, example)
else:
return super(DataFrame, self).__init__(*args, **kwargs)
example = None
if "example" in kwargs:
example = kwargs.get('example')
elif len(args) > 1:
example = args[1]
self._subtype = get_base_frame_type(self.__class__.__name__,
is_dataframe_like, example)
super(DataFrame, self).__init__(*args, **kwargs)

def verify(self, x):
""" Verify consistency of elements that pass through this stream """
Expand Down Expand Up @@ -314,7 +322,20 @@ class Series(Frame, _SeriesMixin):
--------
DataFrame
"""
_subtype = pd.Series

def __init__(self, *args, **kwargs):
example = None
if "example" in kwargs:
example = kwargs.get('example')
elif len(args) > 1:
example = args[1]
if isinstance(self, Index):
self._subtype = get_base_frame_type(self.__class__.__name__,
is_index_like, example)
else:
self._subtype = get_base_frame_type(self.__class__.__name__,
is_series_like, example)
super(Series, self).__init__(*args, **kwargs)

def value_counts(self):
return self.accumulate_partitions(aggregations.accumulator,
Expand All @@ -324,7 +345,7 @@ def value_counts(self):


class Index(Series):
_subtype = pd.Index
pass


class DataFrames(Frames, _DataFrameMixin):
Expand Down Expand Up @@ -362,6 +383,7 @@ class Rolling(object):
>>> sdf.rolling(10).x.mean() # doctest: +SKIP
>>> sdf.rolling('100ms').x.mean() # doctest: +SKIP
"""

def __init__(self, sdf, window, min_periods):
self.root = sdf
if not isinstance(window, int):
Expand All @@ -382,12 +404,12 @@ def __getattr__(self, key):

def _known_aggregation(self, op, *args, **kwargs):
return self.root.accumulate_partitions(rolling_accumulator,
window=self.window,
op=op,
args=args,
kwargs=kwargs,
start=(),
returns_state=True)
window=self.window,
op=op,
args=args,
kwargs=kwargs,
start=(),
returns_state=True)

def sum(self):
""" Rolling sum """
Expand Down Expand Up @@ -440,6 +462,7 @@ class Window(OperatorMixin):
--------
DataFrame.window: contains full docstring
"""

def __init__(self, sdf, n=None, value=None):
if value is None and isinstance(n, (str, pd.Timedelta)):
value = n
Expand Down Expand Up @@ -492,12 +515,12 @@ def aggregate(self, agg):
diff = aggregations.diff_loc
window = self.value
return self.root.accumulate_partitions(aggregations.window_accumulator,
diff=diff,
window=window,
agg=agg,
start=None,
returns_state=True,
stream_type='updating')
diff=diff,
window=window,
agg=agg,
start=None,
returns_state=True,
stream_type='updating')

def full(self):
return self.aggregate(aggregations.Full())
Expand Down Expand Up @@ -573,6 +596,7 @@ def _accumulate_size(accumulator, new):

class GroupBy(object):
""" Groupby aggregations on streaming dataframes """

def __init__(self, root, grouper, index=None):
self.root = root
self.grouper = grouper
Expand Down Expand Up @@ -603,7 +627,7 @@ def _accumulate(self, Agg, **kwargs):
state = agg.initial(self.root.example, grouper=grouper_example)
if hasattr(grouper_example, 'iloc'):
grouper_example = grouper_example.iloc[:0]
elif isinstance(grouper_example, (np.ndarray, pd.Index)):
elif isinstance(grouper_example, np.ndarray) or is_index_like(grouper_example):
grouper_example = grouper_example[:0]
_, example = agg.on_new(state,
self.root.example.iloc[:0],
Expand All @@ -614,8 +638,9 @@ def _accumulate(self, Agg, **kwargs):
start=None,
returns_state=True)

for typ, s_type in _stream_types[stream_type]:
if isinstance(example, typ):
for fn, s_type in _stream_types[stream_type]:
"""Function checks if example is of a specific frame type"""
if fn(example):
return s_type(outstream, example)
return Streaming(outstream, example, stream_type=stream_type)

Expand Down Expand Up @@ -646,6 +671,7 @@ def var(self, ddof=1):

class WindowedGroupBy(GroupBy):
""" Groupby aggregations over a window of data """

def __init__(self, root, grouper, index=None, n=None, value=None):
self.root = root
self.grouper = grouper
Expand Down Expand Up @@ -678,7 +704,7 @@ def _accumulate(self, Agg, **kwargs):
state = agg.initial(self.root.example, grouper=grouper_example)
if hasattr(grouper_example, 'iloc'):
grouper_example = grouper_example.iloc[:0]
elif isinstance(grouper_example, (np.ndarray, pd.Index)):
elif isinstance(grouper_example, np.ndarray) or is_index_like(grouper_example):
grouper_example = grouper_example[:0]
_, example = agg.on_new(state,
self.root.example.iloc[:0],
Expand All @@ -698,8 +724,9 @@ def _accumulate(self, Agg, **kwargs):
diff=diff,
window=window)

for typ, s_type in _stream_types[stream_type]:
if isinstance(example, typ):
for fn, s_type in _stream_types[stream_type]:
"""Function checks if example is of a specific frame type"""
if fn(example):
return s_type(outstream, example)
return Streaming(outstream, example, stream_type=stream_type)

Expand All @@ -712,7 +739,7 @@ def _random_df(tup):
df = pd.DataFrame({'x': np.random.random(len(index)),
'y': np.random.poisson(size=len(index)),
'z': np.random.normal(0, 1, size=len(index))},
index=index)
index=index)
return df


Expand All @@ -737,6 +764,7 @@ class Random(DataFrame):
-------
>>> source = Random(freq='100ms', interval='1s') # doctest: +SKIP
"""

def __init__(self, freq='100ms', interval='500ms', dask=False):
if dask:
from streamz.dask import DaskStream
Expand Down Expand Up @@ -775,8 +803,8 @@ def _cb(interval, freq, source, continue_):
last = now


_stream_types['streaming'].append((pd.DataFrame, DataFrame))
_stream_types['streaming'].append((pd.Index, Index))
_stream_types['streaming'].append((pd.Series, Series))
_stream_types['updating'].append((pd.DataFrame, DataFrames))
_stream_types['updating'].append((pd.Series, Seriess))
_stream_types['streaming'].append((is_dataframe_like, DataFrame))
_stream_types['streaming'].append((is_index_like, Index))
_stream_types['streaming'].append((is_series_like, Series))
_stream_types['updating'].append((is_dataframe_like, DataFrames))
_stream_types['updating'].append((is_series_like, Seriess))
Loading