Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 27 additions & 22 deletions influxdb/client.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import requests.exceptions
from sys import version_info

from influxdb.line_protocol import make_lines
from influxdb.resultset import ResultSet

try:
Expand Down Expand Up @@ -197,7 +198,7 @@ def switch_user(self, username, password):
self._password = password

def request(self, url, method='GET', params=None, data=None,
expected_response_code=200):
expected_response_code=200, headers=None):
"""Make a HTTP request to the InfluxDB API.

:param url: the path of the HTTP request, e.g. write, query, etc.
Expand All @@ -218,17 +219,13 @@ def request(self, url, method='GET', params=None, data=None,
"""
url = "{0}/{1}".format(self._baseurl, url)

if headers is None:
headers = self._headers

if params is None:
params = {}

auth = {
'u': self._username,
'p': self._password
}

params.update(auth)

if data is not None and not isinstance(data, str):
if isinstance(data, dict) or isinstance(data, list):
data = json.dumps(data)

# Try to send the request a maximum of three times. (see #103)
Expand All @@ -238,9 +235,10 @@ def request(self, url, method='GET', params=None, data=None,
response = self._session.request(
method=method,
url=url,
auth=(self._username, self._password),
params=params,
data=data,
headers=self._headers,
headers=headers,
verify=self._verify_ssl,
timeout=self._timeout
)
Expand Down Expand Up @@ -269,12 +267,17 @@ def write(self, data, params=None, expected_response_code=204):
:returns: True, if the write operation is successful
:rtype: bool
"""

headers = self._headers
headers['Content-type'] = 'application/octet-stream'

self.request(
url="write",
method='POST',
params=params,
data=data,
expected_response_code=expected_response_code
data=make_lines(data).encode('utf-8'),
expected_response_code=expected_response_code,
headers=headers
)
return True

Expand Down Expand Up @@ -387,22 +390,25 @@ def _write_points(self,
'points': points
}

if time_precision:
data['precision'] = time_precision

if retention_policy:
data['retentionPolicy'] = retention_policy

if tags:
data['tags'] = tags

data['database'] = database or self._database
params = {
'db': database or self._database
}

if time_precision:
params['precision'] = time_precision

if retention_policy:
params['rp'] = retention_policy

if self.use_udp:
self.send_packet(data)
else:
self.write(
data=data,
params=params,
expected_response_code=204
)

Expand Down Expand Up @@ -679,9 +685,8 @@ def send_packet(self, packet):
:param packet: the packet to be sent
:type packet: dict
"""
data = json.dumps(packet)
byte = data.encode('utf-8')
self.udp_socket.sendto(byte, (self._host, self.udp_port))
data = make_lines(packet).encode('utf-8')
self.udp_socket.sendto(data, (self._host, self.udp_port))


class InfluxDBClusterClient(object):
Expand Down
103 changes: 103 additions & 0 deletions influxdb/line_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from calendar import timegm
from copy import copy
from datetime import datetime

from dateutil.parser import parse
from pytz import utc
from six import binary_type, text_type


def _convert_timestamp(timestamp):
if isinstance(timestamp, int):
return timestamp
if isinstance(_force_text(timestamp), text_type):
timestamp = parse(timestamp)
if isinstance(timestamp, datetime):
if timestamp.tzinfo:
timestamp = timestamp.astimezone(utc)
timestamp.replace(tzinfo=None)
return (
timegm(timestamp.timetuple()) * 1e9 +
timestamp.microsecond * 1e3
)
raise ValueError(timestamp)


def _escape_tag(tag):
return tag.replace(
"\\", "\\\\"
).replace(
" ", "\\ "
).replace(
",", "\\,"
).replace(
"=", "\\="
)


def _escape_value(value):
value = _force_text(value)
if isinstance(value, text_type):
return "\"{}\"".format(value.replace(
"\"", "\\\""
))
else:
return str(value)


def _force_text(data):
"""
Try to return a text aka unicode object from the given data.
"""
if isinstance(data, binary_type):
return data.decode('utf-8', 'replace')
else:
return data


def make_lines(data):
"""
Extracts the points from the given dict and returns a Unicode string
matching the line protocol introduced in InfluxDB 0.9.0.
"""
lines = ""
static_tags = data.get('tags', None)
for point in data['points']:
# add measurement name
lines += _escape_tag(_force_text(
point.get('measurement', data.get('measurement'))
)) + ","

# add tags
if static_tags is None:
tags = point.get('tags', {})
else:
tags = copy(static_tags)
tags.update(point.get('tags', {}))
# tags should be sorted client-side to take load off server
for tag_key in sorted(tags.keys()):
lines += "{key}={value},".format(
key=_escape_tag(tag_key),
value=_escape_tag(tags[tag_key]),
)
lines = lines[:-1] + " " # strip the trailing comma

# add fields
for field_key in sorted(point['fields'].keys()):
lines += "{key}={value},".format(
key=_escape_tag(field_key),
value=_escape_value(point['fields'][field_key]),
)
lines = lines[:-1] # strip the trailing comma

# add timestamp
if 'time' in point:
lines += " " + _force_text(str(int(
_convert_timestamp(point['time'])
)))

lines += "\n"
return lines
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
python-dateutil>=2.0.0
pytz
requests>=1.0.3
six==1.9.0
six==1.9.0
51 changes: 17 additions & 34 deletions tests/influxdb/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def setUp(self):
"host": "server01",
"region": "us-west"
},
"timestamp": "2009-11-10T23:00:00Z",
"time": "2009-11-10T23:00:00Z",
"fields": {
"value": 0.64
}
Expand Down Expand Up @@ -149,19 +149,13 @@ def test_write(self):
"points": [{"measurement": "cpu_load_short",
"tags": {"host": "server01",
"region": "us-west"},
"timestamp": "2009-11-10T23:00:00Z",
"time": "2009-11-10T23:00:00Z",
"fields": {"value": 0.64}}]}
)

self.assertEqual(
json.loads(m.last_request.body),
{"database": "mydb",
"retentionPolicy": "mypolicy",
"points": [{"measurement": "cpu_load_short",
"tags": {"host": "server01",
"region": "us-west"},
"timestamp": "2009-11-10T23:00:00Z",
"fields": {"value": 0.64}}]}
m.last_request.body,
b"cpu_load_short,host=server01,region=us-west value=0.64 1257894000000000000\n",
)

def test_write_points(self):
Expand All @@ -176,12 +170,9 @@ def test_write_points(self):
cli.write_points(
self.dummy_points,
)
self.assertDictEqual(
{
"database": "db",
"points": self.dummy_points,
},
json.loads(m.last_request.body)
self.assertEqual(
"cpu_load_short,host=server01,region=us-west value=0.64 1257894000000000000\n",
m.last_request.body.decode('utf-8'),
)

def test_write_points_toplevel_attributes(self):
Expand All @@ -199,31 +190,26 @@ def test_write_points_toplevel_attributes(self):
tags={"tag": "hello"},
retention_policy="somepolicy"
)
self.assertDictEqual(
{
"database": "testdb",
"tags": {"tag": "hello"},
"points": self.dummy_points,
"retentionPolicy": "somepolicy"
},
json.loads(m.last_request.body)
self.assertEqual(
"cpu_load_short,host=server01,region=us-west,tag=hello value=0.64 1257894000000000000\n",
m.last_request.body.decode('utf-8'),
)

def test_write_points_batch(self):
dummy_points = [
{"measurement": "cpu_usage", "tags": {"unit": "percent"},
"timestamp": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}},
"time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}},
{"measurement": "network", "tags": {"direction": "in"},
"timestamp": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}},
"time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}},
{"measurement": "network", "tags": {"direction": "out"},
"timestamp": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}}
"time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}}
]
expected_last_body = {"tags": {"host": "server01",
"region": "us-west"},
"database": "db",
"points": [{"measurement": "network",
"tags": {"direction": "out"},
"timestamp": "2009-11-10T23:00:00Z",
"time": "2009-11-10T23:00:00Z",
"fields": {"value": 12.00}}]}
with requests_mock.Mocker() as m:
m.register_uri(requests_mock.POST,
Expand Down Expand Up @@ -294,12 +280,9 @@ def test_write_points_with_precision(self):
time_precision='n'
)

self.assertDictEqual(
{'points': self.dummy_points,
'database': 'db',
'precision': 'n',
},
json.loads(m.last_request.body)
self.assertEqual(
b"cpu_load_short,host=server01,region=us-west value=0.64 1257894000000000000\n",
m.last_request.body,
)

def test_write_points_bad_precision(self):
Expand Down
7 changes: 4 additions & 3 deletions tests/influxdb/client_test_with_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,10 @@ class CommonTests(ManyTestCasesWithServerMixin,
influxdb_template_conf = os.path.join(THIS_DIR, 'influxdb.conf.template')

def test_write(self):
new_dummy_point = dummy_point[0].copy()
new_dummy_point['database'] = 'db'
self.assertIs(True, self.cli.write(new_dummy_point))
self.assertIs(True, self.cli.write(
{'points': dummy_point},
params={'db': 'db'},
))

@unittest.skip("fail against real server instance, "
"don't know if it should succeed actually..")
Expand Down
Loading