Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Add support for messagepack #734

Merged
merged 4 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@
from __future__ import print_function
from __future__ import unicode_literals

import time
import random

import datetime
import json
import random
import socket
import struct
import time

import msgpack
import requests
import requests.exceptions
from six.moves import xrange
Expand Down Expand Up @@ -128,7 +131,7 @@ def __init__(self,

self._headers = {
'Content-Type': 'application/json',
'Accept': 'text/plain'
'Accept': 'application/x-msgpack'
}

@property
Expand Down Expand Up @@ -277,13 +280,30 @@ def request(self, url, method='GET', params=None, data=None,
time.sleep((2 ** _try) * random.random() / 100.0)
if not retry:
raise

type_header = response.headers and response.headers.get("Content-Type")
if type_header == "application/x-msgpack" and response.content:
response._msgpack = msgpack.unpackb(
packed=response.content,
ext_hook=_msgpack_parse_hook,
raw=False)
else:
response._msgpack = None

def reformat_error(response):
if response._msgpack:
return json.dumps(response._msgpack, separators=(',', ':'))
else:
return response.content

# if there's not an error, there must have been a successful response
if 500 <= response.status_code < 600:
raise InfluxDBServerError(response.content)
raise InfluxDBServerError(reformat_error(response))
elif response.status_code == expected_response_code:
return response
else:
raise InfluxDBClientError(response.content, response.status_code)
err_msg = reformat_error(response)
raise InfluxDBClientError(err_msg, response.status_code)

def write(self, data, params=None, expected_response_code=204,
protocol='json'):
Expand Down Expand Up @@ -434,10 +454,11 @@ def query(self,
expected_response_code=expected_response_code
)

if chunked:
return self._read_chunked_response(response)

data = response.json()
data = response._msgpack
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is msgpack meant to ignore chunked answers?

if not data:
if chunked:
return self._read_chunked_response(response)
data = response.json()

results = [
ResultSet(result, raise_errors=raise_errors)
Expand Down Expand Up @@ -1103,3 +1124,12 @@ def _parse_netloc(netloc):
'password': info.password or None,
'host': info.hostname or 'localhost',
'port': info.port or 8086}


def _msgpack_parse_hook(code, data):
if code == 5:
(epoch_s, epoch_ns) = struct.unpack(">QI", data)
timestamp = datetime.datetime.utcfromtimestamp(epoch_s)
timestamp += datetime.timedelta(microseconds=(epoch_ns / 1000))
return timestamp.isoformat() + 'Z'
return msgpack.ExtType(code, data)
23 changes: 23 additions & 0 deletions influxdb/tests/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,29 @@ def test_query(self):
[{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}]
)

def test_query_msgpack(self):
"""Test query method with a messagepack response."""
example_response = bytes(bytearray.fromhex(
"81a7726573756c74739182ac73746174656d656e745f696400a673657269"
"65739183a46e616d65a161a7636f6c756d6e7392a474696d65a176a67661"
"6c7565739192c70c05000000005d26178a019096c8cb3ff0000000000000"
))

with requests_mock.Mocker() as m:
m.register_uri(
requests_mock.GET,
"http://localhost:8086/query",
request_headers={"Accept": "application/x-msgpack"},
headers={"Content-Type": "application/x-msgpack"},
content=example_response
)
rs = self.cli.query('select * from a')

self.assertListEqual(
list(rs.get_points()),
[{'v': 1.0, 'time': '2019-07-10T16:51:22.026253Z'}]
)

def test_select_into_post(self):
"""Test SELECT.*INTO is POSTed."""
example_response = (
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ python-dateutil>=2.6.0
pytz
requests>=2.17.0
six>=1.10.0
msgpack==0.6.1