From a808148dd2f6174b2ae70163d6c3bcde0ec2f771 Mon Sep 17 00:00:00 2001 From: Marc Nijdam Date: Mon, 26 Sep 2016 16:51:41 -0700 Subject: [PATCH] Fix aggregation in a way that allows for other values --- helium/live_session.py | 6 ++++-- helium/timeseries.py | 17 ++++++++++++++--- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/helium/live_session.py b/helium/live_session.py index d119d9e..c4b0926 100644 --- a/helium/live_session.py +++ b/helium/live_session.py @@ -15,10 +15,11 @@ class LiveSession(Iterable): """ _FIELD_SEPARATOR = ':' - def __init__(self, response, session, resource_class): + def __init__(self, response, session, resource_class, **resource_args): self._response = response self._session = session self._resource_class = resource_class + self._resource_args = resource_args def _read(self, response): data = "" @@ -30,6 +31,7 @@ def _read(self, response): def __iter__(self): resource_class = self._resource_class + resource_args = self._resource_args session = self._session response = self._response @@ -52,7 +54,7 @@ def __iter__(self): continue event_data = load_json(event_data).get('data') - yield resource_class(event_data, session) + yield resource_class(event_data, session, **resource_args) def close(self): """Close the live session.""" diff --git a/helium/timeseries.py b/helium/timeseries.py index 7200a56..a76f970 100644 --- a/helium/timeseries.py +++ b/helium/timeseries.py @@ -28,12 +28,16 @@ class DataPoint(Resource): reading was taken """ + def __init__(self, json, session, is_aggregate=False): + self._is_aggregate = is_aggregate + super(DataPoint, self).__init__(json, session) + @classmethod def _resource_type(cls): return "timeseries" def _promote_json_attribute(self, attribute, value): - if attribute == 'value' and isinstance(value, dict): + if attribute == 'value' and self._is_aggregate: value = AggregateValue(**value) super(DataPoint, self)._promote_json_attribute(attribute, value) @@ -156,6 +160,7 @@ def __init__(self, session, resource_class, resource_id, 'timeseries') self._resource_id = resource_id self._direction = direction + self._is_aggregate = False params = {} if datapoint_id is not None: @@ -169,8 +174,10 @@ def __init__(self, session, resource_class, resource_id, if end is not None: params['filter[end]'] = end if agg_type is not None: + self._is_aggregate = True params['agg[type]'] = agg_type if agg_size is not None: + self._is_aggregate = True params['agg[size]'] = agg_size self._params = params @@ -180,6 +187,7 @@ def __iter__(self): datapoint_class = self._datapoint_class direction = self._direction params = self._params + is_aggregate = self._is_aggregate def _get_json(url): json = response_json(session.get(url, params=params), 200, @@ -194,7 +202,8 @@ def _get_json(url): finished = False while not finished: for entry in data: - datapoint = datapoint_class(entry, session) + datapoint = datapoint_class(entry, session, + is_aggregate=is_aggregate) yield datapoint if url is None: @@ -270,13 +279,15 @@ def live(self): """ session = self._session datapoint_class = self._datapoint_class + is_aggregate = self._is_aggregate url = "{}/live".format(self._base_url) headers = { 'Accept': 'text/event-stream' } response = session.get(url, stream=True, headers=headers) response_boolean(response, 200) - return LiveSession(response, session, datapoint_class) + return LiveSession(response, session, datapoint_class, + is_aggregate=is_aggregate) def timeseries():