Skip to content

Commit

Permalink
Bugfix Assign shard logic.
Browse files Browse the repository at this point in the history
If no shard is removed, we just kill the pipeline.
If shards are added we reinitiate the shards.
The pipeline supervisor keeps track of the shard, and reassigns them if
the pipeline is respawned.

Closes #4184
  • Loading branch information
fulmicoton committed Dec 2, 2023
1 parent 0b47eab commit 9541f3e
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 44 deletions.
18 changes: 12 additions & 6 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::BTreeSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand All @@ -37,10 +38,10 @@ use quickwit_proto::indexing::IndexingPipelineId;
use quickwit_proto::metastore::{
IndexMetadataRequest, MetastoreError, MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::types::ShardId;
use quickwit_storage::{Storage, StorageResolver};
use tokio::sync::Semaphore;
use tracing::{debug, error, info, instrument};
use quickwit_proto::types::ShardId;

use super::MergePlanner;
use crate::actors::doc_processor::DocProcessor;
Expand All @@ -51,7 +52,9 @@ use crate::actors::uploader::UploaderType;
use crate::actors::{Indexer, Packager, Publisher, Uploader};
use crate::merge_policy::MergePolicy;
use crate::models::IndexingStatistics;
use crate::source::{quickwit_supported_sources, AssignShards, SourceActor, SourceRuntimeArgs, Assignment};
use crate::source::{
quickwit_supported_sources, AssignShards, Assignment, SourceActor, SourceRuntimeArgs,
};
use crate::split_store::IndexingSplitStore;
use crate::SplitsUpdateMailbox;

Expand Down Expand Up @@ -123,7 +126,7 @@ pub struct IndexingPipeline {
// The set of shard is something that can change dynamically without necessarily
// requiring a respawn of the pipeline.
// We keep the list of shards here however, to reassign them after a respawn.
shard_ids: Vec<ShardId>,
shard_ids: BTreeSet<ShardId>,
}

#[async_trait]
Expand Down Expand Up @@ -164,7 +167,7 @@ impl IndexingPipeline {
handles_opt: None,
kill_switch: KillSwitch::default(),
statistics: IndexingStatistics::default(),
shard_ids: Vec::new(),
shard_ids: Default::default(),
}
}

Expand Down Expand Up @@ -561,9 +564,12 @@ impl Handler<AssignShards> for IndexingPipeline {
if let Some(handles) = &mut self.handles_opt {
info!(
shard_ids=?assign_shards_message.0.shard_ids,
"assigning shards to indexing pipeline."
"assigning shards to indexing pipeline"
);
handles.source_mailbox.send_message(assign_shards_message).await?;
handles
.source_mailbox
.send_message(assign_shards_message)
.await?;
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-indexing/src/actors/indexing_service.rs
Expand Up @@ -523,7 +523,7 @@ impl IndexingService {
continue;
};
let assignment = Assignment {
shard_ids: task.shard_ids.clone(),
shard_ids: task.shard_ids.iter().copied().collect(),
};
let message = AssignShards(assignment);

Expand Down
7 changes: 5 additions & 2 deletions quickwit/quickwit-indexing/src/actors/publisher.rs
Expand Up @@ -23,7 +23,7 @@ use fail::fail_point;
use quickwit_actors::{Actor, ActorContext, Handler, Mailbox, QueueCapacity};
use quickwit_proto::metastore::{MetastoreService, MetastoreServiceClient, PublishSplitsRequest};
use serde::Serialize;
use tracing::{info, instrument};
use tracing::{info, instrument, warn};

use crate::actors::MergePlanner;
use crate::models::{NewSplits, SplitsUpdate};
Expand Down Expand Up @@ -156,12 +156,15 @@ impl Handler<SplitsUpdate> for Publisher {
// considered an error. For instance, if the source is a
// FileSource, it will terminate upon EOF and drop its
// mailbox.
let _ = ctx
let suggest_truncate_res = ctx
.send_message(
source_mailbox,
SuggestTruncate(checkpoint.source_delta.get_source_checkpoint()),
)
.await;
if let Err(send_truncate_err) = suggest_truncate_res {
warn!(error=?send_truncate_err, "failed to send truncate message from publisher to source");
}
}
}

Expand Down
80 changes: 52 additions & 28 deletions quickwit/quickwit-indexing/src/source/ingest/mod.rs
Expand Up @@ -17,14 +17,14 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::collections::BTreeSet;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{bail, Context};
use async_trait::async_trait;
use fnv::FnvHashMap;
use itertools::Itertools;
use quickwit_actors::{ActorExitStatus, Mailbox};
use quickwit_common::pubsub::EventBroker;
use quickwit_common::retry::RetryParams;
Expand All @@ -48,7 +48,7 @@ use tracing::{debug, error, info, warn};
use ulid::Ulid;

use super::{
Assignment, BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory,
BatchBuilder, Source, SourceContext, SourceRuntimeArgs, TypedSourceFactory,
BATCH_NUM_BYTES_LIMIT, EMIT_BATCHES_TIMEOUT,
};
use crate::actors::DocProcessor;
Expand Down Expand Up @@ -116,10 +116,10 @@ impl ClientId {
enum IndexingStatus {
#[default]
Active,
// We have emitted all documents of the pipeline until EOF.
// Disclaimer: a complete status does not mean that all documents have been indexed.
// Some document might be still travelling in the pipeline, and may not have been published
// yet.
// We have received all documents from the stream. Note they
// are not necessarily published yet.
EofReached,
// All documents have been indexed AND published.
Complete,
Error,
}
Expand Down Expand Up @@ -217,7 +217,7 @@ impl IngestSource {
batch_builder.force_commit();
}
MRecord::Eof => {
assigned_shard.status = IndexingStatus::Complete;
assigned_shard.status = IndexingStatus::EofReached;
break;
}
MRecord::Unknown => {
Expand All @@ -239,7 +239,9 @@ impl IngestSource {

fn process_fetch_stream_error(&mut self, fetch_stream_error: FetchStreamError) {
if let Some(shard) = self.assigned_shards.get_mut(&fetch_stream_error.shard_id) {
if shard.status != IndexingStatus::Complete {
if shard.status != IndexingStatus::Complete
|| shard.status != IndexingStatus::EofReached
{
shard.status = IndexingStatus::Error;
}
}
Expand All @@ -250,8 +252,21 @@ impl IngestSource {
self.client_id.source_uid.clone(),
truncate_positions.clone(),
);

// Let's record all shards that have reached Eof as complete.
for (shard, truncate_position) in &truncate_positions {
if truncate_position == &Position::Eof {
if let Some(assigned_shard) = self.assigned_shards.get_mut(shard) {
assigned_shard.status = IndexingStatus::Complete;
}
}
}

// We publish the event to the event broker.
self.event_broker.publish(shard_positions_update);

// Finally, we push the information to ingesters in a best effort manner.
// If the request fail, we just log an error.
let mut per_ingester_truncate_subrequests: FnvHashMap<
&NodeId,
Vec<TruncateShardsSubrequest>,
Expand Down Expand Up @@ -376,7 +391,7 @@ impl Source for IngestSource {

async fn assign_shards(
&mut self,
assignment: Assignment,
new_assigned_shard_ids: BTreeSet<ShardId>,
doc_processor_mailbox: &Mailbox<DocProcessor>,
ctx: &SourceContext,
) -> anyhow::Result<()> {
Expand All @@ -385,15 +400,27 @@ impl Source for IngestSource {
.assigned_shards
.keys()
.copied()
.sorted()
.collect::<Vec<ShardId>>();

let mut new_assigned_shard_ids: Vec<ShardId> = assignment.shard_ids;
new_assigned_shard_ids.sort();
.collect::<BTreeSet<ShardId>>();

// The set of shard is unchanged. There is nothing to do.
if current_assigned_shard_ids == new_assigned_shard_ids {
return Ok(());
}

// If some shard have been removed before reaching EOF,
// we just fail the pipeline.
// Upon restart the shard will be assigned again by the pipeline.
//
// In the future, we will tweak the control plane to avoid moving shard around.
// Instead, we will close the shard and open a new shard.
for removed_shard_id in current_assigned_shard_ids.difference(&new_assigned_shard_ids) {
if let Some(removed_shard) = self.assigned_shards.get(removed_shard_id) {
if removed_shard.status != IndexingStatus::Complete {
bail!("shard `{removed_shard_id}` has been removed before reaching EOF",);
}
}
}

info!("new shard assignment: `{:?}`", new_assigned_shard_ids);

self.assigned_shards.clear();
Expand All @@ -417,7 +444,7 @@ impl Source for IngestSource {
let acquire_shards_subrequest = AcquireShardsSubrequest {
index_uid: self.client_id.source_uid.index_uid.to_string(),
source_id: self.client_id.source_uid.source_id.clone(),
shard_ids: new_assigned_shard_ids,
shard_ids: new_assigned_shard_ids.into_iter().collect(),
publish_token: self.publish_token.clone(),
};
let acquire_shards_request = AcquireShardsRequest {
Expand Down Expand Up @@ -492,7 +519,6 @@ impl Source for IngestSource {
let shard_id = partition_id.as_u64().expect("shard ID should be a u64");
truncate_positions.push((shard_id, position));
}

self.truncate(truncate_positions).await;
Ok(())
}
Expand All @@ -512,9 +538,11 @@ impl Source for IngestSource {

#[cfg(test)]
mod tests {
use std::iter::once;
use std::path::PathBuf;

use bytes::Bytes;
use itertools::Itertools;
use quickwit_actors::{ActorContext, Universe};
use quickwit_common::ServiceStream;
use quickwit_config::{SourceConfig, SourceParams};
Expand Down Expand Up @@ -640,14 +668,12 @@ mod tests {
ActorContext::for_test(&universe, source_mailbox, observable_state_tx);

// In this scenario, the indexer will only be able to acquire shard 1.
let assignment = Assignment {
shard_ids: vec![1, 2],
};
let shard_ids: BTreeSet<ShardId> = (1..3).collect();

let publish_lock = source.publish_lock.clone();
// let publish_token = source.publish_token.clone();

source
.assign_shards(assignment, &doc_processor_mailbox, &ctx)
.assign_shards(shard_ids, &doc_processor_mailbox, &ctx)
.await
.unwrap();

Expand Down Expand Up @@ -785,10 +811,10 @@ mod tests {
ActorContext::for_test(&universe, source_mailbox, observable_state_tx);

// In this scenario, the indexer will only be able to acquire shard 1.
let assignment = Assignment { shard_ids: vec![1] };
let shard_ids: BTreeSet<ShardId> = once(1).collect();

source
.assign_shards(assignment, &doc_processor_mailbox, &ctx)
.assign_shards(shard_ids, &doc_processor_mailbox, &ctx)
.await
.unwrap();

Expand Down Expand Up @@ -941,17 +967,15 @@ mod tests {
ActorContext::for_test(&universe, source_mailbox, observable_state_tx);

// In this scenario, the indexer will only be able to acquire shard 1.
let assignment = Assignment {
shard_ids: vec![1, 2],
};
let shard_ids: BTreeSet<ShardId> = (1..3).collect();

assert_eq!(
shard_positions_update_rx.try_recv().unwrap_err(),
TryRecvError::Empty
);

source
.assign_shards(assignment, &doc_processor_mailbox, &ctx)
.assign_shards(shard_ids, &doc_processor_mailbox, &ctx)
.await
.unwrap();

Expand Down Expand Up @@ -1091,7 +1115,7 @@ mod tests {
.await
.unwrap();
let shard = source.assigned_shards.get(&2).unwrap();
assert_eq!(shard.status, IndexingStatus::Complete);
assert_eq!(shard.status, IndexingStatus::EofReached);

fetch_response_tx
.send(Err(FetchStreamError {
Expand Down
11 changes: 6 additions & 5 deletions quickwit/quickwit-indexing/src/source/mod.rs
Expand Up @@ -72,6 +72,7 @@ mod source_factory;
mod vec_source;
mod void_source;

use std::collections::BTreeSet;
use std::path::PathBuf;
use std::time::Duration;

Expand Down Expand Up @@ -238,7 +239,7 @@ pub trait Source: Send + 'static {
/// plane.
async fn assign_shards(
&mut self,
_assignement: Assignment,
_shard_ids: BTreeSet<ShardId>,
_doc_processor_mailbox: &Mailbox<DocProcessor>,
_ctx: &SourceContext,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -299,7 +300,7 @@ struct Loop;

#[derive(Debug)]
pub struct Assignment {
pub shard_ids: Vec<ShardId>,
pub shard_ids: BTreeSet<ShardId>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -367,12 +368,12 @@ impl Handler<AssignShards> for SourceActor {

async fn handle(
&mut self,
message: AssignShards,
assign_shards_message: AssignShards,
ctx: &SourceContext,
) -> Result<(), ActorExitStatus> {
let AssignShards(assignment) = message;
let AssignShards(Assignment { shard_ids }) = assign_shards_message;
self.source
.assign_shards(assignment, &self.doc_processor_mailbox, ctx)
.assign_shards(shard_ids, &self.doc_processor_mailbox, ctx)
.await?;
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-metastore/src/checkpoint.rs
Expand Up @@ -29,7 +29,7 @@ use quickwit_proto::types::{Position, SourceId};
use serde::ser::SerializeMap;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::{info, warn};
use tracing::{debug, warn};

/// A `PartitionId` uniquely identifies a partition for a given source.
#[derive(Clone, Debug, Default, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
Expand Down Expand Up @@ -327,7 +327,7 @@ impl SourceCheckpoint {
delta: SourceCheckpointDelta,
) -> Result<(), IncompatibleCheckpointDelta> {
self.check_compatibility(&delta)?;
info!(delta=?delta, checkpoint=?self, "applying delta to checkpoint");
debug!(delta=?delta, checkpoint=?self, "applying delta to checkpoint");

for (partition_id, partition_position) in delta.per_partition {
self.per_partition
Expand Down

0 comments on commit 9541f3e

Please sign in to comment.