Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tag: heroku-devcent…
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 41 lines (33 sloc) 1.28 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
package com.typesafe.webwords.common

import akka.actor._
import akka.amqp
import akka.amqp.AMQP
import akka.amqp.rpc.RPC

/**
* This actor wraps the work queue on the "client" side (in the web process).
*/
class WorkQueueClientActor(url: Option[String] = None)
    extends AbstractWorkQueueActor(url) {

    private[this] var rpcClient: Option[RPC.RpcClient[WorkQueueRequest, WorkQueueReply]] = None

    override def receive = {
        case request: WorkQueueRequest =>
            val savedChannel = self.channel
            rpcClient.get.callAsync(request, timeout = 60 * 1000)({
                case Some(reply) =>
                    savedChannel.tryTell(reply)(self)
                case None =>
                    savedChannel.sendException(new Exception("no reply to: " + request))
            })

        case m =>
            super.receive.apply(m)
    }

    override def createRpc(connectionActor: ActorRef) = {
        val serializer =
            new RPC.RpcClientSerializer[WorkQueueRequest, WorkQueueReply](WorkQueueRequest.toBinary, WorkQueueReply.fromBinary)
        rpcClient = Some(RPC.newRpcClient(connectionActor, rpcExchangeName, serializer))
    }

    override def destroyRpc = {
        rpcClient foreach { _.stop }
        rpcClient = None
    }
}
Something went wrong with that request. Please try again.