Skip to content

Commit

Permalink
fix(splunk_hec source): Flush messages (#5069)
Browse files Browse the repository at this point in the history
* fix(architecture): Ensure that Pipelines are flushed

Fixes #5058

#4875 introduced buffering
during `Pipeline.start_send`. Events were sent to the inner sink only on
when `start_send` was called again or `poll_complete` is called.
A side-effect of this was that when using `Sink.send_all` and
`Stream.forward`, if an error was returned on a `poll()` of the stream
being sent, the previously written event would never be sent to the
inner sink as `poll_complete()` was never called.

futures01 `Sink.send_all`:
https://docs.rs/futures/0.1.30/src/futures/sink/send_all.rs.html#59-88
futures01 `Stream.forward`:
https://docs.rs/futures/0.1.30/src/futures/stream/forward.rs.html#78-110

We can see in both cases it will return early if there is an error. If
the stream is not ready or is finished, then it will call
`poll_complete` / `close` on the sink.

We use an errorable futures01 stream for the splunk_hec source:
https://github.com/timberio/vector/blob/3bd8076be4097e4e9cf903a4f32e34a8b145a339/src/sources/splunk_hec.rs#L175-L189
which meant that, if there was an error processing a single event, the
previous event was never flushed downstream.

This change ensures that it flushes after the request is handled to send any additional buffered events.
I also implement `close` to ensure that the inner sink also has
`close()` called on it.

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
  • Loading branch information
jszwedko committed Nov 17, 2020
1 parent 4e33fb6 commit 6bce5ac
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 17 deletions.
5 changes: 5 additions & 0 deletions src/pipeline.rs
Expand Up @@ -60,6 +60,11 @@ impl Sink for Pipeline {
debug_assert!(self.enqueued.is_empty());
self.inner.poll_complete()
}

fn close(&mut self) -> Poll<(), Self::SinkError> {
self.poll_complete()?;
self.inner.close()
}
}

impl Pipeline {
Expand Down
50 changes: 33 additions & 17 deletions src/sources/splunk_hec.rs
Expand Up @@ -171,23 +171,7 @@ impl SplunkSource {
host: Option<String>,
gzip: bool,
body: Bytes| {
let out = out.clone();
async move {
// Construct event parser
if gzip {
EventStream::new(GzDecoder::new(body.reader()), channel, host)
.forward(out.clone().sink_map_err(|_| ApiError::ServerShutdown))
.map(|_| ())
.compat()
.await
} else {
EventStream::new(body.reader(), channel, host)
.forward(out.clone().sink_map_err(|_| ApiError::ServerShutdown))
.map(|_| ())
.compat()
.await
}
}
process_service_request(out.clone(), channel, host, gzip, body)
},
)
.map(finish_ok)
Expand Down Expand Up @@ -319,6 +303,38 @@ impl SplunkSource {
}
}

async fn process_service_request(
out: Pipeline,
channel: Option<String>,
host: Option<String>,
gzip: bool,
body: Bytes,
) -> Result<(), Rejection> {
use futures::compat::Sink01CompatExt;
use futures::compat::Stream01CompatExt;
use futures::{SinkExt, StreamExt};

let mut out = out
.sink_map_err(|_| Rejection::from(ApiError::ServerShutdown))
.sink_compat();

let reader: Box<dyn Read + Send> = if gzip {
Box::new(GzDecoder::new(body.reader()))
} else {
Box::new(body.reader())
};

let stream = EventStream::new(reader, channel, host).compat();

let res = stream.forward(&mut out).await;

out.flush()
.map_err(|_| Rejection::from(ApiError::ServerShutdown))
.await?;

res.map(|_| ())
}

/// Constructs one ore more events from json-s coming from reader.
/// If errors, it's done with input.
struct EventStream<R: Read> {
Expand Down

0 comments on commit 6bce5ac

Please sign in to comment.