-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Perf issues between HTTP vs FlightRPC v3 #26297
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Hey @jules-ch would you happen to have a minimum repro of how you're using the pyarrow client? Even just a small python script to execute would help. That way we could make an equivalent Rust one to help rule out some possible perf bottlenecks |
Yeah I'll try to have one soon. |
Ok here is a Minimal example: services:
influxdb3:
image: quay.io/influxdb/influxdb3-enterprise:5aed0d9d2cb1cba948f77869ff506230b2d0d668
container_name: influxdb3
volumes:
- ./influxdb3/:/var/lib/influxdb3/
environment:
- INFLUXDB3_OBJECT_STORE=file
- INFLUXDB3_DB_DIR=/var/lib/influxdb3
- INFLUXDB3_ENTERPRISE_LICENSE_EMAIL=<YOUREMAIL>
- INFLUXDB3_ENTERPRISE_MODE=all
- INFLUXDB3_HTTP_BIND_ADDR=0.0.0.0:8181
- INFLUXDB3_MAX_HTTP_REQUEST_SIZE=20971520
- LOG_FILTER=info
- INFLUXDB3_WAL_FLUSH_INTERVAL=1000ms
- INFLUXDB3_NODE_IDENTIFIER_PREFIX=node-1
- INFLUXDB3_ENTERPRISE_CLUSTER_ID=cluster-1
- INFLUXDB3_WAL_SNAPSHOT_SIZE=10
expose:
- 8181
ports:
- 8181:8181
healthcheck:
test: ["CMD-SHELL", "bash -c ':> /dev/tcp/127.0.0.1/8181' || exit 1"]
interval: 5s
retries: 5
start_period: 30s
timeout: 10s I reduced Load data 1000 devices with 120 minutes of data, sample rate is every seconds. from datetime import datetime, timedelta
import numpy as np
from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions, write_client_options
TOKEN = "<INFLUXDB_TOKEN>"
DATABASE = "location"
tids = np.arange(0, 1000)
def generate_points():
end_time = datetime.now()
time_span = timedelta(minutes=120)
start_time = end_time - time_span
times = np.arange(
int(start_time.timestamp() * 1e9), int(end_time.timestamp() * 1e9), int(1e9)
)
n_points = int(time_span.total_seconds())
for tid in tids:
lats = np.random.uniform(-90.0, 90.0, (n_points,))
lons = np.random.uniform(-180, 180, (n_points,))
for lat, lon, time_ in zip(lats, lons, times, strict=True):
point = (
Point("telemetry")
.tag("tid", tid)
.field("lat", lat)
.field("lon", lon)
.time(time_)
)
yield point
BATCH_SIZE = 50000
class BatchingCallback:
def __init__(self):
self.write_count = 0
self.total = 0
def success(self, conf, data: str):
self.write_count += 1
count = len(data.splitlines())
self.total += count
print(f"Written batch: {conf}, data: {len(data.splitlines())} items")
print(f"Written {self.total} total")
callback = BatchingCallback()
write_options = WriteOptions(
batch_size=BATCH_SIZE,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
exponential_base=2,
)
wco = write_client_options(
success_callback=callback.success,
write_options=write_options,
)
with InfluxDBClient3(
host="http://127.0.0.1:8181",
database=DATABASE,
token=TOKEN,
write_client_options=wco,
) as influxdb3_client:
influxdb3_client.write(generate_points()) Then query with both FlightRPC and V3 http calls, I'm using aiohttp here not to wait on response calls but influxdb server logs are the way to truly compare Use import asyncio
import json
import time
import aiohttp
from pyarrow.flight import FlightCallOptions, FlightClient, Ticket
TOKEN = "<INFLUXDB_TOKEN>"
flight_client = FlightClient(location="grpc+tcp://127.0.0.1:8181")
token = (b"authorization", f"Bearer {TOKEN}".encode("utf-8"))
query_1 = """
--sql
SELECT DISTINCT ON (tid) lat, lon, tid, time
FROM telemetry
WHERE tid IN ('1', '2', '3', '4', '5', '6', '7', '8', '9', '10')
ORDER BY tid ASC, time DESC;
"""
query_2 = """
--sql
SELECT lat, lon, tid, time
FROM telemetry
ORDER BY time DESC
LIMIT 200
"""
query_3 = """
--sql
SELECT AVG(lat)
FROM telemetry
GROUP BY tid
"""
query_4 = """
--sql
SELECT
DATE_BIN(INTERVAL '10 minutes', time) AS time,
tid,
selector_max(lat, time)['value'] AS 'max lat',
selector_min(lat, time)['value'] AS 'min lat',
avg(lat) AS 'average lat'
FROM telemetry
GROUP BY 1, tid
ORDER BY tid, 1
"""
def flight_rpc_query(query_sql: str, flight_client: FlightClient):
ticket_data = {
"database": "location",
"sql_query": query_sql,
"query_type": "sql",
}
ticket = Ticket(json.dumps(ticket_data).encode("utf-8"))
options = FlightCallOptions(timeout=10, headers=[token])
start_time = time.perf_counter()
reader = flight_client.do_get(ticket, options)
table = reader.read_all()
print(time.perf_counter() - start_time, "s")
async def v3_http_call(query: str):
influxdb_url = "http://127.0.0.1:8181"
headers = {
"Authorization": f"Bearer {TOKEN}",
"accept": "application/json",
"Content-Type": "application/json",
}
content = json.dumps(
{
"db": "location",
"q": query,
"format": "json",
},
)
async with aiohttp.ClientSession(headers=headers) as session:
start_time = time.perf_counter()
async with session.post(
influxdb_url + "/api/v3/query_sql", data=content
) as response:
print("Status:", response.status)
print(time.perf_counter() - start_time, "s")
response = await response.json()
queries = [query_1, query_2, query_3, query_4]
for query in queries:
print("\nSQL query: ", query)
print("Calling FlightRPC\n-----------------")
flight_rpc_query(query, flight_client)
print("Calling HTTP V3\n-----------------")
asyncio.run(v3_http_call(query)) |
|
Especially on aggregate query, performance is 100x worse when using Flight |
Uh oh!
There was an error while loading. Please reload this page.
I'm still experiencing performance issues between calling CLI that calls HTTP API under the hood vs FlightRPC with pyarrow client.
I'm gonna try to create a minimal example for this.
I'm using the v3.0.0 build enterprise, with compaction enabled.
In the mean time here is the log calling the same SQL query:
Here using CLI: 32ms
Using FLightRPC: 617ms
Expected behaviour:
Same performance between the two paths
Actual behaviour:
Environment info:
RUSTFLAGS
.file
Config:
- INFLUXDB3_OBJECT_STORE=file
- INFLUXDB3_DB_DIR=/var/lib/influxdb3
- INFLUXDB3_ENTERPRISE_LICENSE_EMAIL=
- INFLUXDB3_ENTERPRISE_MODE=all
- INFLUXDB3_HTTP_BIND_ADDR=0.0.0.0:8181
- INFLUXDB3_MAX_HTTP_REQUEST_SIZE=20971520
- LOG_FILTER=info
- INFLUXDB3_WAL_FLUSH_INTERVAL=1000ms
- INFLUXDB3_NODE_IDENTIFIER_PREFIX=node-1
- INFLUXDB3_ENTERPRISE_CLUSTER_ID=cluster-1
Logs:
Include snippet of errors in logs or stack traces here.
Sometimes you can get useful information by running the program with the
RUST_BACKTRACE=full
environment variable.Finally, the IOx server has a
-vv
for verbose logging.The text was updated successfully, but these errors were encountered: