Skip to content

feat(new sink): Initial rabbitmq sink implementation#1078

Closed
AlyHKafoury wants to merge 12 commits intovectordotdev:masterfrom
AlyHKafoury:rabbitmq-sink
Closed

feat(new sink): Initial rabbitmq sink implementation#1078
AlyHKafoury wants to merge 12 commits intovectordotdev:masterfrom
AlyHKafoury:rabbitmq-sink

Conversation

@AlyHKafoury
Copy link
Copy Markdown
Contributor

Signed-off-by: AlyHKafoury aly.kafoury@gmail.com

@AlyHKafoury AlyHKafoury changed the title Feature: Rabbit Sink Feature(Rabbit Sink): Adding a RabbitMQ sink Oct 23, 2019
@AlyHKafoury
Copy link
Copy Markdown
Contributor Author

@LucioFranco I've created this as a draft and for you to follow the progress from early stages if it's ok to guide me early for a better path

Copy link
Copy Markdown
Contributor

@LucioFranco LucioFranco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking like a great start!! Let me know if you have any further questions.

Comment thread src/sinks/rabbitmq.rs Outdated
Comment thread src/sinks/rabbitmq.rs Outdated
@binarylogic binarylogic changed the title Feature(Rabbit Sink): Adding a RabbitMQ sink feat(new sink): Initial rabbitmq sink implementation Oct 24, 2019
@AlyHKafoury
Copy link
Copy Markdown
Contributor Author

AlyHKafoury commented Oct 27, 2019

@LucioFranco the acker in the integration test gets out a strange number:

thread 'sinks::rabbitmq::integration_test::publish_messages' panicked at 'assertion failed: (left == right)
left: 499500,
right: 1000', src/sinks/rabbitmq.rs:329:5

assert_eq!(
            ack_counter.load(std::sync::atomic::Ordering::Relaxed),
            number_of_events
        );

I don't understand where this value comes from as the sequence number is correct in the sink

@LucioFranco
Copy link
Copy Markdown
Contributor

@AlyHKafoury so I think there might be something wrong when you decide to ack. It seems that you may be calling https://github.com/timberio/vector/blob/master/src/buffers/mod.rs#L143 more than you think you are? I would assume the code where you call ack just might be getting called a lot more than expected?

@AlyHKafoury AlyHKafoury marked this pull request as ready for review October 30, 2019 00:57
Signed-off-by: AlyHKafoury <aly.kafoury@gmail.com>
Signed-off-by: AlyHKafoury <aly.kafoury@gmail.com>
Signed-off-by: AlyHKafoury <aly.kafoury@gmail.com>
Signed-off-by: AlyHKafoury <aly.kafoury@gmail.com>
Signed-off-by: AlyHKafoury <aly.kafoury@gmail.com>
Signed-off-by: AlyHKafoury <aly.kafoury@gmail.com>
Signed-off-by: AlyHKafoury <aly.kafoury@gmail.com>
Signed-off-by: AlyHKafoury <aly.kafoury@gmail.com>
Signed-off-by: AlyHKafoury <aly.kafoury@gmail.com>
Signed-off-by: AlyHKafoury <aly.kafoury@gmail.com>
Signed-off-by: AlyHKafoury <aly.kafoury@gmail.com>
@LucioFranco
Copy link
Copy Markdown
Contributor

@AlyHKafoury were you able to figure it out?

@AlyHKafoury
Copy link
Copy Markdown
Contributor Author

@AlyHKafoury were you able to figure it out?

@LucioFranco yea, I was acking the sequence number of the messages instead the count of the messages that was the wrong issue the number 499500 is the summation of all the numbers from 0 to 999 :D :D !!

@LucioFranco
Copy link
Copy Markdown
Contributor

@AlyHKafoury so I think the code mostly looks good! I think the next step is to add a docker container that we can run the tests aginst. We have a docker feature flag you can add your feature flag to that. Then we just need to add that docker contianer to docker-compose and to circleci test-stable job.

Let me know if you have any trouble, I am happy to help!

Signed-off-by: AlyHKafoury <aly.kafoury@gmail.com>
@AlyHKafoury
Copy link
Copy Markdown
Contributor Author

@LucioFranco Can you please check the latest commit is there something I am missing yet ?

@binarylogic binarylogic requested a review from Jeffail November 2, 2019 16:04
@binarylogic
Copy link
Copy Markdown
Contributor

@Jeffail tagging you since you have experience with RabbitMQ. 😄

@Jeffail
Copy link
Copy Markdown
Contributor

Jeffail commented Nov 5, 2019

Hey @AlyHKafoury, I think it might be worth refactoring the configuration parameters here slightly.

When defining an AMQP sink I would only expect to specify an exchange and a routing key. It's often the case that the routing key will be the same as the target queue, but sometimes the queue bindings will be defined separately and only known to consumers.

I would therefore not expect to see any queue based fields in a sink config, including the ability to declare a queue. However, I would want to be able to declare the target exchange, but this should be optional.

@binarylogic
Copy link
Copy Markdown
Contributor

@AlyHKafoury thanks again for doing this. Do you plan on making the suggested changes? We'd really appreciate it. If not, we can assign someone to finish this off.

@AlyHKafoury
Copy link
Copy Markdown
Contributor Author

@binarylogic Sure I will I just missed the comments.

@binarylogic
Copy link
Copy Markdown
Contributor

@AlyHKafoury thanks, and no problem if you can't finish this off. We're planning to have @Jeffail work on it this week, so feel free to chime if you plan to do it yourself.

@Jeffail
Copy link
Copy Markdown
Contributor

Jeffail commented Dec 12, 2019

Hey @AlyHKafoury, got the changes we need on this branch: https://github.com/timberio/vector/tree/AlyHKafoury-rabbitmq-sink, I'm just finishing up (adding some extra integration tests) and then I'll get it merged.

@Jeffail
Copy link
Copy Markdown
Contributor

Jeffail commented Dec 16, 2019

Superseded by: #1376

@Jeffail Jeffail closed this Dec 16, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants