Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Consumer API improvements #3

Closed
carlhoerberg opened this Issue · 21 comments

2 participants

@carlhoerberg

Today if I want to manually ack and nack messages in a subscription I have to do something like this:

(lhcons/subscribe 
  channel queue (fn [delivery headers payload] 
                  (let [envelope (.getEnvelope delivery)
                        delivery-tag (.getDeliveryTag envelope)]
                    (try 
                      (message-handler delivery headers payload)
                      (lhb/ack channel delivery-tag)
                      (catch Exception e
                        (prn (.getMessage e) (.printStackTrace e))
                        (lhb/nack channel delivery-tag false true))))))

which isn't very pretty, but it's because you don't have access to the channel in the message-handler. Should the channel be sent to the message-handler? How do we do that without breaking back-compatibility? As a new function maybe..

Also I think that delivery should be replaced with envelope as it's the only thing which isn't extracted from delivery already (http://www.rabbitmq.com/releases/rabbitmq-java-client/v2.8.1/rabbitmq-java-client-javadoc-2.8.1/com/rabbitmq/client/QueueingConsumer.Delivery.html).

I also find it weird that auto-ack is false by default, now when it's so cumbersome to manually ack..

@michaelklishin

Passing channel in is a good idea. However, in my code channels are always stored in vars and I never use anonymous functions with langohr.consumers/subscribe so adding one more parameter is not going to change much in this case. One more thing we can do is to turn message properties into a Clojure map so it can be easily destructured:

(fn [envelope {:keys [type content-type]} payload]
  ())
@michaelklishin

I also have ideas for more sophisticated consumers (that won't be single functions; instead, we will reify the consumer interface) that can perform deserialization or dispatch messages based on message type. Although the latter can also be well served by multimethods.

@michaelklishin

After giving it some thought, I think we can change handler signature to be

(fn [channel metadata payload] …)

where metadata will combine envelope and message properties into a single Clojure map, much like Ruby clients do. I don't think there are any significant advantages to having a single argument in Ruby clients but in Langohr it will allow you to use destructuring and that would make resulting code more idiomatic at effectively no performance cost.

Thoughts?

@carlhoerberg

is storing channel in vars really a good idea? i use one thread per subscription and would then share the channel between multiple threads *gulp

the idea of having a combined "metadata" map is excellent!

@carlhoerberg

oh, you included the channel in the signature now, that's perfect :)

@carlhoerberg

i see that you silently return on InterruptedException, is that wise? shouldn't it be up to the developer to decide what to do? i for instance would like it to retry to connect..

@michaelklishin

Having channel in the function argument list does not change the fact that it may be shared between threads. It will be the same channel your consumer was registered on, not a new one. If you just consume and ack individual messages, it is not a problem. What is dangerous is concurrent publishing. When you publish an empty message (for example), it causes two
frames to be sent down the wire:

[basic.publish method frame] [message metadata frame]

and for messages with some payload, it looks like this

[basic.publish method frame] [message metadata frame] [message body frame]+

so with concurrent publishes sharing a channel, you may get frames delivered to RabbitMQ out of order. This is why consumers that publish back should use a separate channel and it's common to have "one channel per purpose". I am not sure Langohr can or should enforce this.

About the InterruptedException, I am open to ideas and Langohr currently does not have any opinionated way of error handling. But we also can change things because it is still 1.0.0-beta stage and we haven't "marketed" Langohr at all, so I don't expect there are many users besides myself.

@carlhoerberg

i'm not 100% up to clojure's terminology and interpreted "in my code channels are always stored in vars" as:

(def channel
 (.createChannel connection))

but thanks for the throughout explanation!

@michaelklishin

You interpreted it exactly right. We will see what we can do about this and error recovery, lets finish message handler signature changes first.

@carlhoerberg

alright. exactly, do you want me to jump on it?

@michaelklishin

I am going to push signature changes in a few hours, with a new snapshot. I'd be happy if you can try it and see if it is any easier to use for you. If it is, I will cut the next beta and upgrade my apps later this week.

@carlhoerberg
@michaelklishin

New 1.0.0-SNAPSHOT is up on clojars.org, please give it a try. I really like this change so far.

@carlhoerberg
@carlhoerberg

super nice! i really like the merged map of headers, thanks!

@michaelklishin

Can we close this issue and release the next beta? Then figure out error handling (obviously, not in a few hours). Langohr was literally my first real world Clojure library so it has plenty of things that need to be improved or completely reworked (but most of them won't break existing API). Now is the time to do it.

@michaelklishin

I am adding new issues for things I have in mind. Feel free to add your ideas, too.

@carlhoerberg

would you like to release the next beta? an updated snapshot gave a lot of troubles at Heroku (due to heavy jar caching)

@michaelklishin

Within an hour

@carlhoerberg

Thanks :)

@michaelklishin

It's out

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.