Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support intercepting and converting outgoing messages #1162

Closed
vladykin opened this issue Apr 17, 2021 · 13 comments
Closed

Support intercepting and converting outgoing messages #1162

vladykin opened this issue Apr 17, 2021 · 13 comments
Labels

Comments

@vladykin
Copy link

vladykin commented Apr 17, 2021

There a several cross-cutting concerns associated with emitting messages to outgoing channel:

  • serializing domain objects to wire format
  • logging the message that is about to be sent (and the result of sending)
  • updating metrics

Having to write this stuff in all places of application where something is emitted or returned from an @outgoing-annotated method is inconvenient. It would be very handy to have an interceptor that would process all outgoing messages and be able to log/count/convert/etc them.

For incoming messages there is MessageConverter which can be used for this purpose. For outgoing messages, AFAIK, these is nothing like that.

P.S. I'm using smallrye-reactive-messaging-amqp 2.9 with Quarkus 1.13.1.

@cescoffier
Copy link
Contributor

That's an interesting idea. One thing is that AMQP is a bit behind in term of feature (in comparison to Kafka which have been a priority for the last few months). I have a few questions (mostly to collect ideas):

serializing domain objects to wire format

That's generally protocol specific? Typically in Kafka you can configure your serializer. The AMQP protocol is strict in terms of type (AMQP Type system), however, for object (where we map the object into JSON at the moment) we can imagine a hook that would be something like:

public <T> byte[] serialize(T instance, OutgoingAmqpMetadata metadata);

Implementations would be able to serialize the instance as they want (producing a byte[]) as well as updating the outgoing message metadata (typically the content-type header).

I guess the HTTP connector can also benefit from such an interceptor (@michalszynkiewicz)

logging the message that is about to be sent (and the result of sending)

That's definitely something we should add. However, the API is going to be protocol-specific. In Kafka, we will call the listeners with a Record (going to be sent) and RecordMetadata (result). HTTP would use something like an HTTP Request and an HTTP Response.

The question is do we need multiple hooks or a single one receiving what has been sent and the result. Typically, what about:

public <I, O> void onSuccess(I sent, O response);
public <I, O> void onFailure(I sent, Exception e);

Note that the onFailure does not allow retry or anything. This is already handled by the connector.

updating metrics

We already expose metrics (at least for Kafka). We should add the AMQP metrics soon.

@Ladicek
Copy link
Collaborator

Ladicek commented Apr 19, 2021

Is this perhaps related to #1141? (If so, I guess we should keep this one open, as it has more details/ideas. I'd close #1141 as duplicate.)

@vladykin
Copy link
Author

serializing domain objects to wire format

That's generally protocol specific? Typically in Kafka you can configure your serializer. The AMQP protocol is strict in terms of type (AMQP Type system), however, for object (where we map the object into JSON at the moment) we can imagine a hook that would be something like: public <T> byte[] serialize(T instance, OutgoingAmqpMetadata metadata);

Well, MessageConverter used for incoming messages is not protocol-specific but works just fine.
What about OutgoingMessageConverter with a couple of methods like:

    boolean canConvert(Message<?> in);
    Message<?> convert(Message<?> in);

? Similar to MessageConverter, but without target type argument, which is to be decided internally by the converter.

Implementations can access protocol-specific metadata of Message, if they need to.

@vladykin
Copy link
Author

vladykin commented Apr 19, 2021

logging the message that is about to be sent (and the result of sending)

That's definitely something we should add. However, the API is going to be protocol-specific. In Kafka, we will call the listeners with a Record (going to be sent) and RecordMetadata (result). HTTP would use something like an HTTP Request and an HTTP Response.
The question is do we need multiple hooks or a single one receiving what has been sent and the result.
Typically, what about:
public <I, O> void onSuccess(I sent, O response);
public <I, O> void onFailure(I sent, Exception e);

I'd prefer a hook which would allow to intercept the following events: message about to be sent, message successfully sent (ack'ed by broker), message sending failed. And to reliably match all these events corresponding to a single message, e.g. to measure time between sending the message and getting ack for it.

@cescoffier
Copy link
Contributor

So, if I understand correctly, your MessageConverter is called before the connector is called. Because the first thing an outgoing connector does is to transform an incoming Message into its own protocol-specific structure.

@vladykin
Copy link
Author

Yes, OutgoingMessageConverter I described can be called in the message processing pipeline before the message is passed to the outgoing connector.
Maybe there are advanced usecases when access to low-level connector-specific stuff is needed. But for my usecase - serializing protobuf object to byte array - such connector-agnostic converter would be enough.

@scrocquesel
Copy link
Contributor

We have similar needs. We have use cases where we want to add some metadata as cross-cutting concerns (correlationId, authenticated principal name, ...).

@ylepikhov
Copy link

ylepikhov commented Dec 10, 2021

Hi. I need to work with XML and SOAP messages. At the moment I've written MessageConverter for incoming messages (it uses Apache Axiom). But there is no way to do something similar for outgoing messages. I suppose that OutgoingMessageConverter could be useful in my case. Actually, I think that there is no need in to introduce new interface and MessageConverter may be reused. Connector can specify second parameter (Type type) according to it needs. Or may be target type could be made configurable at channel level (like mp.messaging.outgoing.myqueue.outputtype=fully.qulified.class.Name)

@ylepikhov
Copy link

ylepikhov commented Apr 20, 2022

It's possible to implement such logic with io.smallrye.reactive.messaging.providers.PublisherDecorator (which is not documented). It has one method PublisherDecorator.decorate taking publisher instance and channel name as parameters and returning new decorated publisher instance. This method gets called once for each channel at startup. I've implemented such decorator to convert all outgoing messages to String (see Kotlin code below).

@ApplicationScoped
class AmqpSoapPublisherDecorator @Inject constructor(
    // inject all converters
    private val converters: Instance<MessageConverter>,
    // and config
    private val config: Config) : PublisherDecorator {

    // called once for each channel at startup
    override fun decorate(publisher: Multi<out Message<*>>, channelName: String): Multi<out Message<*>> {

        // examine outgoing channel configuration
        val connector = config.getOptionalValue("mp.messaging.outgoing.$channelName.connector", String::class.java)
        // if connector is amqp        
        if (connector.isPresent && "smallrye-amqp".equals(connector.get())) {
            // apply converters to convert to String (see also io.smallrye.reactive.messaging.providers.helpers.ConverterUtils) 
            return ConverterUtils.convert(publisher, converters, String::class.java)
        }
        // otherwise return unchanged publisher 
        return publisher
    }
}

So it's possible to decorate publisher based on channel configuration.

@knutwannheden
Copy link

@ylepikhov I tested your approach in a Quarkus application, where I want to "decorate" outgoing messages as well as extract information from incoming messages (both using the smallrye-kafka connector). I was however only able to get this to work for incoming channels, not outgoing channels.

Stepping through the code the decorate() method gets called for incoming channels from ConfiguredChannelFactory#createPublisher(). The corresponding method ConfiguredChannelFactory#createSubscriber() which gets called for outgoing channels does however not call decorate(), so I am a bit surprised that you were able to get this working for an outgoing channel. Can you maybe tell me what I am missing?

The only other place I see calling decorate() is AbstractMediator#decorate(), but this method never ends up getting called in my application.

Looking at the SmallRye Reactive Messaging 3.16.0 documentation (Quarkus 2.9.0 is on version 3.15.0) I can see documentation for the experimental feature Custom Emitter Implementations. This sounds like it could be another alternative, but I would still be interested in how to use PublisherDecorator for outgoing channels.

@ylepikhov
Copy link

ylepikhov commented Jul 8, 2022

@knutwannheden, sorry for delay. You're right. In my case there was message processor with both @Incoming and @Outgoing. In this case there will be implicit incoming channel for @Outgoing. But it does not work for regular emitters annotated with @Channel.

@ylepikhov
Copy link

Was able to implement such feature with custom emitters.
First, I've implemented an annotation to have a way to specify target class (to convert to).

@Qualifier
@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class ConvertPayloadFor(
   val connector: String,
   val clazz: KClass<*>)

For example, to make SOAPMessage be converted to String for smallrye-ampq connector you must
have this:

@ApplicationScoped
class AmqpConversionConfig {

    // specify connector name and class to be converted
    @ConvertPayloadFor("smallrye-amqp", SOAPMessage::class)
    @Produces
    // function name does not really matter
    fun amqpConvertSOAPMessageTo() = String::class.java // target class
}

Actual conversion will be made by regular MessageConverter which can convert Message<SOAPMessage> to String (I've implemented such converter).

Finally, to send SOAPMessage you may have something like this:

@ApplicationScoped
class Test @Inject constructor(@param:Channel("test") val emitter: EmitterEx<SOAPMessage>) {
    
    suspend fun send(payload: SOAPMessage) {
        
        emitter.sendSuspending(payload)
    } 
}

Where EmitterEx is my custom emitter type (also I've implemented custom emitter implementation class and factory class).

But, it does not work for message processor (having both @Incoming and @Outgoint annotations).
Such logic shold be implemented differently to support all scenarios.

@ozangunalp
Copy link
Collaborator

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants