diff --git a/src/main/scala/com/aphelia/amqp/Amqp.scala b/src/main/scala/com/aphelia/amqp/Amqp.scala index ee536f5..1285398 100644 --- a/src/main/scala/com/aphelia/amqp/Amqp.scala +++ b/src/main/scala/com/aphelia/amqp/Amqp.scala @@ -98,12 +98,21 @@ object Amqp { case class Transaction(publish: List[Publish]) extends Request /** - * sent back by a publisher when the request was processed successfully but there is nothing more more meaningful to - * return + * sent back by a publisher when the request was processed successfully * @param request original request + * @param result optional result. Each request maps directly to a RabbitMQ Channel method: DeclareQueue maps to + * Channel.queueDeclare(), Publish maps to Channel.basicPublish() ... + * When the Channel methods returns something, result wraps that something, otherwise it is empty + * For example: + * */ - case class Ok(request:Request) + case class Ok(request:Request, result:Option[AnyRef] = None) + /** + * sent back by a publisher when the request was not processed successfully + * @param request original request + * @param reason whatever error that was thrown when the request was processed + */ case class Error(request:Request, reason:Throwable) diff --git a/src/main/scala/com/aphelia/amqp/ChannelOwner.scala b/src/main/scala/com/aphelia/amqp/ChannelOwner.scala index 1c1e768..decdd80 100644 --- a/src/main/scala/com/aphelia/amqp/ChannelOwner.scala +++ b/src/main/scala/com/aphelia/amqp/ChannelOwner.scala @@ -4,11 +4,10 @@ import collection.JavaConversions._ import com.rabbitmq.client.AMQP.BasicProperties import com.rabbitmq.client._ import com.aphelia.amqp.ChannelOwner.{Data, State} -import akka.actor.{ActorRef, Actor, FSM} +import akka.actor.{Actor, FSM} import com.aphelia.amqp.ConnectionOwner.Shutdown import com.aphelia.amqp.Amqp._ import java.io.IOException -import com.aphelia.amqp.RpcServer.ProcessResult object ChannelOwner { @@ -133,31 +132,31 @@ class ChannelOwner(channelParams: Option[ChannelParameters] = None) extends Acto } case Event(request@DeclareExchange(exchange), Connected(channel)) => { log.debug("declaring exchange {}", exchange) - stay replying withChannel(channel, request)(c => declareExchange(c, exchange)) + stay replying withChannel(channel, request)(c => Ok(request, Some(declareExchange(c, exchange)))) } case Event(request@DeleteExchange(exchange, ifUnused), Connected(channel)) => { log.debug("deleting exchange {} ifUnused {}", exchange, ifUnused) - stay replying withChannel(channel, request)(c => c.exchangeDelete(exchange, ifUnused)) + stay replying withChannel(channel, request)(c => Ok(request, Some(c.exchangeDelete(exchange, ifUnused)))) } case Event(request@DeclareQueue(queue), Connected(channel)) => { log.debug("declaring queue {}", queue) - stay replying withChannel(channel, request)(c => declareQueue(c, queue)) + stay replying withChannel(channel, request)(c => Ok(request, Some(declareQueue(c, queue)))) } case Event(request@PurgeQueue(queue), Connected(channel)) => { log.debug("purging queue {}", queue) - stay replying withChannel(channel, request)(c => c.queuePurge(queue)) + stay replying withChannel(channel, request)(c => Ok(request, Some(c.queuePurge(queue)))) } case Event(request@DeleteQueue(queue, ifUnused, ifEmpty), Connected(channel)) => { log.debug("deleting queue {} ifUnused {} ifEmpty {}", queue, ifUnused, ifEmpty) - stay replying withChannel(channel, request)(c => c.queueDelete(queue, ifUnused, ifEmpty)) + stay replying withChannel(channel, request)(c => Ok(request, Some(c.queueDelete(queue, ifUnused, ifEmpty)))) } case Event(request@QueueBind(queue, exchange, routingKey, args), Connected(channel)) => { log.debug("binding queue {} to key {} on exchange {}", queue, routingKey, exchange) - stay replying withChannel(channel, request)(c => c.queueBind(queue, exchange, routingKey, args)) + stay replying withChannel(channel, request)(c => Ok(request, Some(c.queueBind(queue, exchange, routingKey, args)))) } case Event(request@QueueUnbind(queue, exchange, routingKey, args), Connected(channel)) => { log.debug("unbinding queue {} to key {} on exchange {}", queue, routingKey, exchange) - stay replying withChannel(channel, request)(c => c.queueUnbind(queue, exchange, routingKey, args)) + stay replying withChannel(channel, request)(c => Ok(request, Some(c.queueUnbind(queue, exchange, routingKey, args)))) } } diff --git a/src/test/scala/com/aphelia/amqp/ChannelOwnerSpec.scala b/src/test/scala/com/aphelia/amqp/ChannelOwnerSpec.scala index a43406c..603d030 100644 --- a/src/test/scala/com/aphelia/amqp/ChannelOwnerSpec.scala +++ b/src/test/scala/com/aphelia/amqp/ChannelOwnerSpec.scala @@ -4,7 +4,7 @@ import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import akka.testkit.TestProbe import akka.util.duration._ -import akka.actor.{PoisonPill, Props} +import akka.actor.Props import java.util.concurrent.{TimeUnit, Executors} import akka.dispatch.Future import akka.dispatch.ExecutionContext @@ -43,34 +43,22 @@ class ChannelOwnerSpec extends BasicAmqpTestSpec { instance ! QueueBind(queue, "amq.direct", "my_test_key") instance ! Publish("amq.direct", "my_test_key", "yo!".getBytes) // check that there is 1 message in the queue - val check1 = Await.result( + val Amqp.Ok(_, Some(check1:Queue.DeclareOk)) = Await.result( instance.ask(DeclareQueue(QueueParameters(queue, passive = true))), 1 second) - println(check1) - check1 match { - case ok: Queue.DeclareOk => assert(ok.getMessageCount == 1) - case Amqp.Error(_, cause) => throw cause - } + assert(check1.getMessageCount === 1) // purge the queue instance ! PurgeQueue(queue) // check that there are no more messages in the queue - val check2 = Await.result( + val Amqp.Ok(_, Some(check2:Queue.DeclareOk)) = Await.result( instance.ask(DeclareQueue(QueueParameters(queue, passive = true))), 1 second) - check2 match { - case ok: Queue.DeclareOk => assert(ok.getMessageCount == 0) - case Amqp.Error(_, cause) => throw cause - } + assert(check2.getMessageCount === 0) // delete the queue - val check3 = Await.result( + val Amqp.Ok(_, Some(check3:Queue.DeleteOk)) = Await.result( instance.ask(DeleteQueue(queue)), 1 second) - println(check3) - check3 match { - case ok: Queue.DeleteOk => {} - case Amqp.Error(_, cause) => throw cause - } system.stop(conn) } "implement basic error handling" in { @@ -83,7 +71,7 @@ class ChannelOwnerSpec extends BasicAmqpTestSpec { instance.ask(DeclareQueue(QueueParameters("no_such_queue", passive = true))), 1 second) println(check1) - assert(check1.getClass == classOf[Amqp.Error]) + assert(check1.getClass === classOf[Amqp.Error]) system.stop(conn) } }