Skip to content

Commit

Permalink
feat: support using s3 directly as index backing store
Browse files Browse the repository at this point in the history
* Add a runtime config option to enable s3 directory backed index.
* Implement tantivy directory trait based on s3 storage to bypass the local filesystem and syncing.

Issue #333
  • Loading branch information
Ulf Lilleengen committed Aug 14, 2023
1 parent 1c0816e commit fe2d820
Show file tree
Hide file tree
Showing 15 changed files with 620 additions and 157 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 7 additions & 10 deletions bombastic/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use actix_web_prom::PrometheusMetricsBuilder;
use anyhow::anyhow;
use prometheus::Registry;
use tokio::sync::RwLock;
use tokio::task::block_in_place;
use trustification_auth::authenticator::Authenticator;
use trustification_auth::swagger_ui::{SwaggerUiOidc, SwaggerUiOidcConfig};
use trustification_auth::{authenticator::config::AuthenticatorConfig, authorizer::Authorizer};
Expand Down Expand Up @@ -111,13 +112,13 @@ impl Run {
}

fn configure(
index: IndexConfig,
index_config: IndexConfig,
mut storage: StorageConfig,
registry: &Registry,
devmode: bool,
) -> anyhow::Result<Arc<AppState>> {
let sync_interval: Duration = index.sync_interval.into();
let index = IndexStore::new(&index, bombastic_index::Index::new(), registry)?;
let index =
block_in_place(|| IndexStore::new(&storage, &index_config, bombastic_index::Index::new(), registry))?;
let storage = storage.create("bombastic", devmode, registry)?;

let state = Arc::new(AppState {
Expand All @@ -126,6 +127,7 @@ impl Run {
});

let sinker = state.clone();
let sync_interval = index_config.sync_interval.into();
tokio::task::spawn(async move {
loop {
if sinker.sync_index().await.is_ok() {
Expand Down Expand Up @@ -159,14 +161,9 @@ pub(crate) type SharedState = Arc<AppState>;

impl AppState {
async fn sync_index(&self) -> Result<(), anyhow::Error> {
let data = {
let storage = self.storage.read().await;
storage.get_index().await?
};

let storage = self.storage.read().await;
let mut index = self.index.write().await;
index.reload(&data[..])?;
log::debug!("Bombastic index reloaded");
index.sync(&storage).await?;
Ok(())
}
}
12 changes: 7 additions & 5 deletions bombastic/api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl error::ResponseError for Error {
match self {
Self::Storage(StorageError::NotFound) => StatusCode::NOT_FOUND,
Self::InvalidContentType | Self::InvalidContentEncoding => StatusCode::BAD_REQUEST,
Self::Index(IndexError::Parser(_)) => StatusCode::BAD_REQUEST,
Self::Index(IndexError::QueryParser(_)) => StatusCode::BAD_REQUEST,
e => {
log::error!("{e:?}");
StatusCode::INTERNAL_SERVER_ERROR
Expand Down Expand Up @@ -224,10 +224,12 @@ async fn search_sbom(

log::info!("Querying SBOM: '{}'", params.q);

let index = state.index.read().await;
let (result, total) = index
.search(&params.q, params.offset, params.limit, (&params).into())
.map_err(Error::Index)?;
let (result, total) = actix_web::web::block(move || {
let index = state.index.blocking_read();
index.search(&params.q, params.offset, params.limit, (&params).into())
})
.await?
.map_err(Error::Index)?;

Ok(HttpResponse::Ok().json(SearchResult { total, result }))
}
Expand Down
2 changes: 1 addition & 1 deletion bombastic/index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ impl trustification_index::Index for Index {
});
}

let mut query = Packages::parse(q).map_err(|err| SearchError::Parser(err.to_string()))?;
let mut query = Packages::parse(q).map_err(|err| SearchError::QueryParser(err.to_string()))?;

query.term = query.term.compact();

Expand Down
19 changes: 13 additions & 6 deletions bombastic/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::process::ExitCode;

use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use tokio::task::block_in_place;
use trustification_event_bus::EventBusConfig;
use trustification_index::{IndexConfig, IndexStore};
use trustification_indexer::{actix::configure, Indexer, IndexerStatus};
Expand Down Expand Up @@ -30,14 +31,14 @@ pub struct Run {
#[command(flatten)]
pub bus: EventBusConfig,

#[command(flatten)]
pub index: IndexConfig,

#[command(flatten)]
pub storage: StorageConfig,

#[command(flatten)]
pub infra: InfrastructureConfig,

#[command(flatten)]
pub index: IndexConfig,
}

impl Run {
Expand All @@ -50,10 +51,16 @@ impl Run {
.run_with_config(
"bombastic-indexer",
|metrics| async move {
let index = IndexStore::new(&self.index, bombastic_index::Index::new(), metrics.registry())?;
let index = block_in_place(|| {
IndexStore::new(
&self.storage,
&self.index,
bombastic_index::Index::new(),
metrics.registry(),
)
})?;
let storage = self.storage.create("bombastic", self.devmode, metrics.registry())?;

let interval = self.index.sync_interval.into();
let bus = self.bus.create(metrics.registry()).await?;
if self.devmode {
bus.create(&[self.stored_topic.as_str()]).await?;
Expand All @@ -70,7 +77,7 @@ impl Run {
stored_topic: self.stored_topic.as_str(),
indexed_topic: self.indexed_topic.as_str(),
failed_topic: self.failed_topic.as_str(),
sync_interval: interval,
sync_interval: self.index.sync_interval.into(),
status: s.clone(),
commands: command_receiver,
command_sender: c,
Expand Down
6 changes: 5 additions & 1 deletion index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ tantivy = "0.19.2"
tar = "0.4"
time = "0.3"
zstd = "0.12"

rust-s3 = { git = "https://github.com/jcrossley3/rust-s3.git", branch = "issue-352", features = ["blocking"] }
crc32fast = "1.3.2"
async-trait = "0.1"
trustification-storage = { path = "../storage"}
thiserror = "1"
trustification-api = { path = "../api"}

[dev-dependencies]
Expand Down

0 comments on commit fe2d820

Please sign in to comment.