Skip to content

Commit

Permalink
Fix reconnection issues
Browse files Browse the repository at this point in the history
- Correctly handle exceptions during handshake
- Clearify error messages
- Correctly handle exceptions after abnormal connection closes
- Update dependencies
  • Loading branch information
DRSchlaubi committed May 15, 2023
1 parent a53fb77 commit c5a4af3
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ public abstract class AbstractLavakord internal constructor(
contentConverter = KotlinxWebsocketSerializationConverter(json)
}

expectSuccess = true

install(HttpTimeout)
commonConfig()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import io.ktor.serialization.*
import io.ktor.websocket.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
Expand All @@ -27,7 +28,7 @@ internal val LOG = KotlinLogging.logger { }

private data class SessionIdContainer(private var value: String? = null) : ReadWriteProperty<Any?, String> {
override fun getValue(thisRef: Any?, property: KProperty<*>): String = value
?: error("WebSocket connection is not ready yet, please wait for the connection to finish")
?: error("WebSocket connection is not ready yet, please wait for the handshake to finish")

override fun setValue(thisRef: Any?, property: KProperty<*>, value: String) {
this.value = value
Expand Down Expand Up @@ -58,41 +59,46 @@ internal class NodeImpl(
get() = eventPublisher.asSharedFlow()

internal suspend fun connect(resume: Boolean = false) {
session = try {
connect(resume) {
addUrl()
timeout {
requestTimeoutMillis = HttpTimeout.INFINITE_TIMEOUT_MS
}
header("Authorization", authenticationHeader)
header("User-Id", lavakord.userId)
header("Client-Name", "Lavalink.kt")
if (resume) {
header("Resume-Key", resumeKey)
try {
session = try {
connect(resume) {
addUrl()
timeout {
requestTimeoutMillis = HttpTimeout.INFINITE_TIMEOUT_MS
}
header("Authorization", authenticationHeader)
header("User-Id", lavakord.userId)
header("Client-Name", "Lavalink.kt")
if (resume) {
header("Resume-Key", resumeKey)
}
}
} catch (e: ReconnectException) {
reconnect(e.cause, resume)
return
}
} catch (e: ReconnectException) {
reconnect(e.cause, resume)
return
}

retry.reset()
available = true
retry.reset()
available = true

LOG.debug { "Successfully connected to node: $name ($host)" }
LOG.debug { "Successfully connected to node: $name ($host)" }

while (!session.incoming.isClosedForReceive) {
try {
onEvent(session.receiveDeserialized())
} catch (e: WebsocketDeserializeException) {
LOG.warn(e) { "An error occurred whilst decoding incoming websocket packet" }
while (!session.incoming.isClosedForReceive) {
try {
onEvent(session.receiveDeserialized())
} catch (e: WebsocketDeserializeException) {
LOG.warn(e) { "An error occurred whilst decoding incoming websocket packet" }
}
}
val reason = session.closeReason.await()
if (reason?.knownReason == CloseReason.Codes.NORMAL) return
available = false
LOG.warn { "Disconnected from websocket for: $reason. Music will continue playing if we can reconnect within the next $resumeTimeout seconds" }
reconnect(resume = true)
} catch (e: ClosedReceiveChannelException) {
LOG.warn(e) { "WebSocket connection was closed abnormally" }
reconnect(resume = true)
}
val reason = session.closeReason.await()
if (reason?.knownReason == CloseReason.Codes.NORMAL) return
available = false
LOG.warn { "Disconnected from websocket for: $reason. Music will continue playing if we can reconnect within the next $resumeTimeout seconds" }
reconnect(resume = true)
}

private suspend fun reconnect(e: Throwable? = null, resume: Boolean = false) {
Expand Down
1 change: 1 addition & 0 deletions example/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
implementation(libs.kord.core)

implementation("org.apache.groovy:groovy-all:4.0.7")
implementation("org.jetbrains.kotlinx:atomicfu:0.20.2")
kapt("dev.kord.x:commands-processor:0.4.0-SNAPSHOT")

implementation("net.dv8tion:JDA:5.0.0-alpha.3") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ suspend fun main() {
val listenedGuilds = mutableListOf<Snowflake>()
lavalink = kord.lavakord()

lavalink.addNode("wss://9764-5-253-115-167.eu.ngrok.io", "youshallnotpass")
lavalink.addNode("wss://schlaubi.eu.ngrok.io", "youshallnotpass")

kord.on<MessageCreateEvent> {
val args = message.content.split(" ")
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.0.2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.1-bin.zip
networkTimeout=10000
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
6 changes: 3 additions & 3 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,16 @@ dependencyResolutionManagement {
}

fun VersionCatalogBuilder.kotlinx() {
val coroutines = version("coroutines", "1.6.4")
val coroutines = version("coroutines", "1.7.1")
library("kotlinx-coroutines-core", "org.jetbrains.kotlinx", "kotlinx-coroutines-core").versionRef(coroutines)
library("kotlinx-coroutines-jdk8", "org.jetbrains.kotlinx", "kotlinx-coroutines-jdk8").versionRef(coroutines)
library("kotlinx-coroutines-test", "org.jetbrains.kotlinx", "kotlinx-coroutines-test").versionRef(coroutines)
library("kotlinx-serialization-json", "org.jetbrains.kotlinx", "kotlinx-serialization-json").version("1.5.0")
library("kotlinx-serialization-json", "org.jetbrains.kotlinx", "kotlinx-serialization-json").version("1.5.1")
library("kotlinx-datetime", "org.jetbrains.kotlinx", "kotlinx-datetime").version("0.4.0")
}

fun VersionCatalogBuilder.ktor() {
val ktor = version("ktor", "2.2.4")
val ktor = version("ktor", "2.3.0")
library("ktor-io", "io.ktor", "ktor-io").versionRef(ktor)
library("ktor-utils", "io.ktor", "ktor-utils").versionRef(ktor)
library("ktor-client-websockets", "io.ktor", "ktor-client-websockets").versionRef(ktor)
Expand Down

0 comments on commit c5a4af3

Please sign in to comment.