Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: missing vertx #1003

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 7 additions & 13 deletions core/src/main/kotlin/spp/jetbrains/status/SourceStatusService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
*/
package spp.jetbrains.status

import com.intellij.openapi.diagnostic.logger
import com.intellij.openapi.project.Project
import com.intellij.openapi.util.Key
import com.intellij.openapi.util.Pair
import spp.jetbrains.UserData
import io.vertx.core.Vertx
import spp.protocol.platform.general.Service

/**
Expand All @@ -30,7 +29,6 @@ import spp.protocol.platform.general.Service
*/
interface SourceStatusService {
companion object {
private val log = logger<SourceStatusService>()
val KEY = Key.create<SourceStatusService>("SPP_SOURCE_STATUS_SERVICE")

@JvmStatic
Expand All @@ -44,6 +42,7 @@ interface SourceStatusService {
}
}

val vertx: Vertx
val project: Project

fun isReady(): Boolean
Expand All @@ -56,17 +55,12 @@ interface SourceStatusService {
fun getCurrentService(): Service?
fun setCurrentService(service: Service)
fun setActiveServices(services: List<Service>)
fun onStatusChange(triggerInitial: Boolean = true, listener: (SourceStatus) -> Unit)
fun onServiceChange(triggerInitial: Boolean = true, listener: () -> Unit)
fun onReadyChange(triggerInitial: Boolean = true, listener: (SourceStatus) -> Unit)
suspend fun start(initialService: String?)
fun onStatusChange(triggerInitial: Boolean = true, listener: suspend (SourceStatus) -> Unit)
fun onServiceChange(triggerInitial: Boolean = true, listener: suspend () -> Unit)
fun onReadyChange(triggerInitial: Boolean = true, listener: suspend (SourceStatus) -> Unit)
suspend fun start(vertx: Vertx, initialService: String?)

fun publishStatus(status: SourceStatus) {
if (!UserData.hasVertx(project)) {
log.warn("Vert.x unavailable. Ignoring status: $status")
return
}

UserData.vertx(project).eventBus().publish("spp.status", status)
vertx.eventBus().publish("spp.status", status)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ import io.vertx.servicediscovery.ServiceDiscoveryOptions
import io.vertx.servicediscovery.impl.DiscoveryImpl
import io.vertx.serviceproxy.ServiceProxyBuilder
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.Job
import liveplugin.implementation.LivePluginProjectLoader
import org.apache.commons.text.CaseUtils
import spp.jetbrains.PluginBundle.message
Expand All @@ -67,7 +66,6 @@ import spp.jetbrains.marker.plugin.LivePluginService
import spp.jetbrains.marker.plugin.LiveStatusBarManager
import spp.jetbrains.marker.plugin.SourceInlayHintProvider
import spp.jetbrains.marker.plugin.SourceMarkerStartupActivity
import spp.jetbrains.safeLaunch
import spp.jetbrains.sourcemarker.command.status.LiveStatusBarManagerImpl
import spp.jetbrains.sourcemarker.config.SourceMarkerConfig
import spp.jetbrains.sourcemarker.config.getServicePortNormalized
Expand Down Expand Up @@ -114,19 +112,32 @@ class SourceMarkerPlugin : SourceMarkerStartupActivity() {
if (project.getUserData(KEY) == null) {
val plugin = SourceMarkerPlugin()
plugin.project = project
plugin.setupVertx()

project.putUserData(SourceStatusService.KEY, SourceStatusServiceImpl(project))
project.putUserData(KEY, plugin)
}
return project.getUserData(KEY)!!
}
}

private lateinit var vertx: Vertx
private lateinit var project: Project
private var loadLivePluginsLock = ReentrantLock()
private var connectionJob: Job? = null
private var discovery: ServiceDiscovery? = null
private var addedConfigListener = false

private fun setupVertx() {
val options = if (System.getProperty("spp.debug.unblocked_threads", "false")!!.toBoolean()) {
log.info("Removed blocked thread checker")
VertxOptions().setBlockedThreadCheckInterval(Int.MAX_VALUE.toLong())
} else {
VertxOptions()
}
vertx = UserData.vertx(project, Vertx.vertx(options))
vertx.eventBus().registerDefaultCodec(SourceStatus::class.java, LocalMessageCodec())
}

override fun runActivity(project: Project) {
if (ApplicationManager.getApplication().isUnitTestMode) {
return //tests manually set up necessary components
Expand All @@ -145,7 +156,6 @@ class SourceMarkerPlugin : SourceMarkerStartupActivity() {

suspend fun init(configInput: SourceMarkerConfig? = null) {
log.info("Initializing SourceMarkerPlugin on project: $project")
disposePlugin()
Disposer.register(project) {
safeRunBlocking {
try {
Expand All @@ -156,73 +166,59 @@ class SourceMarkerPlugin : SourceMarkerStartupActivity() {
}
}

val options = if (System.getProperty("spp.debug.unblocked_threads", "false")!!.toBoolean()) {
log.info("Removed blocked thread checker")
VertxOptions().setBlockedThreadCheckInterval(Int.MAX_VALUE.toLong())
} else {
VertxOptions()
}
val vertx = UserData.vertx(project, Vertx.vertx(options))
vertx.eventBus().registerDefaultCodec(SourceStatus::class.java, LocalMessageCodec())

LivePluginProjectLoader.projectOpened(project)

val config = configInput ?: getConfig()
addSppPluginConfigChangeListener()

connectionJob?.cancel()
connectionJob = null

connectionJob = vertx.safeLaunch {
try {
initServices(vertx, config)
SourceStatusService.getInstance(project).start(config.serviceName)

if (!config.notifiedConnection) {
val pluginName = message("plugin_name")
Notifications.Bus.notify(
Notification(
message("plugin_name"), "Connection established",
"You have successfully connected. $pluginName is now fully activated.",
NotificationType.INFORMATION
),
project
)
config.notifiedConnection = true
try {
initServices(config)
SourceStatusService.getInstance(project).start(vertx, config.serviceName)

if (!config.notifiedConnection) {
val pluginName = message("plugin_name")
Notifications.Bus.notify(
Notification(
message("plugin_name"), "Connection established",
"You have successfully connected. $pluginName is now fully activated.",
NotificationType.INFORMATION
),
project
)
config.notifiedConnection = true

val projectSettings = PropertiesComponent.getInstance(project)
projectSettings.setValue("sourcemarker_plugin_config", Json.encode(config))
}
} catch (ignored: CancellationException) {
} catch (throwable: Throwable) {
SourceStatusService.getInstance(project).update(ConnectionError, throwable.message)
log.warn("Connection failed", throwable)
return@safeLaunch
val projectSettings = PropertiesComponent.getInstance(project)
projectSettings.setValue("sourcemarker_plugin_config", Json.encode(config))
}
} catch (ignored: CancellationException) {
} catch (throwable: Throwable) {
SourceStatusService.getInstance(project).update(ConnectionError, throwable.message)
log.warn("Connection failed", throwable)
return
}

val pluginsPromise = Promise.promise<Nothing>()
ProgressManager.getInstance()
.run(object : Task.Backgroundable(project, "Loading Source++ plugins", false, ALWAYS_BACKGROUND) {
override fun run(indicator: ProgressIndicator) {
if (loadLivePluginsLock.tryLock()) {
SourceStatusService.getInstance(project).onStatusChange {
if (it == PluginsLoaded) {
initMarker(vertx)
}
val pluginsPromise = Promise.promise<Nothing>()
ProgressManager.getInstance()
.run(object : Task.Backgroundable(project, "Loading Source++ plugins", false, ALWAYS_BACKGROUND) {
override fun run(indicator: ProgressIndicator) {
if (loadLivePluginsLock.tryLock()) {
SourceStatusService.getInstance(project).onStatusChange {
if (it == PluginsLoaded) {
initMarker()
}

log.info("Loading live plugins for project: $project")
project.getUserData(LivePluginService.LIVE_PLUGIN_LOADER)!!.invoke()
log.info("Loaded live plugins for project: $project")
pluginsPromise.complete()
loadLivePluginsLock.unlock()
} else {
log.warn("Ignoring extraneous live plugins load request for project: $project")
}

log.info("Loading live plugins for project: $project")
project.getUserData(LivePluginService.LIVE_PLUGIN_LOADER)!!.invoke()
log.info("Loaded live plugins for project: $project")
pluginsPromise.complete()
loadLivePluginsLock.unlock()
} else {
log.warn("Ignoring extraneous live plugins load request for project: $project")
}
})
pluginsPromise.future().await()
}
}
})
pluginsPromise.future().await()
}

private fun addSppPluginConfigChangeListener() {
Expand Down Expand Up @@ -331,9 +327,9 @@ class SourceMarkerPlugin : SourceMarkerStartupActivity() {
return null
}

private suspend fun discoverAvailableServices(vertx: Vertx, config: SourceMarkerConfig) {
private suspend fun discoverAvailableServices(config: SourceMarkerConfig) {
SourceStatusService.getInstance(project).update(Pending, "Discovering available services")
setupServiceDiscoveryBackend(vertx, config)
setupServiceDiscoveryBackend(config)

val liveStatusManager = LiveStatusBarManagerImpl(project, vertx)
project.putUserData(LiveStatusBarManager.KEY, liveStatusManager)
Expand Down Expand Up @@ -407,7 +403,7 @@ class SourceMarkerPlugin : SourceMarkerStartupActivity() {
}
}

private fun setupServiceDiscoveryBackend(vertx: Vertx, config: SourceMarkerConfig) {
private fun setupServiceDiscoveryBackend(config: SourceMarkerConfig) {
val originalClassLoader = Thread.currentThread().contextClassLoader
try {
Thread.currentThread().contextClassLoader = javaClass.classLoader
Expand Down Expand Up @@ -442,24 +438,24 @@ class SourceMarkerPlugin : SourceMarkerStartupActivity() {
UserData.clear(project)
}

private suspend fun initServices(vertx: Vertx, config: SourceMarkerConfig) {
private suspend fun initServices(config: SourceMarkerConfig) {
SourceStatusService.getInstance(project).update(Pending, "Logging in")

if (!config.serviceHost.isNullOrBlank()) {
connectToConfiguredService(vertx, config)
connectToConfiguredService(config)
} else {
//try default local access
try {
tryDefaultAccess(vertx, true, config)
tryDefaultAccess(true, config)
} catch (ignore: SSLHandshakeException) {
tryDefaultAccess(vertx, false, config)
tryDefaultAccess(false, config)
} catch (e: Exception) {
log.warn("Unable to find local live platform", e)
}
}
}

private suspend fun connectToConfiguredService(vertx: Vertx, config: SourceMarkerConfig) {
private suspend fun connectToConfiguredService(config: SourceMarkerConfig) {
val certificatePins = mutableListOf<String>()
certificatePins.addAll(config.certificatePins)
val httpClientOptions = if (certificatePins.isNotEmpty()) {
Expand Down Expand Up @@ -513,13 +509,13 @@ class SourceMarkerPlugin : SourceMarkerStartupActivity() {
config.accessToken = body
}

discoverAvailableServices(vertx, config)
discoverAvailableServices(config)
} else {
error("Error getting service token: ${resp.statusCode()} ${resp.statusMessage()}")
}
}

private suspend fun tryDefaultAccess(vertx: Vertx, ssl: Boolean, config: SourceMarkerConfig) {
private suspend fun tryDefaultAccess(ssl: Boolean, config: SourceMarkerConfig) {
val defaultAuthorizationCode = "change-me"
val tokenUri = "/api/new-token?authorization_code=$defaultAuthorizationCode"
val req = vertx.createHttpClient(HttpClientOptions().setSsl(ssl).setVerifyHost(false).setTrustAll(true))
Expand Down Expand Up @@ -549,7 +545,7 @@ class SourceMarkerPlugin : SourceMarkerStartupActivity() {
val projectSettings = PropertiesComponent.getInstance(project)
projectSettings.setValue("sourcemarker_plugin_config", Json.encode(config))

discoverAvailableServices(vertx, config)
discoverAvailableServices(config)

//auto-established notification
Notifications.Bus.notify(
Expand All @@ -570,7 +566,7 @@ class SourceMarkerPlugin : SourceMarkerStartupActivity() {
}
}

private fun initMarker(vertx: Vertx) {
private fun initMarker() {
log.info("Initializing marker")
val originalClassLoader = Thread.currentThread().contextClassLoader
try {
Expand Down