Skip to content

Commit

Permalink
enhancement(aws_s3 source): batch SQS deletes
Browse files Browse the repository at this point in the history
Signed-off-by: Toby Lawrence <toby@nuclearfurnace.com>
  • Loading branch information
tobz committed Jun 22, 2021
1 parent 473b642 commit cbb1975
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 36 deletions.
89 changes: 76 additions & 13 deletions src/internal_events/aws_s3.rs
Expand Up @@ -4,7 +4,10 @@ pub mod source {
use crate::sources::aws_s3::sqs::ProcessingError;
use metrics::counter;
use rusoto_core::RusotoError;
use rusoto_sqs::{DeleteMessageError, ReceiveMessageError};
use rusoto_sqs::{
BatchResultErrorEntry, DeleteMessageBatchError, DeleteMessageBatchRequestEntry,
DeleteMessageBatchResultEntry, ReceiveMessageError,
};

#[derive(Debug)]
pub(crate) struct SqsS3EventReceived {
Expand Down Expand Up @@ -72,7 +75,7 @@ pub mod source {

impl<'a> InternalEvent for SqsMessageProcessingFailed<'a> {
fn emit_logs(&self) {
warn!(message = "Failed to process SQS.", %self.message_id, %self.error);
warn!(message = "Failed to process SQS message.", %self.message_id, %self.error);
}

fn emit_metrics(&self) {
Expand All @@ -81,33 +84,93 @@ pub mod source {
}

#[derive(Debug)]
pub(crate) struct SqsMessageDeleteSucceeded<'a> {
pub message_id: &'a str,
pub(crate) struct SqsMessageDeleteSucceeded {
pub message_ids: Vec<DeleteMessageBatchResultEntry>,
}

impl<'a> InternalEvent for SqsMessageDeleteSucceeded<'a> {
impl InternalEvent for SqsMessageDeleteSucceeded {
fn emit_logs(&self) {
trace!(message = "Deleted SQS message.", %self.message_id);
let message_ids = self
.message_ids
.iter()
.map(|x| x.id.to_string())
.collect::<Vec<_>>()
.join(", ");
trace!(message = "Deleted SQS message(s).", %message_ids);
}

fn emit_metrics(&self) {
counter!("sqs_message_delete_succeeded_total", 1);
counter!(
"sqs_message_delete_succeeded_total",
self.message_ids.len() as u64
);
}
}

#[derive(Debug)]
pub(crate) struct SqsMessageDeleteFailed<'a> {
pub message_id: &'a str,
pub error: &'a RusotoError<DeleteMessageError>,
enum MessageDeleteFailureState {
Complete(
Vec<DeleteMessageBatchRequestEntry>,
RusotoError<DeleteMessageBatchError>,
),
Partial(Vec<BatchResultErrorEntry>),
}

#[derive(Debug)]
pub(crate) struct SqsMessageDeleteFailed {
state: MessageDeleteFailureState,
}

impl SqsMessageDeleteFailed {
pub(crate) fn complete(
entries: Vec<DeleteMessageBatchRequestEntry>,
error: RusotoError<DeleteMessageBatchError>,
) -> Self {
SqsMessageDeleteFailed {
state: MessageDeleteFailureState::Complete(entries, error),
}
}

pub(crate) fn partial(entries: Vec<BatchResultErrorEntry>) -> Self {
SqsMessageDeleteFailed {
state: MessageDeleteFailureState::Partial(entries),
}
}
}

impl<'a> InternalEvent for SqsMessageDeleteFailed<'a> {
impl InternalEvent for SqsMessageDeleteFailed {
fn emit_logs(&self) {
warn!(message = "Deletion of SQS message failed.", %self.message_id, %self.error);
match self.state {
MessageDeleteFailureState::Complete(ref entries, ref error) => {
let message_ids = entries
.iter()
.map(|x| x.id.to_string())
.collect::<Vec<_>>()
.join(", ");

warn!(message = "Deletion of SQS message(s) failed.", %message_ids, %error);
}
MessageDeleteFailureState::Partial(ref entries) => {
let message_ids = entries
.iter()
.map(|x| format!("{}/{}", x.id, x.code))
.collect::<Vec<_>>()
.join(", ");

warn!(message = "Deletion of SQS message(s) failed.", %message_ids);
}
}
}

fn emit_metrics(&self) {
counter!("sqs_message_delete_failed_total", 1);
match self.state {
MessageDeleteFailureState::Complete(ref entries, _) => {
counter!("sqs_message_delete_failed_total", entries.len() as u64);
}
MessageDeleteFailureState::Partial(ref entries) => {
counter!("sqs_message_delete_failed_total", entries.len() as u64);
}
}
}
}

Expand Down
58 changes: 35 additions & 23 deletions src/sources/aws_s3/sqs.rs
Expand Up @@ -18,8 +18,8 @@ use lazy_static::lazy_static;
use rusoto_core::{Region, RusotoError};
use rusoto_s3::{GetObjectError, GetObjectRequest, S3Client, S3};
use rusoto_sqs::{
DeleteMessageError, DeleteMessageRequest, Message, ReceiveMessageError, ReceiveMessageRequest,
Sqs, SqsClient,
DeleteMessageBatchError, DeleteMessageBatchRequest, DeleteMessageBatchRequestEntry,
DeleteMessageBatchResult, Message, ReceiveMessageError, ReceiveMessageRequest, Sqs, SqsClient,
};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use snafu::{ResultExt, Snafu};
Expand Down Expand Up @@ -244,6 +244,7 @@ impl IngestorProcess {
})
.unwrap_or_default();

let mut delete_entries = Vec::new();
for message in messages {
let receipt_handle = match message.receipt_handle {
None => {
Expand All @@ -267,22 +268,10 @@ impl IngestorProcess {
message_id: &message_id
});
if self.state.delete_message {
// TODO: SQS supports DeleteMessageBatch, so we could collapse this from 10
// delete calls in serial to a single batch of 10 deletes. not sure how
// this would interact with E2E acknowledgements, though, in the future.
match self.delete_message(receipt_handle).await {
Ok(_) => {
emit!(SqsMessageDeleteSucceeded {
message_id: &message_id
});
}
Err(err) => {
emit!(SqsMessageDeleteFailed {
error: &err,
message_id: &message_id,
});
}
}
delete_entries.push(DeleteMessageBatchRequestEntry {
id: message_id,
receipt_handle,
});
}
}
Err(err) => {
Expand All @@ -293,6 +282,29 @@ impl IngestorProcess {
}
}
}

if !delete_entries.is_empty() {
// We need these for a correct error message if the batch fails overall.
let cloned_entries = delete_entries.clone();
match self.delete_messages(delete_entries).await {
Ok(result) => {
// Batch deletes can have partial successes/failures, so we have to check
// for both cases and emit accordingly.
if !result.successful.is_empty() {
emit!(SqsMessageDeleteSucceeded {
message_ids: result.successful,
});
}

if !result.failed.is_empty() {
emit!(SqsMessageDeleteFailed::partial(result.failed));
}
}
Err(err) => {
emit!(SqsMessageDeleteFailed::complete(cloned_entries, err));
}
}
}
}

async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {
Expand Down Expand Up @@ -481,15 +493,15 @@ impl IngestorProcess {
.await
}

async fn delete_message(
async fn delete_messages(
&mut self,
receipt_handle: String,
) -> Result<(), RusotoError<DeleteMessageError>> {
entries: Vec<DeleteMessageBatchRequestEntry>,
) -> Result<DeleteMessageBatchResult, RusotoError<DeleteMessageBatchError>> {
self.state
.sqs_client
.delete_message(DeleteMessageRequest {
.delete_message_batch(DeleteMessageBatchRequest {
queue_url: self.state.queue_url.clone(),
receipt_handle,
entries,
})
.await
}
Expand Down

0 comments on commit cbb1975

Please sign in to comment.