From e32c1e725ed9177f61c30022d526cb4df528e5cb Mon Sep 17 00:00:00 2001 From: Arnaud Gourlay Date: Tue, 12 May 2026 13:06:01 +0200 Subject: [PATCH 1/2] Wire timeout flag to requests --- src/args.rs | 2 +- src/query.rs | 14 +++++++------- src/scroll.rs | 4 ++++ src/search.rs | 10 ++++++++-- src/upsert.rs | 6 ++++++ 5 files changed, 26 insertions(+), 10 deletions(-) diff --git a/src/args.rs b/src/args.rs index 87f31e9..c664bf7 100644 --- a/src/args.rs +++ b/src/args.rs @@ -310,7 +310,7 @@ pub struct Args { #[clap(long)] pub read_consistency: Option, - /// Timeout for requests in seconds + /// Timeout for requests in seconds (applied as both the client channel deadline and the server-side request timeout where supported). #[clap(long, value_parser = parse_number)] pub timeout: Option, diff --git a/src/query.rs b/src/query.rs index 15fa962..e2b7699 100644 --- a/src/query.rs +++ b/src/query.rs @@ -42,13 +42,13 @@ async fn get_uuids(args: &Args, client: &Qdrant) -> Result> { } // Retrieve existing UUIDs - let res = client - .scroll( - ScrollPointsBuilder::new(&args.collection_name) - .with_payload(true) - .limit(args.num_vectors as u32), - ) - .await?; + let mut scroll_builder = ScrollPointsBuilder::new(&args.collection_name) + .with_payload(true) + .limit(args.num_vectors as u32); + if let Some(timeout) = args.timeout { + scroll_builder = scroll_builder.timeout(timeout as u64); + } + let res = client.scroll(scroll_builder).await?; let uuids: Vec<_> = res .result .iter() diff --git a/src/scroll.rs b/src/scroll.rs index 576f174..0fe2c24 100644 --- a/src/scroll.rs +++ b/src/scroll.rs @@ -91,6 +91,10 @@ impl ScrollProcessor { request_builder = request_builder.read_consistency(read_consistency); } + if let Some(timeout) = self.args.timeout { + request_builder = request_builder.timeout(timeout as u64); + } + let request = request_builder.build(); let res = retry_with_clients(&self.clients, args, |client| client.scroll(request.clone())) .await?; diff --git a/src/search.rs b/src/search.rs index 9e1ea3b..3ec59ca 100644 --- a/src/search.rs +++ b/src/search.rs @@ -256,8 +256,11 @@ impl SearchProcessor { (None, query_points) }; - let batch_request_builder = + let mut batch_request_builder = QueryBatchPointsBuilder::new(self.args.collection_name.clone(), batch_query_points); + if let Some(timeout) = self.args.timeout { + batch_request_builder = batch_request_builder.timeout(timeout as u64); + } let request = batch_request_builder.build(); let res_batch = retry_with_clients(&self.clients, args, |client| { @@ -315,8 +318,11 @@ impl SearchProcessor { for point in &mut exact_query_points { point.params = Some(exact_search); } - let exact_request_builder = + let mut exact_request_builder = QueryBatchPointsBuilder::new(self.args.collection_name.clone(), exact_query_points); + if let Some(timeout) = self.args.timeout { + exact_request_builder = exact_request_builder.timeout(timeout as u64); + } let exact_request = exact_request_builder.build(); let exact_res = retry_with_clients(&self.clients, args, |client| { diff --git a/src/upsert.rs b/src/upsert.rs index 37e66bc..6ed608c 100644 --- a/src/upsert.rs +++ b/src/upsert.rs @@ -182,6 +182,9 @@ impl UpsertProcessor { if let Some(shard_key) = &args.shard_key { request = request.shard_key_selector(vec![Key::Keyword(shard_key.to_string())]); } + if let Some(timeout) = self.args.timeout { + request = request.timeout(timeout as u64); + } let request = request.build(); let res = retry_with_clients(&self.clients, args, |client| { @@ -209,6 +212,9 @@ impl UpsertProcessor { if let Some(ordering) = self.args.write_ordering { request_builder = request_builder.ordering(ordering); } + if let Some(timeout) = self.args.timeout { + request_builder = request_builder.timeout(timeout as u64); + } let request = request_builder.build(); From c62cefb31bf16e3e0443c211b4de61caa4bd0c9e Mon Sep 17 00:00:00 2001 From: Arnaud Gourlay Date: Tue, 12 May 2026 13:19:49 +0200 Subject: [PATCH 2/2] fix the race over the transport deadline --- src/client.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index 1f7662b..db5c4af 100644 --- a/src/client.rs +++ b/src/client.rs @@ -24,8 +24,11 @@ pub fn get_config(args: &Args) -> Vec { let api_key = std::env::var("QDRANT_API_KEY").ok(); if let Some(timeout) = args.timeout { - config.set_timeout(Duration::from_secs(timeout as u64)); - config.set_connect_timeout(Duration::from_secs(timeout as u64)); + // Give the channel a small cushion over the server-side request timeout + // so the server's structured timeout error wins the race over a transport deadline. + let channel_timeout = Duration::from_secs(timeout as u64 + 5); + config.set_timeout(channel_timeout); + config.set_connect_timeout(channel_timeout); } if let Some(api_key) = api_key {