Permalink
Browse files

Merge branch 'master' of github.com:jdegoes/blueeyes

Conflicts:
	src/main/scala/blueeyes/actor/ActorMPimps.scala
	src/main/scala/blueeyes/actor/ActorPimps.scala
  • Loading branch information...
2 parents 0cb03c1 + 93dde7c commit d4ce6401a4ee030a2d4d0daf5645c7782bcaf66a @jdegoes committed Jan 3, 2012
Showing with 1,997 additions and 2,877 deletions.
  1. +10 −8 build.sbt
  2. +1 −1 project/build.properties
  3. +2 −2 src/main/scala/blueeyes/BlueEyesServiceBuilder.scala
  4. +5 −4 src/main/scala/blueeyes/actor/ActorPimps.scala
  5. +1 −1 src/main/scala/blueeyes/actor/package.scala
  6. +21 −33 src/main/scala/blueeyes/benchmark/Benchmark.scala
  7. +18 −20 src/main/scala/blueeyes/benchmark/BenchmarkServer.scala
  8. +46 −13 src/main/scala/blueeyes/bkka/Stoppable.scala
  9. +20 −0 src/main/scala/blueeyes/bkka/package.scala
  10. +0 −578 src/main/scala/blueeyes/concurrent/Future.scala
  11. +21 −0 src/main/scala/blueeyes/concurrent/FutureDispatch.scala
  12. +3 −5 src/main/scala/blueeyes/concurrent/ReadWriteLock.scala
  13. +0 −138 src/main/scala/blueeyes/concurrent/ScheduledExecutor.scala
  14. +30 −36 src/main/scala/blueeyes/concurrent/test/FutureMatchers.scala
  15. +17 −14 src/main/scala/blueeyes/core/data/AggregatedByteChunk.scala
  16. +7 −5 src/main/scala/blueeyes/core/data/BijectionsChunkByteArray.scala
  17. +9 −7 src/main/scala/blueeyes/core/data/BijectionsChunkJson.scala
  18. +7 −5 src/main/scala/blueeyes/core/data/BijectionsChunkString.scala
  19. +6 −4 src/main/scala/blueeyes/core/data/BijectionsChunkXML.scala
  20. +1 −1 src/main/scala/blueeyes/core/data/Chunk.scala
  21. +20 −19 src/main/scala/blueeyes/core/data/CompressedByteChunk.scala
  22. +32 −28 src/main/scala/blueeyes/core/data/{FileChunk.scala → FileSource.scala}
  23. +1 −0 src/main/scala/blueeyes/core/http/HttpResponse.scala
  24. +3 −1 src/main/scala/blueeyes/core/http/HttpStatusCode.scala
  25. +6 −6 src/main/scala/blueeyes/core/http/combinators/HttpRequestCombinators.scala
  26. +37 −0 src/main/scala/blueeyes/core/http/test/HttpRequestMatchers.scala
  27. +10 −18 src/main/scala/blueeyes/core/service/ConfigurableHttpClient.scala
  28. +3 −5 src/main/scala/blueeyes/core/service/HttpClient.scala
  29. +6 −14 src/main/scala/blueeyes/core/service/HttpClientByteChunk.scala
  30. +1 −1 src/main/scala/blueeyes/core/service/HttpRequestHandlerCombinators.scala
  31. +66 −42 src/main/scala/blueeyes/core/service/HttpRequestLogger.scala
  32. +6 −5 src/main/scala/blueeyes/core/service/HttpResponseHelpers.scala
  33. +55 −100 src/main/scala/blueeyes/core/service/HttpServer.scala
  34. +13 −11 src/main/scala/blueeyes/core/service/HttpServices.scala
  35. 0 src/main/scala/blueeyes/core/service/{HttpServiceStatus.scala → RunningStatus.scala}
  36. +3 −3 src/main/scala/blueeyes/core/service/ServerHealthMonitorService.scala
  37. +10 −7 src/main/scala/blueeyes/core/service/{HttpServiceBuilder.scala → ServiceBuilder.scala}
  38. 0 src/main/scala/blueeyes/core/service/{HttpServiceContext.scala → ServiceContext.scala}
  39. +10 −14 src/main/scala/blueeyes/core/service/{HttpServiceDescriptor.scala → ServiceDescriptor.scala}
  40. +39 −24 ...rvice/{HttpServiceDescriptorFactoryCombinators.scala → ServiceDescriptorFactoryCombinators.scala}
  41. 0 src/main/scala/blueeyes/core/service/{HttpServiceVersion.scala → ServiceVersion.scala}
  42. +10 −20 src/main/scala/blueeyes/core/service/engines/AbstractNettyEngine.scala
  43. +40 −39 src/main/scala/blueeyes/core/service/engines/HttpClientEngines.scala
  44. +18 −13 src/main/scala/blueeyes/core/service/engines/HttpNettyChunkedRequestHandler.scala
  45. +1 −1 src/main/scala/blueeyes/core/service/engines/HttpNettyConverters.scala
  46. +60 −43 src/main/scala/blueeyes/core/service/engines/HttpNettyRequestHandler.scala
  47. +4 −4 src/main/scala/blueeyes/core/service/engines/HttpNettyServers.scala
  48. +1 −1 src/main/scala/blueeyes/core/service/engines/NettyEngine.scala
  49. +3 −3 src/main/scala/blueeyes/core/service/engines/NettyServer.scala
  50. +9 −11 src/main/scala/blueeyes/core/service/engines/security/JavaKeyTool.scala
  51. +1 −1 src/main/scala/blueeyes/core/service/package.scala
  52. +22 −30 src/main/scala/blueeyes/core/service/test/BlueEyesServiceSpecification.scala
  53. +14 −25 src/main/scala/blueeyes/demo/BlueEyesClientDemo.scala
  54. +16 −14 src/main/scala/blueeyes/demo/BlueEyesDemo.scala
  55. +4 −3 src/main/scala/blueeyes/health/CompositeHealthMonitor.scala
  56. +1 −1 src/main/scala/blueeyes/health/ExportedStatistic.scala
  57. +6 −8 src/main/scala/blueeyes/health/FunctionsMonitor.scala
  58. +10 −8 src/main/scala/blueeyes/health/HealthMonitor.scala
  59. +24 −18 src/main/scala/blueeyes/health/IntervalHealthMonitor.scala
  60. +19 −7 src/main/scala/blueeyes/health/metrics/Statistic.scala
  61. +0 −31 src/main/scala/blueeyes/health/metrics/TimedAverageStat.scala
  62. +0 −17 src/main/scala/blueeyes/health/metrics/TimedCountStat.scala
  63. +0 −17 src/main/scala/blueeyes/health/metrics/TimedErrorStat.scala
  64. +11 −8 src/main/scala/blueeyes/health/metrics/TimedNumbersSample.scala
  65. +33 −15 src/main/scala/blueeyes/health/metrics/TimedSample.scala
  66. +119 −0 src/main/scala/blueeyes/health/metrics/TimedStats.scala
  67. +0 −49 src/main/scala/blueeyes/health/metrics/TimedTimerStat.scala
  68. +4 −4 src/main/scala/blueeyes/health/metrics/Timer.scala
  69. +0 −69 src/main/scala/blueeyes/json/Json2.scala
  70. +0 −2 src/main/scala/blueeyes/json/xschema/JodaSerialization.scala
  71. +34 −19 src/main/scala/blueeyes/persistence/cache/Stage.scala
  72. +9 −8 src/main/scala/blueeyes/persistence/mongo/ConfigurableMongo.scala
  73. +13 −6 src/main/scala/blueeyes/persistence/mongo/Mongo.scala
  74. BIN src/main/scala/blueeyes/persistence/mongo/MongoDemo.scala
  75. +6 −5 src/main/scala/blueeyes/persistence/mongo/MongoPatches.scala
  76. +5 −2 src/main/scala/blueeyes/persistence/mongo/MongoStage.scala
  77. +59 −46 src/main/scala/blueeyes/persistence/mongo/RealMongoImplementation.scala
  78. +22 −15 src/main/scala/blueeyes/persistence/mongo/mock/MockMongoImplementation.scala
  79. +16 −21 src/main/scala/blueeyes/persistence/mongo/mock/MockTest.scala
  80. +1 −1 src/main/scala/blueeyes/util/Clock.scala
  81. +0 −23 src/main/scala/blueeyes/util/logging/LoggingHelper.scala
  82. +5 −3 src/main/scala/blueeyes/util/logging/RequestLogger.scala
  83. +14 −0 src/test/resources/application.conf
  84. +2 −2 src/test/scala/blueeyes/actor/ActorMSpec.scala
  85. +2 −2 src/test/scala/blueeyes/actor/ActorSpec.scala
  86. +13 −5 src/test/scala/blueeyes/bkka/StoppableSpec.scala
  87. +0 −55 src/test/scala/blueeyes/concurrent/FutureImplicitsSpec.scala
  88. +0 −218 src/test/scala/blueeyes/concurrent/FutureSpec.scala
  89. +0 −86 src/test/scala/blueeyes/concurrent/ScheduledExecutorSpec.scala
  90. +9 −7 src/test/scala/blueeyes/core/data/AggregatedByteChunkSpec.scala
  91. +11 −6 src/test/scala/blueeyes/core/data/BijectionsChunkByteArraySpec.scala
  92. +42 −33 src/test/scala/blueeyes/core/data/FileChunkSpec.scala
  93. +10 −8 src/test/scala/blueeyes/core/data/GZIPByteChunkSpec.scala
  94. +9 −8 src/test/scala/blueeyes/core/data/ZLIBByteChunkSpec.scala
  95. +8 −13 src/test/scala/blueeyes/core/http/combinators/HttpRequestCombinatorsSpec.scala
  96. +14 −7 src/test/scala/blueeyes/core/service/HttpClientByteChunkSpec.scala
  97. +3 −3 src/test/scala/blueeyes/core/service/HttpClientSpec.scala
  98. +102 −70 src/test/scala/blueeyes/core/service/HttpRequestHandlerCombinatorsSpec.scala
  99. +28 −23 src/test/scala/blueeyes/core/service/HttpRequestLoggerSpec.scala
  100. +29 −13 src/test/scala/blueeyes/core/service/HttpResponseHelpersSpec.scala
  101. +28 −15 src/test/scala/blueeyes/core/service/HttpServerSpec.scala
  102. +8 −7 src/test/scala/blueeyes/core/service/HttpServiceBuilderSpec.scala
  103. +24 −33 src/test/scala/blueeyes/core/service/HttpServiceDescriptorFactoryCombinatorsSpec.scala
  104. +6 −5 src/test/scala/blueeyes/core/service/MetadataSpec.scala
  105. +15 −16 src/test/scala/blueeyes/core/service/ServerHealthMonitorServiceSpec.scala
  106. +5 −5 src/test/scala/blueeyes/core/service/ServiceDocumenterSpec.scala
  107. +133 −120 src/test/scala/blueeyes/core/service/engines/HttpClientXLightWebSpec.scala
  108. +10 −5 src/test/scala/blueeyes/core/service/engines/HttpNettyChunkedRequestHandlerSpec.scala
  109. +14 −9 src/test/scala/blueeyes/core/service/engines/HttpNettyRequestHandlerSpec.scala
  110. +106 −88 src/test/scala/blueeyes/core/service/engines/HttpServerNettySpec.scala
  111. +17 −22 src/test/scala/blueeyes/core/service/test/BlueEyesServiceSpecificationSpec.scala
  112. +25 −23 src/test/scala/blueeyes/demo/BlueEyesDemoServiceSpec.scala
  113. +16 −12 src/test/scala/blueeyes/health/IntervalHealthMonitorSpec.scala
  114. +6 −12 src/test/scala/blueeyes/health/metrics/TimedAverageStatSpec.scala
  115. +4 −12 src/test/scala/blueeyes/health/metrics/TimedCountStatSpec.scala
  116. +4 −4 src/test/scala/blueeyes/health/metrics/TimedEternityAverageStatSpec.scala
  117. +6 −4 src/test/scala/blueeyes/health/metrics/TimedEternityCountStatSpec.scala
  118. +7 −7 src/test/scala/blueeyes/health/metrics/TimedSampleSpec.scala
  119. +6 −4 src/test/scala/blueeyes/health/metrics/TimedTimerStatSpec.scala
  120. +7 −6 src/test/scala/blueeyes/health/metrics/TimerTest.scala
  121. +13 −28 src/test/scala/blueeyes/persistence/cache/StageProfile.scala
  122. +27 −34 src/test/scala/blueeyes/persistence/cache/StageSpec.scala
  123. +2 −2 src/test/scala/blueeyes/persistence/cache/functional/StageSpec.scala
  124. +6 −4 src/test/scala/blueeyes/persistence/mongo/MongoPatchesSpec.scala
  125. +5 −6 src/test/scala/blueeyes/persistence/mongo/MongoSpec.scala
  126. +28 −23 src/test/scala/blueeyes/persistence/mongo/MongoStageSpec.scala
  127. +18 −23 src/test/scala/blueeyes/persistence/mongo/RealMongoBenchmarkSpec.scala
  128. +1 −1 src/test/scala/blueeyes/persistence/mongo/mock/MockDatabaseCollectionSpec.scala
  129. +5 −2 src/test/scala/blueeyes/persistence/mongo/mock/MockMongoDatabaseSpec.scala
  130. +3 −2 src/test/scala/blueeyes/util/logging/RequestLoggerSpec.scala
View
@@ -1,6 +1,6 @@
name := "blueeyes"
-version := "0.5.0-SNAPSHOT"
+version := "0.6.0-SNAPSHOT"
organization := "com.reportgrid"
@@ -15,18 +15,20 @@ libraryDependencies ++= Seq(
"net.lag" % "configgy" % "2.0.0" intransitive(),
"org.jboss.netty" % "netty" % "3.2.6.Final",
"org.mongodb" % "mongo-java-driver" % "2.7.2",
- "se.scalablesolutions.akka" % "akka-actor" % "1.2",
- "se.scalablesolutions.akka" % "akka-typed-actor" % "1.2",
+ "com.typesafe.akka" % "akka-actor" % "2.0-M1",
"org.xlightweb" % "xlightweb" % "2.13.2",
"rhino" % "js" % "1.7R2",
"javolution" % "javolution" % "5.5.1",
"org.scalaz" %% "scalaz-core" % "6.0.2",
- "org.specs2" %% "specs2" % "1.7-SNAPSHOT" % "provided",
+ "com.weiglewilczek.slf4s" %% "slf4s" % "1.0.7",
+ "org.specs2" %% "specs2" % "1.7" % "provided",
"org.mockito" % "mockito-all" % "1.8.5" % "provided",
"org.scala-tools.testing" %% "scalacheck" % "1.9" % "provided"
)
resolvers ++= Seq(
+ "ReportGrid repo" at "http://devci01.reportgrid.com:8081/content/repositories/releases",
+ "ReportGrid snapshot repo" at "http://devci01.reportgrid.com:8081/content/repositories/snapshots",
"Scala-Tools Releases" at "http://scala-tools.org/repo-releases/",
"Scala-Tools Snapshots" at "http://scala-tools.org/repo-snapshots/",
"Akka Repository" at "http://akka.io/repository/",
@@ -40,9 +42,9 @@ resolvers ++= Seq(
parallelExecution in Test := false
publishTo <<= (version) { version: String =>
- val nexus = "http://nexus.scala-tools.org/content/repositories/"
- if (version.trim.endsWith("SNAPSHOT")) Some("snapshots" at nexus+"snapshots/")
- else Some("releases" at nexus+"releases/")
+ val nexus = "http://nexus.reportgrid.com/content/repositories/"
+ if (version.trim.endsWith("SNAPSHOT")) Some("snapshots" at nexus+"public-snapshots/")
+ else Some("releases" at nexus+"public-releases/")
}
-credentials := Credentials(Path.userHome / ".ivy2" / ".credentials") :: Nil
+credentials := Credentials(Path.userHome / ".ivy2" / ".rgcredentials") :: Nil
View
@@ -1 +1 @@
-sbt.version=0.11.1
+sbt.version=0.11.2
@@ -3,7 +3,7 @@ package blueeyes
import blueeyes.core.http._
import blueeyes.core.data._
import blueeyes.core.service._
-import concurrent.{FutureImplicits}
+import blueeyes.bkka.AkkaDefaults
/** Convenience trait for building services with many common mixins.
* <pre>
@@ -57,7 +57,7 @@ import concurrent.{FutureImplicits}
* </pre>
*/
trait BlueEyesServiceBuilderBase[T] extends ServiceBuilder[T] with
- FutureImplicits with
+ AkkaDefaults with
HttpHeaderImplicits with
HttpStatusImplicits with
HttpStatusCodeImplicits with
@@ -3,7 +3,8 @@ package blueeyes.actor
import scalaz._
import scalaz.Scalaz._
-import blueeyes.concurrent.Future
+import akka.dispatch.Future
+import akka.dispatch.MessageDispatcher
trait ActorPimps {
import ActorFunctions._
@@ -158,8 +159,8 @@ trait ActorPimps {
/** Converts a synchronous actor into an aynchronous actor.
*/
- def async: ActorAsync[A, B] = ActorTFunctions.receive[Future, A, B] { a: A =>
- Future.async(value ! a) map {
+ def async(implicit dispatcher: MessageDispatcher): ActorAsync[A, B] = ActorTFunctions.receive[Future, A, B] { a: A =>
+ Future(value ! a) map {
case (b, next) =>
(b, next.async)
}
@@ -228,4 +229,4 @@ trait ActorPimps {
def variant[AA <: A, BB >: B]: (BB, Actor[AA, BB]) = value.mapElements((b: B) => (b : BB), _.variant[AA, BB])
}
}
-object ActorPimps extends ActorPimps
+object ActorPimps extends ActorPimps
@@ -1,6 +1,6 @@
package blueeyes
-import blueeyes.concurrent.Future
+import akka.dispatch.Future
package object actor {
type ActorState[A, B] = (B, Actor[A, B])
@@ -1,15 +1,18 @@
package blueeyes.benchmark
+import akka.dispatch.Future
+import akka.dispatch.Await
+import akka.util.Duration
+
+import blueeyes.bkka.AkkaDefaults
import blueeyes.core.service.engines.HttpClientXLightWeb
-import blueeyes.concurrent.Future
-import net.lag.configgy.Configgy
+import blueeyes.demo.{Contact, BlueEyesDemoFacade}
import blueeyes.health.metrics.Timer
-import java.util.concurrent.{CountDownLatch, ThreadPoolExecutor, SynchronousQueue, TimeUnit}
import blueeyes.json.JsonAST.JValue
-import blueeyes.demo.{Contact, BlueEyesDemoFacade}
-object Benchmark extends ServerStart{ self =>
+import java.util.concurrent.{CountDownLatch, ThreadPoolExecutor, SynchronousQueue, TimeUnit}
+object Benchmark extends BenchmarkServer with AkkaDefaults { self =>
val executorService = new ThreadPoolExecutor(20, 100, 10*60, TimeUnit.SECONDS, new SynchronousQueue())
def main(args: Array[String]){
@@ -26,6 +29,7 @@ object Benchmark extends ServerStart{ self =>
healthReport
clientReport(timer)
}
+
private def clientReport(timer: Timer){
println("********************************")
println("* Client Health report *")
@@ -43,6 +47,7 @@ object Benchmark extends ServerStart{ self =>
"""********************************
* Service Health report *
********************************"""
+
val serverHealthTitle =
"""********************************
* Server Health report *
@@ -55,15 +60,13 @@ object Benchmark extends ServerStart{ self =>
private def responseReport(future: Future[Option[JValue]], title: String) = {
val taskCounDown = new CountDownLatch(1)
- future.deliverTo(response =>{
- println(title)
- println(blueeyes.json.Printer.pretty(blueeyes.json.JsonDSL.render(response.get)))
- taskCounDown.countDown
- })
- future.ifCanceled{v =>
- taskCounDown.countDown
- }
- taskCounDown.await
+ Await.result(
+ future onSuccess { case Some(jvalue) =>
+ println(title)
+ println(blueeyes.json.Printer.pretty(blueeyes.json.JsonDSL.render(jvalue)))
+ },
+ Duration.Inf
+ )
}
private def benchmark[T](connectionCount: Int, f: Timer => T, duration: Int): T= f(benchmark(new ContactStream().apply().take(connectionCount).toList, duration))
@@ -77,11 +80,7 @@ object Benchmark extends ServerStart{ self =>
}
private def runBenchmarkTasks(contacts: List[Contact], timer: Timer, duration: Int){
- val benchmarkFutures = Future[Unit](contacts.map(startBenchmarkTask(_, timer, duration).future): _*)
-
- val countDownLatch = new CountDownLatch(1)
- benchmarkFutures.deliverTo(v => countDownLatch.countDown)
- countDownLatch.await
+ contacts.map(startBenchmarkTask(_, timer, duration))
}
private def startBenchmarkTask(contact: Contact, timer: Timer, duration: Int) = {
@@ -91,31 +90,20 @@ object Benchmark extends ServerStart{ self =>
}
}
-class BenchmarkTask(val clientFacade: BlueEyesDemoFacade, val contact: Contact, val timer: Timer, durationInSecs: Int) extends HttpClientXLightWeb with Runnable {
- val future = new Future[Unit]()
+class BenchmarkTask(val clientFacade: BlueEyesDemoFacade, val contact: Contact, val timer: Timer, durationInSecs: Int) extends HttpClientXLightWeb with Runnable with AkkaDefaults {
def run = {
val start = System.currentTimeMillis
while (System.currentTimeMillis - start < durationInSecs * 1000){
process({c: Contact => clientFacade.create(c)})
process({c: Contact => clientFacade.remove(c.name)})
}
-
- future.deliver(())
}
private def process[T](f: Contact => Future[T]) = {
val countDown = new CountDownLatch(1)
- timer.time{
- val future = f(contact)
- future.deliverTo(response =>{
- countDown.countDown
- })
- future.ifCanceled{v =>
- countDown.countDown
- v.foreach(_.printStackTrace)
- }
- countDown.await
+ timer.time {
+ Await.result(f(contact) onFailure { case ex => ex.printStackTrace }, Duration.Inf)
}
}
}
@@ -1,49 +1,47 @@
package blueeyes.benchmark
+import akka.util.Timeout
+
import net.lag.configgy.Configgy
import java.util.concurrent.CountDownLatch
import blueeyes.BlueEyesServer
import blueeyes.core.service.ServerHealthMonitorService
import blueeyes.persistence.mongo.RealMongo
import blueeyes.core.service.engines.HttpClientXLightWeb
import blueeyes.demo.{BlueEyesDemoFacade, BlueEyesDemoService, BlueEyesDemo}
+import com.weiglewilczek.slf4s.Logging
-object BenchmarkServerStart extends ServerStart {
+object BenchmarkServerStart extends BenchmarkServer {
def main(args: Array[String]) = startServer(if (args.size > 0) args(0).toBoolean else false)
}
object LiveBlueEyesDemo extends BlueEyesServer with BlueEyesDemoService with ServerHealthMonitorService {
override def main(args: Array[String]) = super.main(Array("--configFile", "/etc/default/blueeyes.conf"))
}
-trait ServerStart{ self =>
+trait BenchmarkServer extends Logging { self =>
private var server: Option[BlueEyesServer] = _
+
var port = 8585
+
private val configPattern = """server{
- port = %d
- sslPort = %d
-}"""
- def startServer(liveDemo: Boolean){
- var error: Option[Throwable] = None
- do{
- val doneSignal = new CountDownLatch(1)
+ port = %d
+ sslPort = %d
+ }"""
+ def startServer(liveDemo: Boolean) {
+ def start(port: Int) {
Configgy.configureFromString(configPattern.format(port, port + 1))
server = Some(if (liveDemo) LiveBlueEyesDemo else BlueEyesDemo)
- val startFuture = server.get.start
- startFuture.deliverTo { _ =>
- error = None
- doneSignal.countDown()
- }
- startFuture.ifCanceled{v =>
- error = v
- port = port + 2
- doneSignal.countDown()
+ server.get.start onFailure {
+ case v =>
+ logger.error("Server failed to start, trying port " + (port + 2))
+ start(port + 2)
}
- }while(error != None)
+ }
- error.foreach(throw _)
+ start(port)
}
def stopServer = server.foreach(_.stop)
@@ -1,8 +1,18 @@
package blueeyes.bkka
import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.ActorSystem
+import akka.actor.PoisonPill
+import akka.actor.Props
+import akka.actor.Terminated
+import akka.actor.ReceiveTimeout
import akka.dispatch.Future
+import akka.dispatch.Promise
+import akka.dispatch.MessageDispatcher
+import akka.util.Timeout
import scala.collection.immutable.Queue
+import com.weiglewilczek.slf4s.Logging
/**
* A base trait for typeclass instances that describe how to stop a given type of service.
@@ -23,18 +33,15 @@ sealed trait Stoppable {
def dependents: List[Stoppable]
}
-object Stoppable {
- def apply[A](a: A, deps: List[Stoppable] = Nil)(implicit stopa: Stop[A], timeout: Actor.Timeout): Stoppable = new Stoppable {
+object Stoppable extends Logging {
+ def apply[A](a: A, deps: List[Stoppable] = Nil)(implicit stopa: Stop[A], messageDispatcher: MessageDispatcher): Stoppable = new Stoppable {
protected def stop = {
- Future(println("About to stop " + a)) flatMap { _ =>
- stopa.stop(a).onResult {
- case v => println("Stopped " + a + " with result " + v)
+ Future(logger.info("About to stop " + a)) flatMap { _ =>
+ stopa.stop(a).onSuccess {
+ case v => logger.info("Stopped " + a + " with result " + v)
} recover { case ex =>
- println("Stop failed: ")
- ex.printStackTrace
- } onTimeout { future =>
- println("Stop timed out for " + a + " after timeout of " + (future.timeoutInNanos / 1000 / 1000) + " ms")
- }
+ logger.error("Stop failed", ex)
+ }
}
}
@@ -51,21 +58,47 @@ object Stoppable {
* in stopping will stop the stopping process, leaving the system in a potentially
* indeterminate state.
*/
- implicit def stoppableStop(implicit timeout: Actor.Timeout): Stop[Stoppable] = new Stop[Stoppable] {
+ implicit def stoppableStop(implicit messageDispatcher: MessageDispatcher) = new Stop[Stoppable] {
def stop(stoppable: Stoppable) = {
def _stop(q: Queue[List[Stoppable]]): Future[List[Any]] = {
if (q.isEmpty) Future(Nil)
else {
val (xs, remainder) = q.dequeue
- Future.sequence(xs.map(_.stop), timeout.duration.toMillis).flatMap(r => _stop(remainder ++ xs.map(_.dependents)).map(r ::: _))
+ Future.sequence(xs.map(_.stop)).flatMap(r => _stop(remainder ++ xs.map(_.dependents)).map(r ::: _))
}
}
_stop(Queue(List(stoppable)))
}
}
- def stop(stoppable: Stoppable)(implicit timeout: Actor.Timeout) = stoppableStop(timeout).stop(stoppable)
+ def stop(stoppable: Stoppable)(implicit messageDispatcher: MessageDispatcher) = stoppableStop.stop(stoppable)
+}
+
+case class ActorRefStop(actorSystem: ActorSystem, timeout: Timeout) extends Stop[ActorRef] {
+ class StopMonitor(target: ActorRef, result: Promise[Unit], timeout: Timeout) extends Actor {
+ // Terminated will be received when target has been stopped
+ context watch target
+ // ReceiveTimeout will be received if nothing else is received within the timeout
+ context.setReceiveTimeout(timeout.duration)
+
+ def receive = {
+ case Terminated(a)
+ result.complete(Right(()))
+ self ! PoisonPill
+
+ case ReceiveTimeout
+ result.complete(Left(new RuntimeException("Failed to stop [%s] within [%s]".format(target.path, context.receiveTimeout))))
+ self ! PoisonPill
+ }
+ }
+
+ override def stop(target: ActorRef): Future[Unit] = {
+ val exitFuture = Promise[Unit]()(actorSystem.dispatcher)
+ val stopMonitor = actorSystem.actorOf(Props(new StopMonitor(target, exitFuture, timeout)))
+ target ! PoisonPill
+ exitFuture
+ }
}
@@ -0,0 +1,20 @@
+package blueeyes
+
+import akka.dispatch.Future
+import akka.dispatch.MessageDispatcher
+
+package object bkka {
+ case class ~[A, B](a: A, b: B)
+
+ class RichFuture[A](future: Future[A]) {
+ def ~[B](other: Future[B])(implicit dispatcher: MessageDispatcher): Future[A ~ B] = {
+ Future.sequence(future :: other :: Nil) map {
+ case x :: y :: Nil => new ~(x.asInstanceOf[A], y.asInstanceOf[B])
+ }
+ }
+ }
+
+ implicit def f2rf[A](f: Future[A]): RichFuture[A] = new RichFuture(f)
+}
+
+// vim: set ts=4 sw=4 et:
Oops, something went wrong.

0 comments on commit d4ce640

Please sign in to comment.