Skip to content
This repository has been archived by the owner on Feb 15, 2018. It is now read-only.

Commit

Permalink
Fix aggregation in a way that allows for other values
Browse files Browse the repository at this point in the history
  • Loading branch information
madninja committed Sep 26, 2016
1 parent 2879376 commit a808148
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
6 changes: 4 additions & 2 deletions helium/live_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ class LiveSession(Iterable):
"""
_FIELD_SEPARATOR = ':'

def __init__(self, response, session, resource_class):
def __init__(self, response, session, resource_class, **resource_args):
self._response = response
self._session = session
self._resource_class = resource_class
self._resource_args = resource_args

def _read(self, response):
data = ""
Expand All @@ -30,6 +31,7 @@ def _read(self, response):

def __iter__(self):
resource_class = self._resource_class
resource_args = self._resource_args
session = self._session
response = self._response

Expand All @@ -52,7 +54,7 @@ def __iter__(self):
continue

event_data = load_json(event_data).get('data')
yield resource_class(event_data, session)
yield resource_class(event_data, session, **resource_args)

def close(self):
"""Close the live session."""
Expand Down
17 changes: 14 additions & 3 deletions helium/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@ class DataPoint(Resource):
reading was taken
"""
def __init__(self, json, session, is_aggregate=False):
self._is_aggregate = is_aggregate
super(DataPoint, self).__init__(json, session)

@classmethod
def _resource_type(cls):
return "timeseries"

def _promote_json_attribute(self, attribute, value):
if attribute == 'value' and isinstance(value, dict):
if attribute == 'value' and self._is_aggregate:
value = AggregateValue(**value)
super(DataPoint, self)._promote_json_attribute(attribute, value)

Expand Down Expand Up @@ -156,6 +160,7 @@ def __init__(self, session, resource_class, resource_id,
'timeseries')
self._resource_id = resource_id
self._direction = direction
self._is_aggregate = False

params = {}
if datapoint_id is not None:
Expand All @@ -169,8 +174,10 @@ def __init__(self, session, resource_class, resource_id,
if end is not None:
params['filter[end]'] = end
if agg_type is not None:
self._is_aggregate = True
params['agg[type]'] = agg_type
if agg_size is not None:
self._is_aggregate = True
params['agg[size]'] = agg_size
self._params = params

Expand All @@ -180,6 +187,7 @@ def __iter__(self):
datapoint_class = self._datapoint_class
direction = self._direction
params = self._params
is_aggregate = self._is_aggregate

def _get_json(url):
json = response_json(session.get(url, params=params), 200,
Expand All @@ -194,7 +202,8 @@ def _get_json(url):
finished = False
while not finished:
for entry in data:
datapoint = datapoint_class(entry, session)
datapoint = datapoint_class(entry, session,
is_aggregate=is_aggregate)
yield datapoint

if url is None:
Expand Down Expand Up @@ -270,13 +279,15 @@ def live(self):
"""
session = self._session
datapoint_class = self._datapoint_class
is_aggregate = self._is_aggregate
url = "{}/live".format(self._base_url)
headers = {
'Accept': 'text/event-stream'
}
response = session.get(url, stream=True, headers=headers)
response_boolean(response, 200)
return LiveSession(response, session, datapoint_class)
return LiveSession(response, session, datapoint_class,
is_aggregate=is_aggregate)


def timeseries():
Expand Down

0 comments on commit a808148

Please sign in to comment.