Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tag: heroku-devcent…
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 44 lines (35 sloc) 1.456 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
package com.typesafe.webwords.common

import akka.actor._
import akka.dispatch.Future
import akka.amqp
import akka.amqp.AMQP
import akka.amqp.rpc.RPC

/**
* This actor wraps the work queue on the worker process side.
*/
abstract class WorkQueueWorkerActor(url: Option[String] = None)
    extends AbstractWorkQueueActor(url) {

    private[this] var rpcServer: Option[RPC.RpcServerHandle] = None

    protected def handleRequest(request: WorkQueueRequest): Future[WorkQueueReply]

    override def receive = {
        case request: WorkQueueRequest =>
            self.channel.replyWith(handleRequest(request))

        case m =>
            super.receive.apply(m)
    }

    override def createRpc(connectionActor: ActorRef) = {
        val serializer =
            new RPC.RpcServerSerializer[WorkQueueRequest, WorkQueueReply](WorkQueueRequest.fromBinary, WorkQueueReply.toBinary)
        def requestHandler(request: WorkQueueRequest): WorkQueueReply = {
            // having to block here is not ideal
            // https://www.assembla.com/spaces/akka/tickets/1217
            (self ? request).as[WorkQueueReply].get
        }
        // the need for poolSize>1 is an artifact of having to block in requestHandler above
        rpcServer = Some(RPC.newRpcServer(connectionActor, rpcExchangeName, serializer, requestHandler, poolSize = 8))
    }

    override def destroyRpc = {
        rpcServer foreach { _.stop }
        rpcServer = None
    }
}
Something went wrong with that request. Please try again.