-
Notifications
You must be signed in to change notification settings - Fork 1
/
QueueListener.scala
60 lines (43 loc) · 1.5 KB
/
QueueListener.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package io.ticofab.scalarabbitmqexample.actor
import akka.actor.{Actor, ActorLogging, Props}
import com.spingo.op_rabbit.Directives._
import com.spingo.op_rabbit.PlayJsonSupport._
import com.spingo.op_rabbit.{RabbitControl, Subscription, SubscriptionRef}
import com.typesafe.config.ConfigFactory
import io.ticofab.scalarabbitmqexample.actor.QueueListener.{CloseYourEars, Listen}
import io.ticofab.scalarabbitmqexample.model.MyObject
import scala.concurrent.ExecutionContext.Implicits.global
object QueueListener {
case object Listen
case object CloseYourEars
def props = Props(new QueueListener)
}
class QueueListener extends Actor with ActorLogging {
// read info from configuration
val conf = ConfigFactory.load()
val QUEUE = conf.getString("op-rabbit.my-queue")
// instantiate a rabbit mq controller
val RABBIT_CONTROL = context.actorOf(Props[RabbitControl])
// references to the queue subscriptions
var myQueueSubscription: Option[SubscriptionRef] = None
override def receive: Receive = {
case Listen =>
// initialize a queue subscription
myQueueSubscription = Some(
Subscription.run(RABBIT_CONTROL) {
channel(qos = 3) {
consume(queue(QUEUE)) {
body(as[MyObject]) {
(obj) =>
log.debug(s"received my object $obj")
ack
}
}
}
}
)
case CloseYourEars =>
// close the subscription
myQueueSubscription.foreach(_.close())
}
}