Python client for InfluxDB following the SansIO principle.
InfluxDB is an open-source distributed time series database. Find more about InfluxDB at http://influxdata.com/
To install the latest release:
$ pip install influx-sansio
The library is still in beta, so you may also want to install the latest version from the development branch:
$ pip install git+https://github.com/miracle2k/influx-sansio@dev
The library supports Python 3.5+.
There is one optional third-party library dependency: pandas
_ for (optional) DataFrame
reading/writing support.
For the concrete IO implementations, there are aditional dependencies.
The module has these parts:
Low-level utilities that implement generating and parsing the InfluxDB line protocol (for writing data), and some helpers for generating queries.
This is Sans-IO, and you can use this to implement your own client.
- An abstract base class that provides a easy to use client interface, which lets you do client.query() or client.write() calls.
- Concrete implementations of this base class for various IO backends, currently the asks library which supports both trio and curio.
Sans-IO (low-level utilities) ~~~~~~~~~~~
See the modules influx_sansio.serialization and influx_sansio.http.
import asyncio
import trio
from influx_sansio.asks import InfluxDBClient
point = dict(time='2009-11-10T23:00:00Z',
measurement='cpu_load_short',
tags={'host': 'server01',
'region': 'us-west'},
fields={'value': 0.64})
client = InfluxDBClient(db='testdb')
coros = [client.create_database(db='testdb'),
client.write(point),
client.query('SELECT value FROM cpu_load_short')]
loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*coros))
for result in results:
print(result)
Input data can be:
- A string properly formatted in InfluxDB's line protocol
- A dictionary containing the following keys:
measurement
,time
,tags
,fields
- A Pandas
DataFrame
with aDatetimeIndex
- An iterable of one of the above
Input data in formats 2-4 are parsed into the line protocol before being written to InfluxDB. Beware that serialization is not highly optimized (cythonization PRs are welcome!) and may become a bottleneck depending on your application.
The write
method returns True
when successful and raises an InfluxDBError
otherwise.
We accept any dictionary-like object (mapping) as input. However, that dictionary must be properly formatted and contain the following keys:
- measurement: Optional. Must be a string-like object. If omitted, must be specified when calling
InfluxDBClient.write
by passing ameasurement
argument. - time: Optional. The value can be
datetime.datetime
, date-like string (e.g.,2017-01-01
,2009-11-10T23:00:00Z
) or anything else that can be parsed by Pandas'Timestamp
_ class initializer. - tags: Optional. This must contain another mapping of field names and values. Both tag keys and values should be strings.
- fields: Mandatory. This must contain another mapping of field names and values. Field keys should be strings. Field values can be
float
,int
,str
, orbool
or any equivalent type (e.g. Numpy types).
Any fields other then the above will be ignored when writing data to InfluxDB.
A typical dictionary-like point would look something like the following:
{'time': '2009-11-10T23:00:00Z',
'measurement': 'cpu_load_short',
'tags': {'host': 'server01', 'region': 'us-west'},
'fields': {'value1': 0.64, 'value2': True, 'value3': 10}}
We also accept Pandas dataframes as input. The only requirements for the dataframe is that the index must be of type DatetimeIndex
. Also, any column whose dtype
is object
will be converted to a string representation.
A typical dataframe input should look something like the following:
LUY BEM AJW tag
2017-06-24 08:45:17.929097+00:00 2.545409 5.173134 5.532397 B 2017-06-24 10:15:17.929097+00:00 -0.306673 -1.132941 -2.130625 E 2017-06-24 11:45:17.929097+00:00 0.894738 -0.561979 -1.487940 B 2017-06-24 13:15:17.929097+00:00 -1.799512 -1.722805 -2.308823 D 2017-06-24 14:45:17.929097+00:00 0.390137 -0.016709 -0.667895 E
The measurement name must be specified with the measurement
argument when calling InfluxDBClient.write
. Additional tags can also be passed using arbitrary keyword arguments.
Example:
client = InfluxDBClient(db='testdb')
client.write(df, measurement='prices', tag_columns=['tag'], asset_class='equities')
In the example above, df
is the dataframe we are trying to write to InfluxDB and measurement
is the measurement we are writing to.
tag_columns
is in an optional iterable telling which of the dataframe columns should be parsed as tag values. If tag_columns
is not explicitly passed, all columns in the dataframe will be treated as InfluxDB field values.
Any other keyword arguments passed to InfluxDBClient.write
are treated as extra tags which will be attached to the data being written to InfluxDB. Any string which is a valid InfluxDB identifier and valid Python identifier can be used as an extra tag key (with the exception of they strings data
, measurement
and tag_columns
).
See InfluxDBClient.write
docstring for details.
Querying data is as simple as passing an InfluxDB query string to InfluxDBClient.query
:
client.query('SELECT myfield FROM mymeasurement')
The result (in blocking
and async
modes) is a dictionary containing the parsed JSON data returned by the InfluxDB HTTP API:
{'results': [{'series': [{'columns': ['time', 'Price', 'Volume'],
'name': 'mymeasurement',
'values': [[1491963424224703000, 5783, 100],
[1491963424375146000, 5783, 200],
[1491963428374895000, 5783, 100],
[1491963429645478000, 5783, 1100],
[1491963429655289000, 5783, 100],
[1491963437084443000, 5783, 100],
[1491963442274656000, 5783, 900],
[1491963442274657000, 5782, 5500],
[1491963442274658000, 5781, 3200],
[1491963442314710000, 5782, 100]]}],
'statement_id': 0}]}
When the client is in dataframe
mode, InfluxDBClient.query
will return a Pandas DataFrame
:
Price Volume
2017-04-12 02:17:04.224703+00:00 5783 100 2017-04-12 02:17:04.375146+00:00 5783 200 2017-04-12 02:17:08.374895+00:00 5783 100 2017-04-12 02:17:09.645478+00:00 5783 1100 2017-04-12 02:17:09.655289+00:00 5783 100 2017-04-12 02:17:17.084443+00:00 5783 100 2017-04-12 02:17:22.274656+00:00 5783 900 2017-04-12 02:17:22.274657+00:00 5782 5500 2017-04-12 02:17:22.274658+00:00 5781 3200 2017-04-12 02:17:22.314710+00:00 5782 100
Mode can be chosen not only during object instantiation but also by simply changing the mode
attribute_.
The library supports InfluxDB chunked queries. Passing chunked=True
when calling InfluxDBClient.query
, returns an AsyncGenerator object, which can asynchronously iterated. Using chunked requests allows response processing to be partially done before the full response is retrieved, reducing overall query time.
chunks = await client.query("SELECT * FROM mymeasurement", chunked=True)
async for chunk in chunks:
# do something
await process_chunk(...)
For Python 3.5, this relies on the async_generator (https://github.com/python-trio/async_generator) library.
InfluxDBClient.query
returns a parsed JSON response from InfluxDB. In order to easily iterate over that JSON response point by point, we provide the iter_resp
generator:
from influx_sansio import iter_resp
r = client.query('SELECT * from h2o_quality LIMIT 10')
for i in iter_resp(r):
print(i)
[1439856000000000000, 41, 'coyote_creek', '1']
[1439856000000000000, 99, 'santa_monica', '2']
[1439856360000000000, 11, 'coyote_creek', '3']
[1439856360000000000, 56, 'santa_monica', '2']
[1439856720000000000, 65, 'santa_monica', '3']
iter_resp
can also be used with chunked responses:
chunks = await client.query('SELECT * from h2o_quality', chunked=True)
async for chunk in chunks:
for point in iter_resp(chunk):
# do something
By default, iter_resp
yields a plain list of values without doing any expensive parsing. However, in case a specific format is needed, an optional parser
argument can be passed. parser
is a function that takes the raw value list for each data point and an additional metadata dictionary containing all or a subset of the following: {'columns', 'name', 'tags', 'statement_id'}
.
r = await client.query('SELECT * from h2o_quality LIMIT 5')
for i in iter_resp(r, lambda x, meta: dict(zip(meta['columns'], x))):
print(i)
{'time': 1439856000000000000, 'index': 41, 'location': 'coyote_creek', 'randtag': '1'}
{'time': 1439856000000000000, 'index': 99, 'location': 'santa_monica', 'randtag': '2'}
{'time': 1439856360000000000, 'index': 11, 'location': 'coyote_creek', 'randtag': '3'}
{'time': 1439856360000000000, 'index': 56, 'location': 'santa_monica', 'randtag': '2'}
{'time': 1439856720000000000, 'index': 65, 'location': 'santa_monica', 'randtag': '3'}
The library provides a wrapping mechanism around InfluxDBClient.query
in order to provide convenient access to commonly used query patterns.
Query patterns are query strings containing optional named "replacement fields" surrounded by curly braces {}
, just as in str_format()
_. Replacement field values are defined by keyword arguments when calling the method associated with the query pattern. Differently from plain str_format()
, positional arguments are also supported and can be mixed with keyword arguments.
Built-in query patterns are defined on the class. Users can also dynamically define additional query patterns by using the _ helper function. User-defined query patterns have the disadvantage of not being shown for auto-completion in IDEs such as Pycharm. However, they do show up in dynamic environments such as Jupyter. If you have a query pattern that you think will used by many people and should be built-in, please submit a PR.
Built-in query pattern examples:
client.create_database(db='foo') # CREATE DATABASE {db}
client.drop_measurement('bar') # DROP MEASUREMENT {measurement}'
client.show_users() # SHOW USERS
# Positional and keyword arguments can be mixed
client.show_tag_values_from('bar', key='spam') # SHOW TAG VALUES FROM {measurement} WITH key = "{key}"
Please refer to InfluxDB documentation for further query-related information.
The library supports basic HTTP authenticatio. Simply pass username
and password
when instantiating InfluxDBClient
:
client = InfluxDBClient(username='user', password='pass)
If your InfluxDB server uses UNIX domain sockets you can use unix_socket
when instantiating InfluxDBClient
:
client = InfluxDBClient(unix_socket='/path/to/socket')
The library uses HTTP by default, but HTTPS can be used by passing ssl=True
when instantiating InfluxDBClient
:
client = InfluxDBClient(host='my.host.io', ssl=True)
After the instantiation of the InfluxDBClient
object, database can be switched by changing the db
attribute:
client = InfluxDBClient(db='db1')
client.db = 'db2'
Beware that differently from some NoSQL databases (such as MongoDB), InfluxDB requires that a databases is explicitly created (by using the CREATE DATABASE
_ query) before doing any operations on it.
Since InfluxDB exposes all its functionality through an HTTP API, InfluxDBClient
tries to be nothing more than a thin and simple wrapper around that API.
The InfluxDB HTTP API exposes exactly three endpoints/functions: ping
, write
and query
.
InfluxDBClient
merely wraps these three functions and provides some parsing functionality for generating line protocol data (when writing) and parsing JSON responses (when querying).
Additionally, partials are used in order to provide convenient access to commonly used query patterns. See the Query patterns section for details.
Forked from aioinflux.