Skip to content

Commit

Permalink
Add basic resharding types (#4216)
Browse files Browse the repository at this point in the history
  • Loading branch information
ffuugoo committed May 13, 2024
1 parent 5d83312 commit dae311e
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 14 deletions.
1 change: 1 addition & 0 deletions docs/grpc/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -1612,6 +1612,7 @@ Note: 1kB = 1 vector of size 256. |
| Listener | 4 | A shard which receives data, but is not used for search; Useful for backup shards |
| PartialSnapshot | 5 | Deprecated: snapshot shard transfer is in progress; Updates should not be sent to (and are ignored by) the shard |
| Recovery | 6 | Shard is undergoing recovered by an external node; Normally rejects updates, accepts updates if force is true |
| Resharding | 7 | Points are being migrated to this shard as part of resharding |



Expand Down
3 changes: 2 additions & 1 deletion lib/api/src/grpc/proto/collections.proto
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ enum ReplicaState {
Listener = 4; // A shard which receives data, but is not used for search; Useful for backup shards
PartialSnapshot = 5; // Deprecated: snapshot shard transfer is in progress; Updates should not be sent to (and are ignored by) the shard
Recovery = 6; // Shard is undergoing recovered by an external node; Normally rejects updates, accepts updates if force is true
Resharding = 7; // Points are being migrated to this shard as part of resharding
}

message ShardKey {
Expand Down Expand Up @@ -505,7 +506,7 @@ message ShardTransferInfo {
}

message CollectionClusterInfoResponse {
uint64 peer_id = 1; // ID of this peer
uint64 peer_id = 1; // ID of this peer
uint64 shard_count = 2; // Total number of shards
repeated LocalShardInfo local_shards = 3; // Local shards
repeated RemoteShardInfo remote_shards = 4; // Remote shards
Expand Down
4 changes: 4 additions & 0 deletions lib/api/src/grpc/qdrant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,8 @@ pub enum ReplicaState {
PartialSnapshot = 5,
/// Shard is undergoing recovered by an external node; Normally rejects updates, accepts updates if force is true
Recovery = 6,
/// Points are being migrated to this shard as part of resharding
Resharding = 7,
}
impl ReplicaState {
/// String value of the enum field names used in the ProtoBuf definition.
Expand All @@ -1470,6 +1472,7 @@ impl ReplicaState {
ReplicaState::Listener => "Listener",
ReplicaState::PartialSnapshot => "PartialSnapshot",
ReplicaState::Recovery => "Recovery",
ReplicaState::Resharding => "Resharding",
}
}
/// Creates an enum from field names used in the ProtoBuf definition.
Expand All @@ -1482,6 +1485,7 @@ impl ReplicaState {
"Listener" => Some(Self::Listener),
"PartialSnapshot" => Some(Self::PartialSnapshot),
"Recovery" => Some(Self::Recovery),
"Resharding" => Some(Self::Resharding),
_ => None,
}
}
Expand Down
23 changes: 23 additions & 0 deletions lib/collection/src/collection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod collection_ops;
pub mod payload_index_schema;
mod point_ops;
mod resharding;
mod search;
mod shard_transfer;
mod sharding_keys;
Expand Down Expand Up @@ -41,13 +42,17 @@ use crate::shards::transfer::{ShardTransfer, ShardTransferMethod};
use crate::shards::{replica_set, CollectionId};
use crate::telemetry::CollectionTelemetry;

const RESHARDING_STATE_FILE: &str = "resharding_state.json";

/// Collection's data is split into several shards.
#[allow(dead_code)]
pub struct Collection {
pub(crate) id: CollectionId,
pub(crate) shards_holder: Arc<LockedShardHolder>,
pub(crate) collection_config: Arc<RwLock<CollectionConfig>>,
pub(crate) shared_storage_config: Arc<SharedStorageConfig>,
pub(crate) payload_index_schema: SaveOnDisk<PayloadIndexSchema>,
resharding_state: SaveOnDisk<Option<resharding::State>>,
this_peer_id: PeerId,
path: PathBuf,
snapshots_path: PathBuf,
Expand Down Expand Up @@ -131,13 +136,15 @@ impl Collection {
collection_config.save(path)?;

let payload_index_schema = Self::load_payload_index_schema(path)?;
let resharding_state = Self::load_resharding_state(path)?;

Ok(Self {
id: name.clone(),
shards_holder: locked_shard_holder,
collection_config: shared_collection_config,
payload_index_schema,
shared_storage_config,
resharding_state,
this_peer_id,
path: path.to_owned(),
snapshots_path: snapshots_path.to_owned(),
Expand Down Expand Up @@ -229,12 +236,16 @@ impl Collection {
let payload_index_schema = Self::load_payload_index_schema(path)
.expect("Can't load or initialize payload index schema");

let resharding_state = Self::load_resharding_state(path)
.expect("Can't load or initialize resharding progress");

Self {
id: collection_id.clone(),
shards_holder: locked_shard_holder,
collection_config: shared_collection_config,
payload_index_schema,
shared_storage_config,
resharding_state,
this_peer_id,
path: path.to_owned(),
snapshots_path: snapshots_path.to_owned(),
Expand All @@ -252,6 +263,18 @@ impl Collection {
}
}

fn resharding_state_file(collection_path: &Path) -> PathBuf {
collection_path.join(RESHARDING_STATE_FILE)
}

fn load_resharding_state(
collection_path: &Path,
) -> CollectionResult<SaveOnDisk<Option<resharding::State>>> {
let resharding_state_file = Self::resharding_state_file(collection_path);
let resharding_state = SaveOnDisk::load_or_init(resharding_state_file)?;
Ok(resharding_state)
}

/// Check if stored version have consequent version.
/// If major version is different, then it is not compatible.
/// If the difference in consecutive versions is greater than 1 in patch,
Expand Down
16 changes: 16 additions & 0 deletions lib/collection/src/collection/resharding.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use serde::{Deserialize, Serialize};

use crate::shards::shard::{PeerId, ShardId};

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct State {
pub peer_id: PeerId,
pub shard_id: ShardId,
}

impl State {
#[allow(dead_code)]
pub fn new(peer_id: PeerId, shard_id: ShardId) -> Self {
Self { peer_id, shard_id }
}
}
2 changes: 2 additions & 0 deletions lib/collection/src/operations/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,7 @@ impl From<api::grpc::qdrant::ReplicaState> for ReplicaState {
api::grpc::qdrant::ReplicaState::Listener => Self::Listener,
api::grpc::qdrant::ReplicaState::PartialSnapshot => Self::PartialSnapshot,
api::grpc::qdrant::ReplicaState::Recovery => Self::Recovery,
api::grpc::qdrant::ReplicaState::Resharding => Self::Resharding,
}
}
}
Expand All @@ -1454,6 +1455,7 @@ impl From<ReplicaState> for api::grpc::qdrant::ReplicaState {
ReplicaState::Listener => Self::Listener,
ReplicaState::PartialSnapshot => Self::PartialSnapshot,
ReplicaState::Recovery => Self::Recovery,
ReplicaState::Resharding => Self::Resharding,
}
}
}
Expand Down
15 changes: 12 additions & 3 deletions lib/collection/src/shards/replica_set/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,11 +644,13 @@ impl ShardReplicaSet {
self.set_local(local_shard, Some(state)).await?;
self.notify_peer_failure(peer_id);
}

ReplicaState::Dead
| ReplicaState::Partial
| ReplicaState::Initializing
| ReplicaState::PartialSnapshot
| ReplicaState::Recovery => {
| ReplicaState::Recovery
| ReplicaState::Resharding => {
self.set_local(local_shard, Some(state)).await?;
}
}
Expand Down Expand Up @@ -920,6 +922,9 @@ pub enum ReplicaState {
// Shard is undergoing recovery by an external node
// Normally rejects updates, accepts updates if force is true
Recovery,
// Points are being migrated to this shard as part of resharding
#[schemars(skip)]
Resharding,
}

impl ReplicaState {
Expand All @@ -933,15 +938,19 @@ impl ReplicaState {
| ReplicaState::Initializing
| ReplicaState::Partial
| ReplicaState::PartialSnapshot
| ReplicaState::Recovery => false,
| ReplicaState::Recovery
| ReplicaState::Resharding => false,
}
}

/// Check whether the replica state is partial or partial-like.
pub fn is_partial_or_recovery(self) -> bool {
// Use explicit match, to catch future changes to `ReplicaState`
match self {
ReplicaState::Partial | ReplicaState::PartialSnapshot | ReplicaState::Recovery => true,
ReplicaState::Partial
| ReplicaState::PartialSnapshot
| ReplicaState::Recovery
| ReplicaState::Resharding => true,

ReplicaState::Active
| ReplicaState::Dead
Expand Down
10 changes: 7 additions & 3 deletions lib/collection/src/shards/replica_set/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,12 @@ impl ShardReplicaSet {

if let Some(local_shard) = local.deref() {
match self.peer_state(&self.this_peer_id()) {
Some(ReplicaState::Active | ReplicaState::Partial | ReplicaState::Initializing) => {
Ok(Some(local_shard.get().update(operation, wait).await?))
}
Some(
ReplicaState::Active
| ReplicaState::Partial
| ReplicaState::Initializing
| ReplicaState::Resharding,
) => Ok(Some(local_shard.get().update(operation, wait).await?)),
Some(ReplicaState::Listener) => {
Ok(Some(local_shard.get().update(operation, false).await?))
}
Expand Down Expand Up @@ -405,6 +408,7 @@ impl ShardReplicaSet {
Some(ReplicaState::Listener) => true,
Some(ReplicaState::PartialSnapshot) => false,
Some(ReplicaState::Recovery) => false,
Some(ReplicaState::Resharding) => true,
None => false,
};
res && !self.is_locally_disabled(peer_id)
Expand Down
36 changes: 29 additions & 7 deletions lib/collection/src/shards/shard_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub type ShardKeyMapping = HashMap<ShardKey, HashSet<ShardId>>;
pub struct ShardHolder {
shards: HashMap<ShardId, ShardReplicaSet>,
pub(crate) shard_transfers: SaveOnDisk<HashSet<ShardTransfer>>,
rings: HashMap<Option<ShardKey>, HashRing<ShardId>>,
rings: HashMap<RingsKey, HashRing<ShardId>>,
key_mapping: SaveOnDisk<ShardKeyMapping>,
// Duplicates the information from `key_mapping` for faster access
// Do not require locking
Expand All @@ -49,10 +49,32 @@ pub struct ShardHolder {

pub type LockedShardHolder = RwLock<ShardHolder>;

#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub enum RingsKey {
Default,
ShardKey(ShardKey),
Resharding,
}

impl From<Option<ShardKey>> for RingsKey {
fn from(shard_key: Option<ShardKey>) -> Self {
match shard_key {
Some(shard_key) => shard_key.into(),
None => Self::Default,
}
}
}

impl From<ShardKey> for RingsKey {
fn from(shard_key: ShardKey) -> Self {
Self::ShardKey(shard_key)
}
}

impl ShardHolder {
pub fn new(collection_path: &Path) -> CollectionResult<Self> {
let mut rings = HashMap::new();
rings.insert(None, HashRing::fair(HASH_RING_SHARD_SCALE));
rings.insert(RingsKey::Default, HashRing::fair(HASH_RING_SHARD_SCALE));
let shard_transfers = SaveOnDisk::load_or_init(collection_path.join(SHARD_TRANSFERS_FILE))?;
let key_mapping: SaveOnDisk<ShardKeyMapping> =
SaveOnDisk::load_or_init(collection_path.join(SHARD_KEY_MAPPING_FILE))?;
Expand Down Expand Up @@ -104,7 +126,7 @@ impl ShardHolder {
) -> Result<(), CollectionError> {
self.shards.insert(shard_id, shard);
self.rings
.entry(shard_key.clone())
.entry(shard_key.clone().into())
.or_insert_with(|| HashRing::fair(HASH_RING_SHARD_SCALE))
.add(shard_id);

Expand Down Expand Up @@ -145,7 +167,7 @@ impl ShardHolder {
}
})?;

self.rings.remove(&Some(shard_key.clone()));
self.rings.remove(&shard_key.clone().into());
for shard_id in remove_shard_ids {
self.drop_and_remove_shard(shard_id).await?;
self.shard_id_to_key_mapping.remove(&shard_id);
Expand All @@ -155,12 +177,12 @@ impl ShardHolder {

fn rebuild_rings(&mut self) {
let mut rings = HashMap::new();
rings.insert(None, HashRing::fair(HASH_RING_SHARD_SCALE));
rings.insert(RingsKey::Default, HashRing::fair(HASH_RING_SHARD_SCALE));
let ids_to_key = self.get_shard_id_to_key_mapping();
for shard_id in self.shards.keys() {
let shard_key = ids_to_key.get(shard_id).cloned();
rings
.entry(shard_key)
.entry(shard_key.into())
.or_insert_with(|| HashRing::fair(HASH_RING_SHARD_SCALE))
.add(*shard_id);
}
Expand Down Expand Up @@ -220,7 +242,7 @@ impl ShardHolder {
operation: O,
shard_keys_selection: &Option<ShardKey>,
) -> CollectionResult<Vec<(&ShardReplicaSet, O)>> {
let Some(hashring) = self.rings.get(shard_keys_selection) else {
let Some(hashring) = self.rings.get(&shard_keys_selection.clone().into()) else {
return if let Some(shard_key) = shard_keys_selection {
Err(CollectionError::bad_input(format!(
"Shard key {shard_key} not found"
Expand Down

0 comments on commit dae311e

Please sign in to comment.