Skip to content
This repository has been archived by the owner on Apr 22, 2024. It is now read-only.

Fixes/msgpack default #783

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 61 additions & 32 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from influxdb.line_protocol import make_lines, quote_ident, quote_literal
from influxdb.resultset import ResultSet
from .exceptions import InfluxDBClientError
from .exceptions import InfluxDBServerError


class InfluxDBClient(object):
Expand Down Expand Up @@ -69,6 +68,10 @@ class InfluxDBClient(object):
as a single file containing the private key and the certificate, or as
a tuple of both files’ paths, defaults to None
:type cert: str
:param use_msgpack: A bool indicating to use msgpack to retrieve query
results from InfluxDB. If False, the fallback will be JSON. This flag
sets the Accept header of the request. Defaults to True
:type use_msgpack: bool

:raises ValueError: if cert is provided but ssl is disabled (set to False)
"""
Expand All @@ -89,6 +92,7 @@ def __init__(self,
pool_size=10,
path='',
cert=None,
use_msgpack=True
):
"""Construct a new InfluxDBClient object."""
self.__host = host
Expand All @@ -110,7 +114,9 @@ def __init__(self,
)

if use_udp:
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
else:
self._udp_socket = None

if not path:
self.__path = ''
Expand Down Expand Up @@ -145,10 +151,16 @@ def __init__(self,
self._port,
self._path)

self._headers = {
'Content-Type': 'application/json',
'Accept': 'application/x-msgpack'
}
if use_msgpack:
self._headers = {
'Content-Type': 'application/json',
'Accept': 'application/x-msgpack'
}
else:
self._headers = {
'Content-Type': 'application/json',
'Accept': 'text/plain'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why text/plain and not application/json ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would have to ask the original implementor - this is the way it was implemented before moving the default to msgpack.

}

@property
def _baseurl(self):
Expand Down Expand Up @@ -243,14 +255,14 @@ def request(self, url, method='GET', params=None, data=None,
:param method: the HTTP method for the request, defaults to GET
:type method: str
:param params: additional parameters for the request, defaults to None
:type params: dict
:type params: dict, optional
:param data: the data of the request, defaults to None
:type data: str
:type data: str, optional
:param expected_response_code: the expected response code of
the request, defaults to 200
:type expected_response_code: int
:param headers: headers to add to the request
:type headers: dict
:type headers: dict, optional
:returns: the response from the request
:rtype: :class:`requests.Response`
:raises InfluxDBServerError: if the response code is any server error
Expand Down Expand Up @@ -285,6 +297,7 @@ def request(self, url, method='GET', params=None, data=None,
verify=self._verify_ssl,
timeout=self._timeout
)
response._msgpack = None
break
except (requests.exceptions.ConnectionError,
requests.exceptions.HTTPError,
Expand All @@ -297,30 +310,39 @@ def request(self, url, method='GET', params=None, data=None,
if not retry:
raise

type_header = response.headers and response.headers.get("Content-Type")
if type_header == "application/x-msgpack" and response.content:
response._msgpack = msgpack.unpackb(
packed=response.content,
ext_hook=_msgpack_parse_hook,
raw=False)
else:
response._msgpack = None
if self._is_msg_pack_response(response):
if response.content:
response._msgpack = msgpack.unpackb(
packed=response.content,
ext_hook=_msgpack_parse_hook,
raw=False)

def reformat_error(response):
if response._msgpack:
return json.dumps(response._msgpack, separators=(',', ':'))
else:
return response.content

# if there's not an error, there must have been a successful response
if 500 <= response.status_code < 600:
raise InfluxDBServerError(reformat_error(response))
elif response.status_code == expected_response_code:
if response.status_code == expected_response_code:
return response
else:
err_msg = reformat_error(response)
err_msg = self._reformat_msgpack_error(response)
raise InfluxDBClientError(err_msg, response.status_code)

@staticmethod
def _is_msg_pack_response(response):
if response is None:
return False

if response.headers is None:
return False

if "Content-Type" not in response.headers:
return False

content_type = response.headers["Content-Type"]
return content_type == "application/x-msgpack"

def _reformat_msgpack_error(self, _response):
if _response._msgpack is not None:
return json.dumps(_response._msgpack, separators=(',', ':'))
else:
return _response.content

def write(self, data, params=None, expected_response_code=204,
protocol='json'):
"""Write data to InfluxDB.
Expand Down Expand Up @@ -697,7 +719,7 @@ def create_retention_policy(self, name, duration, replication,
The minimum retention period is 1 hour.
:type duration: str
:param replication: the replication of the retention policy
:type replication: str
:type replication: int
:param database: the database for which the retention policy is
created. Defaults to current client's database
:type database: str
Expand All @@ -717,7 +739,7 @@ def create_retention_policy(self, name, duration, replication,
"CREATE RETENTION POLICY {0} ON {1} " \
"DURATION {2} REPLICATION {3} SHARD DURATION {4}".format(
quote_ident(name), quote_ident(database or self._database),
duration, replication, shard_duration)
duration, int(replication), shard_duration)

if default is True:
query_string += " DEFAULT"
Expand Down Expand Up @@ -1071,7 +1093,7 @@ def drop_continuous_query(self, name, database=None):
self.query(query_string)

def send_packet(self, packet, protocol='json', time_precision=None):
"""Send an UDP packet.
"""Send an UDP packet. Only valid when use_udp is True.

:param packet: the packet to be sent
:type packet: (if protocol is 'json') dict
Expand All @@ -1081,11 +1103,18 @@ def send_packet(self, packet, protocol='json', time_precision=None):
:param time_precision: Either 's', 'm', 'ms' or 'u', defaults to None
:type time_precision: str
"""
if not self._use_udp:
raise RuntimeError("Unable to send packet : use_udp set to False")

if protocol == 'json':
data = make_lines(packet, time_precision).encode('utf-8')
elif protocol == 'line':
data = ('\n'.join(packet) + '\n').encode('utf-8')
self.udp_socket.sendto(data, (self._host, self._udp_port))
else:
raise InfluxDBClientError("Invalid protocol name : "
"expected json or line")

self._udp_socket.sendto(data, (self._host, self._udp_port))

def close(self):
"""Close http session."""
Expand Down
4 changes: 2 additions & 2 deletions influxdb/resultset.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ def __iter__(self):
yield list(self.__getitem__(key))

@staticmethod
def _tag_matches(tags, filter):
def _tag_matches(tags, _filter):
"""Check if all key/values in filter match in tags."""
for tag_name, tag_value in filter.items():
for tag_name, tag_value in _filter.items():
# using _sentinel as I'm not sure that "None"
# could be used, because it could be a valid
# series_tags value : when a series has no such tag
Expand Down
Loading