-
Notifications
You must be signed in to change notification settings - Fork 183
Description
Steps to reproduce:
I basically created code that generates points based on a few fields and tags and I simulate the timestamps by adding the seconds feature parameter to datetime.delta(seconds=). These 480 points are then appended to a list and then written in batches of 1000 with all other parameters default. However the whole process of writing these 480 points takes 2min and 25secondswhich is extremely slow for quantity of points. I use the success_callback to confirm the batching and uploading of data but it seems to linearly increase in time based on the number of points. I'm not quite sure why this is occurring.
import random
import copy
import datetime
from influxdb_client import InfluxDBClient, Point, WriteOptions
from influxdb_client.client.exceptions import InfluxDBError
import time
def generate_list_dictionary():
# Just fields to modify the template
fields = ["Price", "Volume", "RSI"]
companies = ["AAPL", "NFLX", "MSFT", "GOOGL"]
template = {
"measurement": "macro",
"tags": {},
"fields": {}
}
list_points = []
for timestamp in range(120):
for company_name in companies:
for value in fields:
temp = template
temp["tags"]["stock"] = company_name
temp["fields"][value] = random.random()
# Generating simulated timestamps increasing with each outermost for loop
simulated_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=timestamp)
temp["time"] = simulated_time
dictionary_copy = copy.deepcopy(temp)
point = Point.from_dict(dictionary_copy, WritePrecision.MS) #Converting it to point
list_points.append(point)
return list_points
class BatchingCallback(object):
def success(self, conf: (str, str, str), data: str):
print(f"Written batch: {conf}, data: {data}")
def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
def write_data(url, token, bucket, org):
with InfluxDBClient(url=url, token=token, org=org) as client:
callback = BatchingCallback()
with client.write_api(write_options=WriteOptions(batch_size=1000),
success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry) as write_api:
data = generate_list_dictionary() # Generating the data
# Writing the data in batches
start = time.perf_counter()
write_api.write(bucket=bucket, org=org, record=data)
end = time.perf_counter()
print(f"Writing time is: {end-start}")
if __name__ == '__main__':
url = ""
token = ""
bucket = ""
org = ""
write_data(url, token, bucket, org)
Expected behavior:
I would expect the success_callback to print out the batches it has written to the database and upload these in a couple of seconds.
Actual behavior:
The success_callback does print out what I expect but the timing functionality is clearly wrong on batching, when I tested on SYNCHRONOUS with a simple write, it seemed to work fine:

But if I wait 30 seconds and perform a simple flux query in the UI/client there would still be a lot of data that hasn't been uploaded yet. As I mentioned earlier, I tested it out with 1000 points and it took 5min to upload the data which seems to be quite long.
I'm not quite sure if I'm misunderstanding the batching functionality or there is something else wrong with my code. But 2min seems an awful long amount of time for just 480 points. Any guidance would be greatly appreciated.
Specifications:
- Client Version: 2.0 - 1.3.0 build
- InfluxDB Version: 2.3.0
- Platform: Windows