Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 124 additions & 50 deletions src/main/kotlin/com/redhat/devtools/gateway/DevSpacesConnection.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024 Red Hat, Inc.
* Copyright (c) 2024-2025 Red Hat, Inc.
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
Expand All @@ -11,16 +11,21 @@
*/
package com.redhat.devtools.gateway

import com.intellij.openapi.application.ApplicationManager
import com.intellij.openapi.diagnostic.thisLogger
import com.jetbrains.gateway.thinClientLink.LinkedClientManager
import com.jetbrains.gateway.thinClientLink.ThinClientHandle
import com.jetbrains.rd.util.lifetime.Lifetime
import com.redhat.devtools.gateway.openshift.DevWorkspaces
import com.redhat.devtools.gateway.openshift.Pods
import com.redhat.devtools.gateway.server.RemoteIDEServer
import com.redhat.devtools.gateway.server.RemoteIDEServerStatus
import io.kubernetes.client.openapi.ApiException
import java.io.Closeable
import java.io.IOException
import java.net.ServerSocket
import java.net.URI
import java.util.concurrent.CancellationException

class DevSpacesConnection(private val devSpacesContext: DevSpacesContext) {
@Throws(Exception::class)
Expand All @@ -29,9 +34,11 @@
onConnected: () -> Unit,
onDisconnected: () -> Unit,
onDevWorkspaceStopped: () -> Unit,
onProgress: ((message: String) -> Unit)? = null,
checkCancelled: (() -> Unit)? = null
): ThinClientHandle {
try {
return doConnect(onConnected, onDevWorkspaceStopped, onDisconnected)
return doConnect(onConnected, onDevWorkspaceStopped, onDisconnected, onProgress, checkCancelled)
} catch (e: Exception) {
devSpacesContext.isConnected = false
throw e
Expand All @@ -43,54 +50,84 @@
private fun doConnect(
onConnected: () -> Unit,
onDevWorkspaceStopped: () -> Unit,
onDisconnected: () -> Unit
onDisconnected: () -> Unit,
onProgress: ((message: String) -> Unit)? = null,
checkCancelled: (() -> Unit)? = null
): ThinClientHandle {
startAndWaitDevWorkspace()

val remoteIdeServer = RemoteIDEServer(devSpacesContext)
val remoteIdeServerStatus = remoteIdeServer.getStatus()
val joinLink = remoteIdeServerStatus.joinLink
?: throw IOException("Could not connect, remote IDE is not ready. No join link present.")

val pods = Pods(devSpacesContext.client)
// ✅ Dynamically find a free local port
val localPort = findFreePort()
val forwarder = pods.forward(remoteIdeServer.pod, localPort, 5990)
pods.waitForForwardReady(localPort)
val effectiveJoinLink = joinLink.replace(":5990", ":$localPort")

val client = LinkedClientManager
.getInstance()
.startNewClient(
Lifetime.Eternal,
URI(effectiveJoinLink),
"",
onConnected,
false
)

client.run {
lifetime.onTermination {
try {
forwarder.close()
} catch (_: Exception) {
// Ignore cleanup errors
}
}
lifetime.onTermination {
if (remoteIdeServer.waitServerTerminated())
DevWorkspaces(devSpacesContext.client)
.stop(
devSpacesContext.devWorkspace.namespace,
devSpacesContext.devWorkspace.name
)
.also { onDevWorkspaceStopped() }
}
lifetime.onTermination { devSpacesContext.isConnected = false }
lifetime.onTermination(onDisconnected)
checkCancelled?.invoke()

onProgress?.invoke("Waiting for the Dev Workspace to get ready...")
startAndWaitDevWorkspace(checkCancelled)

checkCancelled?.invoke()

onProgress?.invoke("Waiting for the Remote IDE server to get ready...")
val (remoteIdeServer, remoteIdeServerStatus) = runCatching {
val server = RemoteIDEServer(devSpacesContext).apply { waitServerReady(checkCancelled) }
server to server.getStatus()
}.getOrElse { null to RemoteIDEServerStatus.empty() }
checkCancelled?.invoke()

requireNotNull(remoteIdeServer) { "Could not connect, remote IDE is not ready." }
require(remoteIdeServerStatus.joinLink != null && remoteIdeServerStatus.isReady) { "Remote IDE not ready or missing join link." }

onProgress?.invoke("Waiting for the Remote IDE server to get ready...")
checkCancelled?.invoke()

var forwarder: Closeable? = null

Check warning on line 77 in src/main/kotlin/com/redhat/devtools/gateway/DevSpacesConnection.kt

View workflow job for this annotation

GitHub Actions / Inspect code

Local 'var' is never modified and can be declared as 'val'

Variable is never modified, so it can be declared using 'val'
var client: ThinClientHandle? = null

val cleanup: () -> Unit = {
try { client?.close() } catch (_: Exception) {}
closeForwarder(forwarder)
stopDevWorkspace(remoteIdeServer, devSpacesContext, onDevWorkspaceStopped)
invokeOnDisconnected(onDisconnected)
devSpacesContext.isConnected = false
}

return client
onProgress?.invoke("Client: Starting up the client...")
try {
val pods = Pods(devSpacesContext.client)
// ✅ Dynamically find a free local port
val localPort = findFreePort()
val forwarder = pods.forward(remoteIdeServer.pod, localPort, 5990)

Check warning on line 93 in src/main/kotlin/com/redhat/devtools/gateway/DevSpacesConnection.kt

View workflow job for this annotation

GitHub Actions / Inspect code

Unused variable

Unused variable
pods.waitForForwardReady(localPort)
val effectiveJoinLink = remoteIdeServerStatus.joinLink.replace(":5990", ":$localPort")

val lifetimeDef = Lifetime.Eternal.createNested()
lifetimeDef.lifetime.onTermination { cleanup() }

checkCancelled?.invoke()
var finished = false
client = LinkedClientManager.getInstance().startNewClient(
lifetimeDef.lifetime,
URI(effectiveJoinLink),
"",
onConnected,
false
)

client.onClientPresenceChanged.advise(client.lifetime) { finished = true }
client.clientClosed.advise(client.lifetime) { onDisconnected.invoke(); finished = true }
client.clientFailedToOpenProject.advise(client.lifetime) { cleanup(); finished = true }

val startTime = System.currentTimeMillis()
val maxWaitMillis = 60 * 1000
while (!finished) {
checkCancelled?.invoke()
check(System.currentTimeMillis() - startTime <= maxWaitMillis) { "Could not connect, remote IDE client is not ready." }
Thread.sleep(200)
}

// Check if the thin client has opened
check(client.clientPresent) { "Could not connect, remote IDE client is not ready." }

onConnected.invoke()
return client
} catch (e: Exception) {
cleanup()
throw e
}
}

private fun findFreePort(): Int {
Expand All @@ -100,9 +137,45 @@
}
}

@Throws(IOException::class, ApiException::class)
private fun startAndWaitDevWorkspace() {
private fun closeForwarder(forwarder: Closeable?) {
if (forwarder != null) {
ApplicationManager.getApplication().executeOnPooledThread {
try {
forwarder.close()
} catch (e: Throwable) {
thisLogger().debug("Failed to close port forwarder", e)
}
}
}
}

private fun invokeOnDisconnected(onDisconnected: () -> Unit) {
try { onDisconnected() } catch (_: Exception) { }
}

private fun stopDevWorkspace(
remoteIdeServer: RemoteIDEServer?,
devSpacesContext: DevSpacesContext,
onDevWorkspaceStopped: () -> Unit
) {
try {
if (remoteIdeServer?.isServerState(false) == true) {
DevWorkspaces(devSpacesContext.client)
.stop(
devSpacesContext.devWorkspace.namespace,
devSpacesContext.devWorkspace.name
)
.also { onDevWorkspaceStopped() }
}
} catch (e: Exception) {
thisLogger().debug("Failed to stop DevWorkspace", e)
}
}

@Throws(IOException::class, ApiException::class, CancellationException::class)
private fun startAndWaitDevWorkspace(checkCancelled: (() -> Unit)? = null) {
if (!devSpacesContext.devWorkspace.started) {
checkCancelled?.invoke()
DevWorkspaces(devSpacesContext.client)
.start(
devSpacesContext.devWorkspace.namespace,
Expand All @@ -115,7 +188,8 @@
devSpacesContext.devWorkspace.namespace,
devSpacesContext.devWorkspace.name,
DevWorkspaces.RUNNING,
DevWorkspaces.RUNNING_TIMEOUT
DevWorkspaces.RUNNING_TIMEOUT,
checkCancelled
)
) throw IOException(
"DevWorkspace '${devSpacesContext.devWorkspace.name}' is not running after ${DevWorkspaces.RUNNING_TIMEOUT} seconds"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import com.redhat.devtools.gateway.view.ui.Dialogs
import io.kubernetes.client.openapi.ApiException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.suspendCancellableCoroutine
import java.util.concurrent.CancellationException
import javax.swing.JComponent
import javax.swing.Timer
import kotlin.coroutines.resume
Expand Down Expand Up @@ -86,7 +86,7 @@ class DevSpacesConnectionProvider : GatewayConnectionProvider {
}
} catch (e: ApiException) {
indicator.text = "Connection failed"
runDelayed(2000, { indicator.stop() })
runDelayed(2000, { if (indicator.isRunning) indicator.stop() })
if (!(handleUnauthorizedError(e) || handleNotFoundError(e))) {
Dialogs.error(
e.messageWithoutPrefix() ?: "Could not connect to workspace.",
Expand All @@ -96,11 +96,16 @@ class DevSpacesConnectionProvider : GatewayConnectionProvider {

if (cont.isActive) cont.resume(null)
} catch (e: Exception) {
runDelayed(2000) { indicator.stop() }
Dialogs.error(
e.message ?: "Could not connect to workspace.",
"Connection Error"
)
if (indicator.isCanceled) {
indicator.text2 = "Error: ${e.message}"
runDelayed(2000) { if (indicator.isRunning) indicator.stop() }
} else {
runDelayed(2000) { if (indicator.isRunning) indicator.stop() }
Dialogs.error(
e.message ?: "Could not connect to workspace.",
"Connection Error"
)
}
cont.resume(null)
}
},
Expand All @@ -121,8 +126,8 @@ class DevSpacesConnectionProvider : GatewayConnectionProvider {
indicator.text = "Remote IDE has started successfully"
indicator.text2 = "Opening project window…"
runDelayed(3000) {
indicator.stop()
ready.complete(handle)
if (indicator.isRunning) indicator.stop()
if (ready.isActive) ready.complete(handle)
}
}
}
Expand All @@ -136,8 +141,8 @@ class DevSpacesConnectionProvider : GatewayConnectionProvider {
if (!ready.isCompleted) {
indicator.text = "Failed to open remote project (code: $errorCode)"
runDelayed(2000) {
indicator.stop()
ready.complete(null)
if (indicator.isRunning) indicator.stop()
if (ready.isActive) ready.complete(null)
}
}
}
Expand All @@ -151,8 +156,8 @@ class DevSpacesConnectionProvider : GatewayConnectionProvider {
if (!ready.isCompleted) {
indicator.text = "Remote IDE closed unexpectedly."
runDelayed(2000) {
indicator.stop()
ready.complete(null)
if (indicator.isRunning) indicator.stop()
if (ready.isActive) ready.complete(null)
}
}
}
Expand Down Expand Up @@ -190,7 +195,18 @@ class DevSpacesConnectionProvider : GatewayConnectionProvider {

indicator.text2 = "Establishing remote IDE connection…"
val thinClient = DevSpacesConnection(ctx)
.connect({}, {}, {})
.connect({}, {}, {},
onProgress = { message ->
if (!message.isEmpty()) {
indicator.text2 = message
}
},
checkCancelled = {
if (indicator.isCanceled) {
throw CancellationException("User cancelled the operation")
}
}
)

indicator.text2 = "Connection established successfully."
return DevSpacesConnectionHandle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.kubernetes.client.util.PatchUtils
import io.kubernetes.client.util.Watch
import java.io.IOException
import java.util.concurrent.Executors
import java.util.concurrent.CancellationException
import java.util.concurrent.TimeUnit

class DevWorkspaces(private val client: ApiClient) {
Expand Down Expand Up @@ -141,44 +141,34 @@
doPatch(namespace, name, patch)
}

@Throws(ApiException::class, IOException::class)
@Throws(ApiException::class, IOException::class, CancellationException::class)
fun waitPhase(
namespace: String,
name: String,
desiredPhase: String,
timeout: Long
timeout: Long, // in seconds
checkCancelled: (() -> Unit)? = null,
pollInterval: Long = 1 // in seconds
): Boolean {
var phaseIsDesiredState = false
val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeout)

val watcher = createWatcher(namespace, "metadata.name=$name")
val executor = Executors.newSingleThreadScheduledExecutor()
executor.schedule(
{
try {
for (item in watcher) {
val devWorkspace = DevWorkspace.from(item.`object`)
if (desiredPhase == devWorkspace.phase) {
phaseIsDesiredState = true
break
}
}
} finally {
watcher.close()
executor.shutdown()
}
},
0,
TimeUnit.SECONDS
)
while (System.currentTimeMillis() < deadline) {
checkCancelled?.invoke() // Will throw CancellationException if cancelled

try {
executor.awaitTermination(timeout, TimeUnit.SECONDS)
} finally {
watcher.close()
executor.shutdown()
val devWorkspace = get(namespace, name)
if (devWorkspace.phase == desiredPhase) {
return true
}

try {
TimeUnit.SECONDS.sleep(pollInterval)
} catch (e: InterruptedException) {
Thread.currentThread().interrupt()
throw CancellationException("Polling interrupted")
}
}

return phaseIsDesiredState
return false
}

// Example:
Expand All @@ -204,7 +194,7 @@

// Example:
// https://github.com/kubernetes-client/java/blob/master/examples/examples-release-20/src/main/java/io/kubernetes/client/examples/WatchExample.java
private fun createWatcher(namespace: String, fieldSelector: String = "", labelSelector: String = ""): Watch<Any> {

Check warning on line 197 in src/main/kotlin/com/redhat/devtools/gateway/openshift/DevWorkspaces.kt

View workflow job for this annotation

GitHub Actions / Inspect code

Unused symbol

Function "createWatcher" is never used
return Watch.createWatch(
client,
customApi.listNamespacedCustomObject(
Expand Down
Loading
Loading