Skip to content

Commit

Permalink
TMP: Tweak API and resharding code for easy CommitRead manual testing
Browse files Browse the repository at this point in the history
This commit will be reverted before merging into `dev`!

- Add `commit_read` collection cluster update operation to manually
  trigger `CommitRead` consensus message
- Remove code that starts resharding worker during `Resharding::Start`
  consensus message processing
  • Loading branch information
ffuugoo committed Jun 17, 2024
1 parent cb73fd3 commit 2cfc12a
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 36 deletions.
36 changes: 0 additions & 36 deletions lib/collection/src/collection/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,42 +47,6 @@ impl Collection {

shard_holder.start_resharding_unchecked(reshard_key.clone(), replica_set)?;

// If this peer is responsible for driving the resharding, start the task for it
if reshard_key.peer_id == self.this_peer_id {
// Stop any already active resharding task to allow starting a new one
let mut active_reshard_tasks = self.reshard_tasks.lock().await;
let task_result = active_reshard_tasks.stop_task(&reshard_key).await;
debug_assert!(task_result.is_none(), "Reshard task already exists");

let shard_holder = self.shards_holder.clone();
let collection_id = self.id.clone();
let collection_config = Arc::clone(&self.collection_config);
let channel_service = self.channel_service.clone();
let progress = Arc::new(Mutex::new(ReshardTaskProgress::new()));
let spawned_task = resharding::spawn_resharding_task(
shard_holder,
progress.clone(),
reshard_key.clone(),
consensus,
collection_id,
self.path.clone(),
collection_config,
channel_service,
temp_dir,
on_finish,
on_error,
);

active_reshard_tasks.add_task(
reshard_key,
ReshardTaskItem {
task: spawned_task,
started_at: chrono::Utc::now(),
progress,
},
);
}

Ok(())
}

Expand Down
13 changes: 13 additions & 0 deletions lib/collection/src/operations/cluster_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ pub enum ClusterOperations {
// Abort resharding
#[schemars(skip)]
AbortResharding(AbortReshardingOperation),

// Force `CommitRead` operation
#[schemars(skip)]
CommitRead(CommitReadOperation),
}

#[derive(Debug, Deserialize, Serialize, JsonSchema, Validate, Clone)]
Expand Down Expand Up @@ -110,6 +114,7 @@ impl Validate for ClusterOperations {
ClusterOperations::RestartTransfer(op) => op.validate(),
ClusterOperations::StartResharding(op) => op.validate(),
ClusterOperations::AbortResharding(op) => op.validate(),
ClusterOperations::CommitRead(op) => op.validate(),
}
}
}
Expand Down Expand Up @@ -152,6 +157,11 @@ pub struct AbortReshardingOperation {
pub abort_resharding: AbortResharding,
}

#[derive(Clone, Debug, Deserialize, Serialize, Validate)]
pub struct CommitReadOperation {
pub commit_read: CommitRead,
}

#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)]
#[serde(rename_all = "snake_case")]
pub struct ReplicateShard {
Expand Down Expand Up @@ -213,3 +223,6 @@ pub struct StartResharding {

#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema, Validate)]
pub struct AbortResharding {}

#[derive(Clone, Debug, Deserialize, Serialize, Validate)]
pub struct CommitRead {}
23 changes: 23 additions & 0 deletions src/common/collections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,29 @@ pub async fn do_update_collection_cluster(
)
.await
}

ClusterOperations::CommitRead(_) => {
let Some(state) = collection.resharding_state().await else {
return Err(StorageError::bad_request(format!(
"resharding is not in progress for collection {collection_name}"
)));
};

dispatcher
.submit_collection_meta_op(
CollectionMetaOperations::Resharding(
collection_name.clone(),
ReshardingOperation::CommitRead(ReshardKey {
peer_id: state.peer_id,
shard_id: state.shard_id,
shard_key: state.shard_key.clone(),
}),
),
access,
wait_timeout,
)
.await
}
}
}

Expand Down

0 comments on commit 2cfc12a

Please sign in to comment.