From 765eb637a3f66f641d047ea382c8fe0c0754b386 Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Fri, 28 Feb 2020 09:49:41 +0100 Subject: [PATCH] fix: Correctly parse CSV where multiple results include multiple tables --- CHANGELOG.md | 3 + influxdb_client/client/flux_csv_parser.py | 11 +- tests/test_FluxCSVParser.py | 128 ++++++++++++++++++++++ 3 files changed, 139 insertions(+), 3 deletions(-) create mode 100644 tests/test_FluxCSVParser.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 55090f0b..1b8a8568 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ ### Features 1. [#59](https://github.com/influxdata/influxdb-client-python/issues/59): Set User-Agent to influxdb-client-python/VERSION for all requests +### Bugs +1. [#61](https://github.com/influxdata/influxdb-client-python/issues/61): Correctly parse CSV where multiple results include multiple tables + ## 1.4.0 [2020-02-14] ### Features diff --git a/influxdb_client/client/flux_csv_parser.py b/influxdb_client/client/flux_csv_parser.py index 924121ad..c5eb617d 100644 --- a/influxdb_client/client/flux_csv_parser.py +++ b/influxdb_client/client/flux_csv_parser.py @@ -49,6 +49,7 @@ def generator(self): def _parse_flux_response(self): table_index = 0 + table_id = -1 start_new_table = False table = None parsing_state_error = False @@ -83,6 +84,7 @@ def _parse_flux_response(self): table = FluxTable() self._insert_table(table, table_index) table_index = table_index + 1 + table_id = -1 elif table is None: raise FluxCsvParserException("Unable to parse CSV response. FluxTable definition was not found.") @@ -111,15 +113,18 @@ def _parse_flux_response(self): continue # to int converions todo - current_index = int(csv[2]) + current_id = int(csv[2]) + if table_id == -1: + table_id = current_id - if current_index > (table_index - 1): + if table_id != current_id: # create new table with previous column headers settings flux_columns = table.columns table = FluxTable() table.columns.extend(flux_columns) self._insert_table(table, table_index) table_index = table_index + 1 + table_id = current_id flux_record = self.parse_record(table_index - 1, table, csv) @@ -217,4 +222,4 @@ def add_column_names_and_tags(table, csv): def _insert_table(self, table, table_index): if self._serialization_mode is FluxSerializationMode.tables: - self.tables.insert(table_index, table) + self.tables.insert(table_index, table) \ No newline at end of file diff --git a/tests/test_FluxCSVParser.py b/tests/test_FluxCSVParser.py new file mode 100644 index 00000000..86b66934 --- /dev/null +++ b/tests/test_FluxCSVParser.py @@ -0,0 +1,128 @@ +from io import BytesIO + +from urllib3 import HTTPResponse + +from influxdb_client.client.flux_csv_parser import FluxCsvParser, FluxSerializationMode +from tests.base_test import BaseTest + + +class FluxCsvParserTest(BaseTest): + + def test_one_table(self): + data = "#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,string,string,string,string,long,long,string\n" \ + "#group,false,false,true,true,true,true,true,true,false,false,false\n" \ + "#default,_result,,,,,,,,,,\n" \ + ",result,table,_start,_stop,_field,_measurement,host,region,_value2,value1,value_str\n" \ + ",,0,1677-09-21T00:12:43.145224192Z,2018-07-16T11:21:02.547596934Z,free,mem,A,west,121,11,test\n" + + tables = self._parse_to_tables(data=data) + self.assertEqual(1, tables.__len__()) + self.assertEqual(11, tables[0].columns.__len__()) + self.assertEqual(1, tables[0].records.__len__()) + + def test_more_tables(self): + data = "#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,string,string,string,string,long,long,string\n" \ + "#group,false,false,true,true,true,true,true,true,false,false,false\n" \ + "#default,_result,,,,,,,,,,\n" \ + ",result,table,_start,_stop,_field,_measurement,host,region,_value2,value1,value_str\n" \ + ",,0,1677-09-21T00:12:43.145224192Z,2018-07-16T11:21:02.547596934Z,free,mem,A,west,121,11,test\n" \ + ",,1,1677-09-21T00:12:43.145224192Z,2018-07-16T11:21:02.547596934Z,free,mem,B,west,484,22,test\n" \ + ",,2,1677-09-21T00:12:43.145224192Z,2018-07-16T11:21:02.547596934Z,usage_system,cpu,A,west,1444,38,test\n" \ + ",,3,1677-09-21T00:12:43.145224192Z,2018-07-16T11:21:02.547596934Z,user_usage,cpu,A,west,2401,49,test" + + tables = self._parse_to_tables(data=data) + self.assertEqual(4, tables.__len__()) + self.assertEqual(11, tables[0].columns.__len__()) + self.assertEqual(1, tables[0].records.__len__()) + self.assertEqual(11, tables[1].columns.__len__()) + self.assertEqual(1, tables[1].records.__len__()) + self.assertEqual(11, tables[2].columns.__len__()) + self.assertEqual(1, tables[2].records.__len__()) + self.assertEqual(11, tables[3].columns.__len__()) + self.assertEqual(1, tables[3].records.__len__()) + + def test_multiple_queries(self): + data = "#datatype,string,long,string,string,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string\n" \ + "#group,false,false,true,true,true,true,false,false,true\n" \ + "#default,t1,,,,,,,,\n" \ + ",result,table,_field,_measurement,_start,_stop,_time,_value,tag\n" \ + ",,0,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:20:00Z,2,test1\n" \ + ",,0,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:21:40Z,2,test1\n" \ + ",,0,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:23:20Z,2,test1\n" \ + ",,0,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:25:00Z,2,test1\n" \ + ",,0,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:26:40Z,2,test1\n" \ + ",,0,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:28:20Z,2,test1\n" \ + ",,0,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:30:00Z,2,test1\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:20:00Z,2,test2\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:21:40Z,2,test2\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:23:20Z,2,test2\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:25:00Z,2,test2\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:26:40Z,2,test2\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:28:20Z,2,test2\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:30:00Z,2,test2\n" \ + "\n" \ + "#datatype,string,long,string,string,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string\n" \ + "#group,false,false,true,true,true,true,false,false,true\n" \ + "#default,t2,,,,,,,,\n" \ + ",result,table,_field,_measurement,_start,_stop,_time,_value,tag\n" \ + ",,0,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:20:00Z,2,test1\n" \ + ",,0,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:21:40Z,2,test1\n" \ + ",,0,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:23:20Z,2,test1\n" \ + ",,0,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:25:00Z,2,test1\n" \ + ",,0,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:26:40Z,2,test1\n" \ + ",,0,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:28:20Z,2,test1\n" \ + ",,0,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:30:00Z,2,test1\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:20:00Z,2,test2\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:21:40Z,2,test2\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:23:20Z,2,test2\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:25:00Z,2,test2\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:26:40Z,2,test2\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:28:20Z,2,test2\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:30:00Z,2,test2" + + tables = self._parse_to_tables(data=data) + self.assertEqual(4, tables.__len__()) + self.assertEqual(9, tables[0].columns.__len__()) + self.assertEqual(7, tables[0].records.__len__()) + self.assertEqual(9, tables[1].columns.__len__()) + self.assertEqual(7, tables[1].records.__len__()) + self.assertEqual(9, tables[2].columns.__len__()) + self.assertEqual(7, tables[2].records.__len__()) + self.assertEqual(9, tables[3].columns.__len__()) + self.assertEqual(7, tables[3].records.__len__()) + + def test_table_index_not_start_at_zero(self): + data = "#datatype,string,long,string,string,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string\n" \ + "#group,false,false,true,true,true,true,false,false,true\n" \ + "#default,t1,,,,,,,,\n" \ + ",result,table,_field,_measurement,_start,_stop,_time,_value,tag\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:20:00Z,2,test1\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:21:40Z,2,test1\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:23:20Z,2,test1\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:25:00Z,2,test1\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:26:40Z,2,test1\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:28:20Z,2,test1\n" \ + ",,1,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:30:00Z,2,test1\n" \ + ",,2,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:20:00Z,2,test2\n" \ + ",,2,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:21:40Z,2,test2\n" \ + ",,2,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:23:20Z,2,test2\n" \ + ",,2,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:25:00Z,2,test2\n" \ + ",,2,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:26:40Z,2,test2\n" \ + ",,2,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:28:20Z,2,test2\n" \ + ",,2,value,python_client_test,2010-02-27T04:48:32.752600083Z,2020-02-27T16:48:32.752600083Z,2020-02-27T16:30:00Z,2,test2\n" + + tables = self._parse_to_tables(data=data) + self.assertEqual(2, tables.__len__()) + self.assertEqual(9, tables[0].columns.__len__()) + self.assertEqual(7, tables[0].records.__len__()) + self.assertEqual(9, tables[1].columns.__len__()) + self.assertEqual(7, tables[1].records.__len__()) + + @staticmethod + def _parse_to_tables(data: str): + fp = BytesIO(str.encode(data)) + _parser = FluxCsvParser(response=HTTPResponse(fp, preload_content=False), + serialization_mode=FluxSerializationMode.tables) + list(_parser.generator()) + tables = _parser.tables + return tables