New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement a RabbitMQ-based AMQP connector #3546
Conversation
@satta not urgent, but when you get a chance I'd be curious to hear your feedback on the Also, I've exposed only a few basic tuning knobs, like Settings like hostname, port, vhost, etc., are orthogonal and part of the plugin-specific configuration file, as my intuition is that these mostly shared between operators. As with Finally, I thought about adding an optional, positional argument that is a URL of the form:
I'm not sure how useful it is right now. Exposing this probably only makes sense when using multiple different RabbitMQ deployments. So I'd only add that if it's really needed and a pain to work with |
Sounds good to me, but I guess I'll need to play around with it to get a better idea of how it feels in practice.
Yup, these are the most important ones. Definitely mandatory is some way of specifying if a queue is to be temporary (i.e. removed when the client disconnects) or persistent (storing incoming deliveries until the client comes back). Both styles of behaviour can be useful in practice. This configuration is usually done with the 'auto_delete' option that is either used when declaring a queue, or via a server-side policy (which is based on a name pattern and does not require any client parameterization). I'll have to test whether this parameter can be specified on the client side using the Also note that declaring queues only makes sense for sources; sinks (i.e. components that send to RabbitMQ) can only specify exchanges; this should be appropriately handled. I haven't looked at the code yet, but that's one point that comes to mind.
This is maybe a simplification that suggests a bit too obviously how the plugin should be used: it kind of assumes that there is one cluster per node that is usually interacted with. Might be the case for me, but maybe not for everyone.
That would probably be better (and also what many other client interfaces do). I'd prefer this, but I can see one also has to keep clarity of the operators in mind. I'll try to get my build environment up and running to test it out and send/receive some data. It might also be helpful to also allow access to message headers, i.e. to determine if, for instance, the payload needs to be decompressed or not. Not sure how such OOB values would fit into the pipeline pattern Tenzir uses. |
FTR: In order to get the plugin to build with Debian's diff --git a/plugins/rabbitmq/src/plugin.cpp b/plugins/rabbitmq/src/plugin.cpp
index 79c91a06d9..bfe1d13c67 100644
--- a/plugins/rabbitmq/src/plugin.cpp
+++ b/plugins/rabbitmq/src/plugin.cpp
@@ -12,8 +12,8 @@
#include <caf/expected.hpp>
-#include <rabbitmq-c/amqp.h>
-#include <rabbitmq-c/tcp_socket.h>
+#include <amqp.h>
+#include <amqp_tcp_socket.h>
using namespace std::chrono_literals;
|
These are currently hard-coded for the consumer before I declare the queue: auto passive = amqp_boolean_t{0};
auto durable = amqp_boolean_t{0};
auto exclusive = amqp_boolean_t{0};
auto auto_delete = amqp_boolean_t{1};
// and before consume
auto no_local = amqp_boolean_t{0};
auto no_ack = amqp_boolean_t{1}; And this for the producer: auto mandatory = amqp_boolean_t{0};
auto immediate = amqp_boolean_t{0}; Which of those should I expose to the operator?
Yep, that's the way I've implemented it.
Okay, I'll make the URL as optional positional argument.
We can expose the list of headers simply in the schema layout, e.g., as |
I'd suggest all of them! They are required to implement various use cases in which the connecting client plays a specific role and needs to behave accordingly. It's surely fine to set the defaults as above (corresponding to a typical temporary consumer) but they should be adjustable IMHO. We'd also need a routing key for the consumer, in case one wants to bind to a topic exchange.
Great 👍🏻
The headers are, IIRC, indeed key-value pairs of strings. So that would work. |
I have not yet been able to receive data from a local instance. The connection and channel are set up, but queue handling still seems to be a bit off. I'm not sure why Also, if one is not using a temporary queue, one might not need to care about the exchange since the binding has already been set up and one only needs the queue name. I'd suggest to:
|
I also tried: diff --git a/plugins/rabbitmq/src/plugin.cpp b/plugins/rabbitmq/src/plugin.cpp
index 79c91a06d9..6584218b79 100644
--- a/plugins/rabbitmq/src/plugin.cpp
+++ b/plugins/rabbitmq/src/plugin.cpp
@@ -237,14 +237,14 @@ public:
// TODO: need a better name for this function.
auto consume(amqp_channel_t channel, std::string_view exchange,
std::string_view queue) -> caf::error {
- TENZIR_DEBUG("declaring queue");
+ TENZIR_DEBUG("declaring queue {}", queue);
auto passive = amqp_boolean_t{0};
auto durable = amqp_boolean_t{0};
auto exclusive = amqp_boolean_t{0};
auto auto_delete = amqp_boolean_t{1};
auto arguments = amqp_empty_table;
auto* declare
- = amqp_queue_declare(conn_, channel, amqp_empty_bytes, passive, durable,
+ = amqp_queue_declare(conn_, channel, as_amqp_bytes(queue), passive, durable,
exclusive, auto_delete, arguments);
if (auto err = to_error(amqp_get_rpc_reply(conn_)))
return err; but just got a segfault, apparently during error logging:
|
I hear you. I took this approach from official example at https://github.com/alanxz/rabbitmq-c/blob/master/examples/amqp_consumer.c. It highly confused me as well. I thought "it's the way to do it" but I am not surprised it doesn't work. However, this worked for me locally after starting RabbitMQ:
I'm not quite sure what to make of it. At first I wanted to get the scaffold in place before going deeper. I guess it's now time to do that. :-) |
@satta I exposed a bunch more options for saver and loader, plus added the ability to provide an optional URL. Mind taking a look at the Markdown file whether the new options work for you? I'll take a look at the routing-key vs. queue name next. EDIT: I fixed the confusion of declared queues vs. routing keys. We now only allow setting routing keys for the publisher, and queue name plus routing key for the consumer. Moreover, the default queue name is the empty string, resulting and randomly generated queue names by the AMQP server. The The |
@satta we have a problem with injecting headers into the loader: the reason is that the loader only forwards blocks of bytes to a parser. However, the headers are structured data that we can't simply add in the current framework. For example, if the payload is CSV or JSON, there might be a chance to simply add header fields. But what do you do if the payload is a PCAP file? It's simply not possible to inject structured data into a stream of bytes. I'm not quite sure how we can solve this.
I would need to understand how important this is. Then we should discuss the solution space with @dominiklohmann. The only option I see currently is we make it possible to pass structured, per-chunk metadata when we communicate between connector and format. |
@tobim mind taking a quick look at the Nix build failure in CI? I can't make sense of the seemingly unrelated Perl linker errors. |
That seems to be an error in the diff --git a/nix/overlay.nix b/nix/overlay.nix
index 09fdf28ece..b77cebadf8 100644
--- a/nix/overlay.nix
+++ b/nix/overlay.nix
@@ -182,6 +182,13 @@ in {
configureFlags = old.configureFlags ++ ["--enable-prof" "--enable-stats"];
doCheck = !isStatic;
});
+ rabbitmq-c =
+ if !isStatic
+ then prev.rabbitmq-c
+ else
+ prev.rabbitmq-c.override {
+ xmlto = null;
+ };
tenzir-source = inputs.nix-filter.lib.filter {
root = ./..;
include = [ |
Co-authored-by: Tobias Mayer <tobim@fastmail.fm>
|
@satta there's a known issue with the AWS C++ SDK used in Arrow before version 13; can you double-check whether you're running the newest version of Arrow? |
At least I'm on 13:
Disabling jemalloc didn't make a difference. |
FYI regarding the segfault, removing |
@Dakostu is there an upstream issue tracking this leak? |
@mavam @satta
For some reason, my static library doesn't get shown? Not available? Nonetheless, I was able to build this branch and launch the
And the ./tenzir 'from s3 sentinel-cogs/sentinel-s2-l2a-cogs/1/C/CV/2023/1/S2B_1CCV_20230101_0_L2A/tileinfo_metadata.json | write json'
[08:51:45.209] loaded configuration file: /home/dakostu/.config/tenzir/tenzir.yaml
{
"path": "tiles/1/C/CV/2023/1/1/0",
"timestamp": "2023-01-01T21:05:55.632000",
"utmZone": 1,
(...) For the @satta Just to make sure - did you install Arrow manually? What were the options for your Arrow installation? |
I got the debs from their repo:
in Debian, the BTW Arrow 14 seems to be available just now from the repo. Would it make sense to try that? |
Our Dockerfile & CI on GitHub are also using Debian packages to install Arrow (even the same version), and during automated tests the segfault does not happen. Something seems strange here. |
OK I'll give it a try. Here we go:
Just in case it helps, here's a
I can't see anything that stands out, but I guess you know better what to expect ;) |
Segfault still present:
|
FTR the issue was fixed by using a workaround in the FluentBit library, see fluent/fluent-bit#8011 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I set up a basic RabbitMQ server and was able to send/receive events. I approve, with some comments left.
fa02274
to
3fbe680
Compare
This PR adds a RabbitMQ connector, making it possible to produce and consume messages via AMQP.
Definition of Done
<url>
as optional positional argumentamqp
--set
options (after mergingfluent-bit
PR that factors the implementation)