Skip to content

Commit

Permalink
Add support for messagepack (influxdata#734)
Browse files Browse the repository at this point in the history
* Add support for messagepack

* Remove unnecessary blank line

Fixes https://github.com/influxdata/influxdb-python/pull/734/files/57daf8ccd5027c796a2fd3934b8e88d3982d300e#r302769403

* Small code reorganization

* Small code reorganization

Fixes influxdata#734 (comment)
  • Loading branch information
lovasoa authored and ocworld committed Feb 7, 2020
1 parent 3d61f1f commit 946314d
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 10 deletions.
50 changes: 40 additions & 10 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
from __future__ import print_function
from __future__ import unicode_literals

import time
import random

import datetime
import json
import random
import socket
import struct
import time

import msgpack
import requests
import requests.exceptions
from six.moves import xrange
Expand Down Expand Up @@ -144,7 +147,7 @@ def __init__(self,

self._headers = {
'Content-Type': 'application/json',
'Accept': 'text/plain'
'Accept': 'application/x-msgpack'
}

@property
Expand Down Expand Up @@ -293,13 +296,30 @@ def request(self, url, method='GET', params=None, data=None,
time.sleep((2 ** _try) * random.random() / 100.0)
if not retry:
raise

type_header = response.headers and response.headers.get("Content-Type")
if type_header == "application/x-msgpack" and response.content:
response._msgpack = msgpack.unpackb(
packed=response.content,
ext_hook=_msgpack_parse_hook,
raw=False)
else:
response._msgpack = None

def reformat_error(response):
if response._msgpack:
return json.dumps(response._msgpack, separators=(',', ':'))
else:
return response.content

# if there's not an error, there must have been a successful response
if 500 <= response.status_code < 600:
raise InfluxDBServerError(response.content)
raise InfluxDBServerError(reformat_error(response))
elif response.status_code == expected_response_code:
return response
else:
raise InfluxDBClientError(response.content, response.status_code)
err_msg = reformat_error(response)
raise InfluxDBClientError(err_msg, response.status_code)

def write(self, data, params=None, expected_response_code=204,
protocol='json'):
Expand Down Expand Up @@ -450,10 +470,11 @@ def query(self,
expected_response_code=expected_response_code
)

if chunked:
return self._read_chunked_response(response)

data = response.json()
data = response._msgpack
if not data:
if chunked:
return self._read_chunked_response(response)
data = response.json()

results = [
ResultSet(result, raise_errors=raise_errors)
Expand Down Expand Up @@ -1119,3 +1140,12 @@ def _parse_netloc(netloc):
'password': info.password or None,
'host': info.hostname or 'localhost',
'port': info.port or 8086}


def _msgpack_parse_hook(code, data):
if code == 5:
(epoch_s, epoch_ns) = struct.unpack(">QI", data)
timestamp = datetime.datetime.utcfromtimestamp(epoch_s)
timestamp += datetime.timedelta(microseconds=(epoch_ns / 1000))
return timestamp.isoformat() + 'Z'
return msgpack.ExtType(code, data)
23 changes: 23 additions & 0 deletions influxdb/tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,29 @@ def test_query(self):
[{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}]
)

def test_query_msgpack(self):
"""Test query method with a messagepack response."""
example_response = bytes(bytearray.fromhex(
"81a7726573756c74739182ac73746174656d656e745f696400a673657269"
"65739183a46e616d65a161a7636f6c756d6e7392a474696d65a176a67661"
"6c7565739192c70c05000000005d26178a019096c8cb3ff0000000000000"
))

with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
request_headers={"Accept": "application/x-msgpack"},
headers={"Content-Type": "application/x-msgpack"},
content=example_response
)
rs = self.cli.query('select * from a')

self.assertListEqual(
list(rs.get_points()),
[{'v': 1.0, 'time': '2019-07-10T16:51:22.026253Z'}]
)

def test_select_into_post(self):
"""Test SELECT.*INTO is POSTed."""
example_response = (
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ python-dateutil>=2.6.0
pytz
requests>=2.17.0
six>=1.10.0
msgpack==0.6.1

0 comments on commit 946314d

Please sign in to comment.