Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(aws_s3 source): batch SQS deletes #7992

Merged
merged 5 commits into from Jun 23, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
89 changes: 76 additions & 13 deletions src/internal_events/aws_s3.rs
Expand Up @@ -4,7 +4,10 @@ pub mod source {
use crate::sources::aws_s3::sqs::ProcessingError;
use metrics::counter;
use rusoto_core::RusotoError;
use rusoto_sqs::{DeleteMessageError, ReceiveMessageError};
use rusoto_sqs::{
BatchResultErrorEntry, DeleteMessageBatchError, DeleteMessageBatchRequestEntry,
DeleteMessageBatchResultEntry, ReceiveMessageError,
};

#[derive(Debug)]
pub(crate) struct SqsS3EventReceived {
Expand Down Expand Up @@ -72,7 +75,7 @@ pub mod source {

impl<'a> InternalEvent for SqsMessageProcessingFailed<'a> {
fn emit_logs(&self) {
warn!(message = "Failed to process SQS.", %self.message_id, %self.error);
warn!(message = "Failed to process SQS message.", %self.message_id, %self.error);
tobz marked this conversation as resolved.
Show resolved Hide resolved
}

fn emit_metrics(&self) {
Expand All @@ -81,33 +84,93 @@ pub mod source {
}

#[derive(Debug)]
pub(crate) struct SqsMessageDeleteSucceeded<'a> {
pub message_id: &'a str,
pub(crate) struct SqsMessageDeleteSucceeded {
pub message_ids: Vec<DeleteMessageBatchResultEntry>,
}

impl<'a> InternalEvent for SqsMessageDeleteSucceeded<'a> {
impl InternalEvent for SqsMessageDeleteSucceeded {
fn emit_logs(&self) {
trace!(message = "Deleted SQS message.", %self.message_id);
let message_ids = self
.message_ids
.iter()
.map(|x| x.id.to_string())
.collect::<Vec<_>>()
.join(", ");
trace!(message = "Deleted SQS message(s).", %message_ids);
tobz marked this conversation as resolved.
Show resolved Hide resolved
}

fn emit_metrics(&self) {
counter!("sqs_message_delete_succeeded_total", 1);
counter!(
"sqs_message_delete_succeeded_total",
self.message_ids.len() as u64
);
}
}

#[derive(Debug)]
pub(crate) struct SqsMessageDeleteFailed<'a> {
pub message_id: &'a str,
pub error: &'a RusotoError<DeleteMessageError>,
enum MessageDeleteFailureState {
Complete(
Vec<DeleteMessageBatchRequestEntry>,
RusotoError<DeleteMessageBatchError>,
),
Partial(Vec<BatchResultErrorEntry>),
}

#[derive(Debug)]
pub(crate) struct SqsMessageDeleteFailed {
state: MessageDeleteFailureState,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code paths for Complete and Partial below seem to be completely disjoint. What is the benefit of making this an enum instead of two separate event structs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuity in downstream systems that currently look at these events was the thought... although maybe that doesn't really matter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how. There are already two separate code paths that emit the same warning and metric (albeit in the same function). I don't think separating them into separate methods/structs will make that big a deal (assuming I understand your concern correctly).

}

impl SqsMessageDeleteFailed {
pub(crate) fn complete(
entries: Vec<DeleteMessageBatchRequestEntry>,
error: RusotoError<DeleteMessageBatchError>,
) -> Self {
SqsMessageDeleteFailed {
state: MessageDeleteFailureState::Complete(entries, error),
}
}

pub(crate) fn partial(entries: Vec<BatchResultErrorEntry>) -> Self {
SqsMessageDeleteFailed {
state: MessageDeleteFailureState::Partial(entries),
}
}
}

impl<'a> InternalEvent for SqsMessageDeleteFailed<'a> {
impl InternalEvent for SqsMessageDeleteFailed {
fn emit_logs(&self) {
warn!(message = "Deletion of SQS message failed.", %self.message_id, %self.error);
match self.state {
MessageDeleteFailureState::Complete(ref entries, ref error) => {
let message_ids = entries
.iter()
.map(|x| x.id.to_string())
.collect::<Vec<_>>()
.join(", ");

warn!(message = "Deletion of SQS message(s) failed.", %message_ids, %error);
}
MessageDeleteFailureState::Partial(ref entries) => {
let message_ids = entries
.iter()
.map(|x| format!("{}/{}", x.id, x.code))
.collect::<Vec<_>>()
.join(", ");

warn!(message = "Deletion of SQS message(s) failed.", %message_ids);
}
}
}

fn emit_metrics(&self) {
counter!("sqs_message_delete_failed_total", 1);
match self.state {
MessageDeleteFailureState::Complete(ref entries, _) => {
counter!("sqs_message_delete_failed_total", entries.len() as u64);
}
MessageDeleteFailureState::Partial(ref entries) => {
counter!("sqs_message_delete_failed_total", entries.len() as u64);
}
}
}
}

Expand Down
58 changes: 35 additions & 23 deletions src/sources/aws_s3/sqs.rs
Expand Up @@ -18,8 +18,8 @@ use lazy_static::lazy_static;
use rusoto_core::{Region, RusotoError};
use rusoto_s3::{GetObjectError, GetObjectRequest, S3Client, S3};
use rusoto_sqs::{
DeleteMessageError, DeleteMessageRequest, Message, ReceiveMessageError, ReceiveMessageRequest,
Sqs, SqsClient,
DeleteMessageBatchError, DeleteMessageBatchRequest, DeleteMessageBatchRequestEntry,
DeleteMessageBatchResult, Message, ReceiveMessageError, ReceiveMessageRequest, Sqs, SqsClient,
};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use snafu::{ResultExt, Snafu};
Expand Down Expand Up @@ -244,6 +244,7 @@ impl IngestorProcess {
})
.unwrap_or_default();

let mut delete_entries = Vec::new();
for message in messages {
let receipt_handle = match message.receipt_handle {
None => {
Expand All @@ -267,22 +268,10 @@ impl IngestorProcess {
message_id: &message_id
});
if self.state.delete_message {
// TODO: SQS supports DeleteMessageBatch, so we could collapse this from 10
// delete calls in serial to a single batch of 10 deletes. not sure how
// this would interact with E2E acknowledgements, though, in the future.
match self.delete_message(receipt_handle).await {
Ok(_) => {
emit!(SqsMessageDeleteSucceeded {
message_id: &message_id
});
}
Err(err) => {
emit!(SqsMessageDeleteFailed {
error: &err,
message_id: &message_id,
});
}
}
delete_entries.push(DeleteMessageBatchRequestEntry {
id: message_id,
receipt_handle,
});
}
}
Err(err) => {
Expand All @@ -293,6 +282,29 @@ impl IngestorProcess {
}
}
}

if !delete_entries.is_empty() {
// We need these for a correct error message if the batch fails overall.
let cloned_entries = delete_entries.clone();
match self.delete_messages(delete_entries).await {
Comment on lines +288 to +289
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too bad you can't pass this by reference. This clone is otherwise pretty pointless (since the internal events can be written to use a reference and not need a clone).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's annoying. :(

Ok(result) => {
// Batch deletes can have partial successes/failures, so we have to check
// for both cases and emit accordingly.
if !result.successful.is_empty() {
emit!(SqsMessageDeleteSucceeded {
message_ids: result.successful,
});
}

if !result.failed.is_empty() {
emit!(SqsMessageDeleteFailed::partial(result.failed));
}
}
Err(err) => {
emit!(SqsMessageDeleteFailed::complete(cloned_entries, err));
}
}
}
}

async fn handle_sqs_message(&mut self, message: Message) -> Result<(), ProcessingError> {
Expand Down Expand Up @@ -481,15 +493,15 @@ impl IngestorProcess {
.await
}

async fn delete_message(
async fn delete_messages(
&mut self,
receipt_handle: String,
) -> Result<(), RusotoError<DeleteMessageError>> {
entries: Vec<DeleteMessageBatchRequestEntry>,
) -> Result<DeleteMessageBatchResult, RusotoError<DeleteMessageBatchError>> {
self.state
.sqs_client
.delete_message(DeleteMessageRequest {
.delete_message_batch(DeleteMessageBatchRequest {
queue_url: self.state.queue_url.clone(),
receipt_handle,
entries,
})
.await
}
Expand Down