Skip to content

Commit

Permalink
feat: propagate s3 events to indexed queue
Browse files Browse the repository at this point in the history
Slightly change the semantics of the indexed queue so that it includes the full event metadata, and rather than indicating a successful indexing, it contains the full event. This allows chaining other processes on the indexed queue that
want to react to events that are processed by the indexer (even if failed).
  • Loading branch information
Ulf Lilleengen committed Oct 2, 2023
1 parent 83c2c5a commit 5346fb4
Showing 1 changed file with 10 additions and 12 deletions.
22 changes: 10 additions & 12 deletions indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ impl<'a, INDEX: Index> Indexer<'a, INDEX> {
let mut interval = tokio::time::interval(self.sync_interval);
let mut writer = Some(block_in_place(|| self.index.writer())?);
let consumer = self.bus.subscribe("indexer", &[self.stored_topic]).await?;
let mut uncommitted_events = Vec::new();
let mut processed_events = Vec::new();
let mut events = 0;
let mut indexed = Vec::new();

*self.status.lock().await = IndexerStatus::Running;
loop {
Expand Down Expand Up @@ -136,7 +135,7 @@ impl<'a, INDEX: Index> Indexer<'a, INDEX> {
EventType::Put => {
match self.storage.get_for_event(&data, true).await {
Ok(res) => {
if let Err(e) = self.index_doc(self.index.index(), writer.as_mut().unwrap(), &res.key, &res.data, &mut indexed).await {
if let Err(e) = self.index_doc(self.index.index(), writer.as_mut().unwrap(), &res.key, &res.data).await {
log::warn!("(Ignored) Internal error when indexing {}: {:?}", res.key, e);
}
events += 1;
Expand All @@ -162,7 +161,7 @@ impl<'a, INDEX: Index> Indexer<'a, INDEX> {
} else {
log::warn!("No event for payload, skipping");
}
uncommitted_events.push(event);
processed_events.push(event);
}
Ok(None) => {
log::debug!("Polling returned no events, retrying");
Expand All @@ -180,20 +179,21 @@ impl<'a, INDEX: Index> Indexer<'a, INDEX> {
match self.index.snapshot(writer.take().unwrap(), &self.storage, events > 0).await {
Ok(_) => {
log::trace!("Index updated successfully");
match consumer.commit(&uncommitted_events[..]).await {
match consumer.commit(&processed_events[..]).await {
Ok(_) => {
log::trace!("Event committed successfully");
uncommitted_events.clear();
}
Err(e) => {
log::warn!("Error committing event: {:?}", e)
}
}
events = 0;

for key in indexed.drain(..) {
if let Err(e) = self.bus.send(self.indexed_topic, key.as_bytes()).await {
log::warn!("(Ignored) Error sending key {} to indexed topic {}: {:?}", key, self.indexed_topic, e);
for event in processed_events.drain(..) {
if let Some(payload) = event.payload() {
if let Err(e) = self.bus.send(self.indexed_topic, payload).await {
log::warn!("(Ignored) Error sending event to indexed topic {}: {:?}", self.indexed_topic, e);
}
}
}

Expand Down Expand Up @@ -230,7 +230,7 @@ impl<'a, INDEX: Index> Indexer<'a, INDEX> {
let key = path.key();
log::info!("Reindexing {:?}", key);
// Not sending notifications for reindexing
if let Err(e) = self.index_doc(self.index.index(), writer.as_mut().unwrap(), key, &obj, &mut Vec::new()).await {
if let Err(e) = self.index_doc(self.index.index(), writer.as_mut().unwrap(), key, &obj).await {
log::warn!("(Ignored) Internal error when indexing {}: {:?}", key, e);
} else {
progress += 1;
Expand Down Expand Up @@ -268,12 +268,10 @@ impl<'a, INDEX: Index> Indexer<'a, INDEX> {
writer: &mut IndexWriter,
key: &str,
data: &[u8],
indexed: &mut Vec<String>,
) -> Result<(), anyhow::Error> {
match block_in_place(|| writer.add_document(index, key, data)) {
Ok(_) => {
log::debug!("Inserted entry '{key}' into index");
indexed.push(key.to_string());
}
Err(e) => {
let failure = serde_json::json!( {
Expand Down

0 comments on commit 5346fb4

Please sign in to comment.