Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ env:
install:
- sudo pip install tox
- sudo pip install coveralls
- wget http://get.influxdb.org/influxdb_0.9.0_amd64.deb && sudo dpkg -i influxdb_0.9.0_amd64.deb
- wget https://s3.amazonaws.com/influxdb/influxdb_0.9.1_amd64.deb && sudo dpkg -i influxdb_0.9.1_amd64.deb
script:
- travis_wait 30 tox -e $TOX_ENV
after_success:
Expand Down
93 changes: 53 additions & 40 deletions influxdb/client.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import requests.exceptions
from sys import version_info

from influxdb.line_protocol import make_lines
from influxdb.resultset import ResultSet
from .exceptions import InfluxDBClientError
from .exceptions import InfluxDBServerError
Expand Down Expand Up @@ -96,7 +97,8 @@ def __init__(self,

self._headers = {
'Content-type': 'application/json',
'Accept': 'text/plain'}
'Accept': 'text/plain'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason for this new line?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, this was included in a Formating improvements commit.

}

@staticmethod
def from_DSN(dsn, **kwargs):
Expand Down Expand Up @@ -182,7 +184,7 @@ def switch_user(self, username, password):
self._password = password

def request(self, url, method='GET', params=None, data=None,
expected_response_code=200):
expected_response_code=200, headers=None):
"""Make a HTTP request to the InfluxDB API.

:param url: the path of the HTTP request, e.g. write, query, etc.
Expand All @@ -203,17 +205,13 @@ def request(self, url, method='GET', params=None, data=None,
"""
url = "{0}/{1}".format(self._baseurl, url)

if headers is None:
headers = self._headers

if params is None:
params = {}

auth = {
'u': self._username,
'p': self._password
}

params.update(auth)

if data is not None and not isinstance(data, str):
if isinstance(data, (dict, list)):
data = json.dumps(data)

# Try to send the request a maximum of three times. (see #103)
Expand All @@ -223,9 +221,10 @@ def request(self, url, method='GET', params=None, data=None,
response = self._session.request(
method=method,
url=url,
auth=(self._username, self._password),
params=params,
data=data,
headers=self._headers,
headers=headers,
verify=self._verify_ssl,
timeout=self._timeout
)
Expand Down Expand Up @@ -254,18 +253,24 @@ def write(self, data, params=None, expected_response_code=204):
:returns: True, if the write operation is successful
:rtype: bool
"""

headers = self._headers
headers['Content-type'] = 'application/octet-stream'

self.request(
url="write",
method='POST',
params=params,
data=data,
expected_response_code=expected_response_code
data=make_lines(data).encode('utf-8'),
expected_response_code=expected_response_code,
headers=headers
)
return True

def query(self,
query,
params={},
epoch=None,
expected_response_code=200,
database=None,
raise_errors=True):
Expand Down Expand Up @@ -294,6 +299,9 @@ def query(self,
params['q'] = query
params['db'] = database or self._database

if epoch is not None:
params['epoch'] = epoch

response = self.request(
url="query",
method='GET',
Expand Down Expand Up @@ -391,22 +399,25 @@ def _write_points(self,
'points': points
}

if time_precision:
data['precision'] = time_precision
if tags is not None:
data['tags'] = tags

if retention_policy:
data['retentionPolicy'] = retention_policy
params = {
'db': database or self._database
}

if tags:
data['tags'] = tags
if time_precision is not None:
params['precision'] = time_precision

data['database'] = database or self._database
if retention_policy is not None:
params['rp'] = retention_policy

if self.use_udp:
self.send_packet(data)
else:
self.write(
data=data,
params=params,
expected_response_code=204
)

Expand Down Expand Up @@ -578,15 +589,20 @@ def get_list_users(self):
"""
return list(self.query("SHOW USERS").get_points())

def create_user(self, username, password):
def create_user(self, username, password, admin=False):
"""Create a new user in InfluxDB

:param username: the new username to create
:type username: str
:param password: the password for the new user
:type password: str
:param admin: whether the user should have cluster administration
privileges or not
:type admin: boolean
"""
text = "CREATE USER {} WITH PASSWORD '{}'".format(username, password)
if admin:
text += ' WITH ALL PRIVILEGES'
self.query(text)

def drop_user(self, username):
Expand All @@ -609,29 +625,27 @@ def set_user_password(self, username, password):
text = "SET PASSWORD FOR {} = '{}'".format(username, password)
self.query(text)

def delete_series(self, id, database=None):
"""Delete series from a database.
def delete_series(self, database=None, measurement=None, tags=None):
"""Delete series from a database. Series can be filtered by
measurement and tags.

:param id: the id of the series to be deleted
:type id: int
:param measurement: Delete all series from a measurement
:type id: string
:param tags: Delete all series that match given tags
:type id: dict
:param database: the database from which the series should be
deleted, defaults to client's current database
:type database: str
"""
database = database or self._database
self.query('DROP SERIES %s' % id, database=database)

def grant_admin_privileges(self, username):
"""Grant cluster administration privileges to an user.

:param username: the username to grant privileges to
:type username: str
query_str = 'DROP SERIES'
if measurement:
query_str += ' FROM "{}"'.format(measurement)

.. note:: Only a cluster administrator can create/ drop databases
and manage users.
"""
text = "GRANT ALL PRIVILEGES TO {}".format(username)
self.query(text)
if tags:
query_str += ' WHERE ' + ' and '.join(["{}='{}'".format(k, v)
for k, v in tags.items()])
self.query(query_str, database=database)

def revoke_admin_privileges(self, username):
"""Revoke cluster administration privileges from an user.
Expand Down Expand Up @@ -683,9 +697,8 @@ def send_packet(self, packet):
:param packet: the packet to be sent
:type packet: dict
"""
data = json.dumps(packet)
byte = data.encode('utf-8')
self.udp_socket.sendto(byte, (self._host, self.udp_port))
data = make_lines(packet).encode('utf-8')
self.udp_socket.sendto(data, (self._host, self.udp_port))


class InfluxDBClusterClient(object):
Expand Down
103 changes: 103 additions & 0 deletions influxdb/line_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals

from calendar import timegm
from copy import copy
from datetime import datetime

from dateutil.parser import parse
from pytz import utc
from six import binary_type, text_type


def _convert_timestamp(timestamp):
if isinstance(timestamp, int):
return timestamp
if isinstance(_force_text(timestamp), text_type):
timestamp = parse(timestamp)
if isinstance(timestamp, datetime):
if timestamp.tzinfo:
timestamp = timestamp.astimezone(utc)
timestamp.replace(tzinfo=None)
return (
timegm(timestamp.timetuple()) * 1e9 +
timestamp.microsecond * 1e3
)
raise ValueError(timestamp)


def _escape_tag(tag):
return tag.replace(
"\\", "\\\\"
).replace(
" ", "\\ "
).replace(
",", "\\,"
).replace(
"=", "\\="
)


def _escape_value(value):
value = _force_text(value)
if isinstance(value, text_type):
return "\"{}\"".format(value.replace(
"\"", "\\\""
))
else:
return str(value)


def _force_text(data):
"""
Try to return a text aka unicode object from the given data.
"""
if isinstance(data, binary_type):
return data.decode('utf-8')
else:
return data


def make_lines(data):
"""
Extracts the points from the given dict and returns a Unicode string
matching the line protocol introduced in InfluxDB 0.9.0.
"""
lines = ""
static_tags = data.get('tags', None)
for point in data['points']:
# add measurement name
lines += _escape_tag(_force_text(
point.get('measurement', data.get('measurement'))
)) + ","

# add tags
if static_tags is None:
tags = point.get('tags', {})
else:
tags = copy(static_tags)
tags.update(point.get('tags', {}))
# tags should be sorted client-side to take load off server
for tag_key in sorted(tags.keys()):
lines += "{key}={value},".format(
key=_escape_tag(tag_key),
value=_escape_tag(tags[tag_key]),
)
lines = lines[:-1] + " " # strip the trailing comma

# add fields
for field_key in sorted(point['fields'].keys()):
lines += "{key}={value},".format(
key=_escape_tag(field_key),
value=_escape_value(point['fields'][field_key]),
)
lines = lines[:-1] # strip the trailing comma

# add timestamp
if 'time' in point:
lines += " " + _force_text(str(int(
_convert_timestamp(point['time'])
)))

lines += "\n"
return lines
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
python-dateutil>=2.0.0
pytz
requests>=1.0.3
six==1.9.0
six==1.9.0
Loading