Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework CommitHashRing consensus message into CommitRead/CommitWrite/Finish #4417

Merged
merged 9 commits into from
Jun 19, 2024
10 changes: 9 additions & 1 deletion lib/api/src/grpc/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,11 @@ fn conditions_helper_to_grpc(conditions: Option<Vec<segment::types::Condition>>)
if conditions.is_empty() {
vec![]
} else {
conditions.into_iter().map(|c| c.into()).collect()
conditions
.into_iter()
.filter(|c| !c.is_local_only()) // TODO(resharding)!?
.map(|c| c.into())
.collect()
}
}
}
Expand Down Expand Up @@ -1060,6 +1064,10 @@ impl From<segment::types::Condition> for Condition {
segment::types::Condition::Nested(nested) => {
ConditionOneOf::Nested(nested.nested.into())
}

segment::types::Condition::Resharding(_) => {
unimplemented!()
}
};

Self {
Expand Down
41 changes: 38 additions & 3 deletions lib/collection/src/collection/point_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::stream::FuturesUnordered;
use futures::{future, StreamExt as _, TryFutureExt, TryStreamExt as _};
use itertools::Itertools;
use segment::data_types::order_by::{Direction, OrderBy};
use segment::types::{ShardKey, WithPayload, WithPayloadInterface};
use segment::types::{Filter, ShardKey, WithPayload, WithPayloadInterface};
use validator::Validate as _;

use super::Collection;
Expand Down Expand Up @@ -207,10 +207,15 @@ impl Collection {

pub async fn scroll_by(
&self,
request: ScrollRequestInternal,
mut request: ScrollRequestInternal,
read_consistency: Option<ReadConsistency>,
shard_selection: &ShardSelectorInternal,
) -> CollectionResult<ScrollResult> {
merge_filters(
&mut request.filter,
self.shards_holder.read().await.resharding_filter(),
);

let default_request = ScrollRequestInternal::default();

let id_offset = request.offset;
Expand Down Expand Up @@ -333,10 +338,15 @@ impl Collection {

pub async fn count(
&self,
request: CountRequestInternal,
mut request: CountRequestInternal,
read_consistency: Option<ReadConsistency>,
shard_selection: &ShardSelectorInternal,
) -> CollectionResult<CountResult> {
merge_filters(
&mut request.filter,
self.shards_holder.read().await.resharding_filter(),
);

let shards_holder = self.shards_holder.read().await;
let shards = shards_holder.select_shards(shard_selection)?;

Expand Down Expand Up @@ -374,8 +384,16 @@ impl Collection {
.unwrap_or(&WithPayloadInterface::Bool(false));
let with_payload = WithPayload::from(with_payload_interface);
let request = Arc::new(request);

#[allow(unused_assignments)]
let mut resharding_filter = None;

let all_shard_collection_results = {
let shard_holder = self.shards_holder.read().await;

// Get resharding filter, while we hold the lock to shard holder
resharding_filter = shard_holder.resharding_filter_impl();

let target_shards = shard_holder.select_shards(shard_selection)?;
let retrieve_futures = target_shards.into_iter().map(|(shard, shard_key)| {
let shard_key = shard_key.cloned();
Expand All @@ -397,15 +415,32 @@ impl Collection {
Ok(records)
})
});

future::try_join_all(retrieve_futures).await?
};

let mut covered_point_ids = HashSet::new();
let points = all_shard_collection_results
.into_iter()
.flatten()
// If resharding is in progress, and *read* hash-ring is committed, filter-out "resharded" points
.filter(|point| match &resharding_filter {
Some(filter) => filter.check(point.id),
None => true,
})
// Add each point only once, deduplicate point IDs
.filter(|point| covered_point_ids.insert(point.id))
.collect();

Ok(points)
}
}

fn merge_filters(filter: &mut Option<Filter>, resharding_filter: Option<Filter>) {
if let Some(resharding_filter) = resharding_filter {
*filter = Some(match filter.take() {
Some(filter) => filter.merge_owned(resharding_filter),
None => resharding_filter,
});
}
}
21 changes: 19 additions & 2 deletions lib/collection/src/collection/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,25 @@ impl Collection {
Ok(())
}

pub async fn commit_hashring(&self, reshard: ReshardKey) -> CollectionResult<()> {
self.shards_holder.write().await.commit_hashring(reshard)
pub async fn commit_read_hashring(&self, resharding_key: ReshardKey) -> CollectionResult<()> {
self.shards_holder
.write()
.await
.commit_read_hashring(resharding_key)
}

pub async fn commit_write_hashring(&self, resharding_key: ReshardKey) -> CollectionResult<()> {
self.shards_holder
.write()
.await
.commit_write_hashring(resharding_key)
}

pub async fn finish_resharding(&self, resharding_key: ReshardKey) -> CollectionResult<()> {
self.shards_holder
.write()
.await
.finish_resharding(resharding_key)
}

pub async fn abort_resharding(&self, reshard_key: ReshardKey) -> CollectionResult<()> {
Expand Down
16 changes: 15 additions & 1 deletion lib/collection/src/collection/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,25 @@ impl Collection {

async fn do_core_search_batch(
&self,
request: CoreSearchRequestBatch,
mut request: CoreSearchRequestBatch,
read_consistency: Option<ReadConsistency>,
shard_selection: &ShardSelectorInternal,
timeout: Option<Duration>,
) -> CollectionResult<Vec<Vec<ScoredPoint>>> {
if let Some(resharding_filter) = self.shards_holder.read().await.resharding_filter() {
for search in &mut request.searches {
match &mut search.filter {
Some(filter) => {
*filter = filter.merge(&resharding_filter);
}

None => {
search.filter = Some(resharding_filter.clone());
}
}
}
}

let request = Arc::new(request);

let instant = Instant::now();
Expand Down
61 changes: 59 additions & 2 deletions lib/collection/src/hash_ring.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::fmt;
use std::hash::Hash;
use std::{any, fmt};

use itertools::Itertools as _;
use segment::index::field_index::CardinalityEstimation;
use segment::types::{PointIdType, ReshardingCondition};
use smallvec::SmallVec;

use crate::shards::shard::ShardId;
Expand Down Expand Up @@ -76,7 +78,7 @@ impl<T: Hash + Copy + PartialEq> HashRing<T> {
new.add(shard);
}

pub fn commit(&mut self) -> bool {
pub fn commit_resharding(&mut self) -> bool {
let Self::Resharding { new, .. } = self else {
log::warn!("committing resharding hashring, but hashring is not in resharding mode");
return false;
Expand Down Expand Up @@ -246,6 +248,61 @@ impl<T: Hash + Copy> Inner<T> {
Inner::Fair { ring, .. } => ring.is_empty(),
}
}

pub fn len(&self) -> usize {
match self {
Inner::Raw(ring) => ring.len(),
Inner::Fair { ring, scale } => ring.len() / *scale as usize,
}
}
}

#[derive(Clone, Debug, PartialEq)]
pub struct Filter<T = ShardId> {
ring: Inner<T>,
filter: T,
}

impl<T> Filter<T> {
pub fn new(ring: Inner<T>, filter: T) -> Self {
Self { ring, filter }
}

pub fn check(&self, point_id: PointIdType) -> bool
where
T: Hash + PartialEq + Copy,
{
self.ring.get(&point_id) == Some(&self.filter)
}
}

impl<T> ReshardingCondition for Filter<T>
where
T: fmt::Debug + Hash + PartialEq + Copy + 'static,
{
fn check(&self, point_id: PointIdType) -> bool {
self.check(point_id)
}

fn estimate_cardinality(&self, points: usize) -> CardinalityEstimation {
CardinalityEstimation {
primary_clauses: vec![],
min: 0,
timvisee marked this conversation as resolved.
Show resolved Hide resolved
exp: points / self.ring.len(),
max: points,
}
}

fn eq(&self, other: &dyn ReshardingCondition) -> bool {
match other.as_any().downcast_ref::<Self>() {
Some(other) => self == other,
None => false,
}
}

fn as_any(&self) -> &dyn any::Any {
self
}
}
Comment on lines +296 to +305
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be a bit of an overkill, but this is required to implement PartialEq on Condition.


#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl ShardReplicaSet {

let remotes_count = remotes.len();

// TODO(resharding): Handle resharded shard?
let active_remotes_count = remotes
.iter()
.filter(|remote| self.peer_is_active(&remote.peer_id))
Expand Down Expand Up @@ -179,6 +180,7 @@ impl ShardReplicaSet {
None
};

// TODO(resharding): Handle resharded shard?
let mut active_remotes: Vec<_> = remotes
.iter()
.filter(|remote| self.peer_is_active(&remote.peer_id))
Expand Down
2 changes: 2 additions & 0 deletions lib/collection/src/shards/resharding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub struct ReshardState {
pub peer_id: PeerId,
pub shard_id: ShardId,
pub shard_key: Option<ShardKey>,
pub filter_read_operations: bool, // TODO(resharding): Add proper resharding state!
ffuugoo marked this conversation as resolved.
Show resolved Hide resolved
}

impl ReshardState {
Expand All @@ -42,6 +43,7 @@ impl ReshardState {
peer_id,
shard_id,
shard_key,
filter_read_operations: false,
}
}

Expand Down
Loading
Loading