Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ RSocket interface contains 5 methods:
`suspend fun metadataPush(metadata: ByteReadPacket)`

## Using in your projects
The `master` branch is now dedicated to development of multiplatform rsocket-kotlin.
For now only snapshots are available via [oss.jfrog.org](oss.jfrog.org) (OJO).
The `master` branch is now dedicated to development of multiplatform rsocket-kotlin. For now only snapshots are available
via [oss.jfrog.org](https://oss.jfrog.org/artifactory/oss-snapshot-local/io/rsocket/kotlin/) (OJO).

Make sure, that you use Kotlin 1.4.
Make sure, that you use Kotlin 1.4.X.

### Gradle:

Expand Down Expand Up @@ -225,7 +225,7 @@ val bufferedStream: Flow<Payload> = stream.buffer(10) //here buffer is 10, if `b
bufferedStream.collect { payload: Payload ->
println(payload.data.readText())
}
```
```

## Bugs and Feedback

Expand Down
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ subprojects {

//common configuration
extensions.configure<KotlinMultiplatformExtension> {
// explicitApiWarning() //TODO change to strict before release
sourceSets.all {
languageSettings.apply {
progressiveMode = true
Expand All @@ -214,11 +213,13 @@ subprojects {
useExperimentalAnnotation("kotlinx.coroutines.FlowPreview")
useExperimentalAnnotation("io.ktor.util.KtorExperimentalAPI")
useExperimentalAnnotation("io.ktor.util.InternalAPI")
useExperimentalAnnotation("io.ktor.utils.io.core.internal.DangerousInternalIoApi")
}
}
}

if (project.name != "rsocket-test") {
explicitApiWarning() //TODO change to strict before release
sourceSets["commonTest"].dependencies {
implementation(project(":rsocket-test"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.ktor.utils.io.core.*
import io.rsocket.kotlin.payload.*

@Suppress("FunctionName")
fun Payload(route: String, packet: ByteReadPacket): Payload = Payload {
data(packet)
metadata(route)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,38 @@
package io.rsocket.kotlin.connection

import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.frame.*
import kotlinx.coroutines.*

/**
* That interface isn't stable for inheritance.
*/
interface Connection : Cancelable {

@DangerousInternalIoApi
val pool: ObjectPool<ChunkBuffer>
get() = ChunkBuffer.Pool

suspend fun send(packet: ByteReadPacket)
suspend fun receive(): ByteReadPacket
}

suspend fun Connection.connectClient(
configuration: RSocketConnectorConfiguration = RSocketConnectorConfiguration()
configuration: RSocketConnectorConfiguration = RSocketConnectorConfiguration(),
): RSocket = RSocketConnector(ConnectionProvider(this), configuration).connect()

suspend fun Connection.startServer(
configuration: RSocketServerConfiguration = RSocketServerConfiguration(),
acceptor: RSocketAcceptor
acceptor: RSocketAcceptor,
): Job = RSocketServer(ConnectionProvider(this), configuration).start(acceptor)


@OptIn(DangerousInternalIoApi::class)
internal suspend fun Connection.receiveFrame(): Frame = receive().readFrame(pool)

@OptIn(DangerousInternalIoApi::class)
internal suspend fun Connection.sendFrame(frame: Frame): Unit = send(frame.toPacket(pool))
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@
package io.rsocket.kotlin.connection

import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.logging.*
import kotlinx.coroutines.*

internal fun Connection.logging(logger: Logger): Connection =
if (logger.isLoggable(LoggingLevel.DEBUG)) LoggingConnection(this, logger) else this

@OptIn(DangerousInternalIoApi::class)
private class LoggingConnection(
private val delegate: Connection,
private val logger: Logger,
) : Connection {
override val job: Job get() = delegate.job
) : Connection by delegate {

private fun ByteReadPacket.dumpFrameToString(): String {
val length = remaining
return copy().use { it.readFrame(pool).use { it.dump(length) } }
}

override suspend fun send(packet: ByteReadPacket) {
logger.debug { "Send: ${packet.dumpFrameToString()}" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class RSocketConnector(
connection = connection,
plugin = configuration.plugin,
setupFrame = setupFrame,
ignoredFrameConsumer = configuration.ignoredFrameConsumer,
acceptor = configuration.acceptor
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.rsocket.kotlin.core

import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.keepalive.*
import io.rsocket.kotlin.logging.*
import io.rsocket.kotlin.payload.*
Expand All @@ -29,6 +28,5 @@ data class RSocketConnectorConfiguration(
val keepAlive: KeepAlive = KeepAlive(),
val payloadMimeType: PayloadMimeType = PayloadMimeType(),
val setupPayload: Payload = Payload.Empty,
val ignoredFrameConsumer: (Frame) -> Unit = {},
val acceptor: RSocketAcceptor = { RSocketRequestHandler { } },
)
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class RSocketServer(
.let(configuration.plugin::wrapConnection)
.logging(configuration.loggerFactory.logger("io.rsocket.kotlin.frame.Frame"))

val setupFrame = connection.receive().toFrame()
val setupFrame = connection.receiveFrame()
if (setupFrame !is SetupFrame)
connection.failSetup(RSocketError.Setup.Invalid("Invalid setup frame: ${setupFrame.type}"))
if (setupFrame.version != Version.Current)
Expand All @@ -46,7 +46,6 @@ class RSocketServer(
connection = connection,
plugin = configuration.plugin,
setupFrame = setupFrame,
ignoredFrameConsumer = configuration.ignoredFrameConsumer,
acceptor = acceptor
)
} catch (e: Throwable) {
Expand All @@ -55,7 +54,7 @@ class RSocketServer(
}

private suspend fun Connection.failSetup(error: RSocketError.Setup): Nothing {
send(ErrorFrame(0, error).toPacket())
sendFrame(ErrorFrame(0, error))
cancel("Setup failed", error)
throw error
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@

package io.rsocket.kotlin.core

import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.logging.*
import io.rsocket.kotlin.plugin.*

data class RSocketServerConfiguration(
val plugin: Plugin = Plugin(),
val loggerFactory: LoggerFactory = DefaultLoggerFactory,
val ignoredFrameConsumer: (Frame) -> Unit = {},
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package io.rsocket.kotlin.frame

import io.ktor.utils.io.core.*

class CancelFrame(
internal class CancelFrame(
override val streamId: Int,
) : Frame(FrameType.Cancel) {
override val flags: Int get() = 0

override fun release(): Unit = Unit

override fun BytePacketBuilder.writeSelf(): Unit = Unit

override fun StringBuilder.appendFlags(): Unit = Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ import io.ktor.utils.io.core.*
import io.rsocket.kotlin.error.*
import io.rsocket.kotlin.frame.io.*

class ErrorFrame(
internal class ErrorFrame(
override val streamId: Int,
val throwable: Throwable,
val data: ByteReadPacket? = null,
) : Frame(FrameType.Error) {
override val flags: Int get() = 0
val errorCode get() = (throwable as? RSocketError)?.errorCode ?: ErrorCode.ApplicationError

override fun release() {
data?.release()
}

override fun BytePacketBuilder.writeSelf() {
writeInt(errorCode)
when (data) {
Expand All @@ -44,7 +48,7 @@ class ErrorFrame(
}
}

fun ByteReadPacket.readError(streamId: Int): ErrorFrame {
internal fun ByteReadPacket.readError(streamId: Int): ErrorFrame {
val errorCode = readInt()
val data = copy()
val message = readText()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ import io.ktor.utils.io.core.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.payload.*

class ExtensionFrame(
internal class ExtensionFrame(
override val streamId: Int,
val extendedType: Int,
val payload: Payload,
) : Frame(FrameType.Extension) {
override val flags: Int get() = if (payload.metadata != null) Flags.Metadata else 0

override fun release() {
payload.release()
}

override fun BytePacketBuilder.writeSelf() {
writeInt(extendedType)
writePayload(payload)
Expand All @@ -41,8 +46,8 @@ class ExtensionFrame(
}
}

fun ByteReadPacket.readExtension(streamId: Int, flags: Int): ExtensionFrame {
internal fun ByteReadPacket.readExtension(pool: BufferPool, streamId: Int, flags: Int): ExtensionFrame {
val extendedType = readInt()
val payload = readPayload(flags)
val payload = readPayload(pool, flags)
return ExtensionFrame(streamId, extendedType, payload)
}
53 changes: 29 additions & 24 deletions rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/frame/Frame.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,33 @@
package io.rsocket.kotlin.frame

import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.rsocket.kotlin.frame.io.*

private const val FlagsMask: Int = 1023
private const val FrameTypeShift: Int = 10

abstract class Frame(open val type: FrameType) {
abstract class Frame internal constructor(open val type: FrameType) : Closeable {
abstract val streamId: Int
abstract val flags: Int

abstract fun release()

protected abstract fun BytePacketBuilder.writeSelf()
protected abstract fun StringBuilder.appendFlags()
protected abstract fun StringBuilder.appendSelf()

fun toPacket(): ByteReadPacket {
@DangerousInternalIoApi
fun toPacket(pool: BufferPool): ByteReadPacket {
check(type.canHaveMetadata || !(flags check Flags.Metadata)) { "bad value for metadata flag" }
return buildPacket {
return buildPacket(pool) {
writeInt(streamId)
writeShort((type.encodedType shl FrameTypeShift or flags).toShort())
writeSelf()
}
}

fun dump(length: Long): String = buildString {
internal fun dump(length: Long): String = buildString {
append("\n").append(type).append(" frame -> Stream Id: ").append(streamId).append(" Length: ").append(length)
append("\nFlags: 0b").append(flags.toBinaryString()).append(" (").apply { appendFlags() }.append(")")
appendSelf()
Expand All @@ -49,36 +53,37 @@ abstract class Frame(open val type: FrameType) {
append(flag)
if (value) append(1) else append(0)
}

override fun close() {
release()
}
}

fun ByteReadPacket.toFrame(): Frame = use {
@DangerousInternalIoApi
fun ByteReadPacket.readFrame(pool: BufferPool): Frame = use {
val streamId = readInt()
val typeAndFlags = readShort().toInt() and 0xFFFF
val flags = typeAndFlags and FlagsMask
when (val type = FrameType(typeAndFlags shr FrameTypeShift)) {
//stream id = 0
FrameType.Setup -> readSetup(flags)
FrameType.Resume -> readResume()
FrameType.ResumeOk -> readResumeOk()
FrameType.MetadataPush -> readMetadataPush()
FrameType.Lease -> readLease(flags)
FrameType.KeepAlive -> readKeepAlive(flags)
FrameType.Setup -> readSetup(pool, flags)
FrameType.Resume -> readResume(pool)
FrameType.ResumeOk -> readResumeOk()
FrameType.MetadataPush -> readMetadataPush(pool)
FrameType.Lease -> readLease(pool, flags)
FrameType.KeepAlive -> readKeepAlive(pool, flags)
//stream id != 0
FrameType.Cancel -> CancelFrame(streamId)
FrameType.Error -> readError(streamId)
FrameType.RequestN -> readRequestN(streamId)
FrameType.Extension -> readExtension(streamId, flags)
FrameType.Cancel -> CancelFrame(streamId)
FrameType.Error -> readError(streamId)
FrameType.RequestN -> readRequestN(streamId)
FrameType.Extension -> readExtension(pool, streamId, flags)
FrameType.Payload,
FrameType.RequestFnF,
FrameType.RequestResponse -> readRequest(type, streamId, flags, withInitial = false)
FrameType.RequestResponse,
-> readRequest(pool, type, streamId, flags, withInitial = false)
FrameType.RequestStream,
FrameType.RequestChannel -> readRequest(type, streamId, flags, withInitial = true)
FrameType.Reserved -> error("Reserved")
FrameType.RequestChannel,
-> readRequest(pool, type, streamId, flags, withInitial = true)
FrameType.Reserved -> error("Reserved")
}
}

fun ByteReadPacket.dumpFrameToString(): String {
val length = remaining
val frame = copy().toFrame()
return frame.dump(length)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package io.rsocket.kotlin.frame

import io.rsocket.kotlin.frame.io.*

enum class FrameType(val encodedType: Int, flags: Int = Flags.Empty) {
internal enum class FrameType(val encodedType: Int, flags: Int = Flags.Empty) {
Reserved(0x00),

//CONNECTION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@ import io.rsocket.kotlin.frame.io.*

private const val RespondFlag = 128

class KeepAliveFrame(
internal class KeepAliveFrame(
val respond: Boolean,
val lastPosition: Long,
val data: ByteReadPacket,
) : Frame(FrameType.KeepAlive) {
override val streamId: Int get() = 0
override val flags: Int get() = if (respond) RespondFlag else 0

override fun release() {
data.release()
}

override fun BytePacketBuilder.writeSelf() {
writeLong(lastPosition.coerceAtLeast(0))
writePacket(data)
Expand All @@ -44,9 +48,9 @@ class KeepAliveFrame(
}
}

fun ByteReadPacket.readKeepAlive(flags: Int): KeepAliveFrame {
internal fun ByteReadPacket.readKeepAlive(pool: BufferPool, flags: Int): KeepAliveFrame {
val respond = flags check RespondFlag
val lastPosition = readLong()
val data = readPacket()
val data = readPacket(pool)
return KeepAliveFrame(respond, lastPosition, data)
}
Loading