diff --git a/rust/src/encodings/plain.rs b/rust/src/encodings/plain.rs index 9e366e57f68..b388cac3d82 100644 --- a/rust/src/encodings/plain.rs +++ b/rust/src/encodings/plain.rs @@ -48,6 +48,11 @@ use crate::Error; use super::Decoder; +/// Parallelism factor decides how many run parallel I/O issued per CPU core. +/// This is a heuristic value, with the assumption NVME and S3/GCS can +/// handles large mount of parallel I/O & large disk-queue. +const PARALLELISM_FACTOR: usize = 4; + /// Encoder for plain encoding. /// pub struct PlainEncoder<'a> { @@ -362,7 +367,7 @@ impl<'a> Decoder for PlainDecoder<'a> { let adjusted_offsets = subtract_scalar(request, start)?; Ok::(take(&array, &adjusted_offsets, None)?) }) - .buffered(8) + .buffered(num_cpus::get() * PARALLELISM_FACTOR) .try_collect::>() .await?; let references = arrays.iter().map(|a| a.as_ref()).collect::>(); diff --git a/rust/src/index/vector/pq.rs b/rust/src/index/vector/pq.rs index ae3e7fe654c..4707c0b3c95 100644 --- a/rust/src/index/vector/pq.rs +++ b/rust/src/index/vector/pq.rs @@ -108,21 +108,25 @@ impl PQIndex { distance_table.extend(distances.values()); } - Ok(Arc::new(Float32Array::from_iter( - self.code - .as_ref() - .unwrap() - .values() - .chunks_exact(self.num_sub_vectors) - .map(|c| { - c.iter() - .enumerate() - .map(|(sub_vec_idx, centroid)| { - distance_table[sub_vec_idx * 256 + *centroid as usize] - }) - .sum::() - }), - ))) + Ok(Arc::new(unsafe { + Float32Array::from_trusted_len_iter( + self.code + .as_ref() + .unwrap() + .values() + .chunks_exact(self.num_sub_vectors) + .map(|c| { + Some( + c.iter() + .enumerate() + .map(|(sub_vec_idx, centroid)| { + distance_table[sub_vec_idx * 256 + *centroid as usize] + }) + .sum::(), + ) + }), + ) + })) } fn cosine_scores(&self, key: &Float32Array) -> Result {