Skip to content
Browse files

fixes #3: ChannelOwner: bad wrapping of Channel methods that do not r…

…eturn anything
  • Loading branch information...
1 parent 4fd41f8 commit 98e540795e1fa815b02fc81b2c29444d8bfe2d48 @sstone sstone committed with pm47
View
11 README.md
@@ -45,9 +45,12 @@ The latest snapshot (development) version is 1.0-SNAPSHOT, the latest tagged ver
## Library design
-I guess that if you ended up here you already know a bit about AMQP 0.9.1. There are very nice tutorial on the
-[RabbitMQ](http://www.rabbitmq.com/) website, and also [there](http://www.zeromq.org/whitepapers:amqp-analysis), and
-probably many other...
+This is a thin wrapper over the RabbitMQ java client, which tries to take advantage of the nice actor model provided
+by the Akka library. There is no effort to "hide/encapsulate" the RabbitMQ library (and I don't really see the point anyway
+since AMQP is a binary protocol spec, not an API spec).
+So to make the most of this library you should first check the documentation for the RabbitMQ client, and learn a bit
+about AMQP 0.9.1. There are very nice tutorial on the [RabbitMQ](http://www.rabbitmq.com/) website, and
+also [there](http://www.zeromq.org/whitepapers:amqp-analysis), and probably many other...
### Connection and channel management
@@ -61,7 +64,7 @@ ConnectionOwner and ChannelOwner are implemened as Akka actors, using Akka super
* when a connection is lost, the connection owner will create a new connection and provide each of its children with a
new channel
-YMMV, but using few connections (one per JVM) and many channels per connection is a common practice.
+YMMV, but using few connections (one per JVM) and many channels per connection (one per thread) is a common practice.
### Basic usage
View
61 src/main/scala/com/aphelia/amqp/Amqp.scala
@@ -6,7 +6,6 @@ import com.rabbitmq.client.{Channel, Envelope}
import akka.actor.{Actor, Props, ActorRef, ActorSystem}
import akka.actor.FSM.{SubscribeTransitionCallBack, CurrentState, Transition}
import java.util.concurrent.CountDownLatch
-import java.util.Date
object Amqp {
@@ -68,35 +67,61 @@ object Amqp {
*/
case class ChannelParameters(qos: Int)
- case class DeclareQueue(queue: QueueParameters)
+ /**
+ * requests that can be sent to a ChannelOwner actor
+ */
- case class DeleteQueue(name: String, ifUnused: Boolean = false, ifEmpty: Boolean = false)
+ sealed trait Request
- case class PurgeQueue(name: String)
+ case class DeclareQueue(queue: QueueParameters) extends Request
- case class DeclareExchange(exchange: ExchangeParameters)
+ case class DeleteQueue(name: String, ifUnused: Boolean = false, ifEmpty: Boolean = false) extends Request
- case class DeleteExchange(name: String, ifUnused: Boolean = false)
+ case class PurgeQueue(name: String) extends Request
- case class QueueBind(queue: String, exchange: String, routing_key: String, args: Map[String, AnyRef] = Map.empty)
+ case class DeclareExchange(exchange: ExchangeParameters) extends Request
- case class QueueUnbind(queue: String, exchange: String, routing_key: String, args: Map[String, AnyRef] = Map.empty)
+ case class DeleteExchange(name: String, ifUnused: Boolean = false) extends Request
- case class Binding(exchange: ExchangeParameters, queue: QueueParameters, routingKey: String, autoack: Boolean)
+ case class QueueBind(queue: String, exchange: String, routing_key: String, args: Map[String, AnyRef] = Map.empty) extends Request
- case class Delivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte])
+ case class QueueUnbind(queue: String, exchange: String, routing_key: String, args: Map[String, AnyRef] = Map.empty) extends Request
- case class Publish(exchange: String, key: String, body: Array[Byte], properties: Option[BasicProperties] = None, mandatory: Boolean = true, immediate: Boolean = false)
+ case class Binding(exchange: ExchangeParameters, queue: QueueParameters, routingKey: String, autoack: Boolean) extends Request
- case class ReturnedMessage(replyCode: Int, replyText: String, exchange: String, routingKey: String, properties: BasicProperties, body: Array[Byte])
+ case class Publish(exchange: String, key: String, body: Array[Byte], properties: Option[BasicProperties] = None, mandatory: Boolean = true, immediate: Boolean = false) extends Request
- case class Ack(deliveryTag: Long)
+ case class Ack(deliveryTag: Long) extends Request
- case class Reject(deliveryTag: Long, requeue: Boolean = true)
+ case class Reject(deliveryTag: Long, requeue: Boolean = true) extends Request
- case class Transaction(publish: List[Publish])
+ case class Transaction(publish: List[Publish]) extends Request
- case class Error(e: Throwable)
+ /**
+ * sent back by a publisher when the request was processed successfully but there is nothing more more meaningful to
+ * return
+ * @param request original request
+ */
+ case class Ok(request:Request)
+
+ case class Error(request:Request, reason:Throwable)
+
+
+ /**
+ * AMQP delivery, which is sent to the actor that you register with a Consumer
+ * @param consumerTag AMQP consumer tag
+ * @param envelope AMQP envelope
+ * @param properties AMQP properties
+ * @param body message body
+ * @see [[com.aphelia.amqp.Consumer]]
+ */
+ case class Delivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte])
+
+ /**
+ * wrapper for returned, or undelivered, messages i.e. messages published with the immediate flag an and an
+ * (exchange, key) pair for which the broker could not find any destination
+ */
+ case class ReturnedMessage(replyCode: Int, replyText: String, exchange: String, routingKey: String, properties: BasicProperties, body: Array[Byte])
/**executes a callback when a connection or channel actors is "connected" i.e. usable
* <ul>
@@ -124,8 +149,8 @@ object Amqp {
channelOrConnectionActor ! SubscribeTransitionCallBack(m)
}
- /**wait until a number of connection or channel actors are connected
- *
+ /**
+ * wait until a number of connection or channel actors are connected
* @param system actor system (will be used to create temporary watchers)
* @param actors set of reference to ConnectionOwner or ChannelOwner actors
* @return a CountDownLatch object you can wait on; its count will reach 0 when all actors are connected
View
2 src/main/scala/com/aphelia/amqp/App.scala
@@ -29,7 +29,7 @@ object App {
implicit val timeout = Timeout(2 seconds)
val check = Await.result(c.ask(DeclareQueue(QueueParameters("no_such_queue", passive = true))), timeout.duration)
check match {
- case Amqp.Error(cause) => { println(cause) }
+ case Amqp.Error(_, cause) => throw cause
case uh => println(uh)
}
system.stop(conn)
View
67 src/main/scala/com/aphelia/amqp/ChannelOwner.scala
@@ -24,23 +24,14 @@ object ChannelOwner {
private[amqp] case class Connected(channel: com.rabbitmq.client.Channel) extends Data
- def withChannel[T](channel: Channel)(f: Channel => T) = {
+ def withChannel[T](channel: Channel, request: Request)(f: Channel => T) = {
try {
f(channel)
}
catch {
- case e: IOException => Amqp.Error(e)
+ case e: IOException => Amqp.Error(request, e)
}
}
-
- def publishMessage(channel: Channel, publish: Publish) {
- import publish._
- val props = properties match {
- case Some(p) => p
- case None => new AMQP.BasicProperties.Builder().build()
- }
- channel.basicPublish(exchange, key, mandatory, immediate, props, body)
- }
}
/**
@@ -109,54 +100,64 @@ class ChannelOwner(channelParams: Option[ChannelParameters] = None) extends Acto
* sent by the actor's parent when the AMQP connection is lost
*/
case Event(Shutdown(cause), _) => goto(Disconnected)
- case Event(Publish(exchange, routingKey, body, properties, mandatory, immediate), Connected(channel)) => {
+ case Event(request@Publish(exchange, routingKey, body, properties, mandatory, immediate), Connected(channel)) => {
val props = properties getOrElse new AMQP.BasicProperties.Builder().build()
- stay replying withChannel(channel)(c => c.basicPublish(exchange, routingKey, mandatory, immediate, props, body))
+ stay replying withChannel(channel, request)(c => {
+ c.basicPublish(exchange, routingKey, mandatory, immediate, props, body)
+ Ok(request)
+ })
}
- case Event(Transaction(publish), Connected(channel)) => {
- stay replying withChannel(channel) {
+ case Event(request@Transaction(publish), Connected(channel)) => {
+ stay replying withChannel(channel, request) {
c => {
c.txSelect()
publish.foreach(p => c.basicPublish(p.exchange, p.key, p.mandatory, p.immediate, new AMQP.BasicProperties.Builder().build(), p.body))
c.txCommit()
+ Ok(request)
}
}
}
- case Event(Ack(deliveryTag), Connected(channel)) => {
+ case Event(request@Ack(deliveryTag), Connected(channel)) => {
log.debug("acking %d on %s".format(deliveryTag, channel))
- stay replying withChannel(channel)(c => c.basicAck(deliveryTag, false))
+ stay replying withChannel(channel, request)(c => {
+ c.basicAck(deliveryTag, false)
+ Ok(request)
+ })
}
- case Event(Reject(deliveryTag, requeue), Connected(channel)) => {
+ case Event(request@Reject(deliveryTag, requeue), Connected(channel)) => {
log.debug("rejecting %d on %s".format(deliveryTag, channel))
- stay replying withChannel(channel)(c => c.basicReject(deliveryTag, requeue))
+ stay replying withChannel(channel, request)(c => {
+ c.basicReject(deliveryTag, requeue)
+ Ok(request)
+ })
}
- case Event(DeclareExchange(exchange), Connected(channel)) => {
+ case Event(request@DeclareExchange(exchange), Connected(channel)) => {
log.debug("declaring exchange {}", exchange)
- stay replying withChannel(channel)(c => declareExchange(c, exchange))
+ stay replying withChannel(channel, request)(c => declareExchange(c, exchange))
}
- case Event(DeleteExchange(exchange, ifUnused), Connected(channel)) => {
+ case Event(request@DeleteExchange(exchange, ifUnused), Connected(channel)) => {
log.debug("deleting exchange {} ifUnused {}", exchange, ifUnused)
- stay replying withChannel(channel)(c => c.exchangeDelete(exchange, ifUnused))
+ stay replying withChannel(channel, request)(c => c.exchangeDelete(exchange, ifUnused))
}
- case Event(DeclareQueue(queue), Connected(channel)) => {
+ case Event(request@DeclareQueue(queue), Connected(channel)) => {
log.debug("declaring queue {}", queue)
- stay replying withChannel(channel)(c => declareQueue(c, queue))
+ stay replying withChannel(channel, request)(c => declareQueue(c, queue))
}
- case Event(PurgeQueue(queue), Connected(channel)) => {
+ case Event(request@PurgeQueue(queue), Connected(channel)) => {
log.debug("purging queue {}", queue)
- stay replying withChannel(channel)(c => c.queuePurge(queue))
+ stay replying withChannel(channel, request)(c => c.queuePurge(queue))
}
- case Event(DeleteQueue(queue, ifUnused, ifEmpty), Connected(channel)) => {
+ case Event(request@DeleteQueue(queue, ifUnused, ifEmpty), Connected(channel)) => {
log.debug("deleting queue {} ifUnused {} ifEmpty {}", queue, ifUnused, ifEmpty)
- stay replying withChannel(channel)(c => c.queueDelete(queue, ifUnused, ifEmpty))
+ stay replying withChannel(channel, request)(c => c.queueDelete(queue, ifUnused, ifEmpty))
}
- case Event(QueueBind(queue, exchange, routingKey, args), Connected(channel)) => {
+ case Event(request@QueueBind(queue, exchange, routingKey, args), Connected(channel)) => {
log.debug("binding queue {} to key {} on exchange {}", queue, routingKey, exchange)
- stay replying withChannel(channel)(c => c.queueBind(queue, exchange, routingKey, args))
+ stay replying withChannel(channel, request)(c => c.queueBind(queue, exchange, routingKey, args))
}
- case Event(QueueUnbind(queue, exchange, routingKey, args), Connected(channel)) => {
+ case Event(request@QueueUnbind(queue, exchange, routingKey, args), Connected(channel)) => {
log.debug("unbinding queue {} to key {} on exchange {}", queue, routingKey, exchange)
- stay replying withChannel(channel)(c => c.queueUnbind(queue, exchange, routingKey, args))
+ stay replying withChannel(channel, request)(c => c.queueUnbind(queue, exchange, routingKey, args))
}
}
View
4 src/main/scala/com/aphelia/amqp/Consumer.scala
@@ -7,13 +7,13 @@ import com.rabbitmq.client.AMQP.BasicProperties
/**
* Create an AMQP consumer, which takes a list of AMQP bindings, a listener to forward messages to, and optional channel parameters.
- * For each (Exchange, Queue, RoutingKey) biding, the consumer will
+ * For each (Exchange, Queue, RoutingKey) biding, the consumer will:
* <ul>
* <li>declare the exchange</li>
* <li>declare the queue</li>
* <li>bind the queue to the routing key on the exchange</li>
* <li>consume messages from the queue</li>
- * <li>forward them to the listener actor</li>
+ * <li>forward them to the listener actor, wrapped in a [[com.aphelia.amqp.Amqp.Delivery]] instance</li>
* </ul>
* @param bindings list of bindings
* @param listener optional listener actor; if not set, self will be used instead
View
9 src/test/scala/com/aphelia/amqp/ChannelOwnerSpec.scala
@@ -49,7 +49,7 @@ class ChannelOwnerSpec extends BasicAmqpTestSpec {
println(check1)
check1 match {
case ok: Queue.DeclareOk => assert(ok.getMessageCount == 1)
- case Amqp.Error(cause) => throw cause
+ case Amqp.Error(_, cause) => throw cause
}
// purge the queue
@@ -60,7 +60,7 @@ class ChannelOwnerSpec extends BasicAmqpTestSpec {
1 second)
check2 match {
case ok: Queue.DeclareOk => assert(ok.getMessageCount == 0)
- case Amqp.Error(cause) => throw cause
+ case Amqp.Error(_, cause) => throw cause
}
// delete the queue
val check3 = Await.result(
@@ -69,10 +69,7 @@ class ChannelOwnerSpec extends BasicAmqpTestSpec {
println(check3)
check3 match {
case ok: Queue.DeleteOk => {}
- case Amqp.Error(cause) => {
- println(cause)
- throw cause
- }
+ case Amqp.Error(_, cause) => throw cause
}
system.stop(conn)
}

0 comments on commit 98e5407

Please sign in to comment.
Something went wrong with that request. Please try again.