Skip to content

Commit

Permalink
Introduce new transport API and migrate internals to use it under the…
Browse files Browse the repository at this point in the history
… hood
  • Loading branch information
whyoleg committed Nov 16, 2023
1 parent 3e48909 commit b771697
Show file tree
Hide file tree
Showing 13 changed files with 381 additions and 73 deletions.
1 change: 1 addition & 0 deletions build-logic/src/main/kotlin/rsocketbuild/TestOptIn.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ fun LanguageSettingsBuilder.optInForTest() {
optIn("kotlinx.coroutines.DelicateCoroutinesApi")

optIn("io.rsocket.kotlin.TransportApi")
optIn("io.rsocket.kotlin.transport.RSocketTransportApi")
optIn("io.rsocket.kotlin.ExperimentalMetadataApi")
optIn("io.rsocket.kotlin.ExperimentalStreamsApi")
optIn("io.rsocket.kotlin.RSocketLoggingApi")
Expand Down
56 changes: 56 additions & 0 deletions rsocket-core/api/rsocket-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public abstract interface class io/rsocket/kotlin/core/MimeTypeWithName : io/rso

public final class io/rsocket/kotlin/core/RSocketConnector {
public final fun connect (Lio/rsocket/kotlin/transport/ClientTransport;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun connect (Lio/rsocket/kotlin/transport/RSocketClientTransport;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/rsocket/kotlin/core/RSocketConnectorBuilder {
Expand Down Expand Up @@ -241,6 +242,8 @@ public final class io/rsocket/kotlin/core/RSocketConnectorBuilderKt {
public final class io/rsocket/kotlin/core/RSocketServer {
public final fun bind (Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object;
public final fun bindIn (Lkotlinx/coroutines/CoroutineScope;Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object;
public final fun createAcceptor (Lio/rsocket/kotlin/ConnectionAcceptor;)Lio/rsocket/kotlin/transport/RSocketServerAcceptor;
public final fun start (Lio/rsocket/kotlin/transport/RSocketServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/rsocket/kotlin/core/RSocketServerBuilder {
Expand Down Expand Up @@ -820,6 +823,59 @@ public final class io/rsocket/kotlin/transport/ClientTransportKt {
public static final fun ClientTransport (Lkotlin/coroutines/CoroutineContext;Lio/rsocket/kotlin/transport/ClientTransport;)Lio/rsocket/kotlin/transport/ClientTransport;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketClientTransport : io/rsocket/kotlin/transport/RSocketTransport {
public abstract fun createSession (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketServerAcceptor {
public abstract fun acceptSession (Lio/rsocket/kotlin/transport/RSocketTransportSession;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketServerInstance : kotlinx/coroutines/CoroutineScope {
}

public abstract interface class io/rsocket/kotlin/transport/RSocketServerTransport : io/rsocket/kotlin/transport/RSocketTransport {
public abstract fun startServer (Lio/rsocket/kotlin/transport/RSocketServerAcceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketTransport : kotlinx/coroutines/CoroutineScope {
}

public abstract interface annotation class io/rsocket/kotlin/transport/RSocketTransportApi : java/lang/annotation/Annotation {
}

public abstract interface class io/rsocket/kotlin/transport/RSocketTransportBuilder {
public abstract fun buildTransport (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketTransport;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketTransportEngine : kotlinx/coroutines/CoroutineScope {
public abstract fun createTransport (Ljava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketTransport;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketTransportEngineBuilder {
public abstract fun buildEngine (Lkotlin/coroutines/CoroutineContext;)Lio/rsocket/kotlin/transport/RSocketTransportEngine;
}

public abstract class io/rsocket/kotlin/transport/RSocketTransportEngineFactory {
public fun <init> (Lkotlin/jvm/functions/Function0;)V
public final fun invoke (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/RSocketTransportEngine;
public static synthetic fun invoke$default (Lio/rsocket/kotlin/transport/RSocketTransportEngineFactory;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketTransportEngine;
}

public abstract class io/rsocket/kotlin/transport/RSocketTransportFactory {
public fun <init> (Lkotlin/jvm/functions/Function0;)V
public final fun invoke (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/RSocketTransport;
public static synthetic fun invoke$default (Lio/rsocket/kotlin/transport/RSocketTransportFactory;Lkotlin/coroutines/CoroutineContext;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketTransport;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketTransportSession : kotlinx/coroutines/CoroutineScope {
}

public abstract interface class io/rsocket/kotlin/transport/RSocketTransportSession$Sequential : io/rsocket/kotlin/transport/RSocketTransportSession {
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun sendFrame (Lio/ktor/utils/io/core/ByteReadPacket;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/rsocket/kotlin/transport/ServerTransport {
public abstract fun start (Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function3;)Ljava/lang/Object;
}
Expand Down
13 changes: 0 additions & 13 deletions rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package io.rsocket.kotlin
import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.*
import kotlinx.coroutines.*

/**
Expand All @@ -35,14 +33,3 @@ public interface Connection : CoroutineScope {
public suspend fun send(packet: ByteReadPacket)
public suspend fun receive(): ByteReadPacket
}

@Suppress("DEPRECATION")
@OptIn(TransportApi::class)
internal suspend inline fun <T> Connection.receiveFrame(bufferPool: ObjectPool<ChunkBuffer>, block: (frame: Frame) -> T): T =
receive().readFrame(bufferPool).closeOnError(block)

@Suppress("DEPRECATION")
@OptIn(TransportApi::class)
internal suspend fun Connection.sendFrame(bufferPool: ObjectPool<ChunkBuffer>, frame: Frame) {
frame.toPacket(bufferPool).closeOnError { send(it) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.logging.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

@OptIn(TransportApi::class, RSocketLoggingApi::class)
@OptIn(TransportApi::class, RSocketTransportApi::class, RSocketLoggingApi::class)
public class RSocketConnector internal constructor(
private val loggerFactory: LoggerFactory,
private val maxFragmentSize: Int,
Expand All @@ -37,19 +38,33 @@ public class RSocketConnector internal constructor(
@Suppress("DEPRECATION") private val bufferPool: ObjectPool<ChunkBuffer>,
) {

public suspend fun connect(transport: ClientTransport): RSocket = when (reconnectPredicate) {
//TODO current coroutineContext job is overriden by transport coroutineContext jov
null -> withContext(transport.coroutineContext) { connectOnce(transport) }
else -> connectWithReconnect(
transport.coroutineContext,
loggerFactory.logger("io.rsocket.kotlin.connection"),
{ connectOnce(transport) },
reconnectPredicate,
)
private val connectionLogger = loggerFactory.logger("io.rsocket.kotlin.connection")
private val frameLogger = loggerFactory.logger("io.rsocket.kotlin.frame")

public suspend fun connect(transport: ClientTransport): RSocket = connect(object : RSocketClientTransport {
override val coroutineContext: CoroutineContext get() = transport.coroutineContext
override suspend fun createSession(): RSocketTransportSession {
return interceptors.wrapConnection(transport.connect()).convert()
}
})

public suspend fun connect(transport: RSocketClientTransport): RSocket {
return when (reconnectPredicate) {
//TODO current coroutineContext job is overriden by transport coroutineContext jov
null -> withContext(transport.coroutineContext) { connectOnce(transport) }
else -> connectWithReconnect(
transport.coroutineContext,
connectionLogger,
{ connectOnce(transport) },
reconnectPredicate,
)
}
}

private suspend fun connectOnce(transport: ClientTransport): RSocket {
val connection = transport.connect().wrapConnection()
private suspend fun connectOnce(transport: RSocketClientTransport): RSocket {
val connection = when (val connection = transport.createSession()) {
is RSocketTransportSession.Sequential -> connection.logging(frameLogger)
}
val connectionConfig = try {
connectionConfigProvider()
} catch (cause: Throwable) {
Expand Down Expand Up @@ -83,8 +98,4 @@ public class RSocketConnector internal constructor(
throw cause
}
}

private fun Connection.wrapConnection(): Connection =
interceptors.wrapConnection(this)
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ import io.rsocket.kotlin.logging.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*

@OptIn(TransportApi::class, RSocketLoggingApi::class)
@OptIn(TransportApi::class, RSocketTransportApi::class, RSocketLoggingApi::class)
public class RSocketServer internal constructor(
private val loggerFactory: LoggerFactory,
private val maxFragmentSize: Int,
private val interceptors: Interceptors,
@Suppress("DEPRECATION") private val bufferPool: ObjectPool<ChunkBuffer>,
) {

private val frameLogger = loggerFactory.logger("io.rsocket.kotlin.frame")

@DelicateCoroutinesApi
public fun <T> bind(
transport: ServerTransport<T>,
Expand All @@ -46,46 +48,59 @@ public class RSocketServer internal constructor(
acceptor: ConnectionAcceptor,
): T = with(transport) {
scope.start {
it.wrapConnection().bind(acceptor).join()
interceptors.wrapConnection(it).convert().bind(acceptor).join()
}
}

public suspend fun <T : RSocketServerInstance> start(
transport: RSocketServerTransport<T>,
acceptor: ConnectionAcceptor,
): T = transport.startServer(createAcceptor(acceptor))

@RSocketTransportApi
public fun createAcceptor(acceptor: ConnectionAcceptor): RSocketServerAcceptor = object : RSocketServerAcceptor {
override suspend fun acceptSession(session: RSocketTransportSession) {
when (session) {
is RSocketTransportSession.Sequential -> session.bind(acceptor)
}
}
}

private suspend fun Connection.bind(acceptor: ConnectionAcceptor): Job = receiveFrame(bufferPool) { setupFrame ->
when {
setupFrame !is SetupFrame -> failSetup(RSocketError.Setup.Invalid("Invalid setup frame: ${setupFrame.type}"))
setupFrame.version != Version.Current -> failSetup(RSocketError.Setup.Unsupported("Unsupported version: ${setupFrame.version}"))
setupFrame.honorLease -> failSetup(RSocketError.Setup.Unsupported("Lease is not supported"))
setupFrame.resumeToken != null -> failSetup(RSocketError.Setup.Unsupported("Resume is not supported"))
else -> try {
connect(
connection = this,
isServer = true,
maxFragmentSize = maxFragmentSize,
interceptors = interceptors,
connectionConfig = ConnectionConfig(
keepAlive = setupFrame.keepAlive,
payloadMimeType = setupFrame.payloadMimeType,
setupPayload = setupFrame.payload
),
acceptor = acceptor,
bufferPool = bufferPool
)
coroutineContext.job
} catch (e: Throwable) {
failSetup(RSocketError.Setup.Rejected(e.message ?: "Rejected by server acceptor"))
private suspend fun RSocketTransportSession.Sequential.bind(
acceptor: ConnectionAcceptor,
): Job = with(logging(frameLogger)) {
receiveFrame(bufferPool) { setupFrame ->
when {
setupFrame !is SetupFrame -> failSetup(RSocketError.Setup.Invalid("Invalid setup frame: ${setupFrame.type}"))
setupFrame.version != Version.Current -> failSetup(RSocketError.Setup.Unsupported("Unsupported version: ${setupFrame.version}"))
setupFrame.honorLease -> failSetup(RSocketError.Setup.Unsupported("Lease is not supported"))
setupFrame.resumeToken != null -> failSetup(RSocketError.Setup.Unsupported("Resume is not supported"))
else -> try {
connect(
connection = this,
isServer = true,
maxFragmentSize = maxFragmentSize,
interceptors = interceptors,
connectionConfig = ConnectionConfig(
keepAlive = setupFrame.keepAlive,
payloadMimeType = setupFrame.payloadMimeType,
setupPayload = setupFrame.payload
),
acceptor = acceptor,
bufferPool = bufferPool
)
coroutineContext.job
} catch (e: Throwable) {
failSetup(RSocketError.Setup.Rejected(e.message ?: "Rejected by server acceptor"))
}
}
}
}

@Suppress("SuspendFunctionOnCoroutineScope")
private suspend fun Connection.failSetup(error: RSocketError.Setup): Nothing {
private suspend fun RSocketTransportSession.Sequential.failSetup(error: RSocketError.Setup): Nothing {
sendFrame(bufferPool, ErrorFrame(0, error))
cancel("Connection establishment failed", error)
throw error
}

private fun Connection.wrapConnection(): Connection =
interceptors.wrapConnection(this)
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.core

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.transport.*
import kotlin.coroutines.*

@TransportApi
@RSocketTransportApi
internal fun Connection.convert(): RSocketTransportSession.Sequential = object : RSocketTransportSession.Sequential {
override val coroutineContext: CoroutineContext
get() = this@convert.coroutineContext

override suspend fun sendFrame(frame: ByteReadPacket) {
send(frame)
}

override suspend fun receiveFrame(): ByteReadPacket {
return receive()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*

@OptIn(TransportApi::class)
@RSocketTransportApi
internal suspend inline fun connect(
connection: Connection,
connection: RSocketTransportSession.Sequential,
isServer: Boolean,
maxFragmentSize: Int,
interceptors: Interceptors,
Expand Down Expand Up @@ -99,10 +100,26 @@ internal suspend inline fun connect(
is LeaseFrame -> frame.close().also { error("lease isn't implemented") }
else -> frame.close()
}

else -> streamsStorage.handleFrame(frame, responder)
}
}
}

return requester
}

@RSocketTransportApi
internal suspend inline fun <T> RSocketTransportSession.Sequential.receiveFrame(
@Suppress("DEPRECATION") bufferPool: ObjectPool<ChunkBuffer>,
block: (frame: Frame) -> T,
): T =
receiveFrame().readFrame(bufferPool).closeOnError(block)

@RSocketTransportApi
internal suspend fun RSocketTransportSession.Sequential.sendFrame(
@Suppress("DEPRECATION") bufferPool: ObjectPool<ChunkBuffer>,
frame: Frame,
) {
frame.toPacket(bufferPool).closeOnError { sendFrame(it) }
}

0 comments on commit b771697

Please sign in to comment.