Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reworked Bloop connection and Tree View #1677

Merged
merged 1 commit into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ final class BloopServers(
workspace: AbsolutePath,
client: MetalsBuildClient,
languageClient: LanguageClient,
tables: Tables
tables: Tables,
config: MetalsServerConfig
)(implicit ec: ExecutionContextExecutorService) {

def shutdownServer(): Boolean = {
Expand Down Expand Up @@ -61,7 +62,8 @@ final class BloopServers(
client,
languageClient,
() => connectToLauncher(bloopVersion),
tables
tables,
config
)
.map(Option(_))
}
Expand Down Expand Up @@ -96,13 +98,15 @@ final class BloopServers(
serverStarted
)

val finished = Promise[Unit]()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This promise is used to determine whether the launcher is finished.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then maybe it could be called: launched instead of finished?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finished is updated when the launcher finishes and not when it launches, so I think this is better.

val job = ec.submit(new Runnable {
override def run(): Unit = {
launcher.runLauncher(
bloopVersion,
skipBspConnection = false,
Nil
)
finished.success(())
}
})

Expand All @@ -117,7 +121,8 @@ final class BloopServers(
clientOut.close()
},
Cancelable(() => job.cancel(true))
)
),
finished
)
}
}
Expand Down
16 changes: 13 additions & 3 deletions metals/src/main/scala/scala/meta/internal/metals/BspServers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import scala.meta.internal.metals.MetalsEnrichments._
import scala.meta.internal.mtags.MD5
import scala.meta.io.AbsolutePath
import scala.util.Try
import scala.concurrent.Promise

/**
* Implements BSP server discovery, named "BSP Connection Protocol" in the spec.
Expand All @@ -26,7 +27,8 @@ final class BspServers(
client: MetalsLanguageClient,
buildClient: MetalsBuildClient,
tables: Tables,
bspGlobalInstallDirectories: List[AbsolutePath]
bspGlobalInstallDirectories: List[AbsolutePath],
config: MetalsServerConfig
)(implicit ec: ExecutionContextExecutorService) {

def newServer(): Future[Option[BuildServerConnection]] = {
Expand Down Expand Up @@ -70,14 +72,21 @@ final class BspServers(
s"${details.getName} input stream"
)

val finished = Promise[Unit]()
Future {
process.waitFor()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure about this solution, need to test it with another BSP server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for it in Bill Suite, seems to work pretty well.

finished.success(())
}

Future.successful {
SocketConnection(
details.getName(),
output,
input,
List(
Cancelable(() => process.destroy())
)
),
finished
)
}

Expand All @@ -88,7 +97,8 @@ final class BspServers(
buildClient,
client,
newConnection,
tables
tables,
config
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import scala.meta.io.AbsolutePath
import scala.util.Try
import com.google.gson.Gson
import MetalsEnrichments._
import java.io.IOException
import org.eclipse.lsp4j.services.LanguageClient
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.Promise
import scala.concurrent.ExecutionContext
import org.eclipse.lsp4j.jsonrpc.JsonRpcException
import java.io.IOException

/**
* An actively running and initialized BSP connection.
Expand All @@ -32,11 +34,20 @@ class BuildServerConnection private (
],
initialConnection: BuildServerConnection.LauncherConnection,
languageClient: LanguageClient,
tables: Tables
tables: Tables,
config: MetalsServerConfig
)(implicit ec: ExecutionContextExecutorService)
extends Cancelable {

@volatile private var connection = Future.successful(initialConnection)
initialConnection.onConnectionFinished(reconnect)

private val isShuttingDown = new AtomicBoolean(false)
private val onReconnection =
new AtomicReference[BuildServerConnection => Future[Unit]](_ =>
Future.successful(())
)

private val _version = new AtomicReference(initialConnection.version)

private val ongoingRequests =
Expand All @@ -47,13 +58,21 @@ class BuildServerConnection private (
// the name is set before when establishing conenction
def name: String = initialConnection.socketConnection.serverName

def onReconnection(
index: BuildServerConnection => Future[Unit]
): Unit = {
onReconnection.set(index)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to reindex the workspace and this is to make sure that we will only try to reindex once we finished the indexing at least once.

}

/** Run build/shutdown procedure */
def shutdown(): Future[Unit] = connection.map { conn =>
try {
conn.server.buildShutdown().get(2, TimeUnit.SECONDS)
conn.server.onBuildExit()
// Cancel pending compilations on our side, this is not needed for Bloop.
cancel()
if (isShuttingDown.compareAndSet(false, true)) {
conn.server.buildShutdown().get(2, TimeUnit.SECONDS)
conn.server.onBuildExit()
// Cancel pending compilations on our side, this is not needed for Bloop.
cancel()
}
} catch {
case e: TimeoutException =>
scribe.error(
Expand All @@ -72,6 +91,10 @@ class BuildServerConnection private (
register(server => server.buildTargetCompile(params))
}

def clean(params: CleanCacheParams): CompletableFuture[CleanCacheResult] = {
register(server => server.buildTargetCleanCache(params))
}

def mainClasses(
params: ScalaMainClassesParams
): Future[ScalaMainClassesResult] = {
Expand Down Expand Up @@ -118,21 +141,49 @@ class BuildServerConnection private (
}

private def askUser(): Future[BuildServerConnection.LauncherConnection] = {
val notification = tables.dismissedNotifications.ReconnectBsp
if (!notification.isDismissed) {
val params = Messages.DisconnectedServer.params()
languageClient.showMessageRequest(params).asScala.flatMap {
case response if response == Messages.DisconnectedServer.reconnect =>
reestablishConnection()
case response if response == Messages.DisconnectedServer.notNow =>
notification.dismiss(5, TimeUnit.MINUTES)
connection
if (config.askToReconnect) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends now on the user configuration.

val notification = tables.dismissedNotifications.ReconnectBsp
if (!notification.isDismissed) {
val params = Messages.DisconnectedServer.params()
languageClient.showMessageRequest(params).asScala.flatMap {
case response if response == Messages.DisconnectedServer.reconnect =>
reestablishConnection()
case response if response == Messages.DisconnectedServer.notNow =>
notification.dismiss(5, TimeUnit.MINUTES)
connection
case _ =>
connection
}
} else {
connection
}
} else {
connection
reestablishConnection()
}
}

private def reconnect(): Future[BuildServerConnection.LauncherConnection] = {
val original = connection
if (!isShuttingDown.get()) {
synchronized {
// if the future is different then the connection is already being reestablished
if (connection eq original) {
connection = askUser().map { conn =>
// version can change when reconnecting
_version.set(conn.version)
ongoingRequests.addAll(conn.cancelables)
conn.onConnectionFinished(reconnect)
conn
}
connection.foreach(_ => onReconnection.get()(this))
}
connection
}
} else {
connection
}

}
private def register[T](
action: MetalsBuildServer => CompletableFuture[T]
): CompletableFuture[T] = {
Expand All @@ -150,16 +201,7 @@ class BuildServerConnection private (
.recoverWith {
case io: JsonRpcException if io.getCause.isInstanceOf[IOException] =>
synchronized {
// if the future is different then the connection is already being reestablished
if (connection eq original) {
connection = askUser().map { conn =>
// version can change when reconnecting
_version.set(conn.version)
ongoingRequests.addAll(conn.cancelables)
conn
}
}
connection.flatMap(conn => action(conn.server).asScala)
reconnect().flatMap(conn => action(conn.server).asScala)
}
}
CancelTokens.future(_ => actionFuture)
Expand All @@ -181,14 +223,15 @@ object BuildServerConnection {
localClient: MetalsBuildClient,
languageClient: LanguageClient,
connect: () => Future[SocketConnection],
tables: Tables
tables: Tables,
config: MetalsServerConfig
)(
implicit ec: ExecutionContextExecutorService
): Future[BuildServerConnection] = {

def setupServer(): Future[LauncherConnection] = {
connect().map {
case conn @ SocketConnection(name, output, input, cancelables) =>
case conn @ SocketConnection(name, output, input, _, _) =>
val tracePrinter = GlobalTrace.setupTracePrinter("BSP")
val launcher = new Launcher.Builder[MetalsBuildServer]()
.traceMessages(tracePrinter)
Expand Down Expand Up @@ -218,7 +261,8 @@ object BuildServerConnection {
setupServer,
connection,
languageClient,
tables
tables,
config
)
}
}
Expand Down Expand Up @@ -274,14 +318,22 @@ object BuildServerConnection {
cancelServer: Cancelable,
version: String
) {

def cancelables: List[Cancelable] =
cancelServer :: socketConnection.cancelables

def onConnectionFinished(
f: () => Unit
)(implicit ec: ExecutionContext): Unit = {
socketConnection.finishedPromise.future.foreach(_ => f())
}
}
}

case class SocketConnection(
serverName: String,
output: ClosableOutputStream,
input: InputStream,
cancelables: List[Cancelable]
cancelables: List[Cancelable],
finishedPromise: Promise[Unit]
)
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ final class BuildTargets() {
def scalacOptions: Iterable[ScalacOptionsItem] =
scalacTargetInfo.values

def allBuildTargetIds: Seq[BuildTargetIdentifier] =
all.toSeq.map(_.info.getId())
def all: Iterator[ScalaTarget] =
for {
(id, target) <- buildTargetInfo.iterator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,26 @@ final class Compilations(
cascadeBatch.cancelCurrentRequest()
}

def recompileAll(): Future[Unit] = {
cancel()
val allTargetIds = buildTargets.allBuildTargetIds
val clean = for {
connection <- buildServer()
params = new b.CleanCacheParams(allTargetIds.asJava)
} yield connection.clean(params).asScala

// if we don't have a connection that will show up later
val cleaned = clean.getOrElse {
Future.successful(new b.CleanCacheResult("", false))
}

for {
cleanResult <- cleaned
if cleanResult.getCleaned() == true
compiled <- compile(allTargetIds).future
} yield ()
}

private def expand(paths: Seq[AbsolutePath]): Seq[b.BuildTargetIdentifier] = {
def isCompilable(path: AbsolutePath): Boolean =
path.isScalaOrJava && !path.isDependencySource(workspace())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ final class DefinitionProvider(
if (result.isEmpty) {
compilers().definition(params, token)
} else {
if (result.isEmpty && fromSemanticdb.isEmpty) {
if (fromSemanticdb.isEmpty) {
warnings.noSemanticdb(path)
}
Future.successful(result)
Expand Down
Loading