Skip to content

Concurrent Dataset::open against the same URI rebuilds ObjectStore N times under a shared Session #6838

@LuciferYang

Description

@LuciferYang

Repro

A standalone reproducer (drop into rust/lance/examples/concurrent_open_bench.rs, source inlined below) drives 144 concurrent DatasetBuilder::from_uri(uri).with_session(s).load() against an S3-backed dataset, semaphore-gated to 12 in flight:

BENCH_URI=s3://bucket/foo.lance \
BENCH_SHARED_SESSION=1 \
BENCH_CONCURRENCY=12 BENCH_TOTAL=144 \
BENCH_STORAGE_OPTS='aws_endpoint=...,aws_access_key_id=...,...' \
cargo run --release --example concurrent_open_bench

Observed on ad9f38227:

wall p50 registry hits/misses
shared Session, 144 opens 9.50 s 784 ms 0 / 144

Every concurrent open rebuilds the credential chain + HTTP client.

Cause

ObjectStoreRegistry caches Weak<Arc<ObjectStore>>. The dedup is post-build: a Weak is only inserted after build() returns, so N tasks racing the same key before the first builder finishes all fall through to a fresh build.

Fix

Per-key build-lock in get_store so concurrent cold builds for the same CacheKey serialize behind one in-flight build(). After the fix, the same bench reports hits=143 / misses=1, wall 9.50 s → 0.18 s (~53× on this workload).

PR follows.

concurrent_open_bench.rs (drop into rust/lance/examples/)
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

//! Concurrent `Dataset::open` reproducer
//!
//! Reproduces the JNI default-open path bottleneck: when many threads call
//! `Dataset::open(uri)` concurrently with no shared `Session`, every call
//! constructs a fresh `ObjectStoreRegistry` and a cold object-store client,
//! which dominates wall time even when most opens hit the same URI.
//!
//! Usage:
//! ```bash
//! BENCH_URI=s3://bucket/foo.lance \
//!   BENCH_CONCURRENCY=12 BENCH_TOTAL=144 \
//!   BENCH_STORAGE_OPTS='aws_endpoint=http://localhost:9000,aws_access_key_id=...' \
//!   cargo run --release --example concurrent_open_bench
//! ```
//!
//! Set `BENCH_SHARED_SESSION=1` to thread one `Arc<Session>` through every
//! `DatasetBuilder` — i.e. the candidate fix — and compare wall time + the
//! registry's hit/miss counters against the baseline.

#![allow(clippy::print_stdout)]

use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};

use lance::dataset::builder::DatasetBuilder;
use lance::session::Session;
use tokio::sync::Semaphore;

fn parse_storage_opts(s: &str) -> HashMap<String, String> {
    s.split(',')
        .filter_map(|kv| {
            let kv = kv.trim();
            if kv.is_empty() {
                return None;
            }
            let mut it = kv.splitn(2, '=');
            Some((it.next()?.trim().to_string(), it.next()?.trim().to_string()))
        })
        .collect()
}

fn percentile(latencies: &[Duration], p: f64) -> Duration {
    if latencies.is_empty() {
        return Duration::ZERO;
    }
    let mut sorted: Vec<_> = latencies.to_vec();
    sorted.sort();
    let idx = ((p / 100.0) * (sorted.len() - 1) as f64).round() as usize;
    sorted[idx]
}

fn env_usize(key: &str, default: usize) -> usize {
    std::env::var(key)
        .ok()
        .and_then(|s| s.parse().ok())
        .unwrap_or(default)
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let uri = std::env::var("BENCH_URI").map_err(|_| "BENCH_URI must be set")?;
    let concurrency = env_usize("BENCH_CONCURRENCY", 12);
    let total = env_usize("BENCH_TOTAL", 144);
    let storage_opts = std::env::var("BENCH_STORAGE_OPTS")
        .map(|s| parse_storage_opts(&s))
        .unwrap_or_default();
    let shared_session = std::env::var("BENCH_SHARED_SESSION").is_ok();
    let warm = std::env::var("BENCH_WARM").is_ok();

    println!("=== concurrent_open_bench ===");
    println!("URI               = {}", uri);
    println!("concurrency       = {}", concurrency);
    println!("total opens       = {}", total);
    println!(
        "storage_opts keys = {:?}",
        storage_opts.keys().collect::<Vec<_>>()
    );
    println!("shared_session    = {}", shared_session);
    println!("warm (1 prep open)= {}", warm);
    println!();

    let shared = if shared_session {
        Some(Arc::new(Session::default()))
    } else {
        None
    };

    // Hold the warm dataset alive so its Arc<ObjectStore> keeps the registry's
    // weak entry upgradeable for the entire concurrent run — otherwise the first
    // open's strong Arc dies before the next acquires the permit.
    let _warm_dataset_keepalive = if warm {
        let mut b = DatasetBuilder::from_uri(&uri).with_storage_options(storage_opts.clone());
        if let Some(s) = shared.as_ref() {
            b = b.with_session(s.clone());
        }
        let t0 = Instant::now();
        let ds = b.load().await?;
        println!("warmup open = {:?}", t0.elapsed());
        println!();
        Some(ds)
    } else {
        None
    };

    let semaphore = Arc::new(Semaphore::new(concurrency));
    let mut handles = Vec::with_capacity(total);
    let wall_start = Instant::now();

    for i in 0..total {
        let uri = uri.clone();
        let storage_opts = storage_opts.clone();
        let semaphore = semaphore.clone();
        let shared = shared.clone();
        handles.push(tokio::spawn(async move {
            let _permit = semaphore.acquire_owned().await.unwrap();
            let t0 = Instant::now();
            let mut builder = DatasetBuilder::from_uri(&uri).with_storage_options(storage_opts);
            if let Some(s) = shared {
                builder = builder.with_session(s);
            }
            let res = builder.load().await;
            let elapsed = t0.elapsed();
            (i, res.is_ok(), elapsed, res.err().map(|e| e.to_string()))
        }));
    }

    let mut latencies = Vec::with_capacity(total);
    let mut errors: Vec<(usize, String)> = Vec::new();
    for h in handles {
        let (i, ok, elapsed, err) = h.await?;
        latencies.push(elapsed);
        if !ok {
            errors.push((i, err.unwrap_or_default()));
        }
    }
    let wall = wall_start.elapsed();

    println!("--- results ---");
    println!("wall            = {:?}", wall);
    println!("succeeded       = {} / {}", total - errors.len(), total);
    println!("p50 per-open    = {:?}", percentile(&latencies, 50.0));
    println!("p95 per-open    = {:?}", percentile(&latencies, 95.0));
    println!("p99 per-open    = {:?}", percentile(&latencies, 99.0));
    println!(
        "max per-open    = {:?}",
        latencies.iter().max().copied().unwrap_or_default()
    );

    if let Some(s) = shared {
        let stats = s.store_registry().stats();
        println!("registry hits   = {}", stats.hits);
        println!("registry misses = {}", stats.misses);
        println!("registry active = {}", stats.active_stores);
    } else {
        println!("(no shared session — registry stats unavailable)");
    }

    if !errors.is_empty() {
        println!();
        println!("--- first 5 errors ---");
        for (i, e) in errors.iter().take(5) {
            println!("  task {}: {}", i, e);
        }
    }
    Ok(())
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions