Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InfluxDB Connector Issue #8

Closed
gefaila opened this issue Nov 5, 2020 · 34 comments · Fixed by #9
Closed

InfluxDB Connector Issue #8

gefaila opened this issue Nov 5, 2020 · 34 comments · Fixed by #9
Milestone

Comments

@gefaila
Copy link

gefaila commented Nov 5, 2020

Very excited to try this connector.
I have a bucket with a lot of data in. It seemed to make the connector 'fall over'

Community Connector Error
There was an error caused by the community connector. Please report the issue to the provider of this community connector if this issue persists.

Connector details
"GetFields from: https://eu-central-1-1.aws.cloud2.influxdata.com" returned an error:Exception: Request failed for https://eu-central-1-1.aws.cloud2.influxdata.com returned code 400. Truncated server response: {"code":"invalid","message":"runtime error @1:110-1:143: drop: schema collision detected: column "_value" is both of type int and float"} (use muteHttpExceptions option to examine full response)

Error ID: fb5b13d2

I'm quite experienced on InfluxDB and there is nothing fundamentally wrong with having _value as being int and float for different _field values. In fact it's fairly fundamental. There must be something that the connector is assuming about InfluxDB 2.0 data that is (in general) not always true.

Any ideas how I'd move forward to use this excellent tool?

@bednar
Copy link
Contributor

bednar commented Nov 5, 2020

Hi @gefaila,

I'm quite experienced on InfluxDB and there is nothing fundamentally wrong with having _value as being int and float for different _field values.

Yes, but Google Data Studio requires static schema. The connector internally uses pivot function to determine schema for GDS.

Schema query:

from(bucket: "my-bucket") 
    |> range(start: time(v: 1)) 
    |> filter(fn: (r) => r["_measurement"] == "circleci") 
    |> drop(columns: ["tag1", "tag2", "tag3"]) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_time", "_measurement"]) 
    |> limit(n:1)

You could prepare Task that normalize your data into new Measurement. See - https://github.com/influxdata/influxdb-gds-connector/tree/master/examples#performance

Regards

@gefaila
Copy link
Author

gefaila commented Nov 9, 2020

Hi @bednar,
Thanks for engaging here! 👍

The Schema query runs fine on the data in my bucket,
It produces a table each of the hosts I have in the database.

2020-11-09-10-50_chronograf_data.xlsx

Is there anything wrong with this data it's returning to GDS?

If, not, are there any documented guidelines for what this data should conform to?

I checked out the link you sent above

You could prepare Task that normalize your data into new Measurement. See - https://github.com/influxdata/influxdb-gds-connector/tree/master/examples#performance

However, there is no actual definition of acceptable and unacceptable data, Just a single example of a very long flux query that is apparently returning something that is OK for GDS.

Maybe some clarification is needed for the general case?
Thanks

@bednar
Copy link
Contributor

bednar commented Nov 9, 2020

Hi @gefaila,

The schema query should produce only one table, because the GDS needs consistent schema.

Based on your export 2020-11-09-10-50_chronograf_data.xlsx you missed add dropping of tags. Try this:

bucket = "my-bucket"
measurement = "my-measurement"
tags = ["host", "method"]

from(bucket: bucket) 
    |> range(start: time(v: 1)) 
    |> filter(fn: (r) => r["_measurement"] == measurement) 
    |> drop(columns: tags) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_time", "_measurement"]) 
    |> limit(n:1)

You could find all your tags by:

import "influxdata/influxdb/v1"

bucket = "my-bucket"
measurement = "gds"

v1.tagKeys(
  bucket: bucket,
  predicate: (r) => r._measurement == measurement,
  start: duration(v: uint(v: 1970-01-01) - uint(v: now()))
)
|> filter(fn: (r) => r._value != "_start" and r._value != "_stop" and r._value != "_measurement" and r._value != "_field")

However, there is no actual definition of acceptable and unacceptable data, Just a single example of a very long flux query that is apparently returning something that is OK for GDS.

Maybe some clarification is needed for the general case?

Yeah, you are right. We need clarify how to data should looks like to use in GDS.

Could you export result of schema query to CSV?

Regards.

@gefaila
Copy link
Author

gefaila commented Nov 9, 2020

In that case I'd like to create a filter in the GDS connection page that allows me to select the tag filters that I'm interested in. That way I can drop the tags.
Otherwise it's unpredictable which 'host' I'd end up with and that's not useful.

Furthermore, the querying of InfluxDB data with
from(bucket: "my-bucket") |> range(start: time(v: 1))
is not great. Our database is large and there's no need for this. It would be way more useful to additionally select a time that I'm interested in (e.g. last month, year etc). But that query looks over all schemas and data since 1970! Not very sane default behaviour.
But it's a great tool and I'm looking forward to being able to create dashboards into my data.
👍

@gefaila
Copy link
Author

gefaila commented Nov 9, 2020

The schema query should produce only one table, because the GDS needs consistent schema.

A good way of giving the user the ability to make sure the data returned was suitable for GDS would be to allow the user to write a flux query that returned data in the format GDS needs it. E.g. one table with agreed structure.

Presumably, users of the tool are able to do this, otherwise they wouldn't be connecting InfluxDB?
:-)

@bednar
Copy link
Contributor

bednar commented Nov 10, 2020

Furthermore, the querying of InfluxDB data with from(bucket: "my-bucket") |> range(start: time(v: 1)) is not great.

The query with this range(start: time(v: 1)) is only used for determine the schema.

For getting data we use range specified in report - https://support.google.com/datastudio/answer/9272806?hl=en. By default, the date range provided will be the last 28 days excluding today. If a user applies a date range filter for a report, then the date range provided will reflect the user selection.

A good way of giving the user the ability to make sure the data returned was suitable for GDS would be to allow the user to write a flux query that returned data in the format GDS needs it. E.g. one table with agreed structure.

We use two type of queries:

GetSchema

  • call only once
from(bucket:"my-bucket") 
    |> range(start: time(v: 1)) 
    |> filter(fn: (r) => r["_measurement"] == "my-measurement") 
    |> drop(columns: tags) 
    |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
    |> drop(columns: ["_start", "_stop", "_time", "_measurement"]) 
    |> limit(n:1)

GetData

  • range is determined by GDS Filter
from(bucket: "my-bucket") 
   |> range(start: 2020-04-20T00:00:00Z, stop: 2020-05-20T23:59:59Z)
   |> filter(fn: (r) => r["_measurement"] == "my-measurement") 
   |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

What do you think about ability to specify Flux filter?

You will be able to specify something like: r["host"] == "R1901_1742" and this Flux filter will be append after |> filter(fn: (r) => r["_measurement"] == "my-measurement") in GetSchema and GetData queries.

Regards

@gefaila
Copy link
Author

gefaila commented Nov 11, 2020

What do you think about ability to specify Flux filter?

You will be able to specify something like: r["host"] == "R1901_1742" and this Flux filter will be append after |> filter(fn: (r) => r["_measurement"] == "my-measurement") in GetSchema and GetData queries.

I quite like that.
The 'Data Explorer' feature does already what I think GDS needs to do ...
i.e. allow the user to select filters for tags that deliver a data-set in the form that GDS needs.

image

@gefaila
Copy link
Author

gefaila commented Nov 11, 2020

However, stepping back a bit, I see no reason why I would not want to build a dashboard that allowed the filtering to happen 'afterwards'. Again, this is parallel to the Chronograf dashboards with a variable.

On one of my dashboards I can select the host that I'm interested in viewing.
image

To me that seems much more sensible and usable functionality to aim for in GDS.

What do you think?
Possible?
👍

@bednar
Copy link
Contributor

bednar commented Nov 12, 2020

What do you think?
Possible?

Yeah, of course. We use this type of filter here: https://datastudio.google.com/s/p19vh-b82Sw - "Country Filter".

Do you think that update our docs to clarify required schema for GDS will be enough? Something like:

Required schema

The Google Data Studio requires know schema of your data. Each column requires a data type that should be consistent across whole table (= measurement in InfluxDB). For that reason the InfluxDB Connector needs to determine schema from your InfluxDB by this Flux query:

import "influxdata/influxdb/v1"

bucket = "my-bucket"
measurement = "my-measurement"

tags = v1.tagKeys(
  bucket: bucket,
  predicate: (r) => r._measurement == measurement,
  start: duration(v: uint(v: 1970-01-01) - uint(v: now()))
) |> filter(fn: (r) => r._value != "_start" and r._value != "_stop" and r._value != "_measurement" and r._value != "_field")
  |> findColumn(fn: (key) => true, column: "_value")

from(bucket: bucket) 
  |> range(start: time(v: 1)) 
  |> filter(fn: (r) => r["_measurement"] == measurement) 
  |> drop(fn: (column) => contains(value: column, set: tags))
  |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") 
  |> drop(columns: ["_start", "_stop", "_time", "_measurement"]) 
  |> limit(n:1)

Please be ensure that the query above could be successfully use in your InfluxDB.

Links

Regrads

@gefaila
Copy link
Author

gefaila commented Nov 12, 2020

What do you think about ability to specify Flux filter?

You will be able to specify something like: r["host"] == "R1901_1742" and this Flux filter will be append after |> filter(fn: (r) => r["_measurement"] == "my-measurement") in GetSchema and GetData queries.

Well I don't see why it's necessary for the user to specify flux filters.

You've already confirmed that filtering is possible after building the dashboard

Yeah, of course. We use this type of filter here: https://datastudio.google.com/s/p19vh-b82Sw - "Country Filter".

So why compel the user to filter so that you can even make a connection!

The fact is that a normal bucket will contain data with multiple tags. Therefore your schema will not normally work.
What the user would obviously like is to be able to build a GDS dashboard from the data they have.
They don't want to have to create copies of the data in other buckets, simply because GDS can't cope with multiple tags.

@gefaila
Copy link
Author

gefaila commented Nov 12, 2020

Do you think that update our docs to clarify required schema for GDS will be enough? Something like:

Required schema
The Google Data Studio requires know schema of your data. Each column requires a data type that should be consistent across whole table (= measurement in InfluxDB). For that reason the InfluxDB Connector needs to determine schema from your InfluxDB by this Flux query:

No not really.
I'd just like the GDS to work on my InfluxDB tables as-is.
The functionality you are implementing is awesome, but it's going to be limited in application because it forces users to create copies of the data in other buckets that conform to the GDS requirements, simply because GDS can't cope with _fields being of different types for different tag sets.

I think the real problem is this:

The Google Data Studio requires know schema of your data. Each column requires a data type that should be consistent across whole table (= measurement in InfluxDB)

Actually InfluxDB only requires that data types are consistent within tables as defined by a set of tags.
So actually it's acceptable to InfluxDB that:
Valve_Open is a boolean when _measurement="data",host="1643" but is an integer when _measurement="data",host=1235

The root assumption that the data type should be consistent for all tables returned where measurement = "data" is not what InfluxDB assumes at all.

@gefaila
Copy link
Author

gefaila commented Nov 12, 2020

What I don't quite understand is the actual tags that GDS doesn't like in my data tables. And I can't run the flux query you specify for the reasons I give above. i.e. you are forcing a search through all data from 1970 to current day. Apart from the fact that this takes longer than the timeout, the query will in most cases run out of memory. It's impossible in the general case where people are using InfluxDB for what it's designed for GB of data every day.

@gefaila
Copy link
Author

gefaila commented Nov 12, 2020

I can only get the flux query to run by considering less time:

import "influxdata/influxdb/v1"

bucket = "PLC_Router_Data"
measurement = "data"

tags = v1.tagKeys(
bucket: bucket,
predicate: (r) => r._measurement == measurement,
start: -3d
) |> filter(fn: (r) => r._value != "_start" and r._value != "_stop" and r._value != "_measurement" and r._value != "_field")
|> findColumn(fn: (key) => true, column: "_value")

from(bucket: bucket)
|> range(start: -3d)
|> filter(fn: (r) => r["_measurement"] == measurement)
|> drop(fn: (column) => contains(value: column, set: tags))
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> drop(columns: ["_start", "_stop", "_time", "_measurement"])
|> limit(n:1)

Then this delivers a single table

Even querying 30d causes InfluxDB cloud to run out of memory. This will be the case for most customers who have significant data in their tables

@gefaila
Copy link
Author

gefaila commented Nov 12, 2020

Querying 30days runs out of memory
Imagine querying 300 days.
You can forget 3000 days.
But your flux query goes through 18,578 days!!!!
You've got to be kidding

@bednar
Copy link
Contributor

bednar commented Nov 12, 2020

What I don't quite understand is the actual tags that GDS doesn't like in my data tables.

The schema collision detected: column "_value" is both of type int and float" is response from InfluxDB to the schema query.

And I can't run the flux query you specify for the reasons I give above. i.e. you are forcing a search through all data from 1970 to current day. Apart from the fact that this takes longer than the timeout, the query will in most cases run out of memory. It's impossible in the general case where people are using InfluxDB for what it's designed for GB of data every day.

The key problem here is that the Flux doesn't have show-field-keys as IFQL. The our schema query is just workaround for this.

Querying 30days runs out of memory
Imagine querying 300 days.
You can forget 3000 days.
But your flux query goes through 18,578 days!!!!
You've got to be kidding

Yes, it scan whole your measurement :(

Even querying 30d causes InfluxDB cloud to run out of memory.

Try to insert this line protocols into database:

my-measurement,tag1=a field1=10 1
my-measurement,tag2=a field2=10 1605189244

If we specify the range to -30d we loose the field1.

This will be the case for most customers who have significant data in their tables.

So we have to add an advance option into Connector configuration that will limit range in Schema query.

@bednar
Copy link
Contributor

bednar commented Nov 12, 2020

New option in Configuration screen:

The range that is used for determine your InfluxDB data schema. - https://todo_link_to_doc
Schema Range
-30d

What do you think?

@gefaila
Copy link
Author

gefaila commented Nov 12, 2020

It's the

pivot()

that's taking the time and killing the query time and memory.

But the good news is you don't need it.
The following flux query returns the necessary data and it completes in only a few sec while scanning all data.

import "influxdata/influxdb/v1"
bucket = "my_bucket"
measurement = "my_measurement"
filter_tags = ["host","_field","_value"] // in addition to _field and _value allow the user to keep some tags to filter by later
from(bucket: bucket)
|> range(start: -100000d)
|> filter(fn: (r) => r["_measurement"] == measurement)
|> keep(columns: filter_tags)
|> unique(column: "_field")

It contains tables that show data types for everything in the database
_field,_value, #datatype, tags

Can you work with that to build the schema you need?

@bednar
Copy link
Contributor

bednar commented Nov 13, 2020

@gefaila nice catch 👍 Thanks!

I think the following query could be a solution:

import "influxdata/influxdb/v1"

bucket = "my-bucket"
measurement = "my-measurement"
start_range = duration(v: uint(v: 1970-01-01) - uint(v: now()))

v1.tagKeys(
  bucket: bucket,
  predicate: (r) => r._measurement == measurement,
  start: start_range
) |> filter(fn: (r) => r._value != "_start" and r._value != "_stop" and r._value != "_measurement" and r._value != "_field")
  |> yield(name: "tags")

from(bucket: bucket)
  |> range(start: start_range)
  |> filter(fn: (r) => r["_measurement"] == measurement)
  |> keep(fn: (column) => column == "_field" or column == "_value")
  |> unique(column: "_field")
  |> yield(name: "fields")

Could you try it with your data? If you could share a result of query it will be awesome.

@gefaila
Copy link
Author

gefaila commented Nov 13, 2020

It works! 👍
But there is no need for the

start_range = duration(v: uint(v: 1970-01-01) - uint(v: now()))

you only need

start_range =-1000y

For me this returns the following

2020-11-13-10-12_chronograf_data.zip

@gefaila
Copy link
Author

gefaila commented Nov 17, 2020

Hi @bednar
How's things progressing?
Were you able to use the returned data of that query to construct a usable schema?

@bednar
Copy link
Contributor

bednar commented Nov 18, 2020

Hi @gefaila,

I want start works on this as soon as possible ... probably at Thursday or Friday.

Thanks a lot with your help, the query is fine and we will use it 👍

Regards

@bednar bednar mentioned this issue Nov 20, 2020
13 tasks
@bednar
Copy link
Contributor

bednar commented Nov 20, 2020

Hi @gefaila,

you could track progress at #9

Regards

@gefaila
Copy link
Author

gefaila commented Nov 22, 2020 via email

@bednar
Copy link
Contributor

bednar commented Nov 23, 2020

Hi @gefaila,

there is prepared a test version of the Connector:

https://datastudio.google.com/u/0/datasources/create?connectorId=AKfycbySDF4eD7wmA_awZ6aoCwENuXs1Opw_T0DIJ8F-MVI

Could you test it with your schema?

@gefaila
Copy link
Author

gefaila commented Nov 23, 2020 via email

@bednar
Copy link
Contributor

bednar commented Nov 24, 2020

It is caused by too large schema. We have to change how we cache the produced schema.

@gefaila
Copy link
Author

gefaila commented Nov 25, 2020

It is caused by too large schema. We have to change how we cache the produced schema.
...

I see.
I think my schema size is probably average for someone who is using InfluxDB in a real-world context.

@bednar
Copy link
Contributor

bednar commented Nov 26, 2020

Hi @gefaila,

the commit 118edfe fixes: Argument too large: value

Could you try the fixed version?

https://datastudio.google.com/u/0/datasources/create?connectorId=AKfycbySDF4eD7wmA_awZ6aoCwENuXs1Opw_T0DIJ8F-MVI

I think my schema size is probably average for someone who is using InfluxDB in a real-world context.

It is true, but Google Data Studio is like other visualisation and analytics tool - their loves well-structured data.

Here is nice article from Tableau - https://www.tableau.com/learn/get-started/data-structure

@gefaila
Copy link
Author

gefaila commented Nov 26, 2020

I think influx tables fulfill all of that.

Can you explain what GDS needs the influx data to conform to?

Do you have an example influx bucket that conforms to this?

Can you give us a list that outlines in plain language the restrictions on influx buckets?
E.g.

  1. all records must have .....
  2. ...

@gefaila
Copy link
Author

gefaila commented Nov 26, 2020

I'm failing to see how our industrial IIOT data doesn't conform to this. Is there a specific problem that our data doesn't conform to that you can point to and clarify?

@bednar
Copy link
Contributor

bednar commented Nov 27, 2020

From my experience with GDS there are these constrains:

1. Avoid unnecessary data

2. Avoid null values

3. Avoid data blending

4. Data range

  • the default data range is -28days
  • How you data looks like for this range? This query will be used by connector:
bucket = "my-bucket"
measurement = "my-repository"
  
start = -28d
stop = now()

from(bucket: bucket) 
 |> range(start: start, stop: stop) 
 |> filter(fn: (r) => r["_measurement"] == measurement) 
 |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

Thanks you @gefaila to help us to improve our connector. Thanks again 👍

@gefaila
Copy link
Author

gefaila commented Oct 12, 2021

Hi @bednar ! We meet again. You are very active on Influx!
So I've come back to try the InfluxDB connector. I was wondering if you resolved the issue here.
It seems it's not resolved
image

As before, we have IoT data and of course some of it is Boolean and some is Float and some are integers.
Our use is normal for InfluxDB and our bucket is normal for InfluxDB. So we can say our database is "normal".

But the connector doesn't like this "normal" Bucket.

But I'd really like to use your connector.

Can we fix it?

@gefaila
Copy link
Author

gefaila commented Oct 12, 2021

I'm sure you may ask what data is yielded by the query above ....

bucket = "my-bucket"
measurement = "my-repository"
  
start = -28d
stop = now()

from(bucket: bucket) 
 |> range(start: start, stop: stop) 
 |> filter(fn: (r) => r["_measurement"] == measurement) 
 |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

Gives the following file.
2021-10-12_09_59_influxdb_data.zip

It would be better if you could let the customer write the flux so that the data came in the way your connector needs it.
Is that possible?

@bednar
Copy link
Contributor

bednar commented Oct 12, 2021

Hi @bednar ! We meet again. You are very active on Influx! So I've come back to try the InfluxDB connector. I was wondering if you resolved the issue here. It seems it's not resolved image

The #9 improve a schema query and currently is in approving. You can try this version by following link: https://datastudio.google.com/u/0/datasources/create?connectorId=AKfycbySDF4eD7wmA_awZ6aoCwENuXs1Opw_T0DIJ8F-MVI

As before, we have IoT data and of course some of it is Boolean and some is Float and some are integers. Our use is normal for InfluxDB and our bucket is normal for InfluxDB. So we can say our database is "normal".

But the connector doesn't like this "normal" Bucket.

But I'd really like to use your connector.

Can we fix it?

There is a problem with requirements from GDS. The GDS expects static tabular schema. So we are not able to supports scheme where field has a different types.

I'm sure you may ask what data is yielded by the query above ....

bucket = "my-bucket"
measurement = "my-repository"
  
start = -28d
stop = now()

from(bucket: bucket) 
 |> range(start: start, stop: stop) 
 |> filter(fn: (r) => r["_measurement"] == measurement) 
 |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")

Gives the following file. 2021-10-12_09_59_influxdb_data.zip

It would be better if you could let the customer write the flux so that the data came in the way your connector needs it. Is that possible?

Currently we don't support this type of configuration. How will looks your query according to your provided data - 2021-10-12_09_59_influxdb_data.zip?

@bednar bednar closed this as completed in #9 Oct 14, 2021
@bednar bednar added this to the 2021.10 milestone Oct 14, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants