Skip to content

Commit

Permalink
looks like progress was ignored on instrumentation failure and timeou…
Browse files Browse the repository at this point in the history
…ts leaving tasks in queue indefinitely and causing excessive reloads
  • Loading branch information
OlegYch committed Nov 9, 2018
1 parent 7e07dfe commit eb74fa0
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 68 deletions.
Expand Up @@ -18,7 +18,7 @@ object InstrumentedInputs {
def apply(
inputs0: Inputs
): Either[InstrumentationFailureReport, InstrumentedInputs] = {
if (inputs0.isWorksheetMode && inputs0.target.hasWorksheetMode) {
if (inputs0.isWorksheetMode) {
val instrumented =
Instrument(inputs0.code, inputs0.target).map(
instrumentedCode => inputs0.copy(code = instrumentedCode)
Expand Down
@@ -1,33 +1,20 @@
package com.olegych.scastie.sbt

import SbtProcess._

import com.olegych.scastie.api._
import com.olegych.scastie.instrumentation.Instrument

import com.olegych.scastie.sbt.SbtProcess._
import org.slf4j.LoggerFactory
import play.api.libs.json._

import scala.util.control.NonFatal

import org.slf4j.LoggerFactory

class OutputExtractor(getScalaJsContent: () => Option[String],
getScalaJsSourceMapContent: () => Option[String],
isProduction: Boolean,
promptUniqueId: String) {

private val log = LoggerFactory.getLogger(getClass)

def apply(output: ProcessOutput, sbtRun: SbtRun, isReloading: Boolean): SnippetProgress = {

val progress = extractProgress(output, sbtRun, isReloading)

sbtRun.progressActor ! progress.copy(scalaJsContent = None, scalaJsSourceMapContent = None)
sbtRun.snippetActor ! progress

progress
}

def extractProgress(output: ProcessOutput, sbtRun: SbtRun, isReloading: Boolean): SnippetProgress = {
import sbtRun._

Expand Down
96 changes: 44 additions & 52 deletions sbt-runner/src/main/scala/com.olegych.scastie.sbt/SbtProcess.scala
@@ -1,20 +1,17 @@
package com.olegych.scastie.sbt

import com.olegych.scastie.api._

import com.olegych.scastie.util._
import com.olegych.scastie.util.ScastieFileUtil.{slurp, write}
import com.olegych.scastie.instrumentation.InstrumentedInputs
import java.nio.file._

import akka.actor.{ActorRef, Cancellable, FSM, Stash}
import com.olegych.scastie.api._
import com.olegych.scastie.buildinfo.BuildInfo.sbtVersion

import akka.actor.{ActorRef, Stash, FSM, Cancellable}
import com.olegych.scastie.instrumentation.InstrumentedInputs
import com.olegych.scastie.util.ScastieFileUtil.{slurp, write}
import com.olegych.scastie.util._

import scala.concurrent.duration._
import scala.util.Random

import java.nio.file._

object SbtProcess {
sealed trait SbtState
case object Initializing extends SbtState
Expand All @@ -31,7 +28,12 @@ object SbtProcess {
progressActor: ActorRef,
snippetActor: ActorRef,
timeoutEvent: Option[Cancellable]
) extends Data
) extends Data {
def sendProgress(p: SnippetProgress): Unit = {
progressActor ! p.copy(scalaJsContent = None, scalaJsSourceMapContent = None)
snippetActor ! p
}
}

case class SbtStateTimeout(duration: FiniteDuration, state: SbtState) {
def message: String = {
Expand Down Expand Up @@ -70,12 +72,10 @@ class SbtProcess(runTimeout: FiniteDuration,
extends FSM[SbtProcess.SbtState, SbtProcess.Data]
with Stash {

import ProcessActor._
import SbtProcess._

import context.dispatcher

import ProcessActor._

// private val log = LoggerFactory.getLogger(getClass)

private val sbtDir: Path =
Expand All @@ -102,11 +102,10 @@ class SbtProcess(runTimeout: FiniteDuration,
startWith(
Initializing, {
InstrumentedInputs(Inputs.default) match {
case Right(instrumented) => {
case Right(instrumented) =>
val inputs = instrumented.inputs
setInputs(inputs)
SbtData(inputs)
}
case e => sys.error("failed to instrument default input: " + e)
}
}
Expand All @@ -132,61 +131,55 @@ class SbtProcess(runTimeout: FiniteDuration,
}

whenUnhandled {
case Event(_: SbtTask, _) => {
case Event(_: SbtTask, _) =>
stash()
stay
}
case Event(timeout: SbtStateTimeout, run: SbtRun) => {
case Event(timeout: SbtStateTimeout, run: SbtRun) =>
println("*** timeout ***")

val progress = timeout.toProgress(run.snippetId)
run.progressActor ! progress
run.sendProgress(progress)
throw new Exception(timeout.message)
}
}

onTransition {
case _ -> Ready => {
case _ -> Ready =>
println("-- Ready --")
unstashAll()
}
case _ -> Initializing => {
case _ -> Initializing =>
println("-- Initializing --")
}
case _ -> Reloading => {
case _ -> Reloading =>
println("-- Reloading --")
}
case _ -> Running => {
case _ -> Running =>
println("-- Running --")
}
}

when(Initializing) {
case Event(ProcessOutput(line, _), _) => {
case Event(ProcessOutput(line, _), _) =>
println(line)
if (isPrompt(line)) {
goto(Ready)
} else {
stay
}
}
}

when(Ready) {
case Event(task @ SbtTask(snippetId, taskInputs, ip, login, progressActor), SbtData(stateInputs)) => {
case Event(task @ SbtTask(snippetId, taskInputs, ip, login, progressActor), SbtData(stateInputs)) =>
println(s"Running: (login: $login, ip: $ip) \n $taskInputs")

InstrumentedInputs(taskInputs) match {
case Right(instrumented) => {
val sbtRun = SbtRun(
snippetId = snippetId,
inputs = instrumented.inputs,
isForcedProgramMode = instrumented.isForcedProgramMode,
progressActor = progressActor,
snippetActor = sender,
timeoutEvent = None
)
val _sbtRun = SbtRun(
snippetId = snippetId,
inputs = taskInputs,
isForcedProgramMode = false,
progressActor = progressActor,
snippetActor = sender,
timeoutEvent = None
)

InstrumentedInputs(taskInputs) match {
case Right(instrumented) =>
val sbtRun = _sbtRun.copy(inputs = instrumented.inputs, isForcedProgramMode = instrumented.isForcedProgramMode)
val isReloading = stateInputs.needsReload(sbtRun.inputs)
setInputs(sbtRun.inputs)

Expand All @@ -196,14 +189,13 @@ class SbtProcess(runTimeout: FiniteDuration,
} else {
gotoRunning(sbtRun)
}
}

case Left(report) => {
progressActor ! report.toProgress(snippetId)
case Left(report) =>
val sbtRun = _sbtRun
setInputs(sbtRun.inputs)
sbtRun.sendProgress(report.toProgress(snippetId))
goto(Ready)
}
}
}
}

val extractor = new OutputExtractor(
Expand All @@ -214,8 +206,9 @@ class SbtProcess(runTimeout: FiniteDuration,
)

when(Reloading) {
case Event(output: ProcessOutput, sbtRun: SbtRun) => {
val progress = extractor(output, sbtRun, isReloading = true)
case Event(output: ProcessOutput, sbtRun: SbtRun) =>
val progress = extractor.extractProgress(output, sbtRun, isReloading = true)
sbtRun.sendProgress(progress)

if (progress.isSbtError) {
throw new Exception("sbt error: " + output.line)
Expand All @@ -226,20 +219,19 @@ class SbtProcess(runTimeout: FiniteDuration,
} else {
stay
}
}
}

when(Running) {
case Event(output: ProcessOutput, sbtRun: SbtRun) => {
extractor(output, sbtRun, isReloading = false)
case Event(output: ProcessOutput, sbtRun: SbtRun) =>
val progress = extractor.extractProgress(output, sbtRun, isReloading = false)
sbtRun.sendProgress(progress)

if (isPrompt(output.line)) {
sbtRun.timeoutEvent.foreach(_.cancel())
goto(Ready).using(SbtData(sbtRun.inputs))
} else {
stay
}
}
}

private def gotoWithTimeout(sbtRun: SbtRun, nextState: SbtState, duration: FiniteDuration): this.State = {
Expand Down

0 comments on commit eb74fa0

Please sign in to comment.