Skip to content

Commit

Permalink
Add resharding status to API (#4319)
Browse files Browse the repository at this point in the history
* Add resharding status to API

* Report human readable resharding status in cluster info

* Fix delete stage not showing shard IDs

* Update OpenAPI spec

* Temporarily hide resharding operations in cluster info if empty

We'll enable it again to always be included when releasing resharding

* Remove resharding details from cluster info in gRPC until release

---------

Co-authored-by: timvisee <tim@visee.me>
  • Loading branch information
JojiiOfficial and timvisee committed Jul 10, 2024
1 parent ed6f6b7 commit e8acf1e
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 4 deletions.
18 changes: 18 additions & 0 deletions docs/grpc/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
- [RenameAlias](#qdrant-RenameAlias)
- [Replica](#qdrant-Replica)
- [ReplicateShard](#qdrant-ReplicateShard)
- [ReshardingInfo](#qdrant-ReshardingInfo)
- [RestartTransfer](#qdrant-RestartTransfer)
- [ScalarQuantization](#qdrant-ScalarQuantization)
- [ShardKey](#qdrant-ShardKey)
Expand Down Expand Up @@ -1146,6 +1147,23 @@ Note: 1kB = 1 vector of size 256. |



<a name="qdrant-ReshardingInfo"></a>

### ReshardingInfo



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| shard_id | [uint32](#uint32) | | |
| peer_id | [uint64](#uint64) | | |
| shard_key | [ShardKey](#qdrant-ShardKey) | optional | |






<a name="qdrant-RestartTransfer"></a>

### RestartTransfer
Expand Down
42 changes: 42 additions & 0 deletions docs/redoc/master/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -8911,6 +8911,7 @@
"local_shards",
"peer_id",
"remote_shards",
"resharding_operations",
"shard_count",
"shard_transfers"
],
Expand Down Expand Up @@ -8947,6 +8948,13 @@
"items": {
"$ref": "#/components/schemas/ShardTransferInfo"
}
},
"resharding_operations": {
"description": "Resharding operations",
"type": "array",
"items": {
"$ref": "#/components/schemas/ReshardingInfo"
}
}
}
},
Expand Down Expand Up @@ -9108,6 +9116,40 @@
}
]
},
"ReshardingInfo": {
"type": "object",
"required": [
"peer_id",
"shard_id"
],
"properties": {
"shard_id": {
"type": "integer",
"format": "uint32",
"minimum": 0
},
"peer_id": {
"type": "integer",
"format": "uint64",
"minimum": 0
},
"shard_key": {
"anyOf": [
{
"$ref": "#/components/schemas/ShardKey"
},
{
"nullable": true
}
]
},
"comment": {
"description": "A human-readable report of the operation progress. Available only on the source peer.",
"type": "string",
"nullable": true
}
}
},
"TelemetryData": {
"type": "object",
"required": [
Expand Down
8 changes: 8 additions & 0 deletions lib/api/src/grpc/proto/collections.proto
Original file line number Diff line number Diff line change
Expand Up @@ -511,12 +511,20 @@ message ShardTransferInfo {
bool sync = 4; // If `true` transfer is a synchronization of a replicas; If `false` transfer is a moving of a shard from one peer to another
}

message ReshardingInfo {
uint32 shard_id = 1;
uint64 peer_id = 2;
optional ShardKey shard_key = 3;
}

message CollectionClusterInfoResponse {
uint64 peer_id = 1; // ID of this peer
uint64 shard_count = 2; // Total number of shards
repeated LocalShardInfo local_shards = 3; // Local shards
repeated RemoteShardInfo remote_shards = 4; // Remote shards
repeated ShardTransferInfo shard_transfers = 5; // Shard transfers
// TODO(resharding): enable on release:
// repeated ReshardingInfo resharding_operations = 6; // Resharding operations
}

message MoveShard {
Expand Down
11 changes: 11 additions & 0 deletions lib/api/src/grpc/qdrant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,17 @@ pub struct ShardTransferInfo {
#[derive(serde::Serialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ReshardingInfo {
#[prost(uint32, tag = "1")]
pub shard_id: u32,
#[prost(uint64, tag = "2")]
pub peer_id: u64,
#[prost(message, optional, tag = "3")]
pub shard_key: ::core::option::Option<ShardKey>,
}
#[derive(serde::Serialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CollectionClusterInfoResponse {
/// ID of this peer
#[prost(uint64, tag = "1")]
Expand Down
3 changes: 3 additions & 0 deletions lib/collection/src/collection/collection_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,8 @@ impl Collection {
}
let shard_transfers =
shards_holder.get_shard_transfer_info(&*self.transfer_tasks.lock().await);
let resharding_operations =
shards_holder.get_resharding_operations_info(&*self.reshard_tasks.lock().await);

// sort by shard_id
local_shards.sort_by_key(|k| k.shard_id);
Expand All @@ -344,6 +346,7 @@ impl Collection {
local_shards,
remote_shards,
shard_transfers,
resharding_operations,
};
Ok(info)
}
Expand Down
21 changes: 19 additions & 2 deletions lib/collection/src/operations/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use tonic::Status;
use super::consistency_params::ReadConsistency;
use super::types::{
ContextExamplePair, CoreSearchRequest, Datatype, DiscoverRequestInternal, GroupsResult,
Modifier, PointGroup, RecommendExample, RecommendGroupsRequestInternal, SparseIndexParams,
SparseVectorParams, SparseVectorsConfig, VectorParamsDiff, VectorsConfigDiff,
Modifier, PointGroup, RecommendExample, RecommendGroupsRequestInternal, ReshardingInfo,
SparseIndexParams, SparseVectorParams, SparseVectorsConfig, VectorParamsDiff,
VectorsConfigDiff,
};
use crate::config::{
default_replication_factor, default_write_consistency_factor, CollectionConfig,
Expand Down Expand Up @@ -1562,6 +1563,16 @@ impl From<RemoteShardInfo> for api::grpc::qdrant::RemoteShardInfo {
}
}

impl From<ReshardingInfo> for api::grpc::qdrant::ReshardingInfo {
fn from(value: ReshardingInfo) -> Self {
Self {
shard_id: value.shard_id,
peer_id: value.peer_id,
shard_key: value.shard_key.map(convert_shard_key_to_grpc),
}
}
}

impl From<ShardTransferInfo> for api::grpc::qdrant::ShardTransferInfo {
fn from(value: ShardTransferInfo) -> Self {
Self {
Expand Down Expand Up @@ -1594,6 +1605,12 @@ impl From<CollectionClusterInfo> for api::grpc::qdrant::CollectionClusterInfoRes
.into_iter()
.map(|shard| shard.into())
.collect(),
// TODO(resharding): enable on release:
// resharding_operations: value
// .resharding_operations
// .into_iter()
// .map(|info| info.into())
// .collect(),
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions lib/collection/src/operations/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ pub struct CollectionClusterInfo {
pub remote_shards: Vec<RemoteShardInfo>,
/// Shard transfers
pub shard_transfers: Vec<ShardTransferInfo>,
/// Resharding operations
// TODO(resharding): remove this skip when releasing resharding
#[serde(skip_serializing_if = "Vec::is_empty")]
pub resharding_operations: Vec<ReshardingInfo>,
}

#[derive(Debug, Serialize, JsonSchema, Clone)]
Expand Down Expand Up @@ -231,6 +235,19 @@ pub struct ShardTransferInfo {
pub comment: Option<String>,
}

#[derive(Debug, Serialize, JsonSchema, Clone)]
pub struct ReshardingInfo {
pub shard_id: ShardId,

pub peer_id: PeerId,

pub shard_key: Option<ShardKey>,

/// A human-readable report of the operation progress. Available only on the source peer.
#[serde(skip_serializing_if = "Option::is_none")]
pub comment: Option<String>,
}

#[derive(Debug, Serialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub struct LocalShardInfo {
Expand Down
2 changes: 1 addition & 1 deletion lib/collection/src/shards/resharding/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl DriverState {
Stage::S4_CommitHashring => "commit hash ring: switching reads and writes".into(),
Stage::S5_PropagateDeletes => format!(
"propagate deletes: deleting migrated points from shards {:?}",
self.shards_to_migrate().collect::<Vec<_>>(),
self.shards_to_delete().collect::<Vec<_>>(),
),
Stage::S6_Finalize => "finalize".into(),
Stage::Finished => "finished".into(),
Expand Down
27 changes: 26 additions & 1 deletion lib/collection/src/shards/shard_holder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tokio::runtime::Handle;
use tokio::sync::{broadcast, RwLock};

use super::replica_set::AbortShardTransfer;
use super::resharding::tasks_pool::ReshardTasksPool;
use super::resharding::ReshardState;
use super::transfer::transfer_tasks_pool::TransferTasksPool;
use crate::collection::payload_index_schema::PayloadIndexSchema;
Expand All @@ -23,7 +24,9 @@ use crate::hash_ring::HashRing;
use crate::operations::shard_selector_internal::ShardSelectorInternal;
use crate::operations::shared_storage_config::SharedStorageConfig;
use crate::operations::snapshot_ops::SnapshotDescription;
use crate::operations::types::{CollectionError, CollectionResult, ShardTransferInfo};
use crate::operations::types::{
CollectionError, CollectionResult, ReshardingInfo, ShardTransferInfo,
};
use crate::operations::{OperationToShard, SplitByShard};
use crate::optimizers_builder::OptimizersConfig;
use crate::save_on_disk::SaveOnDisk;
Expand Down Expand Up @@ -403,6 +406,28 @@ impl ShardHolder {
shard_transfers
}

pub fn get_resharding_operations_info(
&self,
tasks_pool: &ReshardTasksPool,
) -> Vec<ReshardingInfo> {
let mut resharding_operations = vec![];

// We eventually expect to extend this to multiple concurrent operations, which is why
// we're using a list here
if let Some(resharding_state) = &*self.resharding_state.read() {
let status = tasks_pool.get_task_status(&resharding_state.key());
resharding_operations.push(ReshardingInfo {
shard_id: resharding_state.shard_id,
peer_id: resharding_state.peer_id,
shard_key: resharding_state.shard_key.clone(),
comment: status.map(|p| p.comment),
});
}

resharding_operations.sort_by_key(|k| k.shard_id);
resharding_operations
}

pub fn get_related_transfers(
&self,
shard_id: &ShardId,
Expand Down

0 comments on commit e8acf1e

Please sign in to comment.