Skip to content

Commit

Permalink
Rework transport and internal implementation to support multiplexed t…
Browse files Browse the repository at this point in the history
…ransports

* new Transport API
* migrate local to new Transport API and introduce multiplexed kind
* fix some tests to use port 0 to auto-assign port
  • Loading branch information
whyoleg committed May 13, 2024
1 parent 167ed77 commit 87696ca
Show file tree
Hide file tree
Showing 83 changed files with 3,416 additions and 1,648 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ kotlin {

// rsocket related
optIn(OptIns.TransportApi)
optIn(OptIns.RSocketTransportApi)
optIn(OptIns.ExperimentalMetadataApi)
optIn(OptIns.ExperimentalStreamsApi)
optIn(OptIns.RSocketLoggingApi)
Expand Down
1 change: 1 addition & 0 deletions build-logic/src/main/kotlin/rsocketbuild/OptIns.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ object OptIns {
const val DelicateCoroutinesApi = "kotlinx.coroutines.DelicateCoroutinesApi"

const val TransportApi = "io.rsocket.kotlin.TransportApi"
const val RSocketTransportApi = "io.rsocket.kotlin.transport.RSocketTransportApi"
const val ExperimentalMetadataApi = "io.rsocket.kotlin.ExperimentalMetadataApi"
const val ExperimentalStreamsApi = "io.rsocket.kotlin.ExperimentalStreamsApi"
const val RSocketLoggingApi = "io.rsocket.kotlin.RSocketLoggingApi"
Expand Down
67 changes: 67 additions & 0 deletions rsocket-core/api/rsocket-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ public abstract interface class io/rsocket/kotlin/core/MimeTypeWithName : io/rso

public final class io/rsocket/kotlin/core/RSocketConnector {
public final fun connect (Lio/rsocket/kotlin/transport/ClientTransport;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun connect (Lio/rsocket/kotlin/transport/RSocketClientTarget;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/rsocket/kotlin/core/RSocketConnectorBuilder {
Expand Down Expand Up @@ -228,6 +229,8 @@ public final class io/rsocket/kotlin/core/RSocketConnectorBuilderKt {
public final class io/rsocket/kotlin/core/RSocketServer {
public final fun bind (Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object;
public final fun bindIn (Lkotlinx/coroutines/CoroutineScope;Lio/rsocket/kotlin/transport/ServerTransport;Lio/rsocket/kotlin/ConnectionAcceptor;)Ljava/lang/Object;
public final fun createHandler (Lio/rsocket/kotlin/ConnectionAcceptor;)Lio/rsocket/kotlin/transport/RSocketConnectionHandler;
public final fun startServer (Lio/rsocket/kotlin/transport/RSocketServerTarget;Lio/rsocket/kotlin/ConnectionAcceptor;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/rsocket/kotlin/core/RSocketServerBuilder {
Expand Down Expand Up @@ -760,7 +763,71 @@ public final class io/rsocket/kotlin/transport/ClientTransportKt {
public static final fun ClientTransport (Lkotlin/coroutines/CoroutineContext;Lio/rsocket/kotlin/transport/ClientTransport;)Lio/rsocket/kotlin/transport/ClientTransport;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketClientTarget : kotlinx/coroutines/CoroutineScope {
public abstract fun connectClient (Lio/rsocket/kotlin/transport/RSocketConnectionHandler;)Lkotlinx/coroutines/Job;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketConnection {
}

public abstract interface class io/rsocket/kotlin/transport/RSocketConnectionHandler {
public abstract fun handleConnection (Lio/rsocket/kotlin/transport/RSocketConnection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedConnection : io/rsocket/kotlin/transport/RSocketConnection {
public abstract fun acceptStream (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun createStream (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketMultiplexedConnection$Stream : java/io/Closeable {
public abstract fun close ()V
public abstract fun isClosedForSend ()Z
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun sendFrame (Lio/ktor/utils/io/core/ByteReadPacket;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun setSendPriority (I)V
}

public abstract interface class io/rsocket/kotlin/transport/RSocketSequentialConnection : io/rsocket/kotlin/transport/RSocketConnection {
public abstract fun isClosedForSend ()Z
public abstract fun receiveFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun sendFrame (ILio/ktor/utils/io/core/ByteReadPacket;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketServerInstance : kotlinx/coroutines/CoroutineScope {
}

public abstract interface class io/rsocket/kotlin/transport/RSocketServerTarget : kotlinx/coroutines/CoroutineScope {
public abstract fun startServer (Lio/rsocket/kotlin/transport/RSocketConnectionHandler;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class io/rsocket/kotlin/transport/RSocketTransport : kotlinx/coroutines/CoroutineScope {
}

public abstract interface annotation class io/rsocket/kotlin/transport/RSocketTransportApi : java/lang/annotation/Annotation {
}

public abstract interface class io/rsocket/kotlin/transport/RSocketTransportBuilder {
public abstract fun buildTransport (Lkotlin/coroutines/CoroutineContext;)Lio/rsocket/kotlin/transport/RSocketTransport;
}

public abstract class io/rsocket/kotlin/transport/RSocketTransportFactory {
public fun <init> (Lkotlin/jvm/functions/Function0;)V
public final fun getCreateBuilder ()Lkotlin/jvm/functions/Function0;
public final fun invoke (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)Lio/rsocket/kotlin/transport/RSocketTransport;
public static synthetic fun invoke$default (Lio/rsocket/kotlin/transport/RSocketTransportFactory;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/rsocket/kotlin/transport/RSocketTransport;
}

public abstract interface class io/rsocket/kotlin/transport/ServerTransport {
public abstract fun start (Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function3;)Ljava/lang/Object;
}

public final class io/rsocket/kotlin/transport/internal/PrioritizationFrameQueue {
public fun <init> (I)V
public final fun cancel ()V
public final fun close ()V
public final fun dequeueFrame (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun enqueueFrame (ILio/ktor/utils/io/core/ByteReadPacket;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun isClosedForSend ()Z
public final fun tryDequeueFrame ()Lio/ktor/utils/io/core/ByteReadPacket;
}

12 changes: 0 additions & 12 deletions rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
package io.rsocket.kotlin

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import kotlinx.coroutines.*

/**
Expand All @@ -30,12 +27,3 @@ public interface Connection : CoroutineScope {
public suspend fun send(packet: ByteReadPacket)
public suspend fun receive(): ByteReadPacket
}

@OptIn(TransportApi::class)
internal suspend inline fun <T> Connection.receiveFrame(pool: BufferPool, block: (frame: Frame) -> T): T =
receive().readFrame(pool).closeOnError(block)

@OptIn(TransportApi::class)
internal suspend fun Connection.sendFrame(pool: BufferPool, frame: Frame) {
frame.toPacket(pool).closeOnError { send(it) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.connection

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.operation.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

// TODO: rename to just `Connection` after root `Connection` will be dropped
@RSocketTransportApi
internal abstract class Connection2(
protected val frameCodec: FrameCodec,
// requestContext
final override val coroutineContext: CoroutineContext,
) : RSocket, Closeable {

// connection establishment part

abstract suspend fun establishConnection(handler: ConnectionEstablishmentHandler): ConnectionConfig

// setup completed, start handling requests
abstract suspend fun handleConnection(inbound: ConnectionInbound)

// connection part

protected abstract suspend fun sendConnectionFrame(frame: ByteReadPacket)
private suspend fun sendConnectionFrame(frame: Frame): Unit = sendConnectionFrame(frameCodec.encodeFrame(frame))

suspend fun sendError(cause: Throwable) {
sendConnectionFrame(ErrorFrame(0, cause))
}

private suspend fun sendMetadataPush(metadata: ByteReadPacket) {
sendConnectionFrame(MetadataPushFrame(metadata))
}

suspend fun sendKeepAlive(respond: Boolean, data: ByteReadPacket, lastPosition: Long) {
sendConnectionFrame(KeepAliveFrame(respond, lastPosition, data))
}

// operations part

protected abstract fun launchRequest(requestPayload: Payload, operation: RequesterOperation): Job
private suspend fun ensureActiveOrClose(closeable: Closeable) {
currentCoroutineContext().ensureActive { closeable.close() }
coroutineContext.ensureActive { closeable.close() }
}

final override suspend fun metadataPush(metadata: ByteReadPacket) {
ensureActiveOrClose(metadata)
sendMetadataPush(metadata)
}

final override suspend fun fireAndForget(payload: Payload) {
ensureActiveOrClose(payload)

suspendCancellableCoroutine { cont ->
val requestJob = launchRequest(
requestPayload = payload,
operation = RequesterFireAndForgetOperation(cont)
)
cont.invokeOnCancellation { cause ->
requestJob.cancel("Request was cancelled", cause)
}
}
}

final override suspend fun requestResponse(payload: Payload): Payload {
ensureActiveOrClose(payload)

val responseDeferred = CompletableDeferred<Payload>()

val requestJob = launchRequest(
requestPayload = payload,
operation = RequesterRequestResponseOperation(responseDeferred)
)

try {
responseDeferred.join()
} catch (cause: Throwable) {
requestJob.cancel("Request was cancelled", cause)
throw cause
}
return responseDeferred.await()
}

@OptIn(ExperimentalStreamsApi::class)
final override fun requestStream(
payload: Payload,
): Flow<Payload> = payloadFlow { strategy, initialRequest ->
ensureActiveOrClose(payload)

val responsePayloads = PayloadChannel()

val requestJob = launchRequest(
requestPayload = payload,
operation = RequesterRequestStreamOperation(initialRequest, responsePayloads)
)

throw try {
responsePayloads.consumeInto(this, strategy)
} catch (cause: Throwable) {
requestJob.cancel("Request was cancelled", cause)
throw cause
} ?: return@payloadFlow
}

@OptIn(ExperimentalStreamsApi::class)
final override fun requestChannel(
initPayload: Payload,
payloads: Flow<Payload>,
): Flow<Payload> = payloadFlow { strategy, initialRequest ->
ensureActiveOrClose(initPayload)

val responsePayloads = PayloadChannel()

val requestJob = launchRequest(
initPayload,
RequesterRequestChannelOperation(initialRequest, payloads, responsePayloads)
)

throw try {
responsePayloads.consumeInto(this, strategy)
} catch (cause: Throwable) {
requestJob.cancel("Request was cancelled", cause)
throw cause
} ?: return@payloadFlow
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.connection

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.keepalive.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.*

// send/receive setup, resume, resume ok, lease, error
@RSocketTransportApi
internal abstract class ConnectionEstablishmentContext(
private val frameCodec: FrameCodec,
) {
protected abstract suspend fun receiveFrameRaw(): ByteReadPacket?
protected abstract suspend fun sendFrame(frame: ByteReadPacket)
private suspend fun sendFrame(frame: Frame): Unit = sendFrame(frameCodec.encodeFrame(frame))

// only setup|lease|resume|resume_ok|error frames
suspend fun receiveFrame(): Frame = frameCodec.decodeFrame(
expectedStreamId = 0,
frame = receiveFrameRaw() ?: error("Expected frame during connection establishment but nothing was received")
)

suspend fun sendSetup(
version: Version,
honorLease: Boolean,
keepAlive: KeepAlive,
resumeToken: ByteReadPacket?,
payloadMimeType: PayloadMimeType,
payload: Payload,
): Unit = sendFrame(SetupFrame(version, honorLease, keepAlive, resumeToken, payloadMimeType, payload))
}
Loading

0 comments on commit 87696ca

Please sign in to comment.