Skip to content

Commit

Permalink
Simple no-op refactoring.
Browse files Browse the repository at this point in the history
Bugfix: IndexPipeline remembers list of shards and applies it after respawn

Bugfix Assign shard logic.

If no shard is removed, we just kill the pipeline.
If shards are added we reinitiate the shards.
The pipeline supervisor keeps track of the shard, and reassigns them if
the pipeline is respawned.

Closes #4184
Closes #4174
  • Loading branch information
fulmicoton committed Dec 4, 2023
1 parent b31bf72 commit d2c9c2d
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 104 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-cluster/src/cluster.rs
Expand Up @@ -447,7 +447,7 @@ pub fn parse_indexing_tasks(node_state: &NodeState) -> Vec<IndexingTask> {
///
/// If previous indexing tasks were present in the node state but were not in the given tasks, they
/// are marked for deletion.
pub fn set_indexing_tasks_in_node_state(
pub(crate) fn set_indexing_tasks_in_node_state(
indexing_tasks: &[IndexingTask],
node_state: &mut NodeState,
) {
Expand Down
27 changes: 23 additions & 4 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::BTreeSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand All @@ -37,6 +38,7 @@ use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::{
IndexMetadataRequest, MetastoreError, MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::types::ShardId;
use quickwit_storage::{Storage, StorageResolver};
use tokio::sync::Semaphore;
use tracing::{debug, error, info, instrument};
Expand All @@ -50,7 +52,9 @@ use crate::actors::uploader::UploaderType;
use crate::actors::{Indexer, Packager, Publisher, Uploader};
use crate::merge_policy::MergePolicy;
use crate::models::IndexingStatistics;
use crate::source::{quickwit_supported_sources, AssignShards, SourceActor, SourceRuntimeArgs};
use crate::source::{
quickwit_supported_sources, AssignShards, Assignment, SourceActor, SourceRuntimeArgs,
};
use crate::split_store::IndexingSplitStore;
use crate::SplitsUpdateMailbox;

Expand Down Expand Up @@ -119,6 +123,10 @@ pub struct IndexingPipeline {
handles_opt: Option<IndexingPipelineHandles>,
// Killswitch used for the actors in the pipeline. This is not the supervisor killswitch.
kill_switch: KillSwitch,
// The set of shard is something that can change dynamically without necessarily
// requiring a respawn of the pipeline.
// We keep the list of shards here however, to reassign them after a respawn.
shard_ids: BTreeSet<ShardId>,
}

#[async_trait]
Expand Down Expand Up @@ -153,12 +161,13 @@ impl Actor for IndexingPipeline {

impl IndexingPipeline {
pub fn new(params: IndexingPipelineParams) -> Self {
Self {
IndexingPipeline {
params,
previous_generations_statistics: Default::default(),
handles_opt: None,
kill_switch: KillSwitch::default(),
statistics: IndexingStatistics::default(),
shard_ids: Default::default(),
}
}

Expand Down Expand Up @@ -258,6 +267,7 @@ impl IndexingPipeline {
.set_num_spawn_attempts(self.statistics.num_spawn_attempts);
let pipeline_metrics_opt = handles.indexer.last_observation().pipeline_metrics_opt;
self.statistics.pipeline_metrics_opt = pipeline_metrics_opt;
self.statistics.shard_ids = self.shard_ids.clone();
ctx.observe(self);
}

Expand Down Expand Up @@ -453,6 +463,10 @@ impl IndexingPipeline {
.set_mailboxes(source_mailbox, source_inbox)
.set_kill_switch(self.kill_switch.clone())
.spawn(actor_source);
let assign_shard_message = AssignShards(Assignment {
shard_ids: self.shard_ids.clone(),
});
source_mailbox.send_message(assign_shard_message).await?;

// Increment generation once we are sure there will be no spawning error.
self.previous_generations_statistics = self.statistics.clone();
Expand Down Expand Up @@ -543,18 +557,23 @@ impl Handler<AssignShards> for IndexingPipeline {
async fn handle(
&mut self,
assign_shards_message: AssignShards,
_ctx: &ActorContext<Self>,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.shard_ids = assign_shards_message.0.shard_ids.clone();
// If the pipeline is running, we forward the message to its source.
// If it is not, then it will soon be respawn, and the shard will be assign after that.
if let Some(handles) = &mut self.handles_opt {
info!(
shard_ids=?assign_shards_message.0.shard_ids,
"assigning shards to indexing pipeline."
"assigning shards to indexing pipeline"
);
handles
.source_mailbox
.send_message(assign_shards_message)
.await?;
}
// We perform observe to make sure the set of shard ids is up to date.
self.perform_observe(ctx);
Ok(())
}
}
Expand Down
122 changes: 68 additions & 54 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Expand Up @@ -466,38 +466,34 @@ impl IndexingService {
Ok(merge_planner_mailbox)
}

/// Applies the indexing plan by:
/// - Stopping the running pipelines not present in the provided plan.
/// - Starting the pipelines that are not running.
/// Note: the indexing is a list of `IndexingTask` and has no ordinal
/// like a pipeline. We assign an ordinal for each `IndexingTask` from
/// [0, n) with n the number of indexing tasks given a (index_id, source_id).
async fn apply_indexing_plan(
&mut self,
ctx: &ActorContext<Self>,
physical_indexing_plan_request: ApplyIndexingPlanRequest,
) -> Result<ApplyIndexingPlanResponse, IndexingError> {
let pipelines_uid_in_plan: FnvHashSet<PipelineUid> = physical_indexing_plan_request
.indexing_tasks
async fn find_and_shutdown_decommissioned_pipelines(&mut self, tasks: &[IndexingTask]) {
let pipeline_uids_in_plan: FnvHashSet<PipelineUid> = tasks
.iter()
.map(|indexing_task| indexing_task.pipeline_uid())
.collect();
let pipeline_to_add: FnvHashSet<&IndexingTask> = physical_indexing_plan_request
.indexing_tasks
.iter()
.filter(|indexing_task| {
let pipeline_uid = indexing_task.pipeline_uid();
!self.indexing_pipelines.contains_key(&pipeline_uid)
})
.collect::<FnvHashSet<_>>();
let pipeline_uid_to_remove: Vec<PipelineUid> = self

let pipeline_uids_to_remove: Vec<PipelineUid> = self
.indexing_pipelines
.keys()
.cloned()
.filter(|pipeline_uid| !pipelines_uid_in_plan.contains(pipeline_uid))
.filter(|pipeline_uid| !pipeline_uids_in_plan.contains(pipeline_uid))
.collect::<Vec<_>>();
let indexing_pipeline_ids_to_add: Vec<IndexingPipelineId> = pipeline_to_add

// Shut down currently running pipelines that are missing in the new plan.
self.shutdown_pipelines(&pipeline_uids_to_remove).await;
}

async fn find_and_spawn_new_pipelines(
&mut self,
tasks: &[IndexingTask],
ctx: &ActorContext<Self>,
) -> Result<Vec<IndexingPipelineId>, IndexingError> {
let pipeline_ids_to_add: Vec<IndexingPipelineId> = tasks
.iter()
.filter(|indexing_task| {
let pipeline_uid = indexing_task.pipeline_uid();
!self.indexing_pipelines.contains_key(&pipeline_uid)
})
.flat_map(|indexing_task| {
let pipeline_uid = indexing_task.pipeline_uid();
let index_uid = IndexUid::parse(indexing_task.index_uid.clone()).ok()?;
Expand All @@ -509,51 +505,64 @@ impl IndexingService {
})
})
.collect();
self.spawn_pipelines(&pipeline_ids_to_add[..], ctx).await
}

// Spawn new pipeline in the new plan that are not currently running
let failed_spawning_pipeline_ids = self
.spawn_pipelines(ctx, &indexing_pipeline_ids_to_add[..])
.await?;

// TODO: Temporary hack to assign shards to pipelines.
for indexing_task in &physical_indexing_plan_request.indexing_tasks {
if indexing_task.shard_ids.is_empty() {
/// For all Ingest V2 pipelines, assigns the set of shards they should be working on.
/// This is done regardless of whether there has been a change in their shard list
/// or not.
///
/// If a pipeline actor has failed, this function just logs an error.
async fn assign_shards_to_pipelines(&mut self, tasks: &[IndexingTask]) {
for task in tasks {
if task.shard_ids.is_empty() {
continue;
}
let pipeline_uid = indexing_task.pipeline_uid();
let pipeline_uid = task.pipeline_uid();
let Some(pipeline_handle) = self.indexing_pipelines.get(&pipeline_uid) else {
continue;
};
let assignment = Assignment {
shard_ids: indexing_task.shard_ids.clone(),
shard_ids: task.shard_ids.iter().copied().collect(),
};
let message = AssignShards(assignment);

if let Err(error) = pipeline_handle.mailbox.send_message(message).await {
error!(error=%error, "failed to assign shards to indexing pipeline");
}
}
}

// Shut down currently running pipelines that are missing in the new plan.
self.shutdown_pipelines(&pipeline_uid_to_remove).await;

/// Applies the indexing plan by:
/// - Stopping the running pipelines not present in the provided plan.
/// - Starting the pipelines that are not running.
/// Note: the indexing is a list of `IndexingTask` and has no ordinal
/// like a pipeline. We assign an ordinal for each `IndexingTask` from
/// [0, n) with n the number of indexing tasks given a (index_id, source_id).
async fn apply_indexing_plan(
&mut self,
apply_plan_req: ApplyIndexingPlanRequest,
ctx: &ActorContext<Self>,
) -> Result<(), IndexingError> {
let tasks: &[IndexingTask] = &apply_plan_req.indexing_tasks[..];
self.find_and_shutdown_decommissioned_pipelines(tasks).await;
let failed_spawning_pipeline_ids = self.find_and_spawn_new_pipelines(tasks, ctx).await?;
self.assign_shards_to_pipelines(tasks).await;
self.update_cluster_running_indexing_tasks_in_chitchat()
.await;

if !failed_spawning_pipeline_ids.is_empty() {
return Err(IndexingError::SpawnPipelinesError {
pipeline_ids: failed_spawning_pipeline_ids,
});
}

Ok(ApplyIndexingPlanResponse {})
Ok(())
}

/// Spawns the pipelines with supplied ids and returns a list of failed pipelines.
async fn spawn_pipelines(
&mut self,
ctx: &ActorContext<Self>,
added_pipeline_ids: &[IndexingPipelineId],
ctx: &ActorContext<Self>,
) -> Result<Vec<IndexingPipelineId>, IndexingError> {
// We fetch the new indexes metadata.
let indexes_metadata_futures = added_pipeline_ids
Expand Down Expand Up @@ -645,21 +654,23 @@ impl IndexingService {
}

async fn update_cluster_running_indexing_tasks_in_chitchat(&self) {
let indexing_tasks = self
let mut indexing_tasks: Vec<IndexingTask> = self
.indexing_pipelines
.values()
.map(|pipeline_handle| &pipeline_handle.indexing_pipeline_id)
.map(|pipeline_id| IndexingTask {
index_uid: pipeline_id.index_uid.to_string(),
source_id: pipeline_id.source_id.clone(),
pipeline_uid: Some(pipeline_id.pipeline_uid),
shard_ids: Vec::new(),
})
// Sort indexing tasks so it's more readable for debugging purpose.
.sorted_by(|left, right| {
(&left.index_uid, &left.source_id).cmp(&(&right.index_uid, &right.source_id))
.map(|handle| IndexingTask {
index_uid: handle.indexing_pipeline_id.index_uid.to_string(),
source_id: handle.indexing_pipeline_id.source_id.clone(),
pipeline_uid: Some(handle.indexing_pipeline_id.pipeline_uid),
shard_ids: handle
.handle
.last_observation()
.shard_ids
.iter()
.copied()
.collect(),
})
.collect_vec();
.collect();
indexing_tasks.sort_unstable_by_key(|task| task.pipeline_uid);

if let Err(error) = self
.cluster
Expand Down Expand Up @@ -820,7 +831,10 @@ impl Handler<ApplyIndexingPlanRequest> for IndexingService {
plan_request: ApplyIndexingPlanRequest,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
Ok(self.apply_indexing_plan(ctx, plan_request).await)
Ok(self
.apply_indexing_plan(plan_request, ctx)
.await
.map(|_| ApplyIndexingPlanResponse {}))
}
}

Expand Down
7 changes: 5 additions & 2 deletions quickwit/quickwit-indexing/src/actors/publisher.rs
Expand Up @@ -23,7 +23,7 @@ use fail::fail_point;
use quickwit_actors::{Actor, ActorContext, Handler, Mailbox, QueueCapacity};
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient, PublishSplitsRequest};
use serde::Serialize;
use tracing::{info, instrument};
use tracing::{info, instrument, warn};

use crate::actors::MergePlanner;
use crate::models::{NewSplits, SplitsUpdate};
Expand Down Expand Up @@ -156,12 +156,15 @@ impl Handler<SplitsUpdate> for Publisher {
// considered an error. For instance, if the source is a
// FileSource, it will terminate upon EOF and drop its
// mailbox.
let _ = ctx
let suggest_truncate_res = ctx
.send_message(
source_mailbox,
SuggestTruncate(checkpoint.source_delta.get_source_checkpoint()),
)
.await;
if let Err(send_truncate_err) = suggest_truncate_res {
warn!(error=?send_truncate_err, "failed to send truncate message from publisher to source");
}
}
}

Expand Down
5 changes: 5 additions & 0 deletions quickwit/quickwit-indexing/src/models/indexing_statistics.rs
Expand Up @@ -17,9 +17,11 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::BTreeSet;
use std::sync::atomic::Ordering;

use quickwit_proto::indexing::PipelineMetrics;
use quickwit_proto::types::ShardId;
use serde::Serialize;

use crate::actors::{DocProcessorCounters, IndexerCounters, PublisherCounters, UploaderCounters};
Expand Down Expand Up @@ -51,6 +53,9 @@ pub struct IndexingStatistics {
pub num_spawn_attempts: usize,
// Pipeline metrics.
pub pipeline_metrics_opt: Option<PipelineMetrics>,
// List of shard ids.
#[schema(value_type = Vec<u64>)]
pub shard_ids: BTreeSet<ShardId>,
}

impl IndexingStatistics {
Expand Down

0 comments on commit d2c9c2d

Please sign in to comment.