diff --git a/src/java/com/novemberain/langohr/Channel.java b/src/java/com/novemberain/langohr/Channel.java new file mode 100644 index 0000000..1c3784b --- /dev/null +++ b/src/java/com/novemberain/langohr/Channel.java @@ -0,0 +1,940 @@ +package com.novemberain.langohr; + +import com.rabbitmq.client.*; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +public class Channel { + private com.rabbitmq.client.Channel delegate; + private Connection connection; + + public Channel(Connection connection, com.rabbitmq.client.Channel channel) { + this.connection = connection; + this.delegate = channel; + } + + /** + * Abort this channel with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code + * and message 'OK'. + * + * Forces the channel to close and waits for the close operation to complete. + * Any encountered exceptions in the close operation are silently discarded. + */ + public void abort() throws IOException { + delegate.abort(); + } + + /** + * Start a non-nolocal, non-exclusive consumer, with + * explicit acknowledgement and a server-generated consumerTag. + * @param queue the name of the queue + * @param callback an interface to the consumer object + * @return the consumerTag generated by the server + * @throws java.io.IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @see #basicAck + * @see #basicConsume(String, boolean, String, boolean, boolean, java.util.Map, com.rabbitmq.client.Consumer) + */ + public String basicConsume(String queue, Consumer callback) throws IOException { + return delegate.basicConsume(queue, callback); + } + + /** + * Declare a queue + * @see com.rabbitmq.client.AMQP.Queue.Declare + * @see com.rabbitmq.client.AMQP.Queue.DeclareOk + * @param queue the name of the queue + * @param durable true if we are declaring a durable queue (the queue will survive a server restart) + * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) + * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) + * @param arguments other properties (construction arguments) for the queue + * @return a declaration-confirm method to indicate the queue was successfully declared + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments) throws IOException { + return delegate.queueDeclare(queue, durable, exclusive, autoDelete, arguments); + } + + /** + * Publish a message + * @see com.rabbitmq.client.AMQP.Basic.Publish + * @param exchange the exchange to publish the message to + * @param routingKey the routing key + * @param mandatory true if the 'mandatory' flag is to be set + * @param props other properties for the message - routing headers etc + * @param body the message body + * @throws java.io.IOException if an error is encountered + */ + public void basicPublish(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body) throws IOException { + delegate.basicPublish(exchange, routingKey, mandatory, props, body); + } + + /** + * Bind an exchange to an exchange. + * @see com.rabbitmq.client.AMQP.Exchange.Bind + * @see com.rabbitmq.client.AMQP.Exchange.BindOk + * @param destination the name of the exchange to which messages flow across the binding + * @param source the name of the exchange from which messages flow across the binding + * @param routingKey the routine key to use for the binding + * @param arguments other properties (binding parameters) + * @return a binding-confirm method if the binding was successfully created + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map arguments) throws IOException { + return delegate.exchangeBind(destination, source, routingKey, arguments); + } + + /** + * Enables TX mode on this channel. + * @see com.rabbitmq.client.AMQP.Tx.Select + * @see com.rabbitmq.client.AMQP.Tx.SelectOk + * @return a transaction-selection method to indicate the transaction was successfully initiated + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Tx.SelectOk txSelect() throws IOException { + return delegate.txSelect(); + } + + /** + * Declare an exchange passively; that is, check if the named exchange exists. + * @param name check the existence of an exchange named this + * @throws java.io.IOException the server will raise a 404 channel exception if the named exchange does not exist. + */ + public AMQP.Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException { + return delegate.exchangeDeclarePassive(name); + } + + /** + * Abort this channel. + * + * Forces the channel to close and waits for the close operation to complete. + * Any encountered exceptions in the close operation are silently discarded. + */ + public void abort(int closeCode, String closeMessage) throws IOException { + delegate.abort(closeCode, closeMessage); + } + + /** + * Ask the broker to resend unacknowledged messages. In 0-8 + * basic.recover is asynchronous; in 0-9-1 it is synchronous, and + * the new, deprecated method basic.recover_async is asynchronous. + * @param requeue If true, messages will be requeued and possibly + * delivered to a different consumer. If false, messages will be + * redelivered to the same consumer. + */ + public AMQP.Basic.RecoverOk basicRecover(boolean requeue) throws IOException { + return delegate.basicRecover(requeue); + } + + /** + * Reject one or several received messages. + * + * Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} + * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected. + * @see com.rabbitmq.client.AMQP.Basic.Nack + * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver} + * @param multiple true to reject all messages up to and including + * the supplied delivery tag; false to reject just the supplied + * delivery tag. + * @param requeue true if the rejected message(s) should be requeued rather + * than discarded/dead-lettered + * @throws java.io.IOException if an error is encountered + */ + public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException { + delegate.basicNack(deliveryTag, multiple, requeue); + } + + /** + * Delete an exchange, without regard for whether it is in use or not + * @see com.rabbitmq.client.AMQP.Exchange.Delete + * @see com.rabbitmq.client.AMQP.Exchange.DeleteOk + * @param exchange the name of the exchange + * @return a deletion-confirm method to indicate the exchange was successfully deleted + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Exchange.DeleteOk exchangeDelete(String exchange) throws IOException { + return delegate.exchangeDelete(exchange); + } + + /** + * Start a consumer. Calls the consumer's {@link com.rabbitmq.client.Consumer#handleConsumeOk} + * method. + * @param queue the name of the queue + * @param autoAck true if the server should consider messages + * acknowledged once delivered; false if the server should expect + * explicit acknowledgements + * @param consumerTag a client-generated consumer tag to establish context + * @param noLocal true if the server should not deliver to this consumer + * messages published on this channel's connection + * @param exclusive true if this is an exclusive consumer + * @param callback an interface to the consumer object + * @param arguments a set of arguments for the consume + * @return the consumerTag associated with the new consumer + * @throws java.io.IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + */ + public String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, Consumer callback) throws IOException { + return delegate.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, callback); + } + + /** + * Determine whether the component is currently open. + * Will return false if we are currently closing. + * Checking this method should be only for information, + * because of the race conditions - state can change after the call. + * Instead just execute and try to catch ShutdownSignalException + * and IOException + * + * @return true when component is open, false otherwise + */ + public boolean isOpen() { + return delegate.isOpen(); + } + + /** + * Actively declare a non-autodelete exchange with no extra arguments + * @see com.rabbitmq.client.AMQP.Exchange.Declare + * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk + * @param exchange the name of the exchange + * @param type the exchange type + * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) + * @throws java.io.IOException if an error is encountered + * @return a declaration-confirm method to indicate the exchange was successfully declared + */ + public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException { + return delegate.exchangeDeclare(exchange, type, durable); + } + + /** + * Bind an exchange to an exchange, with no extra arguments. + * @see com.rabbitmq.client.AMQP.Exchange.Bind + * @see com.rabbitmq.client.AMQP.Exchange.BindOk + * @param destination the name of the exchange to which messages flow across the binding + * @param source the name of the exchange from which messages flow across the binding + * @param routingKey the routine key to use for the binding + * @return a binding-confirm method if the binding was successfully created + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException { + return delegate.exchangeBind(destination, source, routingKey); + } + + /** + * Delete a queue, without regard for whether it is in use or has messages on it + * @see com.rabbitmq.client.AMQP.Queue.Delete + * @see com.rabbitmq.client.AMQP.Queue.DeleteOk + * @param queue the name of the queue + * @return a deletion-confirm method to indicate the queue was successfully deleted + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException { + return delegate.queueDelete(queue); + } + + /** + * Remove all {@link com.rabbitmq.client.ReturnListener}s. + */ + public void clearReturnListeners() { + delegate.clearReturnListeners(); + } + + /** + * Unbinds a queue from an exchange, with no extra arguments. + * @see com.rabbitmq.client.AMQP.Queue.Unbind + * @see com.rabbitmq.client.AMQP.Queue.UnbindOk + * @param queue the name of the queue + * @param exchange the name of the exchange + * @param routingKey the routine key to use for the binding + * @return an unbinding-confirm method if the binding was successfully deleted + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException { + return delegate.queueUnbind(queue, exchange, routingKey); + } + + /** + * Get the current default consumer. @see setDefaultConsumer for rationale. + * @return an interface to the current default consumer. + */ + public Consumer getDefaultConsumer() { + return delegate.getDefaultConsumer(); + } + + /** + * Request a specific prefetchCount "quality of service" settings + * for this channel. + * + * @see #basicQos(int, int, boolean) + * @param prefetchCount maximum number of messages that the server + * will deliver, 0 if unlimited + * @throws java.io.IOException if an error is encountered + */ + public void basicQos(int prefetchCount) throws IOException { + delegate.basicQos(prefetchCount); + } + + /** + * Remove a {@link com.rabbitmq.client.ConfirmListener}. + * @param listener the listener to remove + * @return true if the listener was found and removed, + * false otherwise + */ + public boolean removeConfirmListener(ConfirmListener listener) { + return delegate.removeConfirmListener(listener); + } + + /** + * When in confirm mode, returns the sequence number of the next + * message to be published. + * @return the sequence number of the next message to be published + */ + public long getNextPublishSeqNo() { + return delegate.getNextPublishSeqNo(); + } + + /** + * Bind a queue to an exchange. + * @see com.rabbitmq.client.AMQP.Queue.Bind + * @see com.rabbitmq.client.AMQP.Queue.BindOk + * @param queue the name of the queue + * @param exchange the name of the exchange + * @param routingKey the routine key to use for the binding + * @param arguments other properties (binding parameters) + * @return a binding-confirm method if the binding was successfully created + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map arguments) throws IOException { + return delegate.queueBind(queue, exchange, routingKey, arguments); + } + + /** + * Wait until all messages published since the last call have been + * either ack'd or nack'd by the broker; or until timeout elapses. + * If the timeout expires a TimeoutException is thrown. When + * called on a non-Confirm channel, waitForConfirms returns true + * immediately. + * @return whether all the messages were ack'd (and none were nack'd) + */ + public boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException { + return delegate.waitForConfirms(timeout); + } + + /** + * Purges the contents of the given queue. + * @see com.rabbitmq.client.AMQP.Queue.Purge + * @see com.rabbitmq.client.AMQP.Queue.PurgeOk + * @param queue the name of the queue + * @return a purge-confirm method if the purge was executed succesfully + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Queue.PurgeOk queuePurge(String queue) throws IOException { + return delegate.queuePurge(queue); + } + + /** + * Declare an exchange. + * @see com.rabbitmq.client.AMQP.Exchange.Declare + * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk + * @param exchange the name of the exchange + * @param type the exchange type + * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) + * @param autoDelete true if the server should delete the exchange when it is no longer in use + * @param arguments other properties (construction arguments) for the exchange + * @return a declaration-confirm method to indicate the exchange was successfully declared + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map arguments) throws IOException { + return delegate.exchangeDeclare(exchange, type, durable, autoDelete, arguments); + } + + /** + * Add a {@link com.rabbitmq.client.ConfirmListener}. + * @param listener the listener to add + */ + public void addConfirmListener(ConfirmListener listener) { + delegate.addConfirmListener(listener); + } + + /** + * Wait until all messages published since the last call have been + * either ack'd or nack'd by the broker. Note, when called on a + * non-Confirm channel, waitForConfirms returns true immediately. + * @return whether all the messages were ack'd (and none were nack'd) + */ + public boolean waitForConfirms() throws InterruptedException { + return delegate.waitForConfirms(); + } + + /** + * Start a non-nolocal, non-exclusive consumer, with + * a server-generated consumerTag. + * @param queue the name of the queue + * @param autoAck true if the server should consider messages + * acknowledged once delivered; false if the server should expect + * explicit acknowledgements + * @param callback an interface to the consumer object + * @return the consumerTag generated by the server + * @throws java.io.IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @see #basicConsume(String, boolean, String, boolean, boolean, java.util.Map, com.rabbitmq.client.Consumer) + */ + public String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException { + return delegate.basicConsume(queue, autoAck, callback); + } + + /** + * Close this channel. + * + * @param closeCode the close code (See under "Reply Codes" in the AMQP specification) + * @param closeMessage a message indicating the reason for closing the connection + * @throws java.io.IOException if an error is encountered + */ + public void close(int closeCode, String closeMessage) throws IOException { + try { + delegate.close(closeCode, closeMessage); + } finally { + this.connection.unregisterChannel(this); + } + } + + /** + * Rolls back a TX transaction on this channel. + * @see com.rabbitmq.client.AMQP.Tx.Rollback + * @see com.rabbitmq.client.AMQP.Tx.RollbackOk + * @return a transaction-rollback method to indicate the transaction was successfully rolled back + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Tx.RollbackOk txRollback() throws IOException { + return delegate.txRollback(); + } + + /** + * Retrieve the connection which carries this channel. + * @return the underlying {@link com.rabbitmq.client.Connection} + */ + public Connection getConnection() { + return connection; + } + + /** + * Add a {@link com.rabbitmq.client.FlowListener}. + * @param listener the listener to add + */ + public void addFlowListener(FlowListener listener) { + delegate.addFlowListener(listener); + } + + /** + * Unbind an exchange from an exchange, with no extra arguments. + * @see com.rabbitmq.client.AMQP.Exchange.Bind + * @see com.rabbitmq.client.AMQP.Exchange.BindOk + * @param destination the name of the exchange to which messages flow across the binding + * @param source the name of the exchange from which messages flow across the binding + * @param routingKey the routine key to use for the binding + * @return a binding-confirm method if the binding was successfully created + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException { + return delegate.exchangeUnbind(destination, source, routingKey); + } + + /** + * Declare an exchange, via an interface that allows the complete set of + * arguments. + * @see com.rabbitmq.client.AMQP.Exchange.Declare + * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk + * @param exchange the name of the exchange + * @param type the exchange type + * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) + * @param autoDelete true if the server should delete the exchange when it is no longer in use + * @param internal true if the exchange is internal, i.e. can't be directly + * published to by a client. + * @param arguments other properties (construction arguments) for the exchange + * @return a declaration-confirm method to indicate the exchange was successfully declared + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map arguments) throws IOException { + return delegate.exchangeDeclare(exchange, type, durable, autoDelete, internal, arguments); + } + + /** + * Protected API - notify the listeners attached to the component + * @see com.rabbitmq.client.ShutdownListener + */ + public void notifyListeners() { + delegate.notifyListeners(); + } + + /** + * Publish a message + * @see com.rabbitmq.client.AMQP.Basic.Publish + * @param exchange the exchange to publish the message to + * @param routingKey the routing key + * @param mandatory true if the 'mandatory' flag is to be set + * @param immediate true if the 'immediate' flag is to be + * set. Note that the RabbitMQ server does not support this flag. + * @param props other properties for the message - routing headers etc + * @param body the message body + * @throws java.io.IOException if an error is encountered + */ + public void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body) throws IOException { + delegate.basicPublish(exchange, routingKey, mandatory, immediate, props, body); + } + + /** + * Remove all {@link com.rabbitmq.client.FlowListener}s. + */ + public void clearFlowListeners() { + delegate.clearFlowListeners(); + } + + /** + * Publish a message + * @see com.rabbitmq.client.AMQP.Basic.Publish + * @param exchange the exchange to publish the message to + * @param routingKey the routing key + * @param props other properties for the message - routing headers etc + * @param body the message body + * @throws java.io.IOException if an error is encountered + */ + public void basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) throws IOException { + delegate.basicPublish(exchange, routingKey, props, body); + } + + /** + * Retrieve this channel's channel number. + * @return the channel number + */ + public int getChannelNumber() { + return delegate.getChannelNumber(); + } + + /** + * Remove shutdown listener for the component. + * + * @param listener {@link com.rabbitmq.client.ShutdownListener} to be removed + */ + public void removeShutdownListener(ShutdownListener listener) { + delegate.removeShutdownListener(listener); + } + + /** + * Set the current default consumer. + * + * Under certain circumstances it is possible for a channel to receive a + * message delivery which does not match any consumer which is currently + * set up via basicConsume(). This will occur after the following sequence + * of events: + * + * ctag = basicConsume(queue, consumer); // i.e. with explicit acks + * // some deliveries take place but are not acked + * basicCancel(ctag); + * basicRecover(false); + * + * Since requeue is specified to be false in the basicRecover, the spec + * states that the message must be redelivered to "the original recipient" + * - i.e. the same channel / consumer-tag. But the consumer is no longer + * active. + * + * In these circumstances, you can register a default consumer to handle + * such deliveries. If no default consumer is registered an + * IllegalStateException will be thrown when such a delivery arrives. + * + * Most people will not need to use this. + * + * @param consumer the consumer to use, or null indicating "don't use one". + */ + public void setDefaultConsumer(Consumer consumer) { + delegate.setDefaultConsumer(consumer); + } + + /** + * Bind a queue to an exchange, with no extra arguments. + * @see com.rabbitmq.client.AMQP.Queue.Bind + * @see com.rabbitmq.client.AMQP.Queue.BindOk + * @param queue the name of the queue + * @param exchange the name of the exchange + * @param routingKey the routine key to use for the binding + * @return a binding-confirm method if the binding was successfully created + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException { + return delegate.queueBind(queue, exchange, routingKey); + } + + /** + * Actively declare a server-named exclusive, autodelete, non-durable queue. + * The name of the new queue is held in the "queue" field of the {@link com.rabbitmq.client.AMQP.Queue.DeclareOk} result. + * @see com.rabbitmq.client.AMQP.Queue.Declare + * @see com.rabbitmq.client.AMQP.Queue.DeclareOk + * @return a declaration-confirm method to indicate the queue was successfully declared + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Queue.DeclareOk queueDeclare() throws IOException { + return delegate.queueDeclare(); + } + + /** + * Cancel a consumer. Calls the consumer's {@link com.rabbitmq.client.Consumer#handleCancelOk} + * method. + * @param consumerTag a client- or server-generated consumer tag to establish context + * @throws java.io.IOException if an error is encountered, or if the consumerTag is unknown + * @see com.rabbitmq.client.AMQP.Basic.Cancel + * @see com.rabbitmq.client.AMQP.Basic.CancelOk + */ + public void basicCancel(String consumerTag) throws IOException { + delegate.basicCancel(consumerTag); + } + + /** + * Unbind a queue from an exchange. + * @see com.rabbitmq.client.AMQP.Queue.Unbind + * @see com.rabbitmq.client.AMQP.Queue.UnbindOk + * @param queue the name of the queue + * @param exchange the name of the exchange + * @param routingKey the routine key to use for the binding + * @param arguments other properties (binding parameters) + * @return an unbinding-confirm method if the binding was successfully deleted + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map arguments) throws IOException { + return delegate.queueUnbind(queue, exchange, routingKey, arguments); + } + + /** + * Get the shutdown reason object + * @return ShutdownSignalException if component is closed, null otherwise + */ + public ShutdownSignalException getCloseReason() { + return delegate.getCloseReason(); + } + + /** + * Enables publisher acknowledgements on this channel. + * @see com.rabbitmq.client.AMQP.Confirm.Select + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Confirm.SelectOk confirmSelect() throws IOException { + return delegate.confirmSelect(); + } + + /** + * Ask the broker to resend unacknowledged messages. In 0-8 + * basic.recover is asynchronous; in 0-9-1 it is synchronous, and + * the new, deprecated method basic.recover_async is asynchronous + * and deprecated. + * @param requeue If true, messages will be requeued and possibly + * delivered to a different consumer. If false, messages will be + * redelivered to the same consumer. + */ + @Deprecated + public void basicRecoverAsync(boolean requeue) throws IOException { + delegate.basicRecoverAsync(requeue); + } + + /** + * Remove a {@link com.rabbitmq.client.FlowListener}. + * @param listener the listener to remove + * @return true if the listener was found and removed, + * false otherwise + */ + public boolean removeFlowListener(FlowListener listener) { + return delegate.removeFlowListener(listener); + } + + /** Wait until all messages published since the last call have + * been either ack'd or nack'd by the broker; or until timeout elapses. + * If the timeout expires a TimeoutException is thrown. If any of the + * messages were nack'd, waitForConfirmsOrDie will throw an + * IOException. When called on a non-Confirm channel, it will + * return immediately. */ + public void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException { + delegate.waitForConfirmsOrDie(timeout); + } + + /** Wait until all messages published since the last call have + * been either ack'd or nack'd by the broker. If any of the + * messages were nack'd, waitForConfirmsOrDie will throw an + * IOException. When called on a non-Confirm channel, it will + * return immediately. */ + public void waitForConfirmsOrDie() throws IOException, InterruptedException { + delegate.waitForConfirmsOrDie(); + } + + /** + * Request specific "quality of service" settings. + * + * These settings impose limits on the amount of data the server + * will deliver to consumers before requiring acknowledgements. + * Thus they provide a means of consumer-initiated flow control. + * @see com.rabbitmq.client.AMQP.Basic.Qos + * @param prefetchSize maximum amount of content (measured in + * octets) that the server will deliver, 0 if unlimited + * @param prefetchCount maximum number of messages that the server + * will deliver, 0 if unlimited + * @param global true if the settings should be applied to the + * entire connection rather than just the current channel + * @throws java.io.IOException if an error is encountered + */ + public void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException { + delegate.basicQos(prefetchSize, prefetchCount, global); + } + + /** + * Close this channel with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code + * and message 'OK'. + * + * @throws java.io.IOException if an error is encountered + */ + public void close() throws IOException { + try { + delegate.close(); + } finally { + this.connection.unregisterChannel(this); + } + } + + /** + * Delete an exchange + * @see com.rabbitmq.client.AMQP.Exchange.Delete + * @see com.rabbitmq.client.AMQP.Exchange.DeleteOk + * @param exchange the name of the exchange + * @param ifUnused true to indicate that the exchange is only to be deleted if it is unused + * @return a deletion-confirm method to indicate the exchange was successfully deleted + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException { + return delegate.exchangeDelete(exchange, ifUnused); + } + + /** + * Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get} + * @see com.rabbitmq.client.AMQP.Basic.Get + * @see com.rabbitmq.client.AMQP.Basic.GetOk + * @see com.rabbitmq.client.AMQP.Basic.GetEmpty + * @param queue the name of the queue + * @param autoAck true if the server should consider messages + * acknowledged once delivered; false if the server should expect + * explicit acknowledgements + * @return a {@link com.rabbitmq.client.GetResponse} containing the retrieved message data + * @throws java.io.IOException if an error is encountered + */ + public GetResponse basicGet(String queue, boolean autoAck) throws IOException { + return delegate.basicGet(queue, autoAck); + } + + /** + * Remove a {@link com.rabbitmq.client.ReturnListener}. + * @param listener the listener to remove + * @return true if the listener was found and removed, + * false otherwise + */ + public boolean removeReturnListener(ReturnListener listener) { + return delegate.removeReturnListener(listener); + } + + /** + * Ask the broker to resend unacknowledged messages. In 0-8 + * basic.recover is asynchronous; in 0-9-1 it is synchronous, and + * the new, deprecated method basic.recover_async is asynchronous. + *

+ * Equivalent to calling basicRecover(true), messages + * will be requeued and possibly delivered to a different consumer. + * @see #basicRecover(boolean) + */ + public AMQP.Basic.RecoverOk basicRecover() throws IOException { + return delegate.basicRecover(); + } + + /** + * Add shutdown listener. + * If the component is already closed, handler is fired immediately + * + * @param listener {@link com.rabbitmq.client.ShutdownListener} to the component + */ + public void addShutdownListener(ShutdownListener listener) { + delegate.addShutdownListener(listener); + } + + /** + * Set flow on the channel + * + * @param active if true, the server is asked to start sending. If false, the server is asked to stop sending. + * @throws java.io.IOException + */ + public AMQP.Channel.FlowOk flow(boolean active) throws IOException { + return delegate.flow(active); + } + + /** + * Asynchronously send a method over this channel. + * @param method method to transmit over this channel. + * @throws java.io.IOException Problem transmitting method. + */ + public void asyncRpc(Method method) throws IOException { + delegate.asyncRpc(method); + } + + /** + * Return the current Channel.Flow settings. + */ + public AMQP.Channel.FlowOk getFlow() { + return delegate.getFlow(); + } + + /** + * Declare a queue passively; i.e., check if it exists. In AMQP + * 0-9-1, all arguments aside from nowait are ignored; and sending + * nowait makes this method a no-op, so we default it to false. + * @see com.rabbitmq.client.AMQP.Queue.Declare + * @see com.rabbitmq.client.AMQP.Queue.DeclareOk + * @param queue the name of the queue + * @return a declaration-confirm method to indicate the queue exists + * @throws java.io.IOException if an error is encountered, + * including if the queue does not exist and if the queue is + * exclusively owned by another connection. + */ + public AMQP.Queue.DeclareOk queueDeclarePassive(String queue) throws IOException { + return delegate.queueDeclarePassive(queue); + } + + /** + * Commits a TX transaction on this channel. + * @see com.rabbitmq.client.AMQP.Tx.Commit + * @see com.rabbitmq.client.AMQP.Tx.CommitOk + * @return a transaction-commit method to indicate the transaction was successfully committed + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Tx.CommitOk txCommit() throws IOException { + return delegate.txCommit(); + } + + /** + * Add a {@link com.rabbitmq.client.ReturnListener}. + * @param listener the listener to add + */ + public void addReturnListener(ReturnListener listener) { + delegate.addReturnListener(listener); + } + + /** + * Actively declare a non-autodelete, non-durable exchange with no extra arguments + * @see com.rabbitmq.client.AMQP.Exchange.Declare + * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk + * @param exchange the name of the exchange + * @param type the exchange type + * @return a declaration-confirm method to indicate the exchange was successfully declared + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException { + return delegate.exchangeDeclare(exchange, type); + } + + /** + * Acknowledge one or several received + * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} + * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method + * containing the received message being acknowledged. + * @see com.rabbitmq.client.AMQP.Basic.Ack + * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver} + * @param multiple true to acknowledge all messages up to and + * including the supplied delivery tag; false to acknowledge just + * the supplied delivery tag. + * @throws java.io.IOException if an error is encountered + */ + public void basicAck(long deliveryTag, boolean multiple) throws IOException { + delegate.basicAck(deliveryTag, multiple); + } + + /** + * Synchronously send a method over this channel. + * @param method method to transmit over this channel. + * @return command response to method. Caller should cast as appropriate. + * @throws java.io.IOException Problem transmitting method. + */ + public Command rpc(Method method) throws IOException { + return delegate.rpc(method); + } + + /** + * Start a non-nolocal, non-exclusive consumer. + * @param queue the name of the queue + * @param autoAck true if the server should consider messages + * acknowledged once delivered; false if the server should expect + * explicit acknowledgements + * @param consumerTag a client-generated consumer tag to establish context + * @param callback an interface to the consumer object + * @return the consumerTag associated with the new consumer + * @throws java.io.IOException if an error is encountered + * @see com.rabbitmq.client.AMQP.Basic.Consume + * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk + * @see #basicConsume(String, boolean, String, boolean, boolean, java.util.Map, com.rabbitmq.client.Consumer) + */ + public String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException { + return delegate.basicConsume(queue, autoAck, consumerTag, callback); + } + + /** + * Remove all {@link com.rabbitmq.client.ConfirmListener}s. + */ + public void clearConfirmListeners() { + delegate.clearConfirmListeners(); + } + + /** + * Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk} + * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method + * containing the received message being rejected. + * @see com.rabbitmq.client.AMQP.Basic.Reject + * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver} + * @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered + * @throws java.io.IOException if an error is encountered + */ + public void basicReject(long deliveryTag, boolean requeue) throws IOException { + delegate.basicReject(deliveryTag, requeue); + } + + /** + * Delete a queue + * @see com.rabbitmq.client.AMQP.Queue.Delete + * @see com.rabbitmq.client.AMQP.Queue.DeleteOk + * @param queue the name of the queue + * @param ifUnused true if the queue should be deleted only if not in use + * @param ifEmpty true if the queue should be deleted only if empty + * @return a deletion-confirm method to indicate the queue was successfully deleted + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException { + return delegate.queueDelete(queue, ifUnused, ifEmpty); + } + + /** + * Unbind an exchange from an exchange. + * @see com.rabbitmq.client.AMQP.Exchange.Bind + * @see com.rabbitmq.client.AMQP.Exchange.BindOk + * @param destination the name of the exchange to which messages flow across the binding + * @param source the name of the exchange from which messages flow across the binding + * @param routingKey the routine key to use for the binding + * @param arguments other properties (binding parameters) + * @return a binding-confirm method if the binding was successfully created + * @throws java.io.IOException if an error is encountered + */ + public AMQP.Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map arguments) throws IOException { + return delegate.exchangeUnbind(destination, source, routingKey, arguments); + } + + public com.rabbitmq.client.Channel getDelegate() { + return delegate; + } + + public void automaticallyRecover(Connection connection, com.rabbitmq.client.Connection delegate) throws IOException { + this.connection = connection; + + this.delegate = delegate.createChannel(this.getChannelNumber()); + } +} diff --git a/src/java/com/novemberain/langohr/Connection.java b/src/java/com/novemberain/langohr/Connection.java new file mode 100644 index 0000000..067a0a9 --- /dev/null +++ b/src/java/com/novemberain/langohr/Connection.java @@ -0,0 +1,380 @@ +package com.novemberain.langohr; + +import clojure.lang.IPersistentMap; +import clojure.lang.Keyword; +import clojure.lang.PersistentHashMap; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.ShutdownListener; +import com.rabbitmq.client.ShutdownSignalException; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; + +public class Connection { + private static final IPersistentMap DEFAULT_OPTIONS = buildDefaultOptions(); + public static final String AUTOMATICALLY_RECOVER_KEYWORD_NAME = "automatically-recover"; + public static final Keyword AUTOMATICALLY_RECOVER_KEYWORD = Keyword.intern(null, AUTOMATICALLY_RECOVER_KEYWORD_NAME); + private static final long DEFAULT_NETWORK_RECOVERY_PERIOD = 5000; + private final IPersistentMap options; + private final ConcurrentSkipListSet shutdownHooks; + private com.rabbitmq.client.Connection delegate; + private ShutdownListener automaticRecoveryListener; + private Map channels; + + private static IPersistentMap buildDefaultOptions() { + Map m = new HashMap(); + m.put(AUTOMATICALLY_RECOVER_KEYWORD, true); + + return PersistentHashMap.create(m); + } + + private final ConnectionFactory cf; + + public Connection(ConnectionFactory cf) { + this(cf, DEFAULT_OPTIONS); + } + + public Connection(ConnectionFactory cf, IPersistentMap options) { + this.cf = cf; + this.options = options; + + this.channels = new ConcurrentHashMap(); + this.shutdownHooks = new ConcurrentSkipListSet(); + } + + public Connection init() throws IOException { + this.delegate = cf.newConnection(); + + if (this.automaticRecoveryEnabled()) { + this.addAutomaticRecoveryHook(); + } + + return this; + } + + private void addAutomaticRecoveryHook() { + final Connection c = this; + automaticRecoveryListener = new ShutdownListener() { + public void shutdownCompleted(ShutdownSignalException cause) { + try { + c.beginAutomaticRecovery(); + } catch (InterruptedException e) { + // no-op, we cannot really do anything useful here, + // doing nothing will prevent automatic recovery + // from continuing. MK. + } catch (IOException e) { + // no-op, see above + // TODO: exponential backoff on how long we wait + } + } + }; + + this.shutdownHooks.add(automaticRecoveryListener); + this.delegate.addShutdownListener(automaticRecoveryListener); + } + + private void beginAutomaticRecovery() throws InterruptedException, IOException { + Thread.sleep(DEFAULT_NETWORK_RECOVERY_PERIOD); + + this.recoverConnection(); + this.recoverShutdownHooks(); + this.recoverChannels(); + } + + private void recoverChannels() throws IOException { + Iterator it = this.channels.entrySet().iterator(); + while(it.hasNext()) { + Map.Entry e =(Map.Entry)it.next(); + Channel ch = e.getValue(); + + ch.automaticallyRecover(this, this.delegate); + } + } + + private void recoverShutdownHooks() { + for (ShutdownListener sh : this.shutdownHooks) { + this.delegate.addShutdownListener(sh); + } + } + + private void recoverConnection() throws IOException { + this.delegate = this.cf.newConnection(); + } + + public boolean automaticRecoveryEnabled() { + return this.options.containsKey(AUTOMATICALLY_RECOVER_KEYWORD); + } + + /** + * Abort this connection and all its channels + * with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code + * and message 'OK'. + *

+ * Forces the connection to close. + * Any encountered exceptions in the close operations are silently discarded. + */ + public void abort() { + delegate.abort(); + } + + /** + * Remove shutdown listener for the component. + * + * @param listener {@link com.rabbitmq.client.ShutdownListener} to be removed + */ + public void removeShutdownListener(ShutdownListener listener) { + delegate.removeShutdownListener(listener); + } + + /** + * Get the negotiated maximum channel number. Usable channel + * numbers range from 1 to this number, inclusive. + * + * @return the maximum channel number permitted for this connection. + */ + public int getChannelMax() { + return delegate.getChannelMax(); + } + + /** + * Add shutdown listener. + * If the component is already closed, handler is fired immediately + * + * @param listener {@link com.rabbitmq.client.ShutdownListener} to the component + */ + public void addShutdownListener(ShutdownListener listener) { + delegate.addShutdownListener(listener); + } + + /** + * Close this connection and all its channels. + *

+ * Waits with the given timeout for all the close operations to complete. + * When timeout is reached the socket is forced to close. + * + * @param closeCode the close code (See under "Reply Codes" in the AMQP specification) + * @param closeMessage a message indicating the reason for closing the connection + * @param timeout timeout (in milliseconds) for completing all the close-related + * operations, use -1 for infinity + * @throws java.io.IOException if an I/O problem is encountered + */ + public void close(int closeCode, String closeMessage, int timeout) throws IOException { + delegate.close(closeCode, closeMessage, timeout); + } + + /** + * Retrieve the port number. + * + * @return the port number of the peer we're connected to. + */ + public int getPort() { + return delegate.getPort(); + } + + /** + * Close this connection and all its channels + * with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code + * and message 'OK'. + *

+ * This method behaves in a similar way as {@link #close()}, with the only difference + * that it waits with a provided timeout for all the close operations to + * complete. When timeout is reached the socket is forced to close. + * + * @param timeout timeout (in milliseconds) for completing all the close-related + * operations, use -1 for infinity + * @throws java.io.IOException if an I/O problem is encountered + */ + public void close(int timeout) throws IOException { + delegate.close(timeout); + } + + /** + * Create a new channel, using the specified channel number if possible. + * + * @param channelNumber the channel number to allocate + * @return a new channel descriptor, or null if this channel number is already in use + * @throws java.io.IOException if an I/O problem is encountered + */ + public Channel createChannel(int channelNumber) throws IOException { + final Channel channel = new Channel(this, delegate.createChannel(channelNumber)); + this.registerChannel(channel); + + return channel; + } + + private void registerChannel(Channel channel) { + this.channels.put(channel.getChannelNumber(), channel); + } + + public void unregisterChannel(Channel channel) { + this.channels.remove(channel.getChannelNumber()); + } + + /** + * Abort this connection and all its channels + * with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code + * and message 'OK'. + *

+ * This method behaves in a similar way as {@link #abort()}, with the only difference + * that it waits with a provided timeout for all the close operations to + * complete. When timeout is reached the socket is forced to close. + * + * @param timeout timeout (in milliseconds) for completing all the close-related + * operations, use -1 for infinity + */ + public void abort(int timeout) { + delegate.abort(timeout); + } + + public com.rabbitmq.client.Connection getDelegate() { + return delegate; + } + + /** + * Create a new channel, using an internally allocated channel number. + * + * @return a new channel descriptor, or null if none is available + * @throws java.io.IOException if an I/O problem is encountered + */ + public Channel createChannel() throws IOException { + return new Channel(this, delegate.createChannel()); + } + + /** + * Abort this connection and all its channels. + *

+ * Forces the connection to close and waits for all the close operations to complete. + * Any encountered exceptions in the close operations are silently discarded. + * + * @param closeCode the close code (See under "Reply Codes" in the AMQP specification) + * @param closeMessage a message indicating the reason for closing the connection + */ + public void abort(int closeCode, String closeMessage) { + delegate.abort(closeCode, closeMessage); + } + + /** + * Close this connection and all its channels + * with the {@link com.rabbitmq.client.AMQP#REPLY_SUCCESS} close code + * and message 'OK'. + *

+ * Waits for all the close operations to complete. + * + * @throws java.io.IOException if an I/O problem is encountered + */ + public void close() throws IOException { + delegate.close(); + } + + /** + * Retrieve the server properties. + * + * @return a map of the server properties. This typically includes the product name and version of the server. + */ + public Map getServerProperties() { + return delegate.getServerProperties(); + } + + /** + * Get a copy of the map of client properties sent to the server + * + * @return a copy of the map of client properties + */ + public Map getClientProperties() { + return delegate.getClientProperties(); + } + + /** + * Close this connection and all its channels. + *

+ * Waits for all the close operations to complete. + * + * @param closeCode the close code (See under "Reply Codes" in the AMQP specification) + * @param closeMessage a message indicating the reason for closing the connection + * @throws java.io.IOException if an I/O problem is encountered + */ + public void close(int closeCode, String closeMessage) throws IOException { + delegate.close(closeCode, closeMessage); + } + + /** + * Get the shutdown reason object + * + * @return ShutdownSignalException if component is closed, null otherwise + */ + public ShutdownSignalException getCloseReason() { + return delegate.getCloseReason(); + } + + /** + * Retrieve the host. + * + * @return the hostname of the peer we're connected to. + */ + public InetAddress getAddress() { + return delegate.getAddress(); + } + + /** + * Get the negotiated heartbeat interval. + * + * @return the heartbeat interval, in seconds; zero if none + */ + public int getHeartbeat() { + return delegate.getHeartbeat(); + } + + /** + * Determine whether the component is currently open. + * Will return false if we are currently closing. + * Checking this method should be only for information, + * because of the race conditions - state can change after the call. + * Instead just execute and try to catch ShutdownSignalException + * and IOException + * + * @return true when component is open, false otherwise + */ + public boolean isOpen() { + return delegate.isOpen(); + } + + /** + * Protected API - notify the listeners attached to the component + * + * @see com.rabbitmq.client.ShutdownListener + */ + public void notifyListeners() { + delegate.notifyListeners(); + } + + /** + * Get the negotiated maximum frame size. + * + * @return the maximum frame size, in octets; zero if unlimited + */ + public int getFrameMax() { + return delegate.getFrameMax(); + } + + /** + * Abort this connection and all its channels. + *

+ * Forces the connection to close and waits with the given timeout + * for all the close operations to complete. When timeout is reached + * the socket is forced to close. + * Any encountered exceptions in the close operations are silently discarded. + * + * @param closeCode the close code (See under "Reply Codes" in the AMQP specification) + * @param closeMessage a message indicating the reason for closing the connection + * @param timeout timeout (in milliseconds) for completing all the close-related + * operations, use -1 for infinity + */ + public void abort(int closeCode, String closeMessage, int timeout) { + delegate.abort(closeCode, closeMessage, timeout); + } +}