Skip to content

kafisatz/InfluxDBClient.jl

Repository files navigation

InfluxDBClient.jl

CI Testing Coverage Status

Scope and Purpose

  • This was developed for InfluxDB v2 OSS. In my case the InfluxDB is running on a machine in the local area network (docker container)
  • I wanted a Julia solution to write data (e.g. from a DataFrame) to InfluxDB v2

Usage

  • Below is an example snippet how to use the functions in this package.
  • Consider names(InfluxDBClient) for a list of methods of this package
  • You may want to consider the functions in runtests.jl to get an idea of the other functions and their arguments.

Configuration

There are three optoins to configure the database access (see function get_settings)

  1. environment variables
  • ENV["INFLUXDB_ORG"] the organization
  • ENV["INFLUXDB_TOKEN"] the token to access the InfluxDB
  • ENV["INFLUXDB_URL"] should include protocol and the port, e.g. "http://10.14.15.10:8086" or with https e.g. ENV["INFLUXDB_URL"]="https://us-east-1-1.aws.cloud2.influxdata.com:443"
  1. keyword argumetns to get_settings
  2. provide a space delimited file to get_settings

Limitations

  • Not all functions account for TimeZones in a proper manner! Notably DateTime in Julia Base/Core (Pkg Dates) cannot handle nanosecond precision. The Pkg NanoDates in contrast cannot handle TimeZones.
  • Some of the functions may be somewhat slow for large DataFrames. I am open for suggestions on how to improve my string handling in Julia.
  • Backslashes and special characters in strings may not (yet) be parsed correctly. https://docs.influxdata.com/influxdb/v2.4/reference/syntax/line-protocol/#integer
  • Some bucket management functions (get_buckets etc) assume that you have fewer than 100 buckets. Functions may fail otherwise. See keywords limit and offset.
  • When data is provided integer valued, InfluxDB will display the result as float, when an aggregation function (such as mean) is selected. Select 'last' or similar to show the data as is.

Ideas / Aspects not yet implemented

References

See https://docs.influxdata.com/influxdb/v2.4/reference/syntax/line-protocol/ for details of the line protocol. See also https://docs.influxdata.com/influxdb/cloud/api/.

Example

using InfluxDBClient
using Dates
using DataFrames

a_random_bucket_name = "test_InfluxDBClient.jl_asdfeafdfasefsIyxdFDYfadsfasdfa____l"

#isettings should return a NamedTuple similar to 
#(INFLUXDB_URL = "http://10.14.15.10:8086", INFLUXDB_ORG = "bk", INFLUXDB_TOKEN = "5Ixxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx==")
isettings = get_settings()

#check if the InfluxDB is reachable
bucket_names, json = get_buckets(isettings);

#bucket_names is a Vector{String} of the buckets in the database

#create or delete a bucket
create_bucket(isettings,a_random_bucket_name)
delete_bucket(isettings,a_random_bucket_name)
create_bucket(isettings,a_random_bucket_name)

#given a DataFrame, we can then write the data to the database
some_dt = DateTime(2022,9,30,15,59,33,0)
df = DataFrame(sensor_id = ["TLM0900","TLM0901","TLM0901"],other_tag=["m","m","x"] ,temperature = [70.1,11.2,99.3], humidity=[14.9,55.2,3], datetime = [some_dt,some_dt-Second(51),some_dt-Second(500)])

#get the line protocol string
lp = lineprotocol("my_meas",df,Symbol.(["temperature","humidity"]),tags=["sensor_id"], "datetime")
#here lp is a string (and thus readable by a human)
rs = write_data(isettings,a_random_bucket_name,lp,"ns")
#the value of RS must be 204 (HTTP return code after successful write)
@show rs

#Please note that by default the lineprotocol function assumes that your timestamps are in UTC
#if your timestamps are in a different TimeZone, consider the tzstr keyword as follows:
lp = lineprotocol("my_meas",df,Symbol.(["temperature","humidity"]),tags=["sensor_id"], "datetime", tzstr="Europe/Berlin")

#for lager dataframes you will want to use compression for the line protocol, using the keyword compress
lp_gzip_compressed = lineprotocol("my_meas",df,Symbol.(["temperature","humidity"]),tags=["sensor_id"], "datetime",compress = true)
#lp_gzip_compressed is now a Unit8 Vector
rs = write_data(isettings,a_random_bucket_name,lp_gzip_compressed,"ns")
@assert rs == 204

#we also provide a wrapper function to directly write a DataFrame to the database
rs,lp = write_dataframe(settings=isettings,bucket=a_random_bucket_name,measurement="xxmeasurment",data=df,fields=["humidity","temperature"],timestamp=:datetime,tags=String["sensor_id"],tzstr = "Europe/Berlin",compress=true);

#querying 
#note that the agg keyword is optional
#consider calls in runtests.jl for more exmaples (i.e. search this repository for "query_flux(")
agg = """   aggregateWindow(every: 20m, fn: mean, createEmpty: false)
                |> yield(name: "mean") """    
datetime_str = string(minimum(df.datetime),"+02:00")
df_result = query_flux(isettings,a_random_bucket_name,"xxmeasurment";tzstr = "Europe/Berlin",range=Dict("start"=>"$datetime_str"),fields=["temperature","humidity"],tags=Dict("sensor_id"=>"TLM0900"),aggregate=agg);

#deleting measurements
#see delete.jl in the tests folder

#deleting a bucket
delete_bucket(isettings,a_random_bucket_name)

Running tests

First make sure you have the following environment variables defined:

ENV["INFLUXDB_URL"]="http://localhost:8086"
ENV["INFLUXDB_ORG"]="<some org>"
ENV["INFLUXDB_TOKEN"]="5Ixxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx=="
ENV["INFLUXDB_USER"]="<admin user>"
ENV["INFLUXDB_PASSWORD"]="<password>"

With newer influx versions, it can be hard to get an admin user that can create and drop buckets sudo influx auth create --all-access -u <admin user> -o <some org> For more information, see: influxdata/influx-cli#231

The easiest is to run via ]test or Pkg.test()

But in order to run runtests.jl or individual tests manually you need to activate the test dependencies, and since this is still has Julia 1 compatibility, we need to use the following method:

]activate
using TestEnv
TestEnv.activate("InfluxDBClient")

For more information, see: https://discourse.julialang.org/t/activating-test-dependencies/48121/10 https://github.com/JuliaTesting/TestEnv.jl

also had to modify runtents.jl line 55 to not add "test/"