Skip to content

Commit

Permalink
feat: support different reindexing modes
Browse files Browse the repository at this point in the history
The reindexing behavior is currently hardcoded to always or never, but this introduces another mode where reindexing can happen only if the index fails to load, which is typical for schema changes.
  • Loading branch information
Ulf Lilleengen committed Sep 22, 2023
1 parent d652456 commit a48704e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 16 deletions.
12 changes: 4 additions & 8 deletions bombastic/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tokio::sync::{mpsc, Mutex};
use tokio::task::block_in_place;
use trustification_event_bus::EventBusConfig;
use trustification_index::{IndexConfig, IndexStore};
use trustification_indexer::{actix::configure, Indexer, IndexerStatus};
use trustification_indexer::{actix::configure, Indexer, IndexerStatus, ReindexMode};
use trustification_infrastructure::{Infrastructure, InfrastructureConfig};
use trustification_storage::{Storage, StorageConfig};

Expand All @@ -24,9 +24,8 @@ pub struct Run {
#[arg(long = "devmode", default_value_t = false)]
pub devmode: bool,

/// Reindex all documents at startup
#[arg(long = "reindex", default_value_t = false)]
pub reindex: bool,
#[arg(long = "reindex", default_value_t = ReindexMode::OnFailure)]
pub reindex: ReindexMode,

#[command(flatten)]
pub bus: EventBusConfig,
Expand Down Expand Up @@ -68,10 +67,6 @@ impl Run {
bus.create(&[self.stored_topic.as_str()]).await?;
}

if self.reindex {
let _ = c.send(trustification_indexer::IndexerCommand::Reindex).await;
}

let mut indexer = Indexer {
index,
storage,
Expand All @@ -83,6 +78,7 @@ impl Run {
status: s.clone(),
commands: command_receiver,
command_sender: c,
reindex: self.reindex,
};
indexer.run().await
},
Expand Down
29 changes: 29 additions & 0 deletions indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use core::fmt;
use std::time::Duration;

use futures::pin_mut;
Expand Down Expand Up @@ -25,6 +26,26 @@ pub enum IndexerCommand {
Reindex,
}

#[derive(clap::ValueEnum, Clone, Debug, PartialEq)]
pub enum ReindexMode {
#[clap(name = "always")]
Always,
#[clap(name = "on-failure")]
OnFailure,
#[clap(name = "never")]
Never,
}

impl fmt::Display for ReindexMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ReindexMode::Always => write!(f, "always"),
ReindexMode::OnFailure => write!(f, "on-failure"),
ReindexMode::Never => write!(f, "never"),
}
}
}

pub struct Indexer<'a, INDEX: Index> {
pub stored_topic: &'a str,
pub indexed_topic: &'a str,
Expand All @@ -36,13 +57,21 @@ pub struct Indexer<'a, INDEX: Index> {
pub status: Arc<Mutex<IndexerStatus>>,
pub commands: Receiver<IndexerCommand>,
pub command_sender: Sender<IndexerCommand>,
pub reindex: ReindexMode,
}

impl<'a, INDEX: Index> Indexer<'a, INDEX> {
pub async fn run(&mut self) -> Result<(), anyhow::Error> {
// Load initial index from storage.
if let Err(e) = self.index.sync(&self.storage).await {
log::info!("Error loading initial index: {:?}", e);
if self.reindex == ReindexMode::OnFailure || self.reindex == ReindexMode::Always {
self.command_sender.send(IndexerCommand::Reindex).await?;
}
} else {
if self.reindex == ReindexMode::Always {
self.command_sender.send(IndexerCommand::Reindex).await?;
}
}

let mut interval = tokio::time::interval(self.sync_interval);
Expand Down
12 changes: 4 additions & 8 deletions vexination/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tokio::sync::{mpsc, Mutex};
use tokio::task::block_in_place;
use trustification_event_bus::EventBusConfig;
use trustification_index::{IndexConfig, IndexStore};
use trustification_indexer::{actix::configure, Indexer, IndexerStatus};
use trustification_indexer::{actix::configure, Indexer, IndexerStatus, ReindexMode};
use trustification_infrastructure::{Infrastructure, InfrastructureConfig};
use trustification_storage::{Storage, StorageConfig};
use vexination_index::Index;
Expand All @@ -25,9 +25,8 @@ pub struct Run {
#[arg(long = "devmode", default_value_t = false)]
pub devmode: bool,

/// Reindex all documents at startup
#[arg(long = "reindex", default_value_t = false)]
pub reindex: bool,
#[arg(long = "reindex", default_value_t = ReindexMode::OnFailure)]
pub reindex: ReindexMode,

#[command(flatten)]
pub bus: EventBusConfig,
Expand Down Expand Up @@ -64,10 +63,6 @@ impl Run {
bus.create(&[self.stored_topic.as_str()]).await?;
}

if self.reindex {
let _ = c.send(trustification_indexer::IndexerCommand::Reindex).await;
}

let mut indexer = Indexer {
index,
storage,
Expand All @@ -79,6 +74,7 @@ impl Run {
status: s.clone(),
commands: command_receiver,
command_sender: c,
reindex: self.reindex,
};
indexer.run().await
},
Expand Down

0 comments on commit a48704e

Please sign in to comment.