Skip to content

Commit

Permalink
fixes sstone#3: ChannelOwner: bad wrapping of Channel methods that do…
Browse files Browse the repository at this point in the history
… not return anything
  • Loading branch information
sstone committed Oct 26, 2012
1 parent c7fb56b commit 25163b2
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 31 deletions.
15 changes: 12 additions & 3 deletions src/main/scala/com/aphelia/amqp/Amqp.scala
Expand Up @@ -98,12 +98,21 @@ object Amqp {
case class Transaction(publish: List[Publish]) extends Request

/**
* sent back by a publisher when the request was processed successfully but there is nothing more more meaningful to
* return
* sent back by a publisher when the request was processed successfully
* @param request original request
* @param result optional result. Each request maps directly to a RabbitMQ Channel method: DeclareQueue maps to
* Channel.queueDeclare(), Publish maps to Channel.basicPublish() ...
* When the Channel methods returns something, result wraps that something, otherwise it is empty
* For example:
*
*/
case class Ok(request:Request)
case class Ok(request:Request, result:Option[AnyRef] = None)

/**
* sent back by a publisher when the request was not processed successfully
* @param request original request
* @param reason whatever error that was thrown when the request was processed
*/
case class Error(request:Request, reason:Throwable)


Expand Down
17 changes: 8 additions & 9 deletions src/main/scala/com/aphelia/amqp/ChannelOwner.scala
Expand Up @@ -4,11 +4,10 @@ import collection.JavaConversions._
import com.rabbitmq.client.AMQP.BasicProperties
import com.rabbitmq.client._
import com.aphelia.amqp.ChannelOwner.{Data, State}
import akka.actor.{ActorRef, Actor, FSM}
import akka.actor.{Actor, FSM}
import com.aphelia.amqp.ConnectionOwner.Shutdown
import com.aphelia.amqp.Amqp._
import java.io.IOException
import com.aphelia.amqp.RpcServer.ProcessResult

object ChannelOwner {

Expand Down Expand Up @@ -133,31 +132,31 @@ class ChannelOwner(channelParams: Option[ChannelParameters] = None) extends Acto
}
case Event(request@DeclareExchange(exchange), Connected(channel)) => {
log.debug("declaring exchange {}", exchange)
stay replying withChannel(channel, request)(c => declareExchange(c, exchange))
stay replying withChannel(channel, request)(c => Ok(request, Some(declareExchange(c, exchange))))
}
case Event(request@DeleteExchange(exchange, ifUnused), Connected(channel)) => {
log.debug("deleting exchange {} ifUnused {}", exchange, ifUnused)
stay replying withChannel(channel, request)(c => c.exchangeDelete(exchange, ifUnused))
stay replying withChannel(channel, request)(c => Ok(request, Some(c.exchangeDelete(exchange, ifUnused))))
}
case Event(request@DeclareQueue(queue), Connected(channel)) => {
log.debug("declaring queue {}", queue)
stay replying withChannel(channel, request)(c => declareQueue(c, queue))
stay replying withChannel(channel, request)(c => Ok(request, Some(declareQueue(c, queue))))
}
case Event(request@PurgeQueue(queue), Connected(channel)) => {
log.debug("purging queue {}", queue)
stay replying withChannel(channel, request)(c => c.queuePurge(queue))
stay replying withChannel(channel, request)(c => Ok(request, Some(c.queuePurge(queue))))
}
case Event(request@DeleteQueue(queue, ifUnused, ifEmpty), Connected(channel)) => {
log.debug("deleting queue {} ifUnused {} ifEmpty {}", queue, ifUnused, ifEmpty)
stay replying withChannel(channel, request)(c => c.queueDelete(queue, ifUnused, ifEmpty))
stay replying withChannel(channel, request)(c => Ok(request, Some(c.queueDelete(queue, ifUnused, ifEmpty))))
}
case Event(request@QueueBind(queue, exchange, routingKey, args), Connected(channel)) => {
log.debug("binding queue {} to key {} on exchange {}", queue, routingKey, exchange)
stay replying withChannel(channel, request)(c => c.queueBind(queue, exchange, routingKey, args))
stay replying withChannel(channel, request)(c => Ok(request, Some(c.queueBind(queue, exchange, routingKey, args))))
}
case Event(request@QueueUnbind(queue, exchange, routingKey, args), Connected(channel)) => {
log.debug("unbinding queue {} to key {} on exchange {}", queue, routingKey, exchange)
stay replying withChannel(channel, request)(c => c.queueUnbind(queue, exchange, routingKey, args))
stay replying withChannel(channel, request)(c => Ok(request, Some(c.queueUnbind(queue, exchange, routingKey, args))))
}
}

Expand Down
26 changes: 7 additions & 19 deletions src/test/scala/com/aphelia/amqp/ChannelOwnerSpec.scala
Expand Up @@ -4,7 +4,7 @@ import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import akka.testkit.TestProbe
import akka.util.duration._
import akka.actor.{PoisonPill, Props}
import akka.actor.Props
import java.util.concurrent.{TimeUnit, Executors}
import akka.dispatch.Future
import akka.dispatch.ExecutionContext
Expand Down Expand Up @@ -43,34 +43,22 @@ class ChannelOwnerSpec extends BasicAmqpTestSpec {
instance ! QueueBind(queue, "amq.direct", "my_test_key")
instance ! Publish("amq.direct", "my_test_key", "yo!".getBytes)
// check that there is 1 message in the queue
val check1 = Await.result(
val Amqp.Ok(_, Some(check1:Queue.DeclareOk)) = Await.result(
instance.ask(DeclareQueue(QueueParameters(queue, passive = true))),
1 second)
println(check1)
check1 match {
case ok: Queue.DeclareOk => assert(ok.getMessageCount == 1)
case Amqp.Error(_, cause) => throw cause
}
assert(check1.getMessageCount === 1)

// purge the queue
instance ! PurgeQueue(queue)
// check that there are no more messages in the queue
val check2 = Await.result(
val Amqp.Ok(_, Some(check2:Queue.DeclareOk)) = Await.result(
instance.ask(DeclareQueue(QueueParameters(queue, passive = true))),
1 second)
check2 match {
case ok: Queue.DeclareOk => assert(ok.getMessageCount == 0)
case Amqp.Error(_, cause) => throw cause
}
assert(check2.getMessageCount === 0)
// delete the queue
val check3 = Await.result(
val Amqp.Ok(_, Some(check3:Queue.DeleteOk)) = Await.result(
instance.ask(DeleteQueue(queue)),
1 second)
println(check3)
check3 match {
case ok: Queue.DeleteOk => {}
case Amqp.Error(_, cause) => throw cause
}
system.stop(conn)
}
"implement basic error handling" in {
Expand All @@ -83,7 +71,7 @@ class ChannelOwnerSpec extends BasicAmqpTestSpec {
instance.ask(DeclareQueue(QueueParameters("no_such_queue", passive = true))),
1 second)
println(check1)
assert(check1.getClass == classOf[Amqp.Error])
assert(check1.getClass === classOf[Amqp.Error])
system.stop(conn)
}
}
Expand Down

0 comments on commit 25163b2

Please sign in to comment.