Skip to content

feat(new sink): Initial rabbitmq sink implementation#1376

Closed
Jeffail wants to merge 2 commits intomasterfrom
AlyHKafoury-rabbitmq-sink
Closed

feat(new sink): Initial rabbitmq sink implementation#1376
Jeffail wants to merge 2 commits intomasterfrom
AlyHKafoury-rabbitmq-sink

Conversation

@Jeffail
Copy link
Copy Markdown
Contributor

@Jeffail Jeffail commented Dec 16, 2019

Supersedes #1078

I've refactored the config fields but there are two remaining problems I think we ought to discuss:

  1. If the connection is lost the library we're using doesn't handle background reconnects like the librdkafka does. This means we need to implement our own mechanism, otherwise users will be forced to restart the service.

  2. When it's known at poll time that an event has failed to send we need to ensure that it is reattempted indefinitely. This ties into Implement end-to-end record acknowledgement #1107.

AlyHKafoury and others added 2 commits December 16, 2019 10:32
Signed-off-by: AlyHKafoury <aly.kafoury@gmail.com>
Signed-off-by: Ashley Jeffs <ash@jeffail.uk>
@binarylogic
Copy link
Copy Markdown
Contributor

Nice! We definitely need to resolve both of those issues. @LucioFranco do you mind chiming on the best way to do that? I feel like you have the best understanding of the underlying networking code.

When it's known at poll time that an event has failed to send we need to ensure that it is reattempted indefinitely

This should currently be the default for all of our HTTP sinks.

Comment thread src/sinks/rabbitmq.rs
fn new(config: RabbitMQSinkConfig, acker: Acker) -> crate::Result<Self> {
let channel = Client::connect(&config.uri, config.connection_properties())
.and_then(|client| client.create_channel())
.wait()?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems surprising that this works, I would assume it needs to do some io?

Comment thread src/sinks/rabbitmq.rs
Ok(Async::Ready(Some(((), seqno)))) => {
if self.pending_acks.remove(&seqno) {
self.acker.ack(1);
trace!("published message to rabbitmq");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might make sense to add the seqno to these logs, you can use a span to do that.

@Jeffail
Copy link
Copy Markdown
Contributor Author

Jeffail commented Dec 17, 2019

Intention: This is very relevant to all streaming types of sink we might add in the future, so we ought to work out a solution here that we're happy to reuse.

Correct behavior regarding connection loss is to indefinitely attempt to re-establish it, but we need to be sure we don't block shutdown or any other mechanisms.

Regarding failed message sends at a minimum we need to reattempt the message indefinitely (assuming the failure is temporary) and it makes sense to attempt this within the same mechanism as the connection recovery.

@LucioFranco
Copy link
Copy Markdown
Contributor

Ok, I took a long deep look at the lapin library. I think our best bet to provide amq support is via this library. The library itself is a bit odd, it provides a futures interface but doesn't actually hook into any of the tokio primitives. It seems like it was originally created before things like tokio existed but after mio. At a high level, the library provides its own reactor/driver and its own executor for this. After reading through the source code some more, I am happy with using this executor because 1) it only spawns on extra thread 2) if the client/channel types are dropped, the sender to the executor will drop and will clean up the executor. This means that we will not leak threads if we decide to reconnect.

That said, we should move forward with the current library, but we should change how we implement the sink. I suggest that we drop using the Sink trait implementation and instead use a tower::Service based one. The benefits that a tower implementation would provide are two fold 1) simpler we only need to implement on future that ensures we are connected, dispatch the basic_publish, and ensure we got the ack. This can be done with a single future implementation that if the basic_publish fails with the correct error type we re-connect. 2) We can take advantage of tower-retry and the retry policies we have already implemented.

So what I suggest is this:

  1. We implement tower::Service that wraps basic_publish and will hold a Option<Channel> (I think in this case too we want to hold onto the connection). When we get a tower request we will make sure that we already have a channel, if not we create a new one that can reconnect as well. Then we dispatch the request via basic_publish. If we notice this returns an error we peak at the error and if it is one that means the tcp connection is broken we will want to clear out the option because this channel is no longer usable. Then we return the error, this will let upstream tower services retry.

  2. Layer the tower services so we only let one concurrent basic_publish and retry to maintain order and sequence.

  3. StreamingServiceSink we will need to implement something similar to BatchServiceSink that instead of batching items will attempt to submit items to a sink in a streaming fashion.

This is just a high-level view of how I would go about implementing reconnect, of course, we could continue with the current implementation but we would repeat a lot of code that we have already implemented and tested.

@binarylogic
Copy link
Copy Markdown
Contributor

Nothing, this has been assigned to @LucioFranco to complete, although this is not high priority at the moment. Once @LucioFranco is done with the HTTP sink work it is worth considering this next.

@binarylogic
Copy link
Copy Markdown
Contributor

Closing this for now since there are some substantial changes that we need to make to get this merged. We'll reopen when we get more user demand.

@binarylogic binarylogic closed this Mar 9, 2020
@binarylogic binarylogic deleted the AlyHKafoury-rabbitmq-sink branch April 24, 2020 20:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants