# Various InfluxDB Query/Write Function Testing

In [2]:
import pandas as pd
import influxdb_client
import datetime
from pytz import UTC
import matplotlib.pyplot as plt
%matplotlib inline

## Look at various query types that can be done from InfluxDB using the influxdb_client

The purpose of this is just to explore InfluxDB queries and the results they return.

In [3]:
# Define a few variables with the name of your bucket, organization, and token.
bucket = "SKYSPARK"
org = "UBC"
# UDL provides public users READ access to the InfluxDB 2.0 instance via this token
token = "omUybYZ3QkGvuXXy0VwT-7hoO2SEFzhckXJ5k32K_GvG47yHQAi9JzZ1bii6r1HD5NKux3ZhHlKAyUfj6i61bA=="
url="http://206.12.92.81:8086/"

In [4]:
# Instantiate the client.
client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)

# Create a Flux query.
    #Flux documentation https://docs.influxdata.com/influxdb/v2.0/reference/flux/
    #You can generate Flux query using the Query Builder on http://206.12.92.81:8086/
query = '''from(bucket: "SKYSPARK")
  |> range(start: 2020-01-01T00:00:00Z, stop: now())
  |> filter(fn: (r) => r["_measurement"] == "READINGS")
  |> filter(fn: (r) => r["siteRef"] == "Campus Energy Centre")
  |> filter(fn: (r) => r["_field"] == "val_num")'''

##Instantiate the query client. Specify org and query.
result = client.query_api().query_data_frame(org=org, query=query)

In [5]:
# Check unique IDs in returned query
result.uniqueID.unique()

array(['r:p:ubcv:r:205b0343-70d7c00c Campus Energy Centre Campus HW Main Meter Energy',
       'r:p:ubcv:r:205b17b3-76f4a76d Campus Energy Centre Campus HW Main Meter Entering Water Temperature',
       'r:p:ubcv:r:205b03d6-b9859d31 Campus Energy Centre Campus HW Main Meter Flow',
       'r:p:ubcv:r:205b1697-84986d73 Campus Energy Centre Campus HW Main Meter Leaving Water Temperature',
       'r:p:ubcv:r:205b0392-31f31280 Campus Energy Centre Campus HW Main Meter Power'],
      dtype=object)

In [6]:
result.head()

Unnamed: 0,result,table,_start,_stop,_time,_value,_field,_measurement,equipRef,groupRef,navName,siteRef,typeRef,uniqueID,unit
0,_result,0,2020-01-01 00:00:00+00:00,2021-05-18 02:14:12.343793+00:00,2020-06-23 01:30:00+00:00,0.5,val_num,READINGS,Campus HW Main Meter,Campus Energy Centre Utilities,Energy,Campus Energy Centre,Energy,r:p:ubcv:r:205b0343-70d7c00c Campus Energy Cen...,MWh
1,_result,0,2020-01-01 00:00:00+00:00,2021-05-18 02:14:12.343793+00:00,2020-06-23 01:45:00+00:00,0.5,val_num,READINGS,Campus HW Main Meter,Campus Energy Centre Utilities,Energy,Campus Energy Centre,Energy,r:p:ubcv:r:205b0343-70d7c00c Campus Energy Cen...,MWh
2,_result,0,2020-01-01 00:00:00+00:00,2021-05-18 02:14:12.343793+00:00,2020-06-23 02:00:00+00:00,0.59375,val_num,READINGS,Campus HW Main Meter,Campus Energy Centre Utilities,Energy,Campus Energy Centre,Energy,r:p:ubcv:r:205b0343-70d7c00c Campus Energy Cen...,MWh
3,_result,0,2020-01-01 00:00:00+00:00,2021-05-18 02:14:12.343793+00:00,2020-06-23 02:15:00+00:00,0.40625,val_num,READINGS,Campus HW Main Meter,Campus Energy Centre Utilities,Energy,Campus Energy Centre,Energy,r:p:ubcv:r:205b0343-70d7c00c Campus Energy Cen...,MWh
4,_result,0,2020-01-01 00:00:00+00:00,2021-05-18 02:14:12.343793+00:00,2020-06-23 02:30:00+00:00,0.59375,val_num,READINGS,Campus HW Main Meter,Campus Energy Centre Utilities,Energy,Campus Energy Centre,Energy,r:p:ubcv:r:205b0343-70d7c00c Campus Energy Cen...,MWh


In [40]:
# Look at date ranges and number of null values for each uniqueID
for uid,dfm in result.groupby('uniqueID'):
    print(dfm._value.isnull().sum(), dfm._time.min(), dfm._time.max())

0 2020-06-23 01:30:00+00:00 2020-09-09 21:50:00+00:00
0 2020-06-23 01:40:33+00:00 2020-09-09 22:00:00+00:00
0 2020-06-23 01:40:00+00:00 2020-09-09 22:01:06+00:00
0 2020-06-23 01:40:00+00:00 2020-09-09 21:53:52+00:00
0 2020-06-23 01:40:00+00:00 2020-09-09 21:58:25+00:00


In [54]:
# Look at using the non-DataFrame query
result2 = client.query_api().query(org=org, query=query)

In [55]:
result2

[<influxdb_client.client.flux_table.FluxTable at 0x22e7ab04790>,
 <influxdb_client.client.flux_table.FluxTable at 0x22e2f02b970>,
 <influxdb_client.client.flux_table.FluxTable at 0x22e01738430>,
 <influxdb_client.client.flux_table.FluxTable at 0x22e085bf100>,
 <influxdb_client.client.flux_table.FluxTable at 0x22e09570910>]

In [56]:
# Check how many tables are returnd
count = 0
for table in result2:
    count += 1
    print(count)

1
2
3
4
5


In [20]:
table

<influxdb_client.client.flux_table.FluxTable at 0x22e41e5ab50>

In [21]:
# look at an indiviual record from one of the tables
for record in table.records:
    print(record)
    break

FluxRecord() table: 4, {'result': '_result', 'table': 4, '_start': datetime.datetime(2020, 1, 1, 0, 0, tzinfo=tzutc()), '_stop': datetime.datetime(2021, 5, 16, 15, 20, 16, 428474, tzinfo=tzutc()), '_time': datetime.datetime(2020, 6, 23, 1, 40, 33, tzinfo=tzutc()), '_value': 2.2, '_field': 'val_num', '_measurement': 'READINGS', 'equipRef': 'Campus HW Main Meter', 'groupRef': 'Campus Energy Centre Utilities', 'navName': 'Power', 'siteRef': 'Campus Energy Centre', 'typeRef': 'Power', 'uniqueID': 'r:p:ubcv:r:205b0392-31f31280 Campus Energy Centre Campus HW Main Meter Power', 'unit': 'MW'}


In [24]:
# see what happens when passing to a dataframe
pd.DataFrame(table)

Unnamed: 0,0
0,"FluxRecord() table: 4, {'result': '_result', '..."
1,"FluxRecord() table: 4, {'result': '_result', '..."
2,"FluxRecord() table: 4, {'result': '_result', '..."
3,"FluxRecord() table: 4, {'result': '_result', '..."
4,"FluxRecord() table: 4, {'result': '_result', '..."
...,...
39952,"FluxRecord() table: 4, {'result': '_result', '..."
39953,"FluxRecord() table: 4, {'result': '_result', '..."
39954,"FluxRecord() table: 4, {'result': '_result', '..."
39955,"FluxRecord() table: 4, {'result': '_result', '..."


In [45]:
# look at the format of the `query_data_frame_stream()` function
result3 = client.query_api().query_data_frame_stream(org=org, query=query)

In [46]:
count = 0
for table in result3:
    count += 1

In [47]:
count

1

It appears to just return the same format as the non-streaming dataframe query.

In [60]:
# look at the streaming query
result4 = client.query_api().query_stream(org=org, query=query)

In [61]:
count = 0
for table in result4:
    count += 1

In [62]:
count

146425

It returns a generator for the entire query - this could be useful if there's issues loading into memory for predictions on historical data

## Look at queries with aggregate windows

In [9]:
# Create a Flux query.
    #Flux documentation https://docs.influxdata.com/influxdb/v2.0/reference/flux/
    #You can generate Flux query using the Query Builder on http://206.12.92.81:8086/
query_agg = '''from(bucket: "SKYSPARK")
  |> range(start: 2020-01-01T00:00:00Z, stop: now())
  |> filter(fn: (r) => r["_measurement"] == "READINGS")
  |> filter(fn: (r) => r["siteRef"] == "Campus Energy Centre")
  |> filter(fn: (r) => r["_field"] == "val_num")
  |> aggregateWindow(every: 60m, fn: mean, createEmpty: true)'''

##Instantiate the query client. Specify org and query.
result_agg = client.query_api().query_data_frame(org=org, query=query_agg)

In [10]:
for uid,dfm in result_agg.groupby('uniqueID'):
    break

In [11]:
dfm.head()

Unnamed: 0,result,table,_start,_stop,_time,_value,_field,_measurement,equipRef,groupRef,navName,siteRef,typeRef,uniqueID,unit
0,_result,0,2020-01-01 00:00:00+00:00,2021-05-18 02:29:17.799309+00:00,2020-01-01 01:00:00+00:00,,val_num,READINGS,Campus HW Main Meter,Campus Energy Centre Utilities,Energy,Campus Energy Centre,Energy,r:p:ubcv:r:205b0343-70d7c00c Campus Energy Cen...,MWh
1,_result,0,2020-01-01 00:00:00+00:00,2021-05-18 02:29:17.799309+00:00,2020-01-01 02:00:00+00:00,,val_num,READINGS,Campus HW Main Meter,Campus Energy Centre Utilities,Energy,Campus Energy Centre,Energy,r:p:ubcv:r:205b0343-70d7c00c Campus Energy Cen...,MWh
2,_result,0,2020-01-01 00:00:00+00:00,2021-05-18 02:29:17.799309+00:00,2020-01-01 03:00:00+00:00,,val_num,READINGS,Campus HW Main Meter,Campus Energy Centre Utilities,Energy,Campus Energy Centre,Energy,r:p:ubcv:r:205b0343-70d7c00c Campus Energy Cen...,MWh
3,_result,0,2020-01-01 00:00:00+00:00,2021-05-18 02:29:17.799309+00:00,2020-01-01 04:00:00+00:00,,val_num,READINGS,Campus HW Main Meter,Campus Energy Centre Utilities,Energy,Campus Energy Centre,Energy,r:p:ubcv:r:205b0343-70d7c00c Campus Energy Cen...,MWh
4,_result,0,2020-01-01 00:00:00+00:00,2021-05-18 02:29:17.799309+00:00,2020-01-01 05:00:00+00:00,,val_num,READINGS,Campus HW Main Meter,Campus Energy Centre Utilities,Energy,Campus Energy Centre,Energy,r:p:ubcv:r:205b0343-70d7c00c Campus Energy Cen...,MWh


Note that if you use `createEmpty: true` in the query, it will fill all values with NaN that don't have a values in the window between the start and end dates of the query. This can result in a very large query returned if the start and end date are well before/after the beginning of the actual data.

## Write to InfluxDB testing

uses a local instance of Telegraf and InfluxDB running and reading/writing cpu and mem data (for examples from /code/docker-files/two-telegraf)

In [28]:
#Define a few variables with the name of your bucket, organization, and token.
write_bucket = "mybucket"
write_org = "myorg"
write_token = "mytoken"
write_url="http://localhost:8086/"

In [29]:
#Instantiate the client.
write_client = influxdb_client.InfluxDBClient(url=write_url, token=write_token, org=write_org)

# Create a Flux query.
    #Flux documentation https://docs.influxdata.com/influxdb/v2.0/reference/flux/
    #You can generate Flux query using the Query Builder on http://206.12.92.81:8086/
write_query = '''from(bucket: "mybucket")
  |> range(start: -10m, stop: now())
  |> filter(fn: (r) => r["_measurement"] == "cpu")
  |> filter(fn: (r) => r["_field"] == "usage_idle")'''

## Instantiate the query client. Specify org and query.
write_result = write_client.query_api().query_data_frame(org=write_org, query=write_query)

In [31]:
write_result.head()

Unnamed: 0,result,table,_start,_stop,_time,_value,_field,_measurement,cpu,host,user
0,_result,0,2021-05-18 00:45:51.614984+00:00,2021-05-18 00:55:51.614984+00:00,2021-05-18 00:46:00+00:00,99.766725,usage_idle,cpu,cpu-total,057e2ff70b33,test
1,_result,0,2021-05-18 00:45:51.614984+00:00,2021-05-18 00:55:51.614984+00:00,2021-05-18 00:46:10+00:00,99.741537,usage_idle,cpu,cpu-total,057e2ff70b33,test
2,_result,0,2021-05-18 00:45:51.614984+00:00,2021-05-18 00:55:51.614984+00:00,2021-05-18 00:46:20+00:00,99.508865,usage_idle,cpu,cpu-total,057e2ff70b33,test
3,_result,0,2021-05-18 00:45:51.614984+00:00,2021-05-18 00:55:51.614984+00:00,2021-05-18 00:46:30+00:00,99.683439,usage_idle,cpu,cpu-total,057e2ff70b33,test
4,_result,0,2021-05-18 00:45:51.614984+00:00,2021-05-18 00:55:51.614984+00:00,2021-05-18 00:46:40+00:00,99.816682,usage_idle,cpu,cpu-total,057e2ff70b33,test


In [48]:
# transform the queried df in the format to write to InfluxDB
output_new = write_result.copy()
output_new.set_index("_time", drop=True, inplace=True)
output_new["anomaly"] = False
replace_name = output_new['_field'].unique()[0]
output_new.rename(columns={'_value':replace_name}, inplace=True)
output_new = output_new.loc[:, [replace_name, 'user']]
output_new['anomaly'] = True
output_new.head()

Unnamed: 0_level_0,usage_idle,user,anomaly
_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2021-05-18 00:46:00+00:00,99.766725,test,True
2021-05-18 00:46:10+00:00,99.741537,test,True
2021-05-18 00:46:20+00:00,99.508865,test,True
2021-05-18 00:46:30+00:00,99.683439,test,True
2021-05-18 00:46:40+00:00,99.816682,test,True


In [54]:
# Write to InfluxDB
write_client.write_api().write(write_bucket, write_org, record=output_new, data_frame_measurement_name="CHECK_ANOMALY3", data_frame_tag_columns=["user"])

The above code results in a successful write in the format expected.