Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ private fun handler(setup: Setup, rSocket: RSocket): Single<RSocket> {
### LICENSE

Copyright 2015-2018 Netflix, Inc.
Maksym Ostroverkhov

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.rsocket.kotlin

class ClientOptions : Options<ClientOptions>() {

override fun copy(): ClientOptions =
ClientOptions().streamRequestLimit(streamRequestLimit())
}
4 changes: 0 additions & 4 deletions rsocket-core/src/main/kotlin/io/rsocket/kotlin/Duration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ package io.rsocket.kotlin

import java.util.concurrent.TimeUnit

/**
* Created by Maksym Ostroverkhov on 27.10.17.
*/

data class Duration(private val value: Long, val unit: TimeUnit) {

val millis = unit.toMillis(value)
Expand Down
11 changes: 11 additions & 0 deletions rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveData.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.rsocket.kotlin

import java.nio.ByteBuffer

interface KeepAliveData {

fun producer(): () -> ByteBuffer

fun handler(): (ByteBuffer) -> Unit
}

11 changes: 11 additions & 0 deletions rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveOptions.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package io.rsocket.kotlin

import io.rsocket.kotlin.internal.EmptyKeepAliveData

class KeepAliveOptions : KeepAlive {
private var interval: Duration = Duration.ofMillis(100)
private var maxLifeTime: Duration = Duration.ofSeconds(1)
private var keepAliveData: KeepAliveData = EmptyKeepAliveData()

fun keepAliveInterval(interval: Duration): KeepAliveOptions {
assertDuration(interval, "keepAliveInterval")
Expand All @@ -20,9 +23,17 @@ class KeepAliveOptions : KeepAlive {

override fun keepAliveMaxLifeTime() = maxLifeTime

fun keepAliveData(keepAliveData: KeepAliveData): KeepAliveOptions {
this.keepAliveData = keepAliveData
return this
}

fun keepAliveData(): KeepAliveData = keepAliveData

fun copy(): KeepAliveOptions = KeepAliveOptions()
.keepAliveInterval(interval)
.keepAliveMaxLifeTime(maxLifeTime)
.keepAliveData(keepAliveData)

private fun assertDuration(duration: Duration, name: String) {
if (duration.millis <= 0) {
Expand Down
22 changes: 22 additions & 0 deletions rsocket-core/src/main/kotlin/io/rsocket/kotlin/Options.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.rsocket.kotlin

abstract class Options<T:Options<T>> internal constructor() {
private var streamRequestLimit: Int = 128

@Suppress("UNCHECKED_CAST")
open fun streamRequestLimit(streamRequestLimit: Int): T {
assertRequestLimit(streamRequestLimit)
this.streamRequestLimit = streamRequestLimit
return this as T
}

abstract fun copy(): T

internal fun streamRequestLimit(): Int = streamRequestLimit

private fun assertRequestLimit(streamRequestLimit: Int) {
if (streamRequestLimit <= 0) {
throw IllegalArgumentException("stream request limit must be positive")
}
}
}
55 changes: 28 additions & 27 deletions rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object RSocketFactory {
private var setupPayload: Payload = DefaultPayload.EMPTY
private val keepAlive = KeepAliveOptions()
private val mediaType = MediaTypeOptions()
private var streamRequestLimit = defaultStreamRequestLimit
private val options = ClientOptions()

fun interceptors(configure: (InterceptorOptions) -> Unit): ClientRSocketFactory {
configure(interceptors)
Expand All @@ -79,7 +79,7 @@ object RSocketFactory {
return this
}

fun enableLease(leaseRefConsumer: (LeaseRef) -> Unit): ClientRSocketFactory {
fun lease(leaseRefConsumer: (LeaseRef) -> Unit): ClientRSocketFactory {
this.flags = Frame.Setup.enableLease(flags)
this.leaseRefConsumer = leaseRefConsumer
return this
Expand All @@ -95,9 +95,8 @@ object RSocketFactory {
return this
}

fun streamRequestLimit(streamRequestLimit: Int): ClientRSocketFactory {
assertRequestLimit(streamRequestLimit)
this.streamRequestLimit = streamRequestLimit
fun options(configure: (ClientOptions) -> Unit): ClientRSocketFactory {
configure(options)
return this
}

Expand Down Expand Up @@ -126,7 +125,7 @@ object RSocketFactory {
setupPayload,
keepAlive.copy(),
mediaType.copy(),
streamRequestLimit,
options.copy(),
transport,
interceptors.copy())

Expand All @@ -137,13 +136,17 @@ object RSocketFactory {
private val leaseRef: ((LeaseRef) -> Unit)?,
private val flags: Int,
private val setupPayload: Payload,
private val keepAlive: KeepAlive,
keepAliveOpts: KeepAliveOptions,
private val mediaType: MediaType,
private val streamRequestLimit: Int,
options: ClientOptions,
private val transportClient: () -> ClientTransport,
private val parentInterceptors: InterceptorRegistry)
: Start<RSocket> {

private val streamRequestLimit = options.streamRequestLimit()
private val keepALive = keepAliveOpts as KeepAlive
private val keepAliveData = keepAliveOpts.keepAliveData()

override fun start(): Single<RSocket> {
return transportClient()
.connect()
Expand Down Expand Up @@ -184,7 +187,8 @@ object RSocketFactory {

ClientServiceHandler(
demuxer.serviceConnection(),
keepAlive,
keepALive,
keepAliveData,
errorConsumer)

val setupFrame = createSetupFrame()
Expand All @@ -205,8 +209,8 @@ object RSocketFactory {
private fun createSetupFrame(): Frame {
return Frame.Setup.from(
flags,
keepAlive.keepAliveInterval().intMillis,
keepAlive.keepAliveMaxLifeTime().intMillis,
keepALive.keepAliveInterval().intMillis,
keepALive.keepAliveMaxLifeTime().intMillis,
mediaType.metadataMimeType(),
mediaType.dataMimeType(),
setupPayload)
Expand All @@ -229,7 +233,7 @@ object RSocketFactory {
private var mtu = 0
private var leaseRefConsumer: ((LeaseRef) -> Unit)? = null
private val interceptors = GlobalInterceptors.create()
private var streamRequestLimit = defaultStreamRequestLimit
private val options = ServerOptions()

fun interceptors(configure: (InterceptorOptions) -> Unit): ServerRSocketFactory {
configure(interceptors)
Expand All @@ -242,7 +246,7 @@ object RSocketFactory {
return this
}

fun enableLease(leaseRefConsumer: (LeaseRef) -> Unit): ServerRSocketFactory {
fun lease(leaseRefConsumer: (LeaseRef) -> Unit): ServerRSocketFactory {
this.leaseRefConsumer = leaseRefConsumer
return this
}
Expand All @@ -252,8 +256,8 @@ object RSocketFactory {
return this
}

fun streamRequestLimit(streamRequestLimit: Int): ServerRSocketFactory {
this.streamRequestLimit = streamRequestLimit
fun options(configure: (ServerOptions) -> Unit): ServerRSocketFactory {
configure(options)
return this
}

Expand All @@ -268,7 +272,7 @@ object RSocketFactory {
mtu,
leaseRefConsumer,
interceptors.copy(),
streamRequestLimit)
options.copy())
}
}

Expand All @@ -279,7 +283,9 @@ object RSocketFactory {
private val mtu: Int,
private val leaseRef: ((LeaseRef) -> Unit)?,
private val parentInterceptors: InterceptorRegistry,
private val streamRequestLimit: Int) : Start<T> {
options: ServerOptions) : Start<T> {

private val streamRequestLimit = options.streamRequestLimit()

override fun start(): Single<T> {
return transportServer().start(object
Expand Down Expand Up @@ -331,12 +337,15 @@ object RSocketFactory {

ServerServiceHandler(
demuxer.serviceConnection(),
setup as KeepAlive,
setup,
errorConsumer)

val handlerRSocket = acceptor()(setup, wrappedRequester)

return handlerRSocket
val rejectingHandlerRSocket = RejectingRSocket(handlerRSocket)
.with(demuxer.requesterConnection())

return rejectingHandlerRSocket
.map { handler -> interceptors.interceptHandler(handler) }
.doOnSuccess { handler ->
RSocketResponder(
Expand Down Expand Up @@ -374,12 +383,6 @@ object RSocketFactory {
}
}

private fun assertRequestLimit(streamRequestLimit: Int) {
if (streamRequestLimit <= 0) {
throw IllegalArgumentException("stream request limit must be positive")
}
}

private fun assertFragmentation(mtu: Int) {
if (mtu < 0) {
throw IllegalArgumentException("fragmentation mtu must be non-negative")
Expand All @@ -405,8 +408,6 @@ object RSocketFactory {
transport { transport }
}

private const val defaultStreamRequestLimit = 128

private val emptyRSocket = object : AbstractRSocket() {}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.rsocket.kotlin

class ServerOptions : Options<ServerOptions>() {

override fun copy(): ServerOptions =
ServerOptions().streamRequestLimit(streamRequestLimit())
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@ import io.reactivex.Flowable
import io.reactivex.disposables.Disposable
import io.rsocket.kotlin.DuplexConnection
import io.rsocket.kotlin.Frame
import io.rsocket.kotlin.exceptions.ConnectionException
import io.rsocket.kotlin.KeepAlive
import io.rsocket.kotlin.KeepAliveData
import io.rsocket.kotlin.exceptions.ConnectionException
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit

internal class ClientServiceHandler(serviceConnection: DuplexConnection,
keepAlive: KeepAlive,
keepAliveData: KeepAliveData,
errorConsumer: (Throwable) -> Unit)
: ServiceHandler(serviceConnection, errorConsumer) {

@Volatile
private var keepAliveReceivedMillis = System.currentTimeMillis()
private var subscription: Disposable? = null
private val dataProducer: () -> ByteBuffer = keepAliveData.producer()
private val dataHandler: (ByteBuffer) -> Unit = keepAliveData.handler()

init {
val tickPeriod = keepAlive.keepAliveInterval().millis
Expand All @@ -35,6 +40,9 @@ internal class ClientServiceHandler(serviceConnection: DuplexConnection,
override fun handleKeepAlive(frame: Frame) {
if (!Frame.Keepalive.hasRespondFlag(frame)) {
keepAliveReceivedMillis = System.currentTimeMillis()
dataHandler(frame.data)
} else {
sendKeepAliveFrame(frame.data, false)
}
}

Expand All @@ -51,8 +59,12 @@ internal class ClientServiceHandler(serviceConnection: DuplexConnection,
"keep-alive timed out: $duration of $timeout ms"
throw ConnectionException(message)
}
sentFrames.onNext(
Frame.Keepalive.from(Unpooled.EMPTY_BUFFER, true))
sendKeepAliveFrame(dataProducer(), true)
}
}

private fun sendKeepAliveFrame(data: ByteBuffer, respond: Boolean) {
sentFrames.onNext(
Frame.Keepalive.from(Unpooled.wrappedBuffer(data), respond))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.rsocket.kotlin.internal

import io.rsocket.kotlin.KeepAliveData
import java.nio.ByteBuffer

internal class EmptyKeepAliveData : KeepAliveData {

override fun producer(): () -> ByteBuffer = noopProducer

override fun handler(): (ByteBuffer) -> Unit = noopHandler

companion object {
private val emptyBuffer = ByteBuffer.allocateDirect(0)
private val noopProducer = { emptyBuffer }
private val noopHandler: (ByteBuffer) -> Unit = { }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ import io.reactivex.Flowable
import io.reactivex.FlowableSubscriber
import io.reactivex.Single
import io.reactivex.processors.FlowableProcessor
import io.reactivex.processors.PublishProcessor
import io.reactivex.processors.UnicastProcessor
import io.rsocket.kotlin.*
import io.rsocket.kotlin.exceptions.ApplicationException
import io.rsocket.kotlin.exceptions.ChannelRequestException
import io.rsocket.kotlin.exceptions.Exceptions
import io.rsocket.kotlin.internal.ExceptionUtil.noStacktrace
import io.rsocket.kotlin.DefaultPayload
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
Expand Down Expand Up @@ -113,7 +111,7 @@ internal class RSocketRequester(
val requestFrame = Frame.Request.from(
streamId, FrameType.REQUEST_RESPONSE, payload, 1)

val receiver = PublishProcessor.create<Payload>()
val receiver = UnicastProcessor.create<Payload>()
receivers[streamId] = receiver
sentFrames.onNext(requestFrame)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.rsocket.kotlin.internal

import io.reactivex.Single
import io.rsocket.kotlin.DuplexConnection
import io.rsocket.kotlin.Frame
import io.rsocket.kotlin.RSocket
import io.rsocket.kotlin.exceptions.RejectedSetupException

internal class RejectingRSocket(private val rSocket: Single<RSocket>) {

fun with(connection: DuplexConnection): Single<RSocket> = rSocket
.onErrorResumeNext { err ->
connection
.sendOne(Frame.Error.from(0,
RejectedSetupException(err.message ?: "")))
.andThen(Single.error(err))
}
}
Loading