-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Message Containers #5077
Message Containers #5077
Conversation
c6b709c
to
73ba623
Compare
2250180
to
0e9a1a0
Compare
016ec10
to
35601e7
Compare
6da41dc
to
79a737c
Compare
dbc9b84
to
65dbaa1
Compare
aecad86
to
de0db4a
Compare
when converting between MQTT 5.0 and AMQP 1.0
Convert to a text representation, if possible, and indicate to MQTT client that the payload is UTF-8 encoded. This way, the MQTT client will be able to parse the payload. If conversion to text representation is not possible, encode the payload using the AMQP 1.0 type system and indiate the encoding via Content-Type message/vnd.rabbitmq.amqp. This Content-Type is not registered. Type "message" makes sense since it's a message. Vendor tree "vnd.rabbitmq.amqp" makes sense since merely subtype "amqp" is not registered.
Translate MQTT 5.0 Response Topic to AMQP 1.0 reply-to address and vice versa. The Response Topic must be a UTF-8 encoded string. This commit re-uses the already defined RabbitMQ target addresses: ``` "/topic/" RK Publish to amq.topic with routing key RK "/exchange/" X "/" RK Publish to exchange X with routing key RK ``` By default, the MQTT topic exchange is configure dto be amq.topic using the 1st target address. When an operator modifies the mqtt.exchange, the 2nd target address is used.
and fix formatting Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
When hashing on a header value.
Test receiving from and sending to MQTT 5.0 and * AMQP 0.9.1 * AMQP 1.0 * STOMP * Streams
I'm not exactly sure how this happened, but gazell seems to have been run with an older version of the rules_erlang gazelle extension at some point. This caused generation of a structure that is no longer used. This commit updates the structure to the current pattern.
Just in case they are included. Also use iolist_to_iovec to create flat list of binaries when converting from amqp with amqp encoded payload.
value_to_hash(undefined, Msg) -> | ||
mc:get_annotation(routing_keys, Msg); | ||
value_to_hash({header, Header}, Msg0) -> | ||
maps:get(Header, mc:routing_headers(Msg0, [x_headers])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kjnilsson hi, this breaks consistent_hash_exchange
and possibly others because Header
may not exist in the message in which case undefined
should be returned as it was previously.
maybe changing it to unwrap(mc:x_header(Header, Msg0));
is more appropriate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
submit a PR with a test case?
This PR implements an approach for a "protocol (data format) agnostic core" where the format of the message isn't converted at point of reception.
Currently all non AMQP 0.9.1 originating messages are converted into a AMQP 0.9.1 flavoured
basic_message
record before sent to a queue. If the messages are then consumed by the originating protocol they are converted back from AMQP 0.9.1. For some protocols such as MQTT 3.1 this isn't too expensive as MQTT is mostly a fairly easily mapped subset of AMQP 0.9.1 but for others such as AMQP 1.0 the conversions are awkward and in some cases lossy even if consuming from the originating protocol.This PR instead wraps all incoming messages in their originating form into a generic, extensible message container type (
mc
). The container module exposes an API to get common message details such as size and various properties (ttl
,priority
etc) directly from the source data type. Each protocol needs to implement themc
behaviour such that when a message originating form one protocol is consumed by another protocol we convert it to the target protocol at that point.The message container also contains annotations, dead letter records and other meta data we need to record during the lifetime of a message. The original protocol message is never modified unless it is consumed.
Find a way to make the changes made to the exchange type behaviour somehow backwards compatibledurable
annotation,combiningexchange
androuting_keys
into a singleroute
annotation