Skip to content

Commit

Permalink
fixes sstone#3: ChannelOwner: bad wrapping of Channel methods that do…
Browse files Browse the repository at this point in the history
… not return anything
  • Loading branch information
sstone committed Oct 25, 2012
1 parent 7e07541 commit c7fb56b
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 64 deletions.
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ The latest snapshot (development) version is 1.0-SNAPSHOT, the latest tagged ver

## Library design

I guess that if you ended up here you already know a bit about AMQP 0.9.1. There are very nice tutorial on the
[RabbitMQ](http://www.rabbitmq.com/) website, and also [there](http://www.zeromq.org/whitepapers:amqp-analysis), and
probably many other...
This is a thin wrapper over the RabbitMQ java client, which tries to take advantage of the nice actor model provided
by the Akka library. There is no effort to "hide/encapsulate" the RabbitMQ library (and I don't really see the point anyway
since AMQP is a binary protocol spec, not an API spec).
So to make the most of this library you should first check the documentation for the RabbitMQ client, and learn a bit
about AMQP 0.9.1. There are very nice tutorial on the [RabbitMQ](http://www.rabbitmq.com/) website, and
also [there](http://www.zeromq.org/whitepapers:amqp-analysis), and probably many other...

### Connection and channel management

Expand All @@ -61,7 +64,7 @@ ConnectionOwner and ChannelOwner are implemened as Akka actors, using Akka super
* when a connection is lost, the connection owner will create a new connection and provide each of its children with a
new channel

YMMV, but using few connections (one per JVM) and many channels per connection is a common practice.
YMMV, but using few connections (one per JVM) and many channels per connection (one per thread) is a common practice.

### Basic usage

Expand Down
61 changes: 43 additions & 18 deletions src/main/scala/com/aphelia/amqp/Amqp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import com.rabbitmq.client.{Channel, Envelope}
import akka.actor.{Actor, Props, ActorRef, ActorSystem}
import akka.actor.FSM.{SubscribeTransitionCallBack, CurrentState, Transition}
import java.util.concurrent.CountDownLatch
import java.util.Date

object Amqp {

Expand Down Expand Up @@ -68,35 +67,61 @@ object Amqp {
*/
case class ChannelParameters(qos: Int)

case class DeclareQueue(queue: QueueParameters)
/**
* requests that can be sent to a ChannelOwner actor
*/

case class DeleteQueue(name: String, ifUnused: Boolean = false, ifEmpty: Boolean = false)
sealed trait Request

case class PurgeQueue(name: String)
case class DeclareQueue(queue: QueueParameters) extends Request

case class DeclareExchange(exchange: ExchangeParameters)
case class DeleteQueue(name: String, ifUnused: Boolean = false, ifEmpty: Boolean = false) extends Request

case class DeleteExchange(name: String, ifUnused: Boolean = false)
case class PurgeQueue(name: String) extends Request

case class QueueBind(queue: String, exchange: String, routing_key: String, args: Map[String, AnyRef] = Map.empty)
case class DeclareExchange(exchange: ExchangeParameters) extends Request

case class QueueUnbind(queue: String, exchange: String, routing_key: String, args: Map[String, AnyRef] = Map.empty)
case class DeleteExchange(name: String, ifUnused: Boolean = false) extends Request

case class Binding(exchange: ExchangeParameters, queue: QueueParameters, routingKey: String, autoack: Boolean)
case class QueueBind(queue: String, exchange: String, routing_key: String, args: Map[String, AnyRef] = Map.empty) extends Request

case class Delivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte])
case class QueueUnbind(queue: String, exchange: String, routing_key: String, args: Map[String, AnyRef] = Map.empty) extends Request

case class Publish(exchange: String, key: String, body: Array[Byte], properties: Option[BasicProperties] = None, mandatory: Boolean = true, immediate: Boolean = false)
case class Binding(exchange: ExchangeParameters, queue: QueueParameters, routingKey: String, autoack: Boolean) extends Request

case class ReturnedMessage(replyCode: Int, replyText: String, exchange: String, routingKey: String, properties: BasicProperties, body: Array[Byte])
case class Publish(exchange: String, key: String, body: Array[Byte], properties: Option[BasicProperties] = None, mandatory: Boolean = true, immediate: Boolean = false) extends Request

case class Ack(deliveryTag: Long)
case class Ack(deliveryTag: Long) extends Request

case class Reject(deliveryTag: Long, requeue: Boolean = true)
case class Reject(deliveryTag: Long, requeue: Boolean = true) extends Request

case class Transaction(publish: List[Publish])
case class Transaction(publish: List[Publish]) extends Request

case class Error(e: Throwable)
/**
* sent back by a publisher when the request was processed successfully but there is nothing more more meaningful to
* return
* @param request original request
*/
case class Ok(request:Request)

case class Error(request:Request, reason:Throwable)


/**
* AMQP delivery, which is sent to the actor that you register with a Consumer
* @param consumerTag AMQP consumer tag
* @param envelope AMQP envelope
* @param properties AMQP properties
* @param body message body
* @see [[com.aphelia.amqp.Consumer]]
*/
case class Delivery(consumerTag: String, envelope: Envelope, properties: BasicProperties, body: Array[Byte])

/**
* wrapper for returned, or undelivered, messages i.e. messages published with the immediate flag an and an
* (exchange, key) pair for which the broker could not find any destination
*/
case class ReturnedMessage(replyCode: Int, replyText: String, exchange: String, routingKey: String, properties: BasicProperties, body: Array[Byte])

/**executes a callback when a connection or channel actors is "connected" i.e. usable
* <ul>
Expand Down Expand Up @@ -124,8 +149,8 @@ object Amqp {
channelOrConnectionActor ! SubscribeTransitionCallBack(m)
}

/**wait until a number of connection or channel actors are connected
*
/**
* wait until a number of connection or channel actors are connected
* @param system actor system (will be used to create temporary watchers)
* @param actors set of reference to ConnectionOwner or ChannelOwner actors
* @return a CountDownLatch object you can wait on; its count will reach 0 when all actors are connected
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/aphelia/amqp/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object App {
implicit val timeout = Timeout(2 seconds)
val check = Await.result(c.ask(DeclareQueue(QueueParameters("no_such_queue", passive = true))), timeout.duration)
check match {
case Amqp.Error(cause) => { println(cause) }
case Amqp.Error(_, cause) => throw cause
case uh => println(uh)
}
system.stop(conn)
Expand Down
67 changes: 34 additions & 33 deletions src/main/scala/com/aphelia/amqp/ChannelOwner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,14 @@ object ChannelOwner {

private[amqp] case class Connected(channel: com.rabbitmq.client.Channel) extends Data

def withChannel[T](channel: Channel)(f: Channel => T) = {
def withChannel[T](channel: Channel, request: Request)(f: Channel => T) = {
try {
f(channel)
}
catch {
case e: IOException => Amqp.Error(e)
case e: IOException => Amqp.Error(request, e)
}
}

def publishMessage(channel: Channel, publish: Publish) {
import publish._
val props = properties match {
case Some(p) => p
case None => new AMQP.BasicProperties.Builder().build()
}
channel.basicPublish(exchange, key, mandatory, immediate, props, body)
}
}

/**
Expand Down Expand Up @@ -109,54 +100,64 @@ class ChannelOwner(channelParams: Option[ChannelParameters] = None) extends Acto
* sent by the actor's parent when the AMQP connection is lost
*/
case Event(Shutdown(cause), _) => goto(Disconnected)
case Event(Publish(exchange, routingKey, body, properties, mandatory, immediate), Connected(channel)) => {
case Event(request@Publish(exchange, routingKey, body, properties, mandatory, immediate), Connected(channel)) => {
val props = properties getOrElse new AMQP.BasicProperties.Builder().build()
stay replying withChannel(channel)(c => c.basicPublish(exchange, routingKey, mandatory, immediate, props, body))
stay replying withChannel(channel, request)(c => {
c.basicPublish(exchange, routingKey, mandatory, immediate, props, body)
Ok(request)
})
}
case Event(Transaction(publish), Connected(channel)) => {
stay replying withChannel(channel) {
case Event(request@Transaction(publish), Connected(channel)) => {
stay replying withChannel(channel, request) {
c => {
c.txSelect()
publish.foreach(p => c.basicPublish(p.exchange, p.key, p.mandatory, p.immediate, new AMQP.BasicProperties.Builder().build(), p.body))
c.txCommit()
Ok(request)
}
}
}
case Event(Ack(deliveryTag), Connected(channel)) => {
case Event(request@Ack(deliveryTag), Connected(channel)) => {
log.debug("acking %d on %s".format(deliveryTag, channel))
stay replying withChannel(channel)(c => c.basicAck(deliveryTag, false))
stay replying withChannel(channel, request)(c => {
c.basicAck(deliveryTag, false)
Ok(request)
})
}
case Event(Reject(deliveryTag, requeue), Connected(channel)) => {
case Event(request@Reject(deliveryTag, requeue), Connected(channel)) => {
log.debug("rejecting %d on %s".format(deliveryTag, channel))
stay replying withChannel(channel)(c => c.basicReject(deliveryTag, requeue))
stay replying withChannel(channel, request)(c => {
c.basicReject(deliveryTag, requeue)
Ok(request)
})
}
case Event(DeclareExchange(exchange), Connected(channel)) => {
case Event(request@DeclareExchange(exchange), Connected(channel)) => {
log.debug("declaring exchange {}", exchange)
stay replying withChannel(channel)(c => declareExchange(c, exchange))
stay replying withChannel(channel, request)(c => declareExchange(c, exchange))
}
case Event(DeleteExchange(exchange, ifUnused), Connected(channel)) => {
case Event(request@DeleteExchange(exchange, ifUnused), Connected(channel)) => {
log.debug("deleting exchange {} ifUnused {}", exchange, ifUnused)
stay replying withChannel(channel)(c => c.exchangeDelete(exchange, ifUnused))
stay replying withChannel(channel, request)(c => c.exchangeDelete(exchange, ifUnused))
}
case Event(DeclareQueue(queue), Connected(channel)) => {
case Event(request@DeclareQueue(queue), Connected(channel)) => {
log.debug("declaring queue {}", queue)
stay replying withChannel(channel)(c => declareQueue(c, queue))
stay replying withChannel(channel, request)(c => declareQueue(c, queue))
}
case Event(PurgeQueue(queue), Connected(channel)) => {
case Event(request@PurgeQueue(queue), Connected(channel)) => {
log.debug("purging queue {}", queue)
stay replying withChannel(channel)(c => c.queuePurge(queue))
stay replying withChannel(channel, request)(c => c.queuePurge(queue))
}
case Event(DeleteQueue(queue, ifUnused, ifEmpty), Connected(channel)) => {
case Event(request@DeleteQueue(queue, ifUnused, ifEmpty), Connected(channel)) => {
log.debug("deleting queue {} ifUnused {} ifEmpty {}", queue, ifUnused, ifEmpty)
stay replying withChannel(channel)(c => c.queueDelete(queue, ifUnused, ifEmpty))
stay replying withChannel(channel, request)(c => c.queueDelete(queue, ifUnused, ifEmpty))
}
case Event(QueueBind(queue, exchange, routingKey, args), Connected(channel)) => {
case Event(request@QueueBind(queue, exchange, routingKey, args), Connected(channel)) => {
log.debug("binding queue {} to key {} on exchange {}", queue, routingKey, exchange)
stay replying withChannel(channel)(c => c.queueBind(queue, exchange, routingKey, args))
stay replying withChannel(channel, request)(c => c.queueBind(queue, exchange, routingKey, args))
}
case Event(QueueUnbind(queue, exchange, routingKey, args), Connected(channel)) => {
case Event(request@QueueUnbind(queue, exchange, routingKey, args), Connected(channel)) => {
log.debug("unbinding queue {} to key {} on exchange {}", queue, routingKey, exchange)
stay replying withChannel(channel)(c => c.queueUnbind(queue, exchange, routingKey, args))
stay replying withChannel(channel, request)(c => c.queueUnbind(queue, exchange, routingKey, args))
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/aphelia/amqp/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import com.rabbitmq.client.AMQP.BasicProperties

/**
* Create an AMQP consumer, which takes a list of AMQP bindings, a listener to forward messages to, and optional channel parameters.
* For each (Exchange, Queue, RoutingKey) biding, the consumer will
* For each (Exchange, Queue, RoutingKey) biding, the consumer will:
* <ul>
* <li>declare the exchange</li>
* <li>declare the queue</li>
* <li>bind the queue to the routing key on the exchange</li>
* <li>consume messages from the queue</li>
* <li>forward them to the listener actor</li>
* <li>forward them to the listener actor, wrapped in a [[com.aphelia.amqp.Amqp.Delivery]] instance</li>
* </ul>
* @param bindings list of bindings
* @param listener optional listener actor; if not set, self will be used instead
Expand Down
9 changes: 3 additions & 6 deletions src/test/scala/com/aphelia/amqp/ChannelOwnerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ChannelOwnerSpec extends BasicAmqpTestSpec {
println(check1)
check1 match {
case ok: Queue.DeclareOk => assert(ok.getMessageCount == 1)
case Amqp.Error(cause) => throw cause
case Amqp.Error(_, cause) => throw cause
}

// purge the queue
Expand All @@ -60,7 +60,7 @@ class ChannelOwnerSpec extends BasicAmqpTestSpec {
1 second)
check2 match {
case ok: Queue.DeclareOk => assert(ok.getMessageCount == 0)
case Amqp.Error(cause) => throw cause
case Amqp.Error(_, cause) => throw cause
}
// delete the queue
val check3 = Await.result(
Expand All @@ -69,10 +69,7 @@ class ChannelOwnerSpec extends BasicAmqpTestSpec {
println(check3)
check3 match {
case ok: Queue.DeleteOk => {}
case Amqp.Error(cause) => {
println(cause)
throw cause
}
case Amqp.Error(_, cause) => throw cause
}
system.stop(conn)
}
Expand Down

0 comments on commit c7fb56b

Please sign in to comment.