New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(splunk_hec source): Flush messages #5069
Conversation
Fixes #5058 #4875 introduced buffering during `Pipeline.start_send`. Events were sent to the inner sink only on when `start_send` was called again or `poll_complete` is called. A side-effect of this was that when using `Sink.send_all` and `Stream.forward`, if an error was returned on a `poll()` of the stream being sent, the previously written event would never be sent to the inner sink as `poll_complete()` was never called. futures01 `Sink.send_all`: https://docs.rs/futures/0.1.30/src/futures/sink/send_all.rs.html#59-88 futures01 `Stream.forward`: https://docs.rs/futures/0.1.30/src/futures/stream/forward.rs.html#78-110 We can see in both cases it will return early if there is an error. If the stream is not ready or is finished, then it will call `poll_complete` / `close` on the sink. We use an errorable futures01 stream for the splunk_hec source: https://github.com/timberio/vector/blob/3bd8076be4097e4e9cf903a4f32e34a8b145a339/src/sources/splunk_hec.rs#L175-L189 which meant that, if there was an error processing a single event, the previous event was never flushed downstream. This adds a `Drop` implementation for `Pipeline` that flushes via `close()`. I'm not 100% on this idea over requiring usages of `Pipeline`s that use errorable streams to explicitly call `.close()`, but pushing up to get feedback. I also implement `close` to ensure that the inner sink also has `close()` called on it. Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
src/pipeline.rs
Outdated
match self.close() { | ||
Ok(Async::Ready(())) => break, | ||
Ok(Async::NotReady) => {} | ||
Err(err) => panic!(err), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thoughts welcome on what to do with errors here.
Is it possible to test for this? Or is that what the broken test is doing? |
@@ -60,6 +60,23 @@ impl Sink for Pipeline { | |||
debug_assert!(self.enqueued.is_empty()); | |||
self.inner.poll_complete() | |||
} | |||
|
|||
fn close(&mut self) -> Poll<(), Self::SinkError> { | |||
self.poll_complete()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this change alone fix it by chance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same thought before; it did not.
src/pipeline.rs
Outdated
loop { | ||
match self.close() { | ||
Ok(Async::Ready(())) => break, | ||
Ok(Async::NotReady) => {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could turn into a pretty tight loop if there's something inner
is waiting on before closing. That plus the error handling question below makes me think we'd be better off requiring the caller to use close
than doing this through Drop
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I'm happy to just call close()
in this one particular case, but do you have thoughts on ensuring that we call close()
everywhere forward
or send_all
is used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(my concern is that we'll forget about this again)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to move away from the futures 0.1 send_all
/forward
methods before too long, so we could add our own method to Pipeline
directly that makes sure to flush before returning an error.
@binarylogic In this case, this existing test is saving us. :) |
Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
@lukesteensen @Hoverbear I pushed another commit switching this to explicit flushing in the I'll do a survey tomorrow for other places we are using Given the switch to calling |
Noting that I'll do the survey separately from this PR; I'd like to merge this to unblock master. |
* fix(architecture): Ensure that Pipelines are flushed Fixes #5058 #4875 introduced buffering during `Pipeline.start_send`. Events were sent to the inner sink only on when `start_send` was called again or `poll_complete` is called. A side-effect of this was that when using `Sink.send_all` and `Stream.forward`, if an error was returned on a `poll()` of the stream being sent, the previously written event would never be sent to the inner sink as `poll_complete()` was never called. futures01 `Sink.send_all`: https://docs.rs/futures/0.1.30/src/futures/sink/send_all.rs.html#59-88 futures01 `Stream.forward`: https://docs.rs/futures/0.1.30/src/futures/stream/forward.rs.html#78-110 We can see in both cases it will return early if there is an error. If the stream is not ready or is finished, then it will call `poll_complete` / `close` on the sink. We use an errorable futures01 stream for the splunk_hec source: https://github.com/timberio/vector/blob/3bd8076be4097e4e9cf903a4f32e34a8b145a339/src/sources/splunk_hec.rs#L175-L189 which meant that, if there was an error processing a single event, the previous event was never flushed downstream. This change ensures that it flushes after the request is handled to send any additional buffered events. I also implement `close` to ensure that the inner sink also has `close()` called on it. Signed-off-by: Jesse Szwedko <jesse@szwedko.me> Signed-off-by: casserni <nicholascassera@gmail.com>
* fix(architecture): Ensure that Pipelines are flushed Fixes vectordotdev#5058 vectordotdev#4875 introduced buffering during `Pipeline.start_send`. Events were sent to the inner sink only on when `start_send` was called again or `poll_complete` is called. A side-effect of this was that when using `Sink.send_all` and `Stream.forward`, if an error was returned on a `poll()` of the stream being sent, the previously written event would never be sent to the inner sink as `poll_complete()` was never called. futures01 `Sink.send_all`: https://docs.rs/futures/0.1.30/src/futures/sink/send_all.rs.html#59-88 futures01 `Stream.forward`: https://docs.rs/futures/0.1.30/src/futures/stream/forward.rs.html#78-110 We can see in both cases it will return early if there is an error. If the stream is not ready or is finished, then it will call `poll_complete` / `close` on the sink. We use an errorable futures01 stream for the splunk_hec source: https://github.com/timberio/vector/blob/3bd8076be4097e4e9cf903a4f32e34a8b145a339/src/sources/splunk_hec.rs#L175-L189 which meant that, if there was an error processing a single event, the previous event was never flushed downstream. This change ensures that it flushes after the request is handled to send any additional buffered events. I also implement `close` to ensure that the inner sink also has `close()` called on it. Signed-off-by: Jesse Szwedko <jesse@szwedko.me> Signed-off-by: Brian Menges <brian.menges@anaplan.com>
Fixes #5058
#4875 introduced buffering
during
Pipeline.start_send
. Events were sent to the inner sink only onwhen
start_send
was called again orpoll_complete
is called.A side-effect of this was that when using
Sink.send_all
andStream.forward
, if an error was returned on apoll()
of the streambeing sent, the previously written event would never be sent to the
inner sink as
poll_complete()
was never called.futures01
Sink.send_all
:https://docs.rs/futures/0.1.30/src/futures/sink/send_all.rs.html#59-88
futures01
Stream.forward
:https://docs.rs/futures/0.1.30/src/futures/stream/forward.rs.html#78-110
We can see in both cases it will return early if there is an error. If
the stream is not ready or is finished, then it will call
poll_complete
/close
on the sink.We use an errorable futures01 stream for the splunk_hec source:
https://github.com/timberio/vector/blob/3bd8076be4097e4e9cf903a4f32e34a8b145a339/src/sources/splunk_hec.rs#L175-L189
which meant that, if there was an error processing a single event, the
previous event was never flushed downstream.
This adds a
Drop
implementation forPipeline
that flushes viaclose()
. I'm not 100% on this idea over requiring usages ofPipeline
s that use errorable streams to explicitly call.close()
,but pushing up to get feedback.
I also implement
close
to ensure that the inner sink also hasclose()
called on it.Signed-off-by: Jesse Szwedko jesse@szwedko.me