Skip to content

Commit

Permalink
hide ktor deprecation
Browse files Browse the repository at this point in the history
* this API will be removed in Ktor 4.0
* possible replacement will be ready by Ktor 3.0 + kotlinx.io
* most of the public declarations will be replaced in coming changes
  • Loading branch information
whyoleg committed Nov 13, 2023
1 parent 2900661 commit 3e48909
Show file tree
Hide file tree
Showing 70 changed files with 238 additions and 187 deletions.
4 changes: 4 additions & 0 deletions rsocket-core/api/rsocket-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,14 @@ public final class io/rsocket/kotlin/core/RSocketConnector {
public final class io/rsocket/kotlin/core/RSocketConnectorBuilder {
public final fun acceptor (Lio/rsocket/kotlin/ConnectionAcceptor;)V
public final fun connectionConfig (Lkotlin/jvm/functions/Function1;)V
public final fun getBufferPool ()Lio/ktor/utils/io/pool/ObjectPool;
public final fun getLoggerFactory ()Lio/rsocket/kotlin/logging/LoggerFactory;
public final fun getMaxFragmentSize ()I
public final fun interceptors (Lkotlin/jvm/functions/Function1;)V
public final fun reconnectable (JLkotlin/jvm/functions/Function2;)V
public final fun reconnectable (Lkotlin/jvm/functions/Function3;)V
public static synthetic fun reconnectable$default (Lio/rsocket/kotlin/core/RSocketConnectorBuilder;JLkotlin/jvm/functions/Function2;ILjava/lang/Object;)V
public final fun setBufferPool (Lio/ktor/utils/io/pool/ObjectPool;)V
public final fun setLoggerFactory (Lio/rsocket/kotlin/logging/LoggerFactory;)V
public final fun setMaxFragmentSize (I)V
}
Expand All @@ -242,9 +244,11 @@ public final class io/rsocket/kotlin/core/RSocketServer {
}

public final class io/rsocket/kotlin/core/RSocketServerBuilder {
public final fun getBufferPool ()Lio/ktor/utils/io/pool/ObjectPool;
public final fun getLoggerFactory ()Lio/rsocket/kotlin/logging/LoggerFactory;
public final fun getMaxFragmentSize ()I
public final fun interceptors (Lkotlin/jvm/functions/Function1;)V
public final fun setBufferPool (Lio/ktor/utils/io/pool/ObjectPool;)V
public final fun setLoggerFactory (Lio/rsocket/kotlin/logging/LoggerFactory;)V
public final fun setMaxFragmentSize (I)V
}
Expand Down
16 changes: 10 additions & 6 deletions rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/Connection.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,17 +28,21 @@ import kotlinx.coroutines.*
*/
@TransportApi
public interface Connection : CoroutineScope {
public val pool: ObjectPool<ChunkBuffer> get() = ChunkBuffer.Pool
@Suppress("DEPRECATION")
@Deprecated(DEPRECATED_IN_KTOR, level = DeprecationLevel.ERROR)
public val pool: ObjectPool<ChunkBuffer> get() = TODO("SHOULD NOT BE CALLED ANY MORE")

public suspend fun send(packet: ByteReadPacket)
public suspend fun receive(): ByteReadPacket
}

@Suppress("DEPRECATION")
@OptIn(TransportApi::class)
internal suspend inline fun <T> Connection.receiveFrame(block: (frame: Frame) -> T): T =
receive().readFrame(pool).closeOnError(block)
internal suspend inline fun <T> Connection.receiveFrame(bufferPool: ObjectPool<ChunkBuffer>, block: (frame: Frame) -> T): T =
receive().readFrame(bufferPool).closeOnError(block)

@Suppress("DEPRECATION")
@OptIn(TransportApi::class)
internal suspend fun Connection.sendFrame(frame: Frame) {
frame.toPacket(pool).closeOnError { send(it) }
internal suspend fun Connection.sendFrame(bufferPool: ObjectPool<ChunkBuffer>, frame: Frame) {
frame.toPacket(bufferPool).closeOnError { send(it) }
}
19 changes: 19 additions & 0 deletions rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/buffers.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin

internal const val DEPRECATED_IN_KTOR = "Deprecated in Ktor"
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package io.rsocket.kotlin.core

import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.frame.io.*
Expand All @@ -32,6 +34,7 @@ public class RSocketConnector internal constructor(
private val connectionConfigProvider: () -> ConnectionConfig,
private val acceptor: ConnectionAcceptor,
private val reconnectPredicate: ReconnectPredicate?,
@Suppress("DEPRECATION") private val bufferPool: ObjectPool<ChunkBuffer>,
) {

public suspend fun connect(transport: ClientTransport): RSocket = when (reconnectPredicate) {
Expand Down Expand Up @@ -68,9 +71,10 @@ public class RSocketConnector internal constructor(
maxFragmentSize = maxFragmentSize,
interceptors = interceptors,
connectionConfig = connectionConfig,
acceptor = acceptor
acceptor = acceptor,
bufferPool = bufferPool
)
connection.sendFrame(setupFrame)
connection.sendFrame(bufferPool, setupFrame)
return requester
} catch (cause: Throwable) {
connectionConfig.setupPayload.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package io.rsocket.kotlin.core

import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.keepalive.*
Expand All @@ -35,6 +37,10 @@ public class RSocketConnectorBuilder internal constructor() {
field = value
}

@Suppress("DEPRECATION")
@Deprecated("Only for tests in rsocket", level = DeprecationLevel.ERROR)
public var bufferPool: ObjectPool<ChunkBuffer> = ChunkBuffer.Pool

private val connectionConfig: ConnectionConfigBuilder = ConnectionConfigBuilder()
private val interceptors: InterceptorsBuilder = InterceptorsBuilder()
private var acceptor: ConnectionAcceptor? = null
Expand Down Expand Up @@ -101,14 +107,16 @@ public class RSocketConnectorBuilder internal constructor() {
}
}

@Suppress("DEPRECATION_ERROR")
@OptIn(RSocketLoggingApi::class)
internal fun build(): RSocketConnector = RSocketConnector(
loggerFactory,
maxFragmentSize,
interceptors.build(),
connectionConfig.producer(),
acceptor ?: defaultAcceptor,
reconnectPredicate
reconnectPredicate,
bufferPool
)

private companion object {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package io.rsocket.kotlin.core

import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.frame.io.*
Expand All @@ -29,6 +31,7 @@ public class RSocketServer internal constructor(
private val loggerFactory: LoggerFactory,
private val maxFragmentSize: Int,
private val interceptors: Interceptors,
@Suppress("DEPRECATION") private val bufferPool: ObjectPool<ChunkBuffer>,
) {

@DelicateCoroutinesApi
Expand All @@ -47,7 +50,7 @@ public class RSocketServer internal constructor(
}
}

private suspend fun Connection.bind(acceptor: ConnectionAcceptor): Job = receiveFrame { setupFrame ->
private suspend fun Connection.bind(acceptor: ConnectionAcceptor): Job = receiveFrame(bufferPool) { setupFrame ->
when {
setupFrame !is SetupFrame -> failSetup(RSocketError.Setup.Invalid("Invalid setup frame: ${setupFrame.type}"))
setupFrame.version != Version.Current -> failSetup(RSocketError.Setup.Unsupported("Unsupported version: ${setupFrame.version}"))
Expand All @@ -64,7 +67,8 @@ public class RSocketServer internal constructor(
payloadMimeType = setupFrame.payloadMimeType,
setupPayload = setupFrame.payload
),
acceptor = acceptor
acceptor = acceptor,
bufferPool = bufferPool
)
coroutineContext.job
} catch (e: Throwable) {
Expand All @@ -75,7 +79,7 @@ public class RSocketServer internal constructor(

@Suppress("SuspendFunctionOnCoroutineScope")
private suspend fun Connection.failSetup(error: RSocketError.Setup): Nothing {
sendFrame(ErrorFrame(0, error))
sendFrame(bufferPool, ErrorFrame(0, error))
cancel("Connection establishment failed", error)
throw error
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package io.rsocket.kotlin.core

import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.logging.*

Expand All @@ -30,14 +32,19 @@ public class RSocketServerBuilder internal constructor() {
field = value
}

@Suppress("DEPRECATION")
@Deprecated("Only for tests in rsocket", level = DeprecationLevel.ERROR)
public var bufferPool: ObjectPool<ChunkBuffer> = ChunkBuffer.Pool

private val interceptors: InterceptorsBuilder = InterceptorsBuilder()

public fun interceptors(configure: InterceptorsBuilder.() -> Unit) {
interceptors.configure()
}

@Suppress("DEPRECATION_ERROR")
@OptIn(RSocketLoggingApi::class)
internal fun build(): RSocketServer = RSocketServer(loggerFactory, maxFragmentSize, interceptors.build())
internal fun build(): RSocketServer = RSocketServer(loggerFactory, maxFragmentSize, interceptors.build(), bufferPool)
}

public fun RSocketServer(configure: RSocketServerBuilder.() -> Unit = {}): RSocketServer {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,7 +49,11 @@ internal class ExtensionFrame(
}
}

internal fun ByteReadPacket.readExtension(pool: ObjectPool<ChunkBuffer>, streamId: Int, flags: Int): ExtensionFrame {
internal fun ByteReadPacket.readExtension(
@Suppress("DEPRECATION") pool: ObjectPool<ChunkBuffer>,
streamId: Int,
flags: Int,
): ExtensionFrame {
val extendedType = readInt()
val payload = readPayload(pool, flags)
return ExtensionFrame(streamId, extendedType, payload)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,7 +33,7 @@ internal sealed class Frame : Closeable {
protected abstract fun StringBuilder.appendFlags()
protected abstract fun StringBuilder.appendSelf()

internal fun toPacket(pool: ObjectPool<ChunkBuffer>): ByteReadPacket {
internal fun toPacket(@Suppress("DEPRECATION") pool: ObjectPool<ChunkBuffer>): ByteReadPacket {
check(type.canHaveMetadata || !(flags check Flags.Metadata)) { "bad value for metadata flag" }
return buildPacket(pool) {
writeInt(streamId)
Expand All @@ -54,7 +54,7 @@ internal sealed class Frame : Closeable {
}
}

internal fun ByteReadPacket.readFrame(pool: ObjectPool<ChunkBuffer>): Frame = use {
internal fun ByteReadPacket.readFrame(@Suppress("DEPRECATION") pool: ObjectPool<ChunkBuffer>): Frame = use {
val streamId = readInt()
val typeAndFlags = readShort().toInt() and 0xFFFF
val flags = typeAndFlags and FlagsMask
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -51,7 +51,7 @@ internal class KeepAliveFrame(
}
}

internal fun ByteReadPacket.readKeepAlive(pool: ObjectPool<ChunkBuffer>, flags: Int): KeepAliveFrame {
internal fun ByteReadPacket.readKeepAlive(@Suppress("DEPRECATION") pool: ObjectPool<ChunkBuffer>, flags: Int): KeepAliveFrame {
val respond = flags check RespondFlag
val lastPosition = readLong()
val data = readPacket(pool)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,7 +50,7 @@ internal class LeaseFrame(
}
}

internal fun ByteReadPacket.readLease(pool: ObjectPool<ChunkBuffer>, flags: Int): LeaseFrame {
internal fun ByteReadPacket.readLease(@Suppress("DEPRECATION") pool: ObjectPool<ChunkBuffer>, flags: Int): LeaseFrame {
val ttl = readInt()
val numberOfRequests = readInt()
val metadata = if (flags check Flags.Metadata) readMetadata(pool) else null
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -45,5 +45,5 @@ internal class MetadataPushFrame(
}
}

internal fun ByteReadPacket.readMetadataPush(pool: ObjectPool<ChunkBuffer>): MetadataPushFrame =
internal fun ByteReadPacket.readMetadataPush(@Suppress("DEPRECATION") pool: ObjectPool<ChunkBuffer>): MetadataPushFrame =
MetadataPushFrame(readPacket(pool))
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,11 +66,11 @@ internal class RequestFrame(
}

internal fun ByteReadPacket.readRequest(
pool: ObjectPool<ChunkBuffer>,
@Suppress("DEPRECATION") pool: ObjectPool<ChunkBuffer>,
type: FrameType,
streamId: Int,
flags: Int,
withInitial: Boolean
withInitial: Boolean,
): RequestFrame {
val fragmentFollows = flags check Flags.Follows
val complete = flags check Flags.Complete
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,7 +50,7 @@ internal class ResumeFrame(
}
}

internal fun ByteReadPacket.readResume(pool: ObjectPool<ChunkBuffer>): ResumeFrame {
internal fun ByteReadPacket.readResume(@Suppress("DEPRECATION") pool: ObjectPool<ChunkBuffer>): ResumeFrame {
val version = readVersion()
val resumeToken = readResumeToken(pool)
val lastReceivedServerPosition = readLong()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -76,7 +76,7 @@ internal class SetupFrame(
}
}

internal fun ByteReadPacket.readSetup(pool: ObjectPool<ChunkBuffer>, flags: Int): SetupFrame {
internal fun ByteReadPacket.readSetup(@Suppress("DEPRECATION") pool: ObjectPool<ChunkBuffer>, flags: Int): SetupFrame {
val version = readVersion()
val keepAlive = run {
val interval = readInt()
Expand Down
Loading

0 comments on commit 3e48909

Please sign in to comment.