Skip to content

Perf issues between HTTP vs FlightRPC v3 #26297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
jules-ch opened this issue Apr 20, 2025 · 6 comments
Open

Perf issues between HTTP vs FlightRPC v3 #26297

jules-ch opened this issue Apr 20, 2025 · 6 comments
Labels

Comments

@jules-ch
Copy link

jules-ch commented Apr 20, 2025

I'm still experiencing performance issues between calling CLI that calls HTTP API under the hood vs FlightRPC with pyarrow client.

I'm gonna try to create a minimal example for this.

I'm using the v3.0.0 build enterprise, with compaction enabled.

In the mean time here is the log calling the same SQL query:

Here using CLI: 32ms

influxdb3  | SELECT DISTINCT ON (tid) lat, lon, tid, battery, altitude, time
influxdb3  | FROM telemetry
influxdb3  | WHERE tid IN ('1', '2', '3', '4', '5', '6', '7', '8', '9', '10')
influxdb3  | ORDER BY tid, time DESC;
influxdb3  |  query_params=Params { } issue_time=2025-04-20T17:52:56.673480632+00:00 partitions=8 parquet_files=16 deduplicated_partitions=3 deduplicated_parquet_files=11 plan_duration_secs=0.031543458 permit_duration_secs=0.000114208 execute_duration_secs=0.000655875 end2end_duration_secs=0.032463916 compute_duration_secs=0.00020381 max_memory=0 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false

Using FLightRPC: 617ms

influxdb3  | SELECT DISTINCT ON (tid) lat, lon, tid, battery, altitude, time
influxdb3  | FROM telemetry
influxdb3  | WHERE tid IN ('1', '2', '3', '4', '5', '6', '7', '8', '9', '10')
influxdb3  | ORDER BY tid, time DESC;
influxdb3  |  query_params=Params { } issue_time=2025-04-20T17:53:48.931954711+00:00 partitions=8 parquet_files=16 deduplicated_partitions=3 deduplicated_parquet_files=11 plan_duration_secs=0.060829459 permit_duration_secs=0.001200208 execute_duration_secs=0.555666209 end2end_duration_secs=0.617759376 compute_duration_secs=0.095443782 max_memory=25699024 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false

Expected behaviour:
Same performance between the two paths

Actual behaviour:

Environment info:

  • Please provide the command you used to build the project, including any RUSTFLAGS.
  • System info: Darwin 24.3.0 arm64
  • I'm running influxdb3 using docker with a volume, object store is setup as file
  • Other relevant environment details: disk info, hardware setup etc.

Config:
- INFLUXDB3_OBJECT_STORE=file
- INFLUXDB3_DB_DIR=/var/lib/influxdb3
- INFLUXDB3_ENTERPRISE_LICENSE_EMAIL=
- INFLUXDB3_ENTERPRISE_MODE=all
- INFLUXDB3_HTTP_BIND_ADDR=0.0.0.0:8181
- INFLUXDB3_MAX_HTTP_REQUEST_SIZE=20971520
- LOG_FILTER=info
- INFLUXDB3_WAL_FLUSH_INTERVAL=1000ms
- INFLUXDB3_NODE_IDENTIFIER_PREFIX=node-1
- INFLUXDB3_ENTERPRISE_CLUSTER_ID=cluster-1

Logs:
Include snippet of errors in logs or stack traces here.
Sometimes you can get useful information by running the program with the RUST_BACKTRACE=full environment variable.
Finally, the IOx server has a -vv for verbose logging.

@jules-ch jules-ch changed the title Perf issues between HTTP vs FlightRPC Perf issues between HTTP vs FlightRPC v3 Apr 20, 2025
@jules-ch
Copy link
Author

log_http.txt

log_rpc.txt

@hiltontj hiltontj added the v3 label Apr 21, 2025
@mgattozzi
Copy link
Contributor

Hey @jules-ch would you happen to have a minimum repro of how you're using the pyarrow client? Even just a small python script to execute would help. That way we could make an equivalent Rust one to help rule out some possible perf bottlenecks

@jules-ch
Copy link
Author

Yeah I'll try to have one soon.
Is it possible to call the compactor manually or would past data spread on multiple periods would be compacted to marquet, I'm asking just for reproductibility

@jules-ch
Copy link
Author

jules-ch commented Apr 27, 2025

Ok here is a Minimal example:

services:
  influxdb3:
    image: quay.io/influxdb/influxdb3-enterprise:5aed0d9d2cb1cba948f77869ff506230b2d0d668
    container_name: influxdb3
    volumes:
      - ./influxdb3/:/var/lib/influxdb3/
    environment:
      - INFLUXDB3_OBJECT_STORE=file
      - INFLUXDB3_DB_DIR=/var/lib/influxdb3
      - INFLUXDB3_ENTERPRISE_LICENSE_EMAIL=<YOUREMAIL>
      - INFLUXDB3_ENTERPRISE_MODE=all
      - INFLUXDB3_HTTP_BIND_ADDR=0.0.0.0:8181
      - INFLUXDB3_MAX_HTTP_REQUEST_SIZE=20971520
      - LOG_FILTER=info
      - INFLUXDB3_WAL_FLUSH_INTERVAL=1000ms
      - INFLUXDB3_NODE_IDENTIFIER_PREFIX=node-1
      - INFLUXDB3_ENTERPRISE_CLUSTER_ID=cluster-1
      - INFLUXDB3_WAL_SNAPSHOT_SIZE=10
    expose:
      - 8181
    ports:
      - 8181:8181
    healthcheck:
      test: ["CMD-SHELL", "bash -c ':> /dev/tcp/127.0.0.1/8181' || exit 1"]
      interval: 5s
      retries: 5
      start_period: 30s
      timeout: 10s

I reduced WAL_SNAPSHOT_SIZE to trigger compaction when loading data with the script.

Load data 1000 devices with 120 minutes of data, sample rate is every seconds.

from datetime import datetime, timedelta

import numpy as np
from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions, write_client_options

TOKEN = "<INFLUXDB_TOKEN>"
DATABASE = "location"

tids = np.arange(0, 1000)


def generate_points():
    end_time = datetime.now()
    time_span = timedelta(minutes=120)
    start_time = end_time - time_span
    times = np.arange(
        int(start_time.timestamp() * 1e9), int(end_time.timestamp() * 1e9), int(1e9)
    )
    n_points = int(time_span.total_seconds())

    for tid in tids:
        lats = np.random.uniform(-90.0, 90.0, (n_points,))
        lons = np.random.uniform(-180, 180, (n_points,))
        for lat, lon, time_ in zip(lats, lons, times, strict=True):
            point = (
                Point("telemetry")
                .tag("tid", tid)
                .field("lat", lat)
                .field("lon", lon)
                .time(time_)
            )
            yield point


BATCH_SIZE = 50000


class BatchingCallback:
    def __init__(self):
        self.write_count = 0
        self.total = 0

    def success(self, conf, data: str):
        self.write_count += 1
        count = len(data.splitlines())
        self.total += count
        print(f"Written batch: {conf}, data: {len(data.splitlines())} items")
        print(f"Written {self.total} total")


callback = BatchingCallback()

write_options = WriteOptions(
    batch_size=BATCH_SIZE,
    flush_interval=10_000,
    jitter_interval=2_000,
    retry_interval=5_000,
    max_retries=5,
    max_retry_delay=30_000,
    exponential_base=2,
)
wco = write_client_options(
    success_callback=callback.success,
    write_options=write_options,
)
with InfluxDBClient3(
    host="http://127.0.0.1:8181",
    database=DATABASE,
    token=TOKEN,
    write_client_options=wco,
) as influxdb3_client:
    influxdb3_client.write(generate_points())

Then query with both FlightRPC and V3 http calls, I'm using aiohttp here not to wait on response calls but influxdb server logs are the way to truly compare end2end_duration.

Use docker compose logs influxdb3 -n0 -f | grep end2end_duration to compare durations between calls.

import asyncio
import json
import time

import aiohttp
from pyarrow.flight import FlightCallOptions, FlightClient, Ticket

TOKEN = "<INFLUXDB_TOKEN>"
flight_client = FlightClient(location="grpc+tcp://127.0.0.1:8181")

token = (b"authorization", f"Bearer {TOKEN}".encode("utf-8"))

query_1 = """
--sql
SELECT DISTINCT ON (tid) lat, lon, tid, time
FROM telemetry
WHERE tid IN ('1', '2', '3', '4', '5', '6', '7', '8', '9', '10')
ORDER BY tid ASC, time DESC;
"""
query_2 = """
--sql
SELECT lat, lon, tid, time
FROM telemetry
ORDER BY time DESC
LIMIT 200
"""
query_3 = """
--sql
SELECT AVG(lat)
FROM telemetry
GROUP BY tid
"""

query_4 = """
--sql
SELECT
  DATE_BIN(INTERVAL '10 minutes', time) AS time,
  tid,
  selector_max(lat, time)['value'] AS 'max lat',
  selector_min(lat, time)['value'] AS 'min lat',
  avg(lat) AS 'average lat'
FROM telemetry
GROUP BY 1, tid
ORDER BY tid, 1
"""


def flight_rpc_query(query_sql: str, flight_client: FlightClient):
    ticket_data = {
        "database": "location",
        "sql_query": query_sql,
        "query_type": "sql",
    }

    ticket = Ticket(json.dumps(ticket_data).encode("utf-8"))
    options = FlightCallOptions(timeout=10, headers=[token])

    start_time = time.perf_counter()
    reader = flight_client.do_get(ticket, options)
    table = reader.read_all()
    print(time.perf_counter() - start_time, "s")


async def v3_http_call(query: str):
    influxdb_url = "http://127.0.0.1:8181"
    headers = {
        "Authorization": f"Bearer {TOKEN}",
        "accept": "application/json",
        "Content-Type": "application/json",
    }
    content = json.dumps(
        {
            "db": "location",
            "q": query,
            "format": "json",
        },
    )
    async with aiohttp.ClientSession(headers=headers) as session:
        start_time = time.perf_counter()
        async with session.post(
            influxdb_url + "/api/v3/query_sql", data=content
        ) as response:
            print("Status:", response.status)
            print(time.perf_counter() - start_time, "s")
            response = await response.json()


queries = [query_1, query_2, query_3, query_4]

for query in queries:
    print("\nSQL query: ", query)
    print("Calling FlightRPC\n-----------------")
    flight_rpc_query(query, flight_client)
    print("Calling HTTP V3\n-----------------")
    asyncio.run(v3_http_call(query))

@jules-ch
Copy link
Author

jules-ch commented Apr 27, 2025



SQL query:  
--sql
SELECT DISTINCT ON (tid) lat, lon, tid, time
FROM telemetry
WHERE tid IN ('1', '2', '3', '4', '5', '6', '7', '8', '9', '10')
ORDER BY tid ASC, time DESC;

Calling FlightRPC
-----------------
0.05812533299831557 s
Calling HTTP V3
-----------------
Status: 200
0.030552708001778228 s

SQL query:  
--sql
SELECT lat, lon, tid, time
FROM telemetry
ORDER BY time DESC
LIMIT 200

Calling FlightRPC
-----------------
1.0316940000011527 s
Calling HTTP V3
-----------------
Status: 200
0.010956583999359282 s

SQL query:  
--sql
SELECT AVG(lat)
FROM telemetry
GROUP BY tid

Calling FlightRPC
-----------------
0.9687469999989844 s
Calling HTTP V3
-----------------
Status: 200
0.0137646670009417 s

SQL query:  
--sql
SELECT
  DATE_BIN(INTERVAL '10 minutes', time) AS time,
  tid,
  selector_max(lat, time)['value'] AS 'max lat',
  selector_min(lat, time)['value'] AS 'min lat',
  avg(lat) AS 'average lat'
FROM telemetry
GROUP BY 1, tid
ORDER BY tid, 1

Calling FlightRPC
-----------------
1.0136529580013303 s
Calling HTTP V3
-----------------
Status: 200
0.010635374997946201 s
influxdb3  |  query_params=Params { } issue_time=2025-04-27T22:09:23.012372595+00:00 partitions=8 parquet_files=30 deduplicated_partitions=6 deduplicated_parquet_files=28 plan_duration_secs=0.011326125 permit_duration_secs=0.000628417 execute_duration_secs=0.040893375 end2end_duration_secs=0.05298875 compute_duration_secs=0.050799921 max_memory=42873040 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false
influxdb3  |  query_params=Params { } issue_time=2025-04-27T22:09:23.067608428+00:00 partitions=8 parquet_files=30 deduplicated_partitions=6 deduplicated_parquet_files=28 plan_duration_secs=0.008610792 permit_duration_secs=6.3917e-5 execute_duration_secs=0.000350458 end2end_duration_secs=0.009155084 compute_duration_secs=0.000436696 max_memory=0 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false
influxdb3  |  query_params=Params { } issue_time=2025-04-27T22:09:23.126399512+00:00 partitions=11 parquet_files=35 deduplicated_partitions=11 deduplicated_parquet_files=35 plan_duration_secs=0.006819791 permit_duration_secs=0.000935 execute_duration_secs=1.022507751 end2end_duration_secs=1.030344 compute_duration_secs=3.243217819 max_memory=1409936128 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false
influxdb3  |  query_params=Params { } issue_time=2025-04-27T22:09:24.158995429+00:00 partitions=11 parquet_files=35 deduplicated_partitions=11 deduplicated_parquet_files=35 plan_duration_secs=0.007156542 permit_duration_secs=5.7166e-5 execute_duration_secs=0.001554 end2end_duration_secs=0.008937625 compute_duration_secs=0.001594963 max_memory=2972808 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false
influxdb3  |  query_params=Params { } issue_time=2025-04-27T22:09:25.166081888+00:00 partitions=11 parquet_files=35 deduplicated_partitions=11 deduplicated_parquet_files=35 plan_duration_secs=0.006815625 permit_duration_secs=0.000289875 execute_duration_secs=0.960482625 end2end_duration_secs=0.967681 compute_duration_secs=3.563979224 max_memory=1256736474 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false
influxdb3  |  query_params=Params { } issue_time=2025-04-27T22:09:26.135655222+00:00 partitions=11 parquet_files=35 deduplicated_partitions=11 deduplicated_parquet_files=35 plan_duration_secs=0.0061555 permit_duration_secs=5.5875e-5 execute_duration_secs=0.000161916 end2end_duration_secs=0.006390375 compute_duration_secs=2.3629e-5 max_memory=0 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false
influxdb3  |  query_params=Params { } issue_time=2025-04-27T22:09:26.933600930+00:00 partitions=11 parquet_files=35 deduplicated_partitions=11 deduplicated_parquet_files=35 plan_duration_secs=0.008665917 permit_duration_secs=0.000477208 execute_duration_secs=0.993381945 end2end_duration_secs=1.002627861 compute_duration_secs=3.706687025 max_memory=1263053695 ingester_metrics=IngesterMetrics { latency_to_plan = 0ns, latency_to_full_data = 0ns, response_rows = 0, partition_count = 0, response_size = 0 } success=true running=false cancelled=false
influxdb3  |  query_params=Params { } issue_time=2025-04-27T22:09:27.940562291+00:00 partitions=11 parquet_files=35 deduplicated_partitions=11 deduplicated_parquet_files=35 plan_duration_secs=0.008619459 permit_duration_secs=4.7041e-5 execute_duration_secs=0.000263667 end2end_duration_secs=0.008

@jules-ch
Copy link
Author

Especially on aggregate query, performance is 100x worse when using Flight

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants