Skip to content

Commit

Permalink
enhancement(kafka sink): Add acknowledgements support (#7677)
Browse files Browse the repository at this point in the history
* Add acknowledgements support to `kafka` sink

Signed-off-by: Bruce Guenter <bruce.guenter@datadoghq.com>
  • Loading branch information
Bruce Guenter committed Jun 16, 2021
1 parent e4b142d commit 024f758
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 21 deletions.
8 changes: 8 additions & 0 deletions lib/vector-core/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ impl Event {
}
}

/// Destroy the event and return the metadata.
pub fn into_metadata(self) -> EventMetadata {
match self {
Self::Log(log) => log.into_parts().1,
Self::Metric(metric) => metric.into_parts().2,
}
}

pub fn add_batch_notifier(&mut self, batch: Arc<BatchNotifier>) {
let finalizer = EventFinalizer::new(batch);
match self {
Expand Down
61 changes: 40 additions & 21 deletions src/sinks/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{
buffers::Acker,
config::{log_schema, DataType, GenerateConfig, SinkConfig, SinkContext, SinkDescription},
event::Event,
internal_events::TemplateRenderingFailed,
kafka::{KafkaAuthConfig, KafkaCompression, KafkaStatisticsContext},
serde::to_string,
Expand Down Expand Up @@ -31,6 +30,7 @@ use std::{
task::{Context, Poll},
};
use tokio::time::{sleep, Duration};
use vector_core::event::{Event, EventMetadata, EventStatus};

// Maximum number of futures blocked by [send_result](https://docs.rs/rdkafka/0.24.0/rdkafka/producer/future_producer/struct.FutureProducer.html#method.send_result)
const SEND_RESULT_LIMIT: usize = 5;
Expand Down Expand Up @@ -84,9 +84,18 @@ pub struct KafkaSink {
topic: Template,
key_field: Option<String>,
encoding: EncodingConfig<Encoding>,
delivery_fut: FuturesUnordered<BoxFuture<'static, (usize, Result<DeliveryFuture, KafkaError>)>>,
delivery_fut: FuturesUnordered<
BoxFuture<'static, (usize, Result<DeliveryFuture, KafkaError>, EventMetadata)>,
>,
in_flight: FuturesUnordered<
BoxFuture<'static, (usize, Result<Result<(i32, i64), KafkaError>, Canceled>)>,
BoxFuture<
'static,
(
usize,
Result<Result<(i32, i64), KafkaError>, Canceled>,
EventMetadata,
),
>,
>,

acker: Acker,
Expand Down Expand Up @@ -247,7 +256,8 @@ impl KafkaSink {
fn poll_delivery_fut(&mut self, cx: &mut Context<'_>) -> Poll<()> {
while !self.delivery_fut.is_empty() {
let result = Pin::new(&mut self.delivery_fut).poll_next(cx);
let (seqno, result) = ready!(result).expect("`delivery_fut` is endless stream");
let (seqno, result, metadata) =
ready!(result).expect("`delivery_fut` is endless stream");
self.in_flight.push(Box::pin(async move {
let result = match result {
Ok(fut) => {
Expand All @@ -257,7 +267,7 @@ impl KafkaSink {
Err(error) => Ok(Err(error)),
};

(seqno, result)
(seqno, result, metadata)
}));
}

Expand Down Expand Up @@ -297,7 +307,7 @@ impl Sink<Event> for KafkaSink {
Event::Metric(metric) => metric.timestamp(),
}
.map(|ts| ts.timestamp_millis());
let (key, body) = encode_event(item, &self.key_field, &self.encoding);
let (key, body, metadata) = encode_event(item, &self.key_field, &self.encoding);

let seqno = self.seq_head;
self.seq_head += 1;
Expand Down Expand Up @@ -327,7 +337,7 @@ impl Sink<Event> for KafkaSink {
}
};

(seqno, result)
(seqno, result, metadata)
}));

Ok(())
Expand All @@ -339,13 +349,17 @@ impl Sink<Event> for KafkaSink {
let this = Pin::into_inner(self);
while !this.in_flight.is_empty() {
match ready!(Pin::new(&mut this.in_flight).poll_next(cx)) {
Some((seqno, Ok(result))) => {
Some((seqno, Ok(result), metadata)) => {
match result {
Ok((partition, offset)) => {
trace!(message = "Produced message.", ?partition, ?offset)
metadata.update_status(EventStatus::Delivered);
trace!(message = "Produced message.", ?partition, ?offset);
}
Err(error) => {
metadata.update_status(EventStatus::Errored);
error!(message = "Kafka error.", %error);
}
Err(error) => error!(message = "Kafka error.", %error),
};
}

this.pending_acks.insert(seqno);

Expand All @@ -356,8 +370,9 @@ impl Sink<Event> for KafkaSink {
}
this.acker.ack(num_to_ack);
}
Some((_, Err(Canceled))) => {
Some((_, Err(Canceled), metadata)) => {
error!(message = "Request canceled.");
metadata.update_status(EventStatus::Errored);
return Poll::Ready(Err(()));
}
None => break,
Expand Down Expand Up @@ -406,7 +421,7 @@ fn encode_event(
mut event: Event,
key_field: &Option<String>,
encoding: &EncodingConfig<Encoding>,
) -> (Vec<u8>, Vec<u8>) {
) -> (Vec<u8>, Vec<u8>, EventMetadata) {
let key = key_field
.as_ref()
.and_then(|f| match &event {
Expand All @@ -420,7 +435,7 @@ fn encode_event(

encoding.apply_rules(&mut event);

let body = match event {
let body = match &event {
Event::Log(log) => match encoding.codec() {
Encoding::Json => serde_json::to_vec(&log).unwrap(),
Encoding::Text => log
Expand All @@ -434,7 +449,8 @@ fn encode_event(
},
};

(key, body)
let metadata = event.into_metadata();
(key, body, metadata)
}

#[cfg(test)]
Expand All @@ -453,7 +469,7 @@ mod tests {
crate::test_util::trace_init();
let key = "";
let message = "hello world".to_string();
let (key_bytes, bytes) = encode_event(
let (key_bytes, bytes, _metadata) = encode_event(
message.clone().into(),
&None,
&EncodingConfig::from(Encoding::Text),
Expand All @@ -471,7 +487,7 @@ mod tests {
event.as_mut_log().insert("key", "value");
event.as_mut_log().insert("foo", "bar");

let (key, bytes) = encode_event(
let (key, bytes, _metadata) = encode_event(
event,
&Some("key".into()),
&EncodingConfig::from(Encoding::Json),
Expand All @@ -492,7 +508,7 @@ mod tests {
MetricKind::Absolute,
MetricValue::Counter { value: 0.0 },
);
let (key_bytes, bytes) = encode_event(
let (key_bytes, bytes, _metadata) = encode_event(
metric.clone().into(),
&None,
&EncodingConfig::from(Encoding::Text),
Expand All @@ -509,7 +525,7 @@ mod tests {
MetricKind::Absolute,
MetricValue::Counter { value: 0.0 },
);
let (key_bytes, bytes) = encode_event(
let (key_bytes, bytes, _metadata) = encode_event(
metric.clone().into(),
&None,
&EncodingConfig::from(Encoding::Json),
Expand All @@ -528,7 +544,7 @@ mod tests {
let mut event = Event::from("hello");
event.as_mut_log().insert("key", "value");

let (key, bytes) = encode_event(
let (key, bytes, _metadata) = encode_event(
event,
&Some("key".into()),
&EncodingConfig {
Expand Down Expand Up @@ -563,6 +579,7 @@ mod integration_test {
Message, Offset, TopicPartitionList,
};
use std::{future::ready, thread, time::Duration};
use vector_core::event::{BatchNotifier, BatchStatus};

#[tokio::test]
async fn healthcheck() {
Expand Down Expand Up @@ -791,8 +808,10 @@ mod integration_test {
let sink = KafkaSink::new(config, acker).unwrap();

let num_events = 1000;
let (input, events) = random_lines_with_stream(100, num_events, None);
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (input, events) = random_lines_with_stream(100, num_events, Some(batch));
events.map(Ok).forward(sink).await.unwrap();
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

// read back everything from the beginning
let mut client_config = rdkafka::ClientConfig::new();
Expand Down

0 comments on commit 024f758

Please sign in to comment.