From 2034bd1a9b2d85cb0699834c7812bee476cb7571 Mon Sep 17 00:00:00 2001 From: Ophir LOJKINE Date: Wed, 10 Jul 2019 18:34:23 +0200 Subject: [PATCH] Add support for messagepack --- influxdb/client.py | 43 +++++++++++++++++++++++++++++------ influxdb/tests/client_test.py | 23 +++++++++++++++++++ requirements.txt | 2 ++ 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 8ac557d3..00abada9 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -10,7 +10,10 @@ import random import json +import struct +import datetime import socket +import msgpack import requests import requests.exceptions from six.moves import xrange @@ -128,7 +131,7 @@ def __init__(self, self._headers = { 'Content-Type': 'application/json', - 'Accept': 'text/plain' + 'Accept': 'application/x-msgpack' } @property @@ -277,13 +280,22 @@ def request(self, url, method='GET', params=None, data=None, time.sleep((2 ** _try) * random.random() / 100.0) if not retry: raise + + def reformat_error(response): + err = self._parse_msgpack(response) + if err: + return json.dumps(err, separators=(',', ':')) + else: + return response.content + # if there's not an error, there must have been a successful response if 500 <= response.status_code < 600: - raise InfluxDBServerError(response.content) + raise InfluxDBServerError(reformat_error(response)) elif response.status_code == expected_response_code: return response else: - raise InfluxDBClientError(response.content, response.status_code) + err_msg = reformat_error(response) + raise InfluxDBClientError(err_msg, response.status_code) def write(self, data, params=None, expected_response_code=204, protocol='json'): @@ -342,6 +354,22 @@ def _read_chunked_response(response, raise_errors=True): _key, []).extend(result[_key]) return ResultSet(result_set, raise_errors=raise_errors) + @staticmethod + def _parse_msgpack(response): + """Return the decoded response if it is encoded as msgpack.""" + + def hook(code, data): + if code == 5: + (epoch_s, epoch_ns) = struct.unpack(">QI", data) + time = datetime.datetime.utcfromtimestamp(epoch_s) + time += datetime.timedelta(microseconds=(epoch_ns / 1000)) + return time.isoformat() + 'Z' + return msgpack.ExtType(code, data) + + headers = response.headers + if headers and headers["Content-Type"] == "application/x-msgpack": + return msgpack.unpackb(response.content, ext_hook=hook, raw=False) + def query(self, query, params=None, @@ -434,10 +462,11 @@ def query(self, expected_response_code=expected_response_code ) - if chunked: - return self._read_chunked_response(response) - - data = response.json() + data = self._parse_msgpack(response) + if not data: + if chunked: + return self._read_chunked_response(response) + data = response.json() results = [ ResultSet(result, raise_errors=raise_errors) diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index e4cc7e11..3b89a03d 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -465,6 +465,29 @@ def test_query(self): [{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}] ) + def test_query_msgpack(self): + """Test query method with a messagepack response.""" + example_response = bytes(bytearray.fromhex( + "81a7726573756c74739182ac73746174656d656e745f696400a673657269" + "65739183a46e616d65a161a7636f6c756d6e7392a474696d65a176a67661" + "6c7565739192c70c05000000005d26178a019096c8cb3ff0000000000000" + )) + + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.GET, + "http://localhost:8086/query", + request_headers={"Accept": "application/x-msgpack"}, + headers={"Content-Type": "application/x-msgpack"}, + content=example_response + ) + rs = self.cli.query('select * from a') + + self.assertListEqual( + list(rs.get_points()), + [{'v': 1.0, 'time': '2019-07-10T16:51:22.026253Z'}] + ) + def test_select_into_post(self): """Test SELECT.*INTO is POSTed.""" example_response = ( diff --git a/requirements.txt b/requirements.txt index db5f6f85..46f6cae7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,5 @@ python-dateutil>=2.6.0 pytz requests>=2.17.0 six>=1.10.0 +msgpack==0.6.1 +