Skip to content

Commit

Permalink
Rework CommitHashRing consensus message into CommitRead/`CommitWr…
Browse files Browse the repository at this point in the history
…ite`/`Finish` (#4417)

* Refactor `CommitHashRing` into `CommitRead`/`CommitWrite`/`Finish`

* Add `Resharding` filter condition

* Filter "resharded" points from search, scroll by, count and retrieve request results

* fixup! Refactor `CommitHashRing` into `CommitRead`/`CommitWrite`/`Finish`

`cargo clippy --fix`

* Apply suggestions from code review

* fixup! Filter "resharded" points from search, scroll by, count and retrieve request results

Add `Condition::is_local_only` method

* fixup! Add `Resharding` filter condition

* fixup! Filter "resharded" points from search, scroll by, count and retrieve request results

Clarified a few `TODO`s

* Fix clippy suggestions

---------

Co-authored-by: Tim Visée <tim+github@visee.me>
Co-authored-by: timvisee <tim@visee.me>
  • Loading branch information
3 people authored and generall committed Jun 21, 2024
1 parent 70c7c4c commit ed4e2fd
Show file tree
Hide file tree
Showing 16 changed files with 312 additions and 40 deletions.
10 changes: 9 additions & 1 deletion lib/api/src/grpc/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,11 @@ fn conditions_helper_to_grpc(conditions: Option<Vec<segment::types::Condition>>)
if conditions.is_empty() {
vec![]
} else {
conditions.into_iter().map(|c| c.into()).collect()
conditions
.into_iter()
.filter(|c| !c.is_local_only()) // TODO(resharding)!?
.map(|c| c.into())
.collect()
}
}
}
Expand Down Expand Up @@ -1060,6 +1064,10 @@ impl From<segment::types::Condition> for Condition {
segment::types::Condition::Nested(nested) => {
ConditionOneOf::Nested(nested.nested.into())
}

segment::types::Condition::Resharding(_) => {
unimplemented!()
}
};

Self {
Expand Down
41 changes: 38 additions & 3 deletions lib/collection/src/collection/point_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::stream::FuturesUnordered;
use futures::{future, StreamExt as _, TryFutureExt, TryStreamExt as _};
use itertools::Itertools;
use segment::data_types::order_by::{Direction, OrderBy};
use segment::types::{ShardKey, WithPayload, WithPayloadInterface};
use segment::types::{Filter, ShardKey, WithPayload, WithPayloadInterface};
use validator::Validate as _;

use super::Collection;
Expand Down Expand Up @@ -207,10 +207,15 @@ impl Collection {

pub async fn scroll_by(
&self,
request: ScrollRequestInternal,
mut request: ScrollRequestInternal,
read_consistency: Option<ReadConsistency>,
shard_selection: &ShardSelectorInternal,
) -> CollectionResult<ScrollResult> {
merge_filters(
&mut request.filter,
self.shards_holder.read().await.resharding_filter(),
);

let default_request = ScrollRequestInternal::default();

let id_offset = request.offset;
Expand Down Expand Up @@ -333,10 +338,15 @@ impl Collection {

pub async fn count(
&self,
request: CountRequestInternal,
mut request: CountRequestInternal,
read_consistency: Option<ReadConsistency>,
shard_selection: &ShardSelectorInternal,
) -> CollectionResult<CountResult> {
merge_filters(
&mut request.filter,
self.shards_holder.read().await.resharding_filter(),
);

let shards_holder = self.shards_holder.read().await;
let shards = shards_holder.select_shards(shard_selection)?;

Expand Down Expand Up @@ -374,8 +384,16 @@ impl Collection {
.unwrap_or(&WithPayloadInterface::Bool(false));
let with_payload = WithPayload::from(with_payload_interface);
let request = Arc::new(request);

#[allow(unused_assignments)]
let mut resharding_filter = None;

let all_shard_collection_results = {
let shard_holder = self.shards_holder.read().await;

// Get resharding filter, while we hold the lock to shard holder
resharding_filter = shard_holder.resharding_filter_impl();

let target_shards = shard_holder.select_shards(shard_selection)?;
let retrieve_futures = target_shards.into_iter().map(|(shard, shard_key)| {
let shard_key = shard_key.cloned();
Expand All @@ -397,15 +415,32 @@ impl Collection {
Ok(records)
})
});

future::try_join_all(retrieve_futures).await?
};

let mut covered_point_ids = HashSet::new();
let points = all_shard_collection_results
.into_iter()
.flatten()
// If resharding is in progress, and *read* hash-ring is committed, filter-out "resharded" points
.filter(|point| match &resharding_filter {
Some(filter) => filter.check(point.id),
None => true,
})
// Add each point only once, deduplicate point IDs
.filter(|point| covered_point_ids.insert(point.id))
.collect();

Ok(points)
}
}

fn merge_filters(filter: &mut Option<Filter>, resharding_filter: Option<Filter>) {
if let Some(resharding_filter) = resharding_filter {
*filter = Some(match filter.take() {
Some(filter) => filter.merge_owned(resharding_filter),
None => resharding_filter,
});
}
}
21 changes: 19 additions & 2 deletions lib/collection/src/collection/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,25 @@ impl Collection {
Ok(())
}

pub async fn commit_hashring(&self, reshard: ReshardKey) -> CollectionResult<()> {
self.shards_holder.write().await.commit_hashring(reshard)
pub async fn commit_read_hashring(&self, resharding_key: ReshardKey) -> CollectionResult<()> {
self.shards_holder
.write()
.await
.commit_read_hashring(resharding_key)
}

pub async fn commit_write_hashring(&self, resharding_key: ReshardKey) -> CollectionResult<()> {
self.shards_holder
.write()
.await
.commit_write_hashring(resharding_key)
}

pub async fn finish_resharding(&self, resharding_key: ReshardKey) -> CollectionResult<()> {
self.shards_holder
.write()
.await
.finish_resharding(resharding_key)
}

pub async fn abort_resharding(&self, reshard_key: ReshardKey) -> CollectionResult<()> {
Expand Down
16 changes: 15 additions & 1 deletion lib/collection/src/collection/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,25 @@ impl Collection {

async fn do_core_search_batch(
&self,
request: CoreSearchRequestBatch,
mut request: CoreSearchRequestBatch,
read_consistency: Option<ReadConsistency>,
shard_selection: &ShardSelectorInternal,
timeout: Option<Duration>,
) -> CollectionResult<Vec<Vec<ScoredPoint>>> {
if let Some(resharding_filter) = self.shards_holder.read().await.resharding_filter() {
for search in &mut request.searches {
match &mut search.filter {
Some(filter) => {
*filter = filter.merge(&resharding_filter);
}

None => {
search.filter = Some(resharding_filter.clone());
}
}
}
}

let request = Arc::new(request);

let instant = Instant::now();
Expand Down
61 changes: 59 additions & 2 deletions lib/collection/src/hash_ring.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::fmt;
use std::hash::Hash;
use std::{any, fmt};

use itertools::Itertools as _;
use segment::index::field_index::CardinalityEstimation;
use segment::types::{PointIdType, ReshardingCondition};
use smallvec::SmallVec;

use crate::shards::shard::ShardId;
Expand Down Expand Up @@ -76,7 +78,7 @@ impl<T: Hash + Copy + PartialEq> HashRing<T> {
new.add(shard);
}

pub fn commit(&mut self) -> bool {
pub fn commit_resharding(&mut self) -> bool {
let Self::Resharding { new, .. } = self else {
log::warn!("committing resharding hashring, but hashring is not in resharding mode");
return false;
Expand Down Expand Up @@ -246,6 +248,61 @@ impl<T: Hash + Copy> Inner<T> {
Inner::Fair { ring, .. } => ring.is_empty(),
}
}

pub fn len(&self) -> usize {
match self {
Inner::Raw(ring) => ring.len(),
Inner::Fair { ring, scale } => ring.len() / *scale as usize,
}
}
}

#[derive(Clone, Debug, PartialEq)]
pub struct Filter<T = ShardId> {
ring: Inner<T>,
filter: T,
}

impl<T> Filter<T> {
pub fn new(ring: Inner<T>, filter: T) -> Self {
Self { ring, filter }
}

pub fn check(&self, point_id: PointIdType) -> bool
where
T: Hash + PartialEq + Copy,
{
self.ring.get(&point_id) == Some(&self.filter)
}
}

impl<T> ReshardingCondition for Filter<T>
where
T: fmt::Debug + Hash + PartialEq + Copy + 'static,
{
fn check(&self, point_id: PointIdType) -> bool {
self.check(point_id)
}

fn estimate_cardinality(&self, points: usize) -> CardinalityEstimation {
CardinalityEstimation {
primary_clauses: vec![],
min: 0,
exp: points / self.ring.len(),
max: points,
}
}

fn eq(&self, other: &dyn ReshardingCondition) -> bool {
match other.as_any().downcast_ref::<Self>() {
Some(other) => self == other,
None => false,
}
}

fn as_any(&self) -> &dyn any::Any {
self
}
}

#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl ShardReplicaSet {

let remotes_count = remotes.len();

// TODO(resharding): Handle resharded shard?
let active_remotes_count = remotes
.iter()
.filter(|remote| self.peer_is_active(&remote.peer_id))
Expand Down Expand Up @@ -179,6 +180,7 @@ impl ShardReplicaSet {
None
};

// TODO(resharding): Handle resharded shard?
let mut active_remotes: Vec<_> = remotes
.iter()
.filter(|remote| self.peer_is_active(&remote.peer_id))
Expand Down
2 changes: 2 additions & 0 deletions lib/collection/src/shards/resharding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct ReshardState {
pub peer_id: PeerId,
pub shard_id: ShardId,
pub shard_key: Option<ShardKey>,
pub filter_read_operations: bool, // TODO(resharding): Add proper resharding state!
}

impl ReshardState {
Expand All @@ -42,6 +43,7 @@ impl ReshardState {
peer_id,
shard_id,
shard_key,
filter_read_operations: false,
}
}

Expand Down
Loading

0 comments on commit ed4e2fd

Please sign in to comment.