From b32ecaf06fa6f69906f721ad8d79a412c40bdea1 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 27 Sep 2011 12:06:30 +0200 Subject: [PATCH] Non-blocking by creating new actors and only using tell --- .../src/main/scala/sample/Backend.scala | 44 ++++++++++++++++--- .../src/main/scala/sample/Frontend.scala | 27 ++++++++++-- 2 files changed, 60 insertions(+), 11 deletions(-) diff --git a/web-to-backend/src/main/scala/sample/Backend.scala b/web-to-backend/src/main/scala/sample/Backend.scala index c4199b1..e28d731 100644 --- a/web-to-backend/src/main/scala/sample/Backend.scala +++ b/web-to-backend/src/main/scala/sample/Backend.scala @@ -6,14 +6,17 @@ import akka.actor.ActorRef import akka.dispatch.Dispatchers import akka.routing.CyclicIterator import akka.routing.Routing +import akka.actor.PoisonPill +import akka.config.Supervision +import akka.actor.ReceiveTimeout object Backend { case class TranslationRequest(text: String) case class TranslationResponse(text: String, words: Int) - private val backendDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("backend-dispatcher") - .setCorePoolSize(4) + val backendDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("backend-dispatcher") + .setCorePoolSize(7) .build val translationService = loadBalanced(10, actorOf[TranslationService]) @@ -31,12 +34,39 @@ object Backend { def receive = { case TranslationRequest(text) ⇒ - val future1 = (translator ? text) - val translatedText = future1.get.asInstanceOf[String] - val future2 = (counter ? text) - val words = future2.get.asInstanceOf[Int] + for (replyTo ← self.sender) { + val aggregator = actorOf(new Aggregator(replyTo)).start() + translator.tell(text, aggregator) + counter.tell(text, aggregator) + } + } + } + + class Aggregator(replyTo: ActorRef) extends Actor { + self.dispatcher = backendDispatcher + self.lifeCycle = Supervision.Temporary + self.receiveTimeout = Some(1000) + + var textResult: Option[String] = None + var lengthResult: Option[Int] = None + + def receive = { + case text: String ⇒ + textResult = Some(text) + replyWhenDone() + case length: Int ⇒ + lengthResult = Some(length) + replyWhenDone() + case ReceiveTimeout ⇒ + self.stop() + } + + def replyWhenDone() { + for (text ← textResult; length ← lengthResult) { + replyTo ! TranslationResponse(text, length) + self.stop() + } - self.channel ! TranslationResponse(translatedText, words) } } diff --git a/web-to-backend/src/main/scala/sample/Frontend.scala b/web-to-backend/src/main/scala/sample/Frontend.scala index 433c979..afa764e 100644 --- a/web-to-backend/src/main/scala/sample/Frontend.scala +++ b/web-to-backend/src/main/scala/sample/Frontend.scala @@ -13,6 +13,11 @@ import akka.routing.Routing import sample.Backend.TranslationRequest import sample.Backend.TranslationResponse import sample.Backend.translationService +import akka.actor.Scheduler +import akka.actor.PoisonPill +import java.util.concurrent.TimeUnit.SECONDS +import akka.actor.ReceiveTimeout +import akka.config.Supervision object Frontend { @@ -20,8 +25,9 @@ object Frontend { val Translate = "/translate" } + // private val frontendDispatcher = Backend.backendDispatcher private val frontendDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("frontend-dispatcher") - .setCorePoolSize(4) + .setCorePoolSize(1) .build private def loadBalanced(poolSize: Int, actor: ⇒ ActorRef): ActorRef = { @@ -57,14 +63,27 @@ object Frontend { def receive = { case get: Get ⇒ val text = get.request.getParameter("text") - val TranslationResponse(translatedText, words) = (translationService ? TranslationRequest(text)). - get.asInstanceOf[TranslationResponse] - get.OK("Translated %s words to: %s".format(words, translatedText)) + val responseHandler = actorOf(new ResponseHandler(get)).start() + translationService.tell(TranslationRequest(text), responseHandler) case other: RequestMethod ⇒ other.NotAllowed("Invalid method for this endpoint.") } } + class ResponseHandler(get: Get) extends Actor { + self.dispatcher = frontendDispatcher + self.lifeCycle = Supervision.Temporary + self.receiveTimeout = Some(1000) + + def receive = { + case TranslationResponse(translatedText, words) ⇒ + get.OK("Translated %s words to: %s".format(words, translatedText)) + self.stop() + case ReceiveTimeout ⇒ + get.Timeout("Timeout") + self.stop() + } + } } class Boot {