Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix GC actor integration #584

Merged
merged 3 commits into from Oct 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions quickwit-core/src/index.rs
Expand Up @@ -89,7 +89,8 @@ pub async fn delete_index(
.list_splits(index_id, SplitState::ScheduledForDeletion, None, &[])
.await?;
let deletion_stats =
delete_splits_with_files(index_id, storage, metastore.clone(), splits_to_delete).await?;
delete_splits_with_files(index_id, storage, metastore.clone(), splits_to_delete, None)
.await?;
metastore.delete_index(index_id).await?;
Ok(deletion_stats.deleted_entries)
}
Expand Down Expand Up @@ -123,6 +124,7 @@ pub async fn garbage_collect_index(
// to be deleted.
Duration::ZERO,
dry_run,
None,
)
.await?;
if dry_run {
Expand Down Expand Up @@ -158,7 +160,7 @@ pub async fn reset_index(
.await?;

let garbage_removal_result =
delete_splits_with_files(index_id, storage, metastore.clone(), splits).await;
delete_splits_with_files(index_id, storage, metastore.clone(), splits, None).await;
if garbage_removal_result.is_err() {
warn!(metastore_uri = %metastore.uri(), "All split files could not be removed during garbage collection.");
}
Expand Down
1 change: 1 addition & 0 deletions quickwit-indexing/src/actors/garbage_collector.rs
Expand Up @@ -110,6 +110,7 @@ impl AsyncActor for GarbageCollector {
STAGED_GRACE_PERIOD,
DELETION_GRACE_PERIOD,
false,
Some(ctx),
)
.await?;

Expand Down
52 changes: 37 additions & 15 deletions quickwit-indexing/src/actors/pipeline_supervisor.rs
Expand Up @@ -122,6 +122,7 @@ impl IndexingPipelineSupervisor {
&handlers.packager,
&handlers.uploader,
&handlers.publisher,
&handlers.garbage_collector,
&handlers.merge_planner,
&handlers.merge_split_downloader,
&handlers.merge_executor,
Expand Down Expand Up @@ -255,9 +256,23 @@ impl IndexingPipelineSupervisor {
.set_kill_switch(self.kill_switch.clone())
.spawn_sync();

// Garbage colletor
let garbage_collector = GarbageCollector::new(
self.params.index_id.clone(),
index_storage.clone(),
self.params.metastore.clone(),
);
let (garbage_collector_mailbox, garbage_collector_handler) = ctx
.spawn_actor(garbage_collector)
.set_kill_switch(self.kill_switch.clone())
.spawn_async();

// Publisher
let publisher =
Publisher::new(self.params.metastore.clone(), merge_planner_mailbox.clone());
let publisher = Publisher::new(
self.params.metastore.clone(),
merge_planner_mailbox.clone(),
garbage_collector_mailbox,
);
let (publisher_mailbox, publisher_handler) = ctx
.spawn_actor(publisher)
.set_kill_switch(self.kill_switch.clone())
Expand All @@ -267,7 +282,7 @@ impl IndexingPipelineSupervisor {
// Uploader
let uploader = Uploader::new(
self.params.metastore.clone(),
index_storage.clone(),
index_storage,
publisher_mailbox,
);
let (uploader_mailbox, uploader_handler) = ctx
Expand Down Expand Up @@ -307,23 +322,13 @@ impl IndexingPipelineSupervisor {
.set_kill_switch(self.kill_switch.clone())
.spawn_async();

let garbage_collector = GarbageCollector::new(
self.params.index_id.clone(),
index_storage,
self.params.metastore.clone(),
);
let (_garbage_collect_mailbox, garbage_collect_handler) = ctx
.spawn_actor(garbage_collector)
.set_kill_switch(self.kill_switch.clone())
.spawn_async();

self.handlers = Some(IndexingPipelineHandler {
source: source_handler,
indexer: indexer_handler,
packager: packager_handler,
uploader: uploader_handler,
publisher: publisher_handler,
garbage_collector: garbage_collect_handler,
garbage_collector: garbage_collector_handler,

merge_planner: merge_planner_handler,
merge_split_downloader: merge_split_downloader_handler,
Expand Down Expand Up @@ -367,6 +372,18 @@ impl IndexingPipelineSupervisor {
.send_exit_with_success(handlers.merge_planner.mailbox())
.await;
}

// When the publisher is dead, try to stop the garbage collector if not
// already.
if handlers.publisher.state() != ActorState::Running
&& handlers.garbage_collector.state() == ActorState::Running
{
info!("Stopping the garbage collector since the publisher is dead.");
// It's fine for the message to fail.
let _ = ctx
.send_exit_with_success(handlers.garbage_collector.mailbox())
.await;
}
}
}
Health::FailureOrUnhealthy => {
Expand Down Expand Up @@ -454,8 +471,13 @@ mod tests {
let mut metastore = MockMetastore::default();
metastore
.expect_list_splits()
.times(1)
.times(3)
.returning(|_, _, _, _| Ok(Vec::new()));
metastore
.expect_mark_splits_for_deletion()
.times(1)
.returning(|_, _| Ok(()));

metastore
.expect_index_metadata()
.withf(|index_id| index_id == "test-index")
Expand Down
33 changes: 31 additions & 2 deletions quickwit-indexing/src/actors/publisher.rs
Expand Up @@ -36,17 +36,20 @@ pub struct PublisherCounters {
pub struct Publisher {
metastore: Arc<dyn Metastore>,
merge_planner_mailbox: Mailbox<MergePlannerMessage>,
garbage_collector_mailbox: Mailbox<()>,
counters: PublisherCounters,
}

impl Publisher {
pub fn new(
metastore: Arc<dyn Metastore>,
merge_planner_mailbox: Mailbox<MergePlannerMessage>,
garbage_collector_mailbox: Mailbox<()>,
) -> Publisher {
Publisher {
metastore,
merge_planner_mailbox,
garbage_collector_mailbox,
counters: PublisherCounters::default(),
}
}
Expand Down Expand Up @@ -142,6 +145,22 @@ impl AsyncActor for Publisher {
fail_point!("publisher:after");
Ok(())
}

async fn finalize(
&mut self,
_exit_status: &quickwit_actors::ActorExitStatus,
ctx: &ActorContext<Self::Message>,
) -> anyhow::Result<()> {
// The `garbage_collector` actor runs for ever.
// Periodically scheduling new messages for itself.
//
// The publisher actor being the last standing actor of the pipeline,
// its end of life should also means the end of life of never stopping actors.
// After all, when the publisher is stopped, there shouldn't be anything to process.
// It's fine if the garbage collector is already dead.
let _ = ctx.send_exit_with_success_blocking(&self.garbage_collector_mailbox);
Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -178,7 +197,12 @@ mod tests {
.times(1)
.returning(|_, _, _| Ok(()));
let (merge_planner_mailbox, _merge_planner_inbox) = create_test_mailbox();
let publisher = Publisher::new(Arc::new(mock_metastore), merge_planner_mailbox);
let (garbage_collector_mailbox, _garbage_collector_inbox) = create_test_mailbox();
let publisher = Publisher::new(
Arc::new(mock_metastore),
merge_planner_mailbox,
garbage_collector_mailbox,
);
let universe = Universe::new();
let (publisher_mailbox, publisher_handle) = universe.spawn_actor(publisher).spawn_async();
let (split_future_tx1, split_future_rx1) = oneshot::channel::<PublisherMessage>();
Expand Down Expand Up @@ -236,7 +260,12 @@ mod tests {
.times(1)
.returning(|_, _, _| Ok(()));
let (merge_planner_mailbox, merge_planner_inbox) = create_test_mailbox();
let publisher = Publisher::new(Arc::new(mock_metastore), merge_planner_mailbox);
let (garbage_collector_mailbox, _garbage_collector_inbox) = create_test_mailbox();
let publisher = Publisher::new(
Arc::new(mock_metastore),
merge_planner_mailbox,
garbage_collector_mailbox,
);
let universe = Universe::new();
let (publisher_mailbox, publisher_handle) = universe.spawn_actor(publisher).spawn_async();
let (split_future_tx, split_future_rx) = oneshot::channel::<PublisherMessage>();
Expand Down
12 changes: 12 additions & 0 deletions quickwit-indexing/src/garbage_collection.rs
Expand Up @@ -22,6 +22,7 @@ use std::sync::Arc;
use std::time::Duration;

use futures::StreamExt;
use quickwit_actors::ActorContext;
use quickwit_metastore::{Metastore, SplitMetadataAndFooterOffsets, SplitState};
use quickwit_storage::Storage;
use tantivy::chrono::Utc;
Expand Down Expand Up @@ -66,13 +67,15 @@ impl From<&SplitMetadataAndFooterOffsets> for FileEntry {
/// * `deletion_grace_period` - Threshold period after which a marked as deleted split can be
/// safely deleted.
/// * `dry_run` - Should this only return a list of affected files without performing deletion.
/// * `ctx_opt` - A context for reporting progress (only useful within quickwit actor).
pub async fn run_garbage_collect(
index_id: &str,
storage: Arc<dyn Storage>,
metastore: Arc<dyn Metastore>,
staged_grace_period: Duration,
deletion_grace_period: Duration,
dry_run: bool,
ctx_opt: Option<&ActorContext<()>>,
) -> anyhow::Result<SplitDeletionStats> {
// Select staged splits with staging timestamp older than grace period timestamp.
let grace_period_timestamp = Utc::now().timestamp() - staged_grace_period.as_secs() as i64;
Expand All @@ -84,6 +87,9 @@ pub async fn run_garbage_collect(
// TODO: Update metastore API and push this filter down.
.filter(|meta| meta.split_metadata.update_timestamp < grace_period_timestamp)
.collect();
if let Some(ctx) = ctx_opt {
ctx.record_progress();
}

if dry_run {
let mut scheduled_for_delete_splits = metastore
Expand Down Expand Up @@ -125,6 +131,7 @@ pub async fn run_garbage_collect(
storage.clone(),
metastore.clone(),
splits_to_delete,
ctx_opt,
)
.await?;

Expand All @@ -138,11 +145,13 @@ pub async fn run_garbage_collect(
/// * `storage - The storage managing the target index.
/// * `metastore` - The metastore managing the target index.
/// * `splits` - The list of splits to delete.
/// * `ctx_opt` - A context for reporting progress (only useful within quickwit actor).
pub async fn delete_splits_with_files(
index_id: &str,
storage: Arc<dyn Storage>,
metastore: Arc<dyn Metastore>,
splits: Vec<SplitMetadataAndFooterOffsets>,
ctx_opt: Option<&ActorContext<()>>,
) -> anyhow::Result<SplitDeletionStats> {
let mut deletion_stats = SplitDeletionStats::default();
let mut deleted_split_ids: Vec<String> = Vec::new();
Expand All @@ -154,6 +163,9 @@ pub async fn delete_splits_with_files(
async move {
let file_entry = FileEntry::from(&meta);
let delete_split_res = storage_clone.delete(Path::new(&file_entry.file_name)).await;
if let Some(ctx) = ctx_opt {
ctx.record_progress();
}
(meta.split_metadata.split_id, file_entry, delete_split_res)
}
})
Expand Down