Skip to content

Commit

Permalink
Non-blocking by creating new actors and only using tell
Browse files Browse the repository at this point in the history
  • Loading branch information
Patrik Nordwall committed Sep 27, 2011
1 parent b8565af commit b32ecaf
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 11 deletions.
44 changes: 37 additions & 7 deletions web-to-backend/src/main/scala/sample/Backend.scala
Expand Up @@ -6,14 +6,17 @@ import akka.actor.ActorRef
import akka.dispatch.Dispatchers
import akka.routing.CyclicIterator
import akka.routing.Routing
import akka.actor.PoisonPill
import akka.config.Supervision
import akka.actor.ReceiveTimeout

object Backend {

case class TranslationRequest(text: String)
case class TranslationResponse(text: String, words: Int)

private val backendDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("backend-dispatcher")
.setCorePoolSize(4)
val backendDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("backend-dispatcher")
.setCorePoolSize(7)
.build

val translationService = loadBalanced(10, actorOf[TranslationService])
Expand All @@ -31,12 +34,39 @@ object Backend {

def receive = {
case TranslationRequest(text)
val future1 = (translator ? text)
val translatedText = future1.get.asInstanceOf[String]
val future2 = (counter ? text)
val words = future2.get.asInstanceOf[Int]
for (replyTo self.sender) {
val aggregator = actorOf(new Aggregator(replyTo)).start()
translator.tell(text, aggregator)
counter.tell(text, aggregator)
}
}
}

class Aggregator(replyTo: ActorRef) extends Actor {
self.dispatcher = backendDispatcher
self.lifeCycle = Supervision.Temporary
self.receiveTimeout = Some(1000)

var textResult: Option[String] = None
var lengthResult: Option[Int] = None

def receive = {
case text: String
textResult = Some(text)
replyWhenDone()
case length: Int
lengthResult = Some(length)
replyWhenDone()
case ReceiveTimeout
self.stop()
}

def replyWhenDone() {
for (text textResult; length lengthResult) {
replyTo ! TranslationResponse(text, length)
self.stop()
}

self.channel ! TranslationResponse(translatedText, words)
}
}

Expand Down
27 changes: 23 additions & 4 deletions web-to-backend/src/main/scala/sample/Frontend.scala
Expand Up @@ -13,15 +13,21 @@ import akka.routing.Routing
import sample.Backend.TranslationRequest
import sample.Backend.TranslationResponse
import sample.Backend.translationService
import akka.actor.Scheduler
import akka.actor.PoisonPill
import java.util.concurrent.TimeUnit.SECONDS
import akka.actor.ReceiveTimeout
import akka.config.Supervision

object Frontend {

object EndpointURI {
val Translate = "/translate"
}

// private val frontendDispatcher = Backend.backendDispatcher
private val frontendDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("frontend-dispatcher")
.setCorePoolSize(4)
.setCorePoolSize(1)
.build

private def loadBalanced(poolSize: Int, actor: ActorRef): ActorRef = {
Expand Down Expand Up @@ -57,14 +63,27 @@ object Frontend {
def receive = {
case get: Get
val text = get.request.getParameter("text")
val TranslationResponse(translatedText, words) = (translationService ? TranslationRequest(text)).
get.asInstanceOf[TranslationResponse]
get.OK("Translated %s words to: %s".format(words, translatedText))
val responseHandler = actorOf(new ResponseHandler(get)).start()
translationService.tell(TranslationRequest(text), responseHandler)
case other: RequestMethod
other.NotAllowed("Invalid method for this endpoint.")
}
}

class ResponseHandler(get: Get) extends Actor {
self.dispatcher = frontendDispatcher
self.lifeCycle = Supervision.Temporary
self.receiveTimeout = Some(1000)

def receive = {
case TranslationResponse(translatedText, words)
get.OK("Translated %s words to: %s".format(words, translatedText))
self.stop()
case ReceiveTimeout
get.Timeout("Timeout")
self.stop()
}
}
}

class Boot {
Expand Down

0 comments on commit b32ecaf

Please sign in to comment.