Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query stops when encountering a field containing a string (query_csv) #533

Closed
Olgidos opened this issue Nov 24, 2022 · 11 comments · Fixed by #536
Closed

Query stops when encountering a field containing a string (query_csv) #533

Olgidos opened this issue Nov 24, 2022 · 11 comments · Fixed by #536
Labels
enhancement New feature or request
Milestone

Comments

@Olgidos
Copy link

Olgidos commented Nov 24, 2022

Specifications

  • Client Version: 1.30.00
  • InfluxDB Version: 2.4
  • Platform: Windows

Code sample to reproduce problem


# You can generate a Token from the "Tokens Tab" in the UI
token = token
org = "my-org"
bucket = "bucket"

with InfluxDBClient(url="http://192.168.1.1:8086", token=token, org=org) as client:
    query = """option v = {timeRangeStart: -1m, timeRangeStop: now()}

from(bucket: "bucket") |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
    |> filter(fn: (r) => r["_measurement"] == "computer1")
    |> filter(fn: (r) => r["device"] == "device1")"""
    csv = client.query_api().query_csv(query, org=org)
    input_list = list(csv)
    for row in input_list:
        if(len(row) < 2 ):
            break
        print(row)

Expected behavior

When I query this online, I get 9 tables. 7 of them have doubles/ints and 2 strings as _value. The same works when I query the above using python and the following code:

tables = client.query_api().query(query, org=org)    
for table in tables:
    for record in table.records:
        print(record)

Actual behavior

However when I query with

 client.query_api().query_csv(query, org=org)

I get only 7 tables without any error. The two strings are not present anymore

Additional info

No response

@Olgidos Olgidos added the bug Something isn't working label Nov 24, 2022
@bednar
Copy link
Contributor

bednar commented Nov 25, 2022

Hi @Olgidos,

thanks for using our client.

Can you share a debug output of the client for both types of query? You can enable debug by: with InfluxDBClient(url="http://192.168.1.1:8086", token=token, org=org, debug=True) as client:.

You can also use {timeRangeStart: -1m, timeRangeStop: now()} with concrete dates to be sure that you query same data.

Regards

@bednar bednar added the question Further information is requested label Nov 25, 2022
@Olgidos
Copy link
Author

Olgidos commented Nov 28, 2022

I am now using:

      token = token
      org = "my-org"
      bucket = "bucket"
      
      with InfluxDBClient(url="http://192.168.1.1:8086", token=token, org=org) as client:
      query = """option v = {timeRangeStart: 2022-11-24T15:00:00Z, timeRangeStop: 2022-11-24T15:00:10Z}
      from(bucket: "bucket") |> range(start: 2022-11-24T15:00:00Z, stop: 2022-11-24T15:00:10Z)
          |> filter(fn: (r) => r["_measurement"] == "computer1")
          |> filter(fn: (r) => r["device"] == "device1")
          |> aggregateWindow(every: 10s, fn: last, createEmpty: true)"""
          csv = client.query_api().query_csv(query, org=org)
          input_list = list(csv)
          for row in input_list:
              if(len(row) < 2 ):
                  break
              print(row)

Output:

        send: b'POST /api/v2/query?org=my-org HTTP/1.1\r\nHost: 192.168.1.1:8086\r\nAccept-Encoding: identity\r\nContent-Length: 592\r\nAccept: application/json\r\nContent-Type: application/json\r\nAuthorization: Token XXX==\r\nUser-Agent: influxdb-client-python/1.30.0\r\n\r\n'
        send: b'{"extern": {"imports": [], "body": []}, "query": "option v = {timeRangeStart: 2022-11-24T15:00:00Z, timeRangeStop: 2022-11-24T15:00:10Z}\\n\\nfrom(bucket: \\"bucket\\") |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\\n    |> filter(fn: (r) => r[\\"_measurement\\"] == \\"computer1\\")\\n    |> filter(fn: (r) => r[\\"device\\"] == \\"device1\\")\\n    |> aggregateWindow(every: 10s, fn: last, createEmpty: true)", "dialect": {"header": true, "delimiter": ",", "annotations": ["datatype", "group", "default"], "commentPrefix": "#", "dateTimeFormat": "RFC3339"}}'
        reply: 'HTTP/1.1 200 OK\r\n'
        header: Content-Type: text/csv; charset=utf-8
        header: Vary: Accept-Encoding
        header: X-Influxdb-Build: OSS
        header: X-Influxdb-Version: v2.4.0
        header: Date: Mon, 28 Nov 2022 10:17:43 GMT
        header: Transfer-Encoding: chunked

Receiving still 7 out of 9 data sets

When using the method from "expected behavior":

    send: b'POST /api/v2/query?org=my-org HTTP/1.1\r\nHost: 192.168.1.1:8086\r\nAccept-Encoding: identity\r\nContent-Length: 601\r\nAccept: application/json\r\nContent-Type: application/json\r\nAuthorization: Token XXX==\r\nUser-Agent: influxdb-client-python/1.30.0\r\n\r\n'
    send: b'{"extern": {"imports": [], "body": []}, "query": "option v = {timeRangeStart: 2022-11-24T15:00:00Z, timeRangeStop: 2022-11-24T15:00:10Z}\\n\\nfrom(bucket: \\"bucket\\") |> range(start: 2022-11-24T15:00:00Z, stop: 2022-11-24T15:00:10Z)\\n    |> filter(fn: (r) => r[\\"_measurement\\"] == \\"computer1\\")\\n    |> filter(fn: (r) => r[\\"device\\"] == \\"device1\\")\\n    |> aggregateWindow(every: 10s, fn: last, createEmpty: true)", "dialect": {"header": true, "delimiter": ",", "annotations": ["datatype", "group", "default"], "commentPrefix": "#", "dateTimeFormat": "RFC3339"}}'       
    reply: 'HTTP/1.1 200 OK\r\n'
    header: Content-Type: text/csv; charset=utf-8
    header: Vary: Accept-Encoding
    header: X-Influxdb-Build: OSS
    header: X-Influxdb-Version: v2.4.0
    header: Date: Mon, 28 Nov 2022 10:25:43 GMT
    header: Transfer-Encoding: chunked

Receiving 9 out of 9 data sets

@bednar
Copy link
Contributor

bednar commented Nov 28, 2022

Can you share also the response HTTP body? It looks like related to your data... Can you also update to latest client version?

@Olgidos
Copy link
Author

Olgidos commented Nov 28, 2022

how can I access the http body or are you talking about the print out from my code?

@bednar
Copy link
Contributor

bednar commented Nov 28, 2022

how can I access the http body or are you talking about the print out from my code?

the print out from your code

@Olgidos
Copy link
Author

Olgidos commented Nov 28, 2022

So I updated the lib

Here is the data output for debug = True and .query_csv
Furthermore I dropped some rows for readability

    >>> Request: 'POST http://192.168.1.1:8086/api/v2/query?org=my-org'
    >>> Accept: application/json
    >>> Content-Type: application/json
    >>> Authorization: ***
    >>> User-Agent: influxdb-client-python/1.34.0
    >>> Body: {'extern': {'imports': [], 'body': []}, 'query': 'option v = {timeRangeStart: 2022-11-24T15:00:00Z, timeRangeStop: 2022-11-24T15:00:10Z}\nfrom(bucket: "bucket") |> range(start: 2022-11-24T10:00:00Z, stop: 2022-11-24T10:00:10Z)\n    |> filter(fn: (r) => r["_measurement"] == "computer1")\n    |> filter(fn: (r) => r["device"] == "device1")\n    |> aggregateWindow(every: 10s, fn: last, createEmpty: true)\n    |> drop(\n    columns: [\n        "_start",\n        "_stop",\n        "device",\n        "channel_name",\n        "test",\n        "_measurement",\n    ])', 'dialect': {'header': True, 'delimiter': ',', 'annotations': ['datatype', 'group', 'default'], 'commentPrefix': '#', 'dateTimeFormat': 'RFC3339'}}
    <<< Response: 200
    <<< Content-Type: text/csv; charset=utf-8
    <<< Vary: Accept-Encoding
    <<< X-Influxdb-Build: OSS
    <<< X-Influxdb-Version: v2.4.0
    <<< Date: Mon, 28 Nov 2022 12:40:13 GMT
    <<< Transfer-Encoding: chunked
    <<< Body: b'#datatype,string,long,dateTime:RFC3339,double,string\r\n#group,false,false,false,false,true\r\n#default,_result,,,,\r\n,result,table,_time,_value,_field\r\n,,0,2022-11-24T10:00:10Z,0.1,_1_current_(mA)\r\n,,1,2022-11-24T10:00:10Z,4,_1_current_limit_(mA)\r\n,,2,2022-11-24T10:00:10Z,1,_1_voltage_(V)\r\n,,3,2022-11-24T10:00:10Z,1,_1_voltage_limit_(V)\r\n,,4,2022-11-24T10:00:10Z,0,_2_current_(mA)\r\n,,5,2022-11-24T10:00:10Z,0,_2_current_limit_(mA)\r\n,,6,2022-11-24T10:00:10Z,0,_2_voltage_(V)\r\n,,7,2022-11-24T10:00:10Z,0,_2_voltage_limit_(V)\r\n\r\n#datatype,string,long,dateTime:RFC3339,string,string\r\n#group,false,false,false,false,true\r\n#default,_result,,,,\r\n,result,table,_time,_value,_field\r\n,,8,2022-11-24T10:00:10Z,K,type\r\n,,9,2022-11-24T10:00:10Z,,type2\r\n\r\n'

When I remove the debug = True, I get my normal output back, which is not present with the debug flag:

    ['#datatype', 'string', 'long', 'dateTime:RFC3339', 'double', 'string']
    ['#group', 'false', 'false', 'false', 'false', 'true']
    ['#default', '_result', '', '', '', '']
    ['', 'result', 'table', '_time', '_value', '_field']
    ['', '', '0', '2022-11-24T10:00:10Z', '0.1', '_1_current_(mA)']
    ['', '', '1', '2022-11-24T10:00:10Z', '4', '_1_current_limit_(mA)']
    ['', '', '2', '2022-11-24T10:00:10Z', '1', '_1_voltage_(V)']
    ['', '', '3', '2022-11-24T10:00:10Z', '1', '_1_voltage_limit_(V)']
    ['', '', '4', '2022-11-24T10:00:10Z', '0', '_2_current_(mA)']
    ['', '', '5', '2022-11-24T10:00:10Z', '0', '_2_current_limit_(mA)']
    ['', '', '6', '2022-11-24T10:00:10Z', '0', '_2_voltage_(V)']
    ['', '', '7', '2022-11-24T10:00:10Z', '0', '_2_voltage_limit_(V)']

It seems like the http body receives the string but it is then not included in the csv list
For the .query version with debug=True I get:

    >>> Request: 'POST http://192.168.1.1:8086/api/v2/query?org=my-org'
    >>> Accept: application/json
    >>> Content-Type: application/json
    >>> Authorization: ***
    >>> User-Agent: influxdb-client-python/1.34.0
    >>> Body: {'extern': {'imports': [], 'body': []}, 'query': 'option v = {timeRangeStart: 2022-11-24T15:00:00Z, timeRangeStop: 2022-11-24T15:00:10Z}\nfrom(bucket: "_bucker") |> range(start: 2022-11-24T10:00:00Z, stop: 2022-11-24T10:00:10Z)\n    |> filter(fn: (r) => r["_measurement"] == "computer1")\n    |> filter(fn: (r) => r["device"] == "device1")\n    |> aggregateWindow(every: 10s, fn: last, createEmpty: true)\n    |> drop(\n    columns: [\n        "_start",\n        "_stop",\n        "device",\n        "channel_name",\n        "test",\n        "_measurement",\n    ])', 'dialect': {'header': True, 'delimiter': ',', 'annotations': ['datatype', 'group', 'default'], 'commentPrefix': '#', 'dateTimeFormat': 'RFC3339'}}
    <<< Response: 200
    <<< Content-Type: text/csv; charset=utf-8
    <<< Vary: Accept-Encoding
    <<< X-Influxdb-Build: OSS
    <<< X-Influxdb-Version: v2.4.0
    <<< Date: Mon, 28 Nov 2022 12:51:45 GMT
    <<< Transfer-Encoding: chunked
    <<< Body: b'#datatype,string,long,dateTime:RFC3339,double,string\r\n#group,false,false,false,false,true\r\n#default,_result,,,,\r\n,result,table,_time,_value,_field\r\n,,0,2022-11-24T10:00:10Z,0.1,_1_current_(mA)\r\n,,1,2022-11-24T10:00:10Z,4,_1_current_limit_(mA)\r\n,,2,2022-11-24T10:00:10Z,1,_1_voltage_(V)\r\n,,3,2022-11-24T10:00:10Z,1,_1_voltage_limit_(V)\r\n,,4,2022-11-24T10:00:10Z,0,_2_current_(mA)\r\n,,5,2022-11-24T10:00:10Z,0,_2_current_limit_(mA)\r\n,,6,2022-11-24T10:00:10Z,0,_2_voltage_(V)\r\n,,7,2022-11-24T10:00:10Z,0,_2_voltage_limit_(V)\r\n\r\n#datatype,string,long,dateTime:RFC3339,string,string\r\n#group,false,false,false,false,true\r\n#default,_result,,,,\r\n,result,table,_time,_value,_field\r\n,,8,2022-11-24T10:00:10Z,K,type\r\n,,9,2022-11-24T10:00:10Z,,type2\r\n\r\n'
    FluxRecord() table: 0, {'result': '_result', 'table': 0, '_time': datetime.datetime(2022, 11, 24, 10, 0, 10, tzinfo=tzutc()), '_value': 0.1, '_field': '_1_current_(mA)'}
    FluxRecord() table: 1, {'result': '_result', 'table': 1, '_time': datetime.datetime(2022, 11, 24, 10, 0, 10, tzinfo=tzutc()), '_value': 4.0, '_field': '_1_current_limit_(mA)'}
    FluxRecord() table: 2, {'result': '_result', 'table': 2, '_time': datetime.datetime(2022, 11, 24, 10, 0, 10, tzinfo=tzutc()), '_value': 1, '_field': '_1_voltage_(V)'}
    FluxRecord() table: 3, {'result': '_result', 'table': 3, '_time': datetime.datetime(2022, 11, 24, 10, 0, 10, tzinfo=tzutc()), '_value': 1.0, '_field': '_1_voltage_limit_(V)'}
    FluxRecord() table: 4, {'result': '_result', 'table': 4, '_time': datetime.datetime(2022, 11, 24, 10, 0, 10, tzinfo=tzutc()), '_value': 0.0, '_field': '_2_current_(mA)'}
    FluxRecord() table: 5, {'result': '_result', 'table': 5, '_time': datetime.datetime(2022, 11, 24, 10, 0, 10, tzinfo=tzutc()), '_value': 0.0, '_field': '_2_current_limit_(mA)'}
    FluxRecord() table: 6, {'result': '_result', 'table': 6, '_time': datetime.datetime(2022, 11, 24, 10, 0, 10, tzinfo=tzutc()), '_value': 0.0, '_field': '_2_voltage_(V)'}
    FluxRecord() table: 7, {'result': '_result', 'table': 7, '_time': datetime.datetime(2022, 11, 24, 10, 0, 10, tzinfo=tzutc()), '_value': 0.0, '_field': '_2_voltage_limit_(V)'}
    FluxRecord() table: 8, {'result': '_result', 'table': 8, '_time': datetime.datetime(2022, 11, 24, 10, 0, 10, tzinfo=tzutc()), '_value': 'K', '_field': 'type'}
    FluxRecord() table: 9, {'result': '_result', 'table': 9, '_time': datetime.datetime(2022, 11, 24, 10, 0, 10, tzinfo=tzutc()), '_value': None, '_field': 'type2'}

@bednar
Copy link
Contributor

bednar commented Nov 29, 2022

So I updated the lib

Here is the data output for debug = True and .query_csv Furthermore I dropped some rows for readability

    >>> Request: 'POST http://192.168.1.1:8086/api/v2/query?org=my-org'
    >>> Accept: application/json
    >>> Content-Type: application/json
    >>> Authorization: ***
    >>> User-Agent: influxdb-client-python/1.34.0
    >>> Body: {'extern': {'imports': [], 'body': []}, 'query': 'option v = {timeRangeStart: 2022-11-24T15:00:00Z, timeRangeStop: 2022-11-24T15:00:10Z}\nfrom(bucket: "bucket") |> range(start: 2022-11-24T10:00:00Z, stop: 2022-11-24T10:00:10Z)\n    |> filter(fn: (r) => r["_measurement"] == "computer1")\n    |> filter(fn: (r) => r["device"] == "device1")\n    |> aggregateWindow(every: 10s, fn: last, createEmpty: true)\n    |> drop(\n    columns: [\n        "_start",\n        "_stop",\n        "device",\n        "channel_name",\n        "test",\n        "_measurement",\n    ])', 'dialect': {'header': True, 'delimiter': ',', 'annotations': ['datatype', 'group', 'default'], 'commentPrefix': '#', 'dateTimeFormat': 'RFC3339'}}
    <<< Response: 200
    <<< Content-Type: text/csv; charset=utf-8
    <<< Vary: Accept-Encoding
    <<< X-Influxdb-Build: OSS
    <<< X-Influxdb-Version: v2.4.0
    <<< Date: Mon, 28 Nov 2022 12:40:13 GMT
    <<< Transfer-Encoding: chunked
    <<< Body: b'#datatype,string,long,dateTime:RFC3339,double,string\r\n#group,false,false,false,false,true\r\n#default,_result,,,,\r\n,result,table,_time,_value,_field\r\n,,0,2022-11-24T10:00:10Z,0.1,_1_current_(mA)\r\n,,1,2022-11-24T10:00:10Z,4,_1_current_limit_(mA)\r\n,,2,2022-11-24T10:00:10Z,1,_1_voltage_(V)\r\n,,3,2022-11-24T10:00:10Z,1,_1_voltage_limit_(V)\r\n,,4,2022-11-24T10:00:10Z,0,_2_current_(mA)\r\n,,5,2022-11-24T10:00:10Z,0,_2_current_limit_(mA)\r\n,,6,2022-11-24T10:00:10Z,0,_2_voltage_(V)\r\n,,7,2022-11-24T10:00:10Z,0,_2_voltage_limit_(V)\r\n\r\n#datatype,string,long,dateTime:RFC3339,string,string\r\n#group,false,false,false,false,true\r\n#default,_result,,,,\r\n,result,table,_time,_value,_field\r\n,,8,2022-11-24T10:00:10Z,K,type\r\n,,9,2022-11-24T10:00:10Z,,type2\r\n\r\n'

The raw output is:

#datatype,string,long,dateTime:RFC3339,double,string
#group,false,false,false,false,true
#default,_result,,,,
,result,table,_time,_value,_field
,,0,2022-11-24T10:00:10Z,0.1,_1_current_(mA)
,,1,2022-11-24T10:00:10Z,4,_1_current_limit_(mA)
,,2,2022-11-24T10:00:10Z,1,_1_voltage_(V)
,,3,2022-11-24T10:00:10Z,1,_1_voltage_limit_(V)
,,4,2022-11-24T10:00:10Z,0,_2_current_(mA)
,,5,2022-11-24T10:00:10Z,0,_2_current_limit_(mA)
,,6,2022-11-24T10:00:10Z,0,_2_voltage_(V)
,,7,2022-11-24T10:00:10Z,0,_2_voltage_limit_(V)

#datatype,string,long,dateTime:RFC3339,string,string
#group,false,false,false,false,true
#default,_result,,,,
,result,table,_time,_value,_field
,,8,2022-11-24T10:00:10Z,K,type
,,9,2022-11-24T10:00:10Z,,type2


It looks like that the csv_parser is not able to parse multiple tables in response - https://docs.influxdata.com/influxdb/v2.5/reference/syntax/annotated-csv/.

As a workaround you can add |> group() to your query.

@Olgidos
Copy link
Author

Olgidos commented Nov 29, 2022

ok

  1. I did not yet work with |> group() could you give me a short example for my specific problem?

  2. In the end I somehow need an output as a list like this

       ['', 'result', 'table', '_time', '_value', '_field']
       ['', '', '0', '2022-11-24T10:00:10Z', '0.1', '_1_current_(mA)']
       ['', '', '1', '2022-11-24T10:00:10Z', '4', '_1_current_limit_(mA)']
       ['', '', '2', '2022-11-24T10:00:10Z', '1', '_1_voltage_(V)']
       ['', '', '3', '2022-11-24T10:00:10Z', '1', '_1_voltage_limit_(V)']
       ['', '', '4', '2022-11-24T10:00:10Z', '0', '_2_current_(mA)']
       ['', '', '5', '2022-11-24T10:00:10Z', '0', '_2_current_limit_(mA)']
       ['', '', '6', '2022-11-24T10:00:10Z', '0', '_2_voltage_(V)']
       ['', '', '7', '2022-11-24T10:00:10Z', '0', '_2_voltage_limit_(V)']
       ['', '', '8', '2022-11-24T10:00:10Z', 'K', 'type']
       ['', '', '9', '2022-11-24T10:00:10Z', 'K', 'type2']
    

is there another way not using .query_csv; at the start I tested .query, which however was much slower then .query_csv;

@bednar
Copy link
Contributor

bednar commented Nov 29, 2022

I did not yet work with |> group() could you give me a short example for my specific problem?

Just add |> group() to the end of your query - result will contains only one table.

is there another way not using .query_csv; at the start I tested .query, which however was much slower then .query_csv;

The .query parses values into python's types which cost a time. You can install a ciso8601 for speed up parsing timestamps (if you don't have). For more info see - https://github.com/influxdata/influxdb-client-python#installation

You can also iterate over response by something like:

response = query_api.query_raw('from(bucket:"my-bucket") |> range(start: -10m)',
                               dialect=Dialect(header=True, annotations=[]))
response.data.splitlines()
for line in response.data.splitlines():
    print(codecs.decode(line).split(','))

@bednar bednar removed the question Further information is requested label Dec 6, 2022
@bednar
Copy link
Contributor

bednar commented Dec 6, 2022

Hi @Olgidos,

the problem is caused by the response from InfluxDB because the response contains empty lines. Your code contains break:

if(len(row) < 2 ):
     break
print(row)

this breaks the loop if the row is an empty line. Instead of break you have to use continue:

if(len(row) < 2 ):
     continue
print(row)

I will prepare PR that improve our handling of empty lines.

Regards

@bednar bednar added enhancement New feature or request and removed bug Something isn't working labels Dec 6, 2022
@bednar bednar added this to the 1.35.0 milestone Dec 7, 2022
@Olgidos
Copy link
Author

Olgidos commented Dec 7, 2022

thanks! :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants