Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ pub struct Args {
#[clap(long)]
pub read_consistency: Option<ReadConsistency>,

/// Timeout for requests in seconds
/// Timeout for requests in seconds (applied as both the client channel deadline and the server-side request timeout where supported).
#[clap(long, value_parser = parse_number)]
pub timeout: Option<usize>,

Expand Down
7 changes: 5 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ pub fn get_config(args: &Args) -> Vec<QdrantConfig> {
let api_key = std::env::var("QDRANT_API_KEY").ok();

if let Some(timeout) = args.timeout {
config.set_timeout(Duration::from_secs(timeout as u64));
config.set_connect_timeout(Duration::from_secs(timeout as u64));
// Give the channel a small cushion over the server-side request timeout
// so the server's structured timeout error wins the race over a transport deadline.
let channel_timeout = Duration::from_secs(timeout as u64 + 5);
config.set_timeout(channel_timeout);
config.set_connect_timeout(channel_timeout);
}

if let Some(api_key) = api_key {
Expand Down
14 changes: 7 additions & 7 deletions src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ async fn get_uuids(args: &Args, client: &Qdrant) -> Result<Vec<String>> {
}

// Retrieve existing UUIDs
let res = client
.scroll(
ScrollPointsBuilder::new(&args.collection_name)
.with_payload(true)
.limit(args.num_vectors as u32),
)
.await?;
let mut scroll_builder = ScrollPointsBuilder::new(&args.collection_name)
.with_payload(true)
.limit(args.num_vectors as u32);
if let Some(timeout) = args.timeout {
scroll_builder = scroll_builder.timeout(timeout as u64);
}
let res = client.scroll(scroll_builder).await?;
let uuids: Vec<_> = res
.result
.iter()
Expand Down
4 changes: 4 additions & 0 deletions src/scroll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ impl ScrollProcessor {
request_builder = request_builder.read_consistency(read_consistency);
}

if let Some(timeout) = self.args.timeout {
request_builder = request_builder.timeout(timeout as u64);
}

let request = request_builder.build();
let res = retry_with_clients(&self.clients, args, |client| client.scroll(request.clone()))
.await?;
Expand Down
10 changes: 8 additions & 2 deletions src/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,11 @@ impl SearchProcessor {
(None, query_points)
};

let batch_request_builder =
let mut batch_request_builder =
QueryBatchPointsBuilder::new(self.args.collection_name.clone(), batch_query_points);
if let Some(timeout) = self.args.timeout {
batch_request_builder = batch_request_builder.timeout(timeout as u64);
}

let request = batch_request_builder.build();
let res_batch = retry_with_clients(&self.clients, args, |client| {
Expand Down Expand Up @@ -315,8 +318,11 @@ impl SearchProcessor {
for point in &mut exact_query_points {
point.params = Some(exact_search);
}
let exact_request_builder =
let mut exact_request_builder =
QueryBatchPointsBuilder::new(self.args.collection_name.clone(), exact_query_points);
if let Some(timeout) = self.args.timeout {
exact_request_builder = exact_request_builder.timeout(timeout as u64);
}
let exact_request = exact_request_builder.build();

let exact_res = retry_with_clients(&self.clients, args, |client| {
Expand Down
6 changes: 6 additions & 0 deletions src/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ impl UpsertProcessor {
if let Some(shard_key) = &args.shard_key {
request = request.shard_key_selector(vec![Key::Keyword(shard_key.to_string())]);
}
if let Some(timeout) = self.args.timeout {
request = request.timeout(timeout as u64);
}

let request = request.build();
let res = retry_with_clients(&self.clients, args, |client| {
Expand Down Expand Up @@ -209,6 +212,9 @@ impl UpsertProcessor {
if let Some(ordering) = self.args.write_ordering {
request_builder = request_builder.ordering(ordering);
}
if let Some(timeout) = self.args.timeout {
request_builder = request_builder.timeout(timeout as u64);
}

let request = request_builder.build();

Expand Down
Loading