Skip to content

Commit

Permalink
Fixed race conditions error in router via Future. Better abstraction …
Browse files Browse the repository at this point in the history
…for the job handle collection. Updated router documentation.
  • Loading branch information
aterga committed Apr 2, 2018
1 parent b18b234 commit 6dde360
Showing 1 changed file with 113 additions and 81 deletions.
194 changes: 113 additions & 81 deletions src/main/scala/viper/server/ViperServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,48 @@ object ViperServerRunner {
implicit val executionContext: ExecutionContextExecutor = system.dispatcher


// We can potentially have more than one verification task at the same time.
// A verification task is distinguished via the corresponding ActorRef,
// as well as its unique job_id.

case class JobHandle(controller_actor: ActorRef,
queue: SourceQueueWithComplete[Message],
publisher: Publisher[Message])

private var _job_handles: mutable.Map[Int, Future[JobHandle]] = mutable.Map[Int, Future[JobHandle]]()
private var _next_job_id: Int = 0
val MAX_ACTIVE_JOBS: Int = 3

private def newJobsAllowed = _job_handles.size < MAX_ACTIVE_JOBS

private def bookNewJob(job_executor: Int => Future[JobHandle]): Int = {
val new_jid = _next_job_id
_job_handles(new_jid) = job_executor(new_jid)
_next_job_id = _next_job_id + 1
new_jid
}

private def discardJob(jid: Int): mutable.Map[Int, Future[JobHandle]] = {
_job_handles -= jid
}

/** If the Option is resolved to None, the job does not exist.
* If the Option is resolved to Some(_),
* a) The Future is not yet completed ==> verification in progress.
* b) The Future is already completed ==> job done.
*/
private def lookupJob(jid: Int): Option[ Future[JobHandle] ] = {
_job_handles.get(jid)
}


// --- Actor: Terminator ---

private var _term_actor: ActorRef = _

object Terminator {
case object Exit
case class WatchJob(jid: Int)
case class WatchJob(jid: Int, handle: JobHandle)

def props(bindingFuture: Future[Http.ServerBinding]): Props = Props(new Terminator(bindingFuture))
}
Expand All @@ -62,43 +97,21 @@ object ViperServerRunner {
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
case Terminator.WatchJob(jid) =>
_job_handles.get(jid) match {
case Some(handle@ JobHandle(_, queue, _)) =>
val queue_completion_future: Future[Done] = queue.watchCompletion()
queue_completion_future.onSuccess({ case _ =>
_job_handles -= jid
println(s"Terminator deleted job #$jid")
})
queue_completion_future.onFailure({ case e =>
println(s"Terminator detected failure in job #$jid: $e")
throw e
})
case _ =>
val e = s"Terminator: job #$jid does not exist."
println(e)
throw new Exception(e)
}
case Terminator.WatchJob(jid, handle) =>
val queue_completion_future: Future[Done] = handle.queue.watchCompletion()
queue_completion_future.onSuccess({ case _ =>
discardJob(jid)
println(s"Terminator deleted job #$jid")
})
queue_completion_future.onFailure({ case e =>
println(s"Terminator detected failure in job #$jid: $e")
throw e
})
}
}


// --- Actor: MainActor ---

// We can potentially have more than one verification task at the same time.
// A verification task is distinguished via the corresponding ActorRef,
// as well as its unique job_id.

case class JobHandle(controller_actor: ActorRef,
queue: SourceQueueWithComplete[Message],
publisher: Publisher[Message])

private var _job_handles = mutable.Map[Int, JobHandle]()
private var _next_job_id: Int = 0
private val _max_active_jobs: Int = 3

private def new_jobs_allowed = _job_handles.size < _max_active_jobs

// (See model description in ViperServerProtocol.scala)

object MainActor {
Expand All @@ -122,6 +135,7 @@ object ViperServerRunner {
}

override def receive: PartialFunction[Any, Unit] = {

case Stop(call_back_needed) =>
val did_I_interrupt = interrupt
if (call_back_needed) {
Expand All @@ -132,22 +146,21 @@ object ViperServerRunner {
sender ! s"Job #$id has already been finalized."
}
}

case Verify(args) =>
if (_verificationTask != null && _verificationTask.isAlive) {
_args = args
_verificationTask.interrupt()
_verificationTask.join()
}
_verificationTask = null
verify(args)
sender ! verify(args)

case msg =>
throw new Exception("Main Actor: unexpected message received: " + msg)
}

private def verify(args: List[String]): Unit = {
assert(_verificationTask == null)

// TODO: reimplement with [[SourceQueue]]s and backpressure strategies.
private def verify(args: List[String]): JobHandle = {

// The maximum number of messages in the reporter's message buffer is 10000.
val (queue, publisher) = Source.queue[Message](10000, OverflowStrategy.backpressure).toMat(Sink.asPublisher(false))(Keep.both).run()
Expand All @@ -157,11 +170,9 @@ object ViperServerRunner {
_verificationTask = new Thread(new VerificationWorker(my_reporter, logger.get, args))
_verificationTask.start()

assert(_job_handles.get(id).isEmpty)
_job_handles(id) = JobHandle(self, queue, publisher)
_next_job_id = _next_job_id + 1

println(s"Starting job #$id...")

JobHandle(self, queue, publisher)
}
}

Expand Down Expand Up @@ -213,9 +224,12 @@ object ViperServerRunner {
* */
path("exit") {
get {
val interrupt_future_list: List[Future[String]] = _job_handles map { case (jid, handle@JobHandle(actor, _, _)) =>
implicit val askTimeout: Timeout = Timeout(5000 milliseconds)
(actor ? Stop(true)).mapTo[String]
val interrupt_future_list: List[Future[String]] = _job_handles map { case (jid, handle_future) =>
handle_future.flatMap {
case JobHandle(actor, _, _) =>
implicit val askTimeout: Timeout = Timeout(5000 milliseconds)
(actor ? Stop(true)).mapTo[String]
}
} toList
val overall_interrupt_future: Future[List[String]] = Future.sequence(interrupt_future_list)

Expand All @@ -240,7 +254,8 @@ object ViperServerRunner {
* 1. If the limit for allowed active verification jobs has been reached, complete with an appropriate reject message
* 2. Otherwise, create an actor with a fresh ID and pass the arguments provided by the client
* 3. Send [[ViperServerProtocol.Verify]] message to the newly created instance of [[MainActor]]
* (the actor will add itself as an entry to the [[_job_handles]] collection)
* ([[bookNewJob]] will add the actor as an entry to the [[_job_handles]] collection under a fresh ID;
* @see [[bookNewJob]])
* 4. Complete request with accepting message with the ID of the new verification job
*
* Use case:
Expand All @@ -250,15 +265,18 @@ object ViperServerRunner {
*/
post {
entity(as[VerificationRequest]) { r =>
if (new_jobs_allowed) {
val id = _next_job_id
val main_actor = system.actorOf(MainActor.props(id, logger), s"main_actor_$id")
var arg_list = getArgListFromArgString(r.arg)
main_actor ! ViperServerProtocol.Verify(arg_list)
if (newJobsAllowed) {
val id = bookNewJob((new_jid: Int) => {
implicit val askTimeout: Timeout = Timeout(5000 milliseconds)
val main_actor = system.actorOf(MainActor.props(new_jid, logger), s"main_actor_$new_jid")
var arg_list = getArgListFromArgString(r.arg)
val new_job_handle: Future[JobHandle] = (main_actor ? ViperServerProtocol.Verify(arg_list)).mapTo[JobHandle]
new_job_handle
})
complete( VerificationRequestAccept(id) )

} else {
complete( VerificationRequestReject(s"the maximum number of active verification jobs are currently running (${_max_active_jobs}).") )
complete( VerificationRequestReject(s"the maximum number of active verification jobs are currently running ($MAX_ACTIVE_JOBS).") )
}
}
}
Expand All @@ -269,27 +287,34 @@ object ViperServerRunner {
* <jid> must be an ID of an existing verification job.
*
* This will do the following:
* 1. If no job handle with ID equal to <jid> exists in [[_job_handles]], complete with an appropriate reject message
* 2. Otherwise:
* - Create a [[Source]] <src> full of [[viper.silver.reporter.Message]]s
* - Send [[Terminator.WatchJob]] message to the [[Terminator]] actor, awaiting
* [[SourceQueueWithComplete.watchCompletion]] future before removing current job handle from [[_job_handles]]
* - Complete request with <src>
* 1. If no job handle future with ID equal to <jid> exists in [[_job_handles]], complete with an appropriate reject message
* 2. Otherwise, once the job handle future is complete:
* - If the future completed with a failure, complete with an appropriate reject message
* - If the future completed successfully:
* - Create a [[Source]] <src> full of [[viper.silver.reporter.Message]]s
* - Send [[Terminator.WatchJob]] message to the [[Terminator]] actor, awaiting
* [[SourceQueueWithComplete.watchCompletion]] future before removing current job handle from [[_job_handles]]
* - Complete request with <src>
*
* Use case:
* - Ask ViperServer to begin streaming the results corresponding to the verification job with provided <jid>
*
*/
get {
_job_handles.get(jid) match {
case Some(handle) =>
// Found a job with this jid.
val src: Source[Message, NotUsed] = Source.fromPublisher(handle.publisher)
// We do not remove the current entry from [[_job_handles]] because the handle is
// needed in order to terminate the job before streaming is completed.
// The Terminator actor will delete the entry upon completion of the stream.
_term_actor ! Terminator.WatchJob(jid)
complete(src)
lookupJob(jid) match {
case Some(handle_future) =>
onComplete(handle_future) {
case Success(handle) =>
// Found a job with this jid.
val src: Source[Message, NotUsed] = Source.fromPublisher(handle.publisher)
// We do not remove the current entry from [[_job_handles]] because the handle is
// needed in order to terminate the job before streaming is completed.
// The Terminator actor will delete the entry upon completion of the stream.
_term_actor ! Terminator.WatchJob(jid, handle)
complete(src)
case Failure(error) =>
complete( VerificationRequestReject(s"The verification job #$jid resulted in a terrible error: $error") )
}
case _ =>
// Did not find a job with this jid.
complete( VerificationRequestReject(s"The verification job #$jid does not exist.") )
Expand All @@ -303,24 +328,32 @@ object ViperServerRunner {
*
* This will do the following:
* 1. If no job handle with ID equal to <jid> exists in [[_job_handles]], complete with an appropriate reject message
* 2. Otherwise:
* - Create a new future based on response from a the current instance of [[MainActor]] to a
* [[ViperServerProtocol.Stop]] message (with a 5 second timeout)
* - Send a [[PoisonPill]] message to the current instance of [[MainActor]]
* - Complete request with accepting message
* 2. Otherwise, once the job handle future is complete:
* - If the future completed with a failure, complete with an appropriate reject message
* - If the future completed successfully:
* - Create a new future based on response from a the current instance of [[MainActor]] to a
* [[ViperServerProtocol.Stop]] message (with a 5 second timeout)
* - Send a [[PoisonPill]] message to the current instance of [[MainActor]]
* - Complete request with accepting message
*
* Use case:
* - Client decided to kill a verification job it no linger cares about
* - Client decided to kill a verification job they no linger care about
*/
get {
_job_handles.get(jid) match {
case Some(handle) =>
implicit val askTimeout: Timeout = Timeout(5000 milliseconds)
val interrupt_done: Future[String] = (handle.controller_actor ? Stop(true)).mapTo[String]
onSuccess(interrupt_done) { msg =>
handle.controller_actor ! PoisonPill // the actor played its part.
complete( JobDiscardAccept(msg) )
lookupJob(jid) match {
case Some(handle_future) =>
onComplete(handle_future) {
case Success(handle) =>
implicit val askTimeout: Timeout = Timeout(5000 milliseconds)
val interrupt_done: Future[String] = (handle.controller_actor ? Stop(true)).mapTo[String]
onSuccess(interrupt_done) { msg =>
handle.controller_actor ! PoisonPill // the actor played its part.
complete(JobDiscardAccept(msg))
}
case Failure(error) =>
complete(JobDiscardReject(s"The verification job #$jid does not exist."))
}

case _ =>
// Did not find a job with this jid.
complete( JobDiscardReject(s"The verification job #$jid does not exist.") )
Expand Down Expand Up @@ -366,4 +399,3 @@ object ViperServerRunner {
}

} // object ViperServerRunner

0 comments on commit 6dde360

Please sign in to comment.