Skip to content
This repository has been archived by the owner on Feb 15, 2018. It is now read-only.

Commit

Permalink
Merge pull request #23 from helium/feature/live-ports
Browse files Browse the repository at this point in the history
Add port filtering support for live timeseries
  • Loading branch information
madninja committed Dec 15, 2016
2 parents e59987c + 262b6d8 commit d1eeb54
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 7 deletions.
4 changes: 3 additions & 1 deletion helium/adapter/aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,14 @@ def delete(self, url, callback, json=None): # noqa: D102
json=json)

def live(self, session, url,
resource_class, resource_args): # noqa: D102
resource_class, resource_args,
params=None): # noqa: D102
headers = {
'Accept': 'text/event-stream',
}
response = super(Adapter, self).get(url,
read_until_eof=False,
params=params,
headers=headers)
return LiveIterator(response, session, resource_class, resource_args)

Expand Down
7 changes: 5 additions & 2 deletions helium/adapter/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,14 @@ def datapoints(self, timeseries): # noqa: D102
def take(self, iter, n): # noqa: D102
return list(islice(iter, n))

def live(self, session, url, resource_class, resource_args): # noqa: D102
def live(self, session, url, resource_class, resource_args, params=None): # noqa: D102
headers = {
'Accept': 'text/event-stream',
}
response = super(Adapter, self).get(url, stream=True, headers=headers)
response = super(Adapter, self).get(url,
stream=True,
headers=headers,
params=params)
# Validate the response code
CB.boolean(200)(Response(response.status_code, response.headers, None,
response.request.method, url))
Expand Down
9 changes: 7 additions & 2 deletions helium/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def delete(self, url, callback, json=None):
def datapoints(self, timeseries):
return self.adapter.datapoints(timeseries)

def live(self, url, resource_class, resource_args):
def live(self, url, resource_class, resource_args, params=None):
"""Get a live endpoint.
Args:
Expand All @@ -275,6 +275,10 @@ def live(self, url, resource_class, resource_args):
resource_args(dict): Additional arguments to pass to the
`resource_class` constructor
Keyword Args:
params(dict): Request parameters for the live url
Returns:
An iterator over the live endpoint. Depending on the
Expand All @@ -283,7 +287,8 @@ def live(self, url, resource_class, resource_args):
iterating over the response of this method.
"""
return self.adapter.live(self, url, resource_class, resource_args)
return self.adapter.live(self, url, resource_class, resource_args,
params=params)

def _build_url(self, *args, **kwargs):
parts = [kwargs.get('base_url', self.base_url)]
Expand Down
8 changes: 6 additions & 2 deletions helium/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from . import to_iso_date
from . import build_request_body
from collections import Iterable, namedtuple, OrderedDict
import sys
from future.utils import iteritems


AggregateValue = namedtuple('agg', ['min', 'max', 'avg'])
AggregateValue.__new__.__defaults__ = (None,) * len(AggregateValue._fields)
Expand Down Expand Up @@ -293,9 +294,12 @@ def live(self):
"""
session = self._session
url = "{}/live".format(self._base_url)
supported_params = frozenset(['filter[port]'])
params = {k: v for k, v in iteritems(self._params)
if k in supported_params}
return session.live(url, self._datapoint_class, {
'is_aggregate': self._is_aggregate
})
}, params=params)


def timeseries():
Expand Down

0 comments on commit d1eeb54

Please sign in to comment.