Skip to content

Commit

Permalink
Bugfix Assign shard logic.
Browse files Browse the repository at this point in the history
If no shard is removed, we just kill the pipeline.
If shards are added we reinitiate the shards.
The pipeline supervisor keeps track of the shard, and reassigns them if
the pipeline is respawned.

Closes #4184
Closes #4174
  • Loading branch information
fulmicoton committed Dec 2, 2023
1 parent 0b47eab commit 35cc8ab
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 66 deletions.
5 changes: 1 addition & 4 deletions quickwit/quickwit-cluster/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,10 +447,7 @@ pub fn parse_indexing_tasks(node_state: &NodeState) -> Vec<IndexingTask> {
///
/// If previous indexing tasks were present in the node state but were not in the given tasks, they
/// are marked for deletion.
pub fn set_indexing_tasks_in_node_state(
indexing_tasks: &[IndexingTask],
node_state: &mut NodeState,
) {
fn set_indexing_tasks_in_node_state(indexing_tasks: &[IndexingTask], node_state: &mut NodeState) {
let mut current_indexing_tasks_keys: HashSet<String> = node_state
.iter_prefix(INDEXING_TASK_PREFIX)
.map(|(key, _)| key.to_string())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,9 @@ impl IndexingScheduler {
let mut indexers = self.get_indexers_from_indexer_pool();
let running_indexing_tasks_by_node_id: FnvHashMap<String, Vec<IndexingTask>> = indexers
.iter()
.map(|indexer| (indexer.0.clone(), indexer.1.indexing_tasks.clone()))
.map(|&(ref indexer_id, ref indexer_node_info)| {
(indexer_id.clone(), indexer_node_info.indexing_tasks.clone())
})
.collect();

let indexing_plans_diff = get_indexing_plans_diff(
Expand Down
29 changes: 19 additions & 10 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::BTreeSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand All @@ -37,10 +38,10 @@ use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::{
IndexMetadataRequest, MetastoreError, MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::types::ShardId;
use quickwit_storage::{Storage, StorageResolver};
use tokio::sync::Semaphore;
use tracing::{debug, error, info, instrument};
use quickwit_proto::types::ShardId;

use super::MergePlanner;
use crate::actors::doc_processor::DocProcessor;
Expand All @@ -51,7 +52,9 @@ use crate::actors::uploader::UploaderType;
use crate::actors::{Indexer, Packager, Publisher, Uploader};
use crate::merge_policy::MergePolicy;
use crate::models::IndexingStatistics;
use crate::source::{quickwit_supported_sources, AssignShards, SourceActor, SourceRuntimeArgs, Assignment};
use crate::source::{
quickwit_supported_sources, AssignShards, Assignment, SourceActor, SourceRuntimeArgs,
};
use crate::split_store::IndexingSplitStore;
use crate::SplitsUpdateMailbox;

Expand Down Expand Up @@ -123,7 +126,7 @@ pub struct IndexingPipeline {
// The set of shard is something that can change dynamically without necessarily
// requiring a respawn of the pipeline.
// We keep the list of shards here however, to reassign them after a respawn.
shard_ids: Vec<ShardId>,
shard_ids: BTreeSet<ShardId>,
}

#[async_trait]
Expand Down Expand Up @@ -151,7 +154,7 @@ impl Actor for IndexingPipeline {
) -> anyhow::Result<()> {
// We update the observation to ensure our last "black box" observation
// is up to date.
self.perform_observe(ctx).await;
self.perform_observe(ctx);
Ok(())
}
}
Expand All @@ -164,7 +167,7 @@ impl IndexingPipeline {
handles_opt: None,
kill_switch: KillSwitch::default(),
statistics: IndexingStatistics::default(),
shard_ids: Vec::new(),
shard_ids: Default::default(),
}
}

Expand Down Expand Up @@ -243,7 +246,7 @@ impl IndexingPipeline {
self.statistics.generation
}

async fn perform_observe(&mut self, ctx: &ActorContext<Self>) {
fn perform_observe(&mut self, ctx: &ActorContext<Self>) {
let Some(handles) = &self.handles_opt else {
return;
};
Expand All @@ -264,6 +267,7 @@ impl IndexingPipeline {
.set_num_spawn_attempts(self.statistics.num_spawn_attempts);
let pipeline_metrics_opt = handles.indexer.last_observation().pipeline_metrics_opt;
self.statistics.pipeline_metrics_opt = pipeline_metrics_opt;
self.statistics.shard_ids = self.shard_ids.clone();
ctx.observe(self);
}

Expand Down Expand Up @@ -504,7 +508,7 @@ impl Handler<SuperviseLoop> for IndexingPipeline {
supervise_loop_token: SuperviseLoop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.perform_observe(ctx).await;
self.perform_observe(ctx);
self.perform_health_check(ctx).await?;
ctx.schedule_self_msg(SUPERVISE_INTERVAL, supervise_loop_token)
.await;
Expand Down Expand Up @@ -553,18 +557,23 @@ impl Handler<AssignShards> for IndexingPipeline {
async fn handle(
&mut self,
assign_shards_message: AssignShards,
_ctx: &ActorContext<Self>,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.shard_ids = assign_shards_message.0.shard_ids.clone();
// If the pipeline is running, we forward the message to its source.
// If it is not, then it will soon be respawn, and the shard will be assign after that.
if let Some(handles) = &mut self.handles_opt {
info!(
shard_ids=?assign_shards_message.0.shard_ids,
"assigning shards to indexing pipeline."
"assigning shards to indexing pipeline"
);
handles.source_mailbox.send_message(assign_shards_message).await?;
handles
.source_mailbox
.send_message(assign_shards_message)
.await?;
}
// We perform observe to make sure the set of shard ids is up to date.
self.perform_observe(ctx);
Ok(())
}
}
Expand Down
29 changes: 15 additions & 14 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ pub struct IndexingService {
}

impl Debug for IndexingService {
fn fmt(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
fn fmt(&self, formatter: &mut Formatter) -> std::fmt::Result {
formatter
.debug_struct("IndexingService")
.field("cluster_id", &self.cluster.cluster_id())
Expand Down Expand Up @@ -523,7 +523,7 @@ impl IndexingService {
continue;
};
let assignment = Assignment {
shard_ids: task.shard_ids.clone(),
shard_ids: task.shard_ids.iter().copied().collect(),
};
let message = AssignShards(assignment);

Expand Down Expand Up @@ -655,21 +655,22 @@ impl IndexingService {

/// Updates running indexing tasks in chitchat cluster state.
async fn update_cluster_running_indexing_tasks_to_chitchat(&self) {
let indexing_tasks = self
let mut indexing_tasks: Vec<IndexingTask> = self
.indexing_pipelines
.values()
.map(|pipeline_handle| &pipeline_handle.indexing_pipeline_id)
.map(|pipeline_id| IndexingTask {
index_uid: pipeline_id.index_uid.to_string(),
source_id: pipeline_id.source_id.clone(),
pipeline_uid: Some(pipeline_id.pipeline_uid),
shard_ids: Vec::new(),
})
// Sort indexing tasks so it's more readable for debugging purpose.
.sorted_by(|left, right| {
(&left.index_uid, &left.source_id).cmp(&(&right.index_uid, &right.source_id))
.map(|handle| IndexingTask {
index_uid: handle.indexing_pipeline_id.index_uid.to_string(),
source_id: handle.indexing_pipeline_id.source_id.clone(),
pipeline_uid: Some(handle.indexing_pipeline_id.pipeline_uid),
shard_ids: handle
.handle
.last_observation()
.shard_ids
.into_iter()
.collect(),
})
.collect_vec();
.collect();
indexing_tasks.sort_unstable_by_key(|task| task.pipeline_uid);

if let Err(error) = self
.cluster
Expand Down
7 changes: 5 additions & 2 deletions quickwit/quickwit-indexing/src/actors/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use fail::fail_point;
use quickwit_actors::{Actor, ActorContext, Handler, Mailbox, QueueCapacity};
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient, PublishSplitsRequest};
use serde::Serialize;
use tracing::{info, instrument};
use tracing::{info, instrument, warn};

use crate::actors::MergePlanner;
use crate::models::{NewSplits, SplitsUpdate};
Expand Down Expand Up @@ -156,12 +156,15 @@ impl Handler<SplitsUpdate> for Publisher {
// considered an error. For instance, if the source is a
// FileSource, it will terminate upon EOF and drop its
// mailbox.
let _ = ctx
let suggest_truncate_res = ctx
.send_message(
source_mailbox,
SuggestTruncate(checkpoint.source_delta.get_source_checkpoint()),
)
.await;
if let Err(send_truncate_err) = suggest_truncate_res {
warn!(error=?send_truncate_err, "failed to send truncate message from publisher to source");
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-indexing/src/models/indexing_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::BTreeSet;
use std::sync::atomic::Ordering;

use quickwit_proto::indexing::PipelineMetrics;
use quickwit_proto::types::ShardId;
use serde::Serialize;

use crate::actors::{DocProcessorCounters, IndexerCounters, PublisherCounters, UploaderCounters};
Expand Down Expand Up @@ -51,6 +53,8 @@ pub struct IndexingStatistics {
pub num_spawn_attempts: usize,
// Pipeline metrics.
pub pipeline_metrics_opt: Option<PipelineMetrics>,
// List of shard ids.
pub shard_ids: BTreeSet<ShardId>,
}

impl IndexingStatistics {
Expand Down

0 comments on commit 35cc8ab

Please sign in to comment.