Skip to content

Commit

Permalink
Workaround for Silver issue #232 (plus minor stuff, see below)
Browse files Browse the repository at this point in the history
1. Add scala.language.postfixOps to imports to fix the warnings.
2. Do not print success/failure the STDOUT from ViperFrontend for ViperServer.
3. Add onFailure handler to the Terminator's code for handling job queue completion event.
4. Add type annotations for implicit declarations.
5. Add job ID as parameter of ReporperActor.
6. Minor code style improvements.
  • Loading branch information
aterga committed Jan 21, 2018
1 parent c40c35b commit f2979f6
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 35 deletions.
7 changes: 4 additions & 3 deletions src/main/scala/viper/server/VerificationWorker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import viper.silver.reporter._
import viper.silver.verifier.errors._
import viper.silver.verifier.{AbstractVerificationError, _}

import scala.collection.mutable
import scala.language.postfixOps



Expand All @@ -33,6 +33,7 @@ class ActorReporter(private val actor_ref: ActorRef, val tag: String)
val name = s"ViperServer_$tag"

def report(msg: reporter.Message) = {
//println(s"ActorReporter reporting >>> ${msg}")
actor_ref ! ReporterActor.ServerRequest(msg)
}
}
Expand Down Expand Up @@ -219,11 +220,11 @@ trait ViperFrontend extends SilFrontend {

result match {
case Success => {
printSuccess();
//printSuccess();
reporter.report(OverallSuccessMessage(getVerifierName, System.currentTimeMillis() - _startTime))
}
case f@Failure(errors) => {
printErrors(errors: _*);
//printErrors(errors: _*);
reporter.report(OverallFailureMessage(getVerifierName, System.currentTimeMillis() - _startTime, f))
}

Expand Down
7 changes: 6 additions & 1 deletion src/main/scala/viper/server/ViperIDEProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@ object ViperIDEProtocol extends akka.http.scaladsl.marshallers.sprayjson.SprayJs

implicit val position_writer = lift(new RootJsonWriter[Position] {
override def write(obj: Position): JsValue = JsObject(
"file" -> obj.file.toJson,
"file" -> (if (obj.file != null) {
//FIXME this hack is needed due to the following bug in Silver: https://bitbucket.org/viperproject/silver/issues/232
obj.file.toJson
} else {
JsString("<undefined>")
}),
"start" -> JsString(s"${obj.line}:${obj.column}"),
"end" -> (obj.end match {
case Some(end_pos) =>
Expand Down
68 changes: 37 additions & 31 deletions src/main/scala/viper/server/ViperServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ package viper.server
import org.reactivestreams.Publisher

import scala.language.postfixOps
import scala.concurrent.Future
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.concurrent.duration._
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Success}
import akka.{Done, NotUsed}
import akka.pattern.ask
import akka.util.Timeout
Expand All @@ -37,8 +37,8 @@ object ViperServerRunner {
private var _config: ViperConfig = _
final def config: ViperConfig = _config

implicit val system = ActorSystem("Main")
implicit val materializer = ActorMaterializer()
implicit val system: ActorSystem = ActorSystem("Main")
implicit val materializer: ActorMaterializer = ActorMaterializer()


// --- Actor: Terminator ---
Expand All @@ -53,23 +53,29 @@ object ViperServerRunner {
}

class Terminator(bindingFuture: Future[Http.ServerBinding]) extends Actor {
implicit val executionContext = system.dispatcher
implicit val executionContext: ExecutionContextExecutor = system.dispatcher

override def receive = {
override def receive: PartialFunction[Any, Unit] = {
case Terminator.Exit =>
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
case Terminator.WatchJob(jid) =>
_job_handles.get(jid) match {
case Some(handle@JobHandle(controller, queue, _)) =>
case Some(handle@ JobHandle(_, queue, _)) =>
val queue_completion_future: Future[Done] = queue.watchCompletion()
queue_completion_future.onSuccess({ case _ =>
_job_handles -= jid
println(s"Terminator deleted job #${jid}")
println(s"Terminator deleted job #$jid")
})
queue_completion_future.onFailure({ case e =>
println(s"Terminator detected failure in job #$jid: $e")
throw e
})
case _ =>
println(s"Terminator: job #${jid} does not exist.")
val e = s"Terminator: job #$jid does not exist."
println(e)
throw new Exception(e)
}
}
}
Expand All @@ -89,7 +95,7 @@ object ViperServerRunner {
private var _next_job_id: Int = 0
private val _max_active_jobs: Int = 3

def new_jobs_allowed = _job_handles.size < _max_active_jobs
private def new_jobs_allowed = _job_handles.size < _max_active_jobs

// (See model description in ViperServerProtocol.scala)

Expand All @@ -99,31 +105,31 @@ object ViperServerRunner {

class MainActor(private val id: Int) extends Actor {

implicit val executionContext = system.dispatcher
implicit val executionContext: ExecutionContextExecutor = system.dispatcher

private var _verificationTask: Thread = null
private var _args: List[String] = null
private var _verificationTask: Thread = _
private var _args: List[String] = _

// blocking
private def interrupt: Boolean = {
if (_verificationTask != null && _verificationTask.isAlive) {
_verificationTask.interrupt()
_verificationTask.join()
println(s"Job #${id} has been successfully interrupted.")
println(s"Job #$id has been successfully interrupted.")
return true
}
else return false
false
}

def receive = {
override def receive: PartialFunction[Any, Unit] = {
case Stop(call_back_needed) =>
val did_I_interrupt = interrupt
if (call_back_needed) {
// If a callback is expected, then the caller must decide when to kill the actor.
if (did_I_interrupt) {
sender ! s"Job #${id} has been successfully interrupted."
sender ! s"Job #$id has been successfully interrupted."
} else {
sender ! s"Job #${id} has already been finalized."
sender ! s"Job #$id has already been finalized."
}
}
case Verify(args) =>
Expand All @@ -146,16 +152,16 @@ object ViperServerRunner {
// The maximum number of messages in the reporter's message buffer is 1000.
val (queue, publisher) = Source.queue[Message](1000, OverflowStrategy.backpressure).toMat(Sink.asPublisher(false))(Keep.both).run()

val my_reporter = system.actorOf(ReporterActor.props(queue), s"reporter_$id")
val my_reporter = system.actorOf(ReporterActor.props(id, queue), s"reporter_$id")

_verificationTask = new Thread(new VerificationWorker(my_reporter, args))
_verificationTask.start()

//println(s"Client #$id disconnected")

assert(_job_handles.get(id).isEmpty)
_job_handles(id) = JobHandle(self, queue, publisher)
_next_job_id = _next_job_id + 1

println(s"Starting job #$id...")
}
}

Expand All @@ -167,26 +173,26 @@ object ViperServerRunner {
case class ServerRequest(msg: reporter.Message)
case object FinalServerRequest

def props(queue: SourceQueueWithComplete[Message]): Props = Props(new ReporterActor(queue))
def props(jid: Int, queue: SourceQueueWithComplete[Message]): Props = Props(new ReporterActor(jid, queue))
}

class ReporterActor(queue: SourceQueueWithComplete[Message]) extends Actor {
class ReporterActor(jid: Int, queue: SourceQueueWithComplete[Message]) extends Actor {

def receive = {
override def receive: PartialFunction[Any, Unit] = {
case ReporterActor.ClientRequest =>
case ReporterActor.ServerRequest(msg) =>
queue.offer(msg)
case ReporterActor.FinalServerRequest =>
queue.complete()
println(s"Job has been successfully completed.")
println(s"Job #$jid has been successfully completed.")
self ! PoisonPill
case _ =>
}
}

def main(args: Array[String]): Unit = {

implicit val executionContext = system.dispatcher
implicit val executionContext: ExecutionContextExecutor = system.dispatcher

try {
parseCommandLine(args)
Expand Down Expand Up @@ -218,7 +224,7 @@ object ViperServerRunner {
path("exit") {
get {
val interrupt_future_list: List[Future[String]] = _job_handles map { case (jid, handle@JobHandle(actor, _, _)) =>
implicit val askTimeout = Timeout(5000 milliseconds)
implicit val askTimeout: Timeout = Timeout(5000 milliseconds)
(actor ? Stop(true)).mapTo[String]
} toList
val overall_interrupt_future: Future[List[String]] = Future.sequence(interrupt_future_list)
Expand All @@ -228,8 +234,8 @@ object ViperServerRunner {
case Success(_) =>
_term_actor ! Terminator.Exit
complete( ServerStopConfirmed("shutting down...") )
case Failure(err) =>
println(s"Interrupting one of the verification threads timed out: ${err}")
case Failure(err_msg) =>
println(s"Interrupting one of the verification threads timed out: $err_msg")
_term_actor ! Terminator.Exit
complete( ServerStopConfirmed("forcibly shutting down...") )
}
Expand Down Expand Up @@ -287,7 +293,7 @@ object ViperServerRunner {
get {
_job_handles.get(jid) match {
case Some(handle) =>
//Found a job with this jid.
// Found a job with this jid.
val src: Source[Message, NotUsed] = Source.fromPublisher(handle.publisher)
// We do not remove the current entry from [[_job_handles]] because the handle is
// needed in order to terminate the job before streaming is completed.
Expand Down Expand Up @@ -319,7 +325,7 @@ object ViperServerRunner {
get {
_job_handles.get(jid) match {
case Some(handle) =>
implicit val askTimeout = Timeout(5000 milliseconds)
implicit val askTimeout: Timeout = Timeout(5000 milliseconds)
val interrupt_done: Future[String] = (handle.controller_actor ? Stop(true)).mapTo[String]
onSuccess(interrupt_done) { msg =>
handle.controller_actor ! PoisonPill // the actor played its part.
Expand Down

0 comments on commit f2979f6

Please sign in to comment.