Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Background jobs #5259

Merged
merged 5 commits into from Dec 2, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 17 additions & 4 deletions main/src/main/scala/sbt/BackgroundJobService.scala
Expand Up @@ -8,11 +8,14 @@
package sbt

import java.io.Closeable

import sbt.util.Logger
import Def.{ ScopedKey, Classpath }
import Def.{ Classpath, ScopedKey }
import sbt.internal.util.complete._
import java.io.File
import scala.util.Try

import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }

abstract class BackgroundJobService extends Closeable {

Expand Down Expand Up @@ -49,9 +52,19 @@ abstract class BackgroundJobService extends Closeable {
def jobs: Vector[JobHandle]
def stop(job: JobHandle): Unit

/**
* Delegate to waitFor but catches any exceptions and returns the result in an instance of `Try`.
* @param job the job to wait for
* @return the result of waiting for the job to complete.
*/
def waitForTry(job: JobHandle): Try[Unit] = {
// This implementation is provided only for backward compatibility.
Try(waitFor(job))
try Success(waitFor(job))
catch {
case NonFatal(e) =>
try stop(job)
catch { case NonFatal(_) => }
Failure(e)
}
}

def waitFor(job: JobHandle): Unit
Expand Down
5 changes: 3 additions & 2 deletions main/src/main/scala/sbt/Defaults.scala
Expand Up @@ -351,7 +351,7 @@ object Defaults extends BuildCommon {
sys.env.contains("CI") || SysProp.ci,
// watch related settings
pollInterval :== Watch.defaultPollInterval,
) ++ LintUnused.lintSettings
) ++ LintUnused.lintSettings ++ DefaultBackgroundJobService.backgroundJobServiceSettings
)

def defaultTestTasks(key: Scoped): Seq[Setting[_]] =
Expand Down Expand Up @@ -1599,7 +1599,8 @@ object Defaults extends BuildCommon {
}

def bgWaitForTask: Initialize[InputTask[Unit]] = foreachJobTask { (manager, handle) =>
manager.waitFor(handle)
manager.waitForTry(handle)
()
}

def docTaskSettings(key: TaskKey[File] = doc): Seq[Setting[_]] =
Expand Down
1 change: 1 addition & 0 deletions main/src/main/scala/sbt/Keys.scala
Expand Up @@ -251,6 +251,7 @@ object Keys {
val javaOptions = taskKey[Seq[String]]("Options passed to a new JVM when forking.").withRank(BPlusTask)
val envVars = taskKey[Map[String, String]]("Environment variables used when forking a new JVM").withRank(BTask)

val bgJobServiceDirectory = settingKey[File]("The directory for temporary files used by background jobs.")
val bgJobService = settingKey[BackgroundJobService]("Job manager used to run background jobs.")
val bgList = taskKey[Seq[JobHandle]]("List running background jobs.")
val ps = taskKey[Seq[JobHandle]]("bgList variant that displays on the log.")
Expand Down
15 changes: 7 additions & 8 deletions main/src/main/scala/sbt/Main.scala
Expand Up @@ -126,15 +126,14 @@ object StandardMain {
def runManaged(s: State): xsbti.MainResult = {
val previous = TrapExit.installManager()
try {
val hook = ShutdownHooks.add(closeRunnable)
try {
val hook = ShutdownHooks.add(closeRunnable)
try {
MainLoop.runLogged(s)
} finally {
hook.close()
()
}
} finally DefaultBackgroundJobService.backgroundJobService.shutdown()
MainLoop.runLogged(s)
} finally {
try DefaultBackgroundJobService.shutdown()
finally hook.close()
()
}
} finally TrapExit.uninstallManager(previous)
}

Expand Down
80 changes: 45 additions & 35 deletions main/src/main/scala/sbt/internal/DefaultBackgroundJobService.scala
Expand Up @@ -12,7 +12,8 @@ import java.io.{ Closeable, File, FileInputStream, IOException }
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.{ FileVisitResult, Files, Path, SimpleFileVisitor }
import java.security.{ DigestInputStream, MessageDigest }
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.{ AtomicLong, AtomicReference }

import sbt.Def.{ Classpath, ScopedKey, Setting }
import sbt.Scope.GlobalScope
Expand Down Expand Up @@ -61,13 +62,16 @@ private[sbt] abstract class AbstractBackgroundJobService extends BackgroundJobSe
private val nextId = new AtomicLong(1)
private val pool = new BackgroundThreadPool()

private var serviceTempDirOpt: Option[File] = None
private def serviceTempDir = serviceTempDirOpt match {
case Some(dir) => dir
case _ =>
val dir = IO.createTemporaryDirectory
serviceTempDirOpt = Some(dir)
dir
private[sbt] def serviceTempDirBase: File
private val serviceTempDirRef = new AtomicReference[File]
private def serviceTempDir: File = serviceTempDirRef.synchronized {
serviceTempDirRef.get match {
case null =>
val dir = IO.createUniqueDirectory(serviceTempDirBase)
serviceTempDirRef.set(dir)
dir
case s => s
}
}
// hooks for sending start/stop events
protected def onAddJob(@deprecated("unused", "") job: JobHandle): Unit = ()
Expand Down Expand Up @@ -161,12 +165,12 @@ private[sbt] abstract class AbstractBackgroundJobService extends BackgroundJobSe
jobSet.headOption.foreach {
case handle: ThreadJobHandle @unchecked =>
handle.job.shutdown()
handle.job.awaitTermination()
handle.job.awaitTerminationTry()
case _ => //
}
}
pool.close()
serviceTempDirOpt foreach IO.delete
Option(serviceTempDirRef.get).foreach(IO.delete)
}

private def withHandle(job: JobHandle)(f: ThreadJobHandle => Unit): Unit = job match {
Expand All @@ -178,24 +182,9 @@ private[sbt] abstract class AbstractBackgroundJobService extends BackgroundJobSe
)
}

private def withHandleTry(job: JobHandle)(f: ThreadJobHandle => Try[Unit]): Try[Unit] =
job match {
case handle: ThreadJobHandle @unchecked => f(handle)
case _: DeadHandle @unchecked => Try(()) // nothing to stop or wait for
case other =>
Try(
sys.error(
s"BackgroundJobHandle does not originate with the current BackgroundJobService: $other"
)
)
}

override def stop(job: JobHandle): Unit =
withHandle(job)(_.job.shutdown())

override def waitForTry(job: JobHandle): Try[Unit] =
withHandleTry(job)(_.job.awaitTerminationTry())

override def waitFor(job: JobHandle): Unit =
withHandle(job)(_.job.awaitTermination())

Expand Down Expand Up @@ -387,7 +376,7 @@ private[sbt] class BackgroundThreadPool extends java.io.Closeable {
list
}
listeners.foreach { l =>
l.executionContext.execute(new Runnable { override def run = l.callback() })
l.executionContext.execute(() => l.callback())
}
}
}
Expand All @@ -398,11 +387,9 @@ private[sbt] class BackgroundThreadPool extends java.io.Closeable {
stopListeners += result
result
}
override def awaitTermination(): Unit = finishedLatch.await()

override def awaitTerminationTry(): Try[Unit] = {
awaitTermination()
exitTry.getOrElse(Try(()))
override def awaitTermination(): Unit = {
finishedLatch.await()
exitTry.foreach(_.fold(e => throw e, identity))
}

override def humanReadableName: String = taskName
Expand Down Expand Up @@ -482,14 +469,37 @@ private[sbt] class BackgroundThreadPool extends java.io.Closeable {
}
}

private[sbt] class DefaultBackgroundJobService extends AbstractBackgroundJobService {
private[sbt] class DefaultBackgroundJobService(private[sbt] val serviceTempDirBase: File)
extends AbstractBackgroundJobService {
@deprecated("Use the constructor that specifies the background job temporary directory", "1.4.0")
def this() = this(IO.createTemporaryDirectory)
override def makeContext(id: Long, spawningTask: ScopedKey[_], state: State): ManagedLogger = {
val extracted = Project.extract(state)
LogManager.constructBackgroundLog(extracted.structure.data, state)(spawningTask)
}
}
private[sbt] object DefaultBackgroundJobService {
lazy val backgroundJobService: DefaultBackgroundJobService = new DefaultBackgroundJobService
lazy val backgroundJobServiceSetting: Setting[_] =
((Keys.bgJobService in GlobalScope) :== backgroundJobService)

private[this] val backgroundJobServices = new ConcurrentHashMap[File, DefaultBackgroundJobService]
private[sbt] def shutdown(): Unit = {
backgroundJobServices.values.forEach(_.shutdown())
backgroundJobServices.clear()
}
private[sbt] lazy val backgroundJobServiceSetting: Setting[_] =
(Keys.bgJobService in GlobalScope) := {
val path = (sbt.Keys.bgJobServiceDirectory in GlobalScope).value
val newService = new DefaultBackgroundJobService(path)
backgroundJobServices.putIfAbsent(path, newService) match {
case null => newService
case s =>
newService.shutdown()
s
}
}
private[sbt] lazy val backgroundJobServiceSettings: Seq[Def.Setting[_]] = Def.settings(
Keys.bgJobServiceDirectory in GlobalScope := {
sbt.Keys.appConfiguration.value.baseDirectory / "target" / "bg-jobs"
},
backgroundJobServiceSetting
)
}
1 change: 0 additions & 1 deletion main/src/main/scala/sbt/internal/Load.scala
Expand Up @@ -130,7 +130,6 @@ private[sbt] object Load {
def injectGlobal(state: State): Seq[Setting[_]] =
(appConfiguration in GlobalScope :== state.configuration) +:
LogManager.settingsLogger(state) +:
DefaultBackgroundJobService.backgroundJobServiceSetting +:
EvaluateTask.injectSettings

def defaultWithGlobal(
Expand Down