Skip to content

Commit 28f082f

Browse files
committed
* Possibility to reject setup by Server acceptor
* Client Keep-alive data support * Client Keep-alive handler responds to Frames with `respond` flag set * Introduce RSocketFactory ClientOptions & ServerOptions
1 parent b63fb27 commit 28f082f

32 files changed

+292
-235
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ private fun handler(setup: Setup, rSocket: RSocket): Single<RSocket> {
103103
### LICENSE
104104

105105
Copyright 2015-2018 Netflix, Inc.
106-
Maksym Ostroverkhov
107106

108107
Licensed under the Apache License, Version 2.0 (the "License");
109108
you may not use this file except in compliance with the License.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.rsocket.kotlin
2+
3+
class ClientOptions : Options<ClientOptions>() {
4+
5+
override fun copy(): ClientOptions =
6+
ClientOptions().streamRequestLimit(streamRequestLimit())
7+
}

rsocket-core/src/main/kotlin/io/rsocket/kotlin/Duration.kt

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,6 @@ package io.rsocket.kotlin
22

33
import java.util.concurrent.TimeUnit
44

5-
/**
6-
* Created by Maksym Ostroverkhov on 27.10.17.
7-
*/
8-
95
data class Duration(private val value: Long, val unit: TimeUnit) {
106

117
val millis = unit.toMillis(value)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package io.rsocket.kotlin
2+
3+
import java.nio.ByteBuffer
4+
5+
interface KeepAliveData {
6+
7+
fun producer(): () -> ByteBuffer
8+
9+
fun handler(): (ByteBuffer) -> Unit
10+
}
11+

rsocket-core/src/main/kotlin/io/rsocket/kotlin/KeepAliveOptions.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package io.rsocket.kotlin
22

3+
import io.rsocket.kotlin.internal.EmptyKeepAliveData
4+
35
class KeepAliveOptions : KeepAlive {
46
private var interval: Duration = Duration.ofMillis(100)
57
private var maxLifeTime: Duration = Duration.ofSeconds(1)
8+
private var keepAliveData: KeepAliveData = EmptyKeepAliveData()
69

710
fun keepAliveInterval(interval: Duration): KeepAliveOptions {
811
assertDuration(interval, "keepAliveInterval")
@@ -20,9 +23,17 @@ class KeepAliveOptions : KeepAlive {
2023

2124
override fun keepAliveMaxLifeTime() = maxLifeTime
2225

26+
fun keepAliveData(keepAliveData: KeepAliveData): KeepAliveOptions {
27+
this.keepAliveData = keepAliveData
28+
return this
29+
}
30+
31+
fun keepAliveData(): KeepAliveData = keepAliveData
32+
2333
fun copy(): KeepAliveOptions = KeepAliveOptions()
2434
.keepAliveInterval(interval)
2535
.keepAliveMaxLifeTime(maxLifeTime)
36+
.keepAliveData(keepAliveData)
2637

2738
private fun assertDuration(duration: Duration, name: String) {
2839
if (duration.millis <= 0) {
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package io.rsocket.kotlin
2+
3+
abstract class Options<T:Options<T>> internal constructor() {
4+
private var streamRequestLimit: Int = 128
5+
6+
@Suppress("UNCHECKED_CAST")
7+
open fun streamRequestLimit(streamRequestLimit: Int): T {
8+
assertRequestLimit(streamRequestLimit)
9+
this.streamRequestLimit = streamRequestLimit
10+
return this as T
11+
}
12+
13+
abstract fun copy(): T
14+
15+
internal fun streamRequestLimit(): Int = streamRequestLimit
16+
17+
private fun assertRequestLimit(streamRequestLimit: Int) {
18+
if (streamRequestLimit <= 0) {
19+
throw IllegalArgumentException("stream request limit must be positive")
20+
}
21+
}
22+
}

rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ object RSocketFactory {
5555
private var setupPayload: Payload = DefaultPayload.EMPTY
5656
private val keepAlive = KeepAliveOptions()
5757
private val mediaType = MediaTypeOptions()
58-
private var streamRequestLimit = defaultStreamRequestLimit
58+
private val options = ClientOptions()
5959

6060
fun interceptors(configure: (InterceptorOptions) -> Unit): ClientRSocketFactory {
6161
configure(interceptors)
@@ -79,7 +79,7 @@ object RSocketFactory {
7979
return this
8080
}
8181

82-
fun enableLease(leaseRefConsumer: (LeaseRef) -> Unit): ClientRSocketFactory {
82+
fun lease(leaseRefConsumer: (LeaseRef) -> Unit): ClientRSocketFactory {
8383
this.flags = Frame.Setup.enableLease(flags)
8484
this.leaseRefConsumer = leaseRefConsumer
8585
return this
@@ -95,9 +95,8 @@ object RSocketFactory {
9595
return this
9696
}
9797

98-
fun streamRequestLimit(streamRequestLimit: Int): ClientRSocketFactory {
99-
assertRequestLimit(streamRequestLimit)
100-
this.streamRequestLimit = streamRequestLimit
98+
fun options(configure: (ClientOptions) -> Unit): ClientRSocketFactory {
99+
configure(options)
101100
return this
102101
}
103102

@@ -126,7 +125,7 @@ object RSocketFactory {
126125
setupPayload,
127126
keepAlive.copy(),
128127
mediaType.copy(),
129-
streamRequestLimit,
128+
options.copy(),
130129
transport,
131130
interceptors.copy())
132131

@@ -137,13 +136,17 @@ object RSocketFactory {
137136
private val leaseRef: ((LeaseRef) -> Unit)?,
138137
private val flags: Int,
139138
private val setupPayload: Payload,
140-
private val keepAlive: KeepAlive,
139+
keepAliveOpts: KeepAliveOptions,
141140
private val mediaType: MediaType,
142-
private val streamRequestLimit: Int,
141+
options: ClientOptions,
143142
private val transportClient: () -> ClientTransport,
144143
private val parentInterceptors: InterceptorRegistry)
145144
: Start<RSocket> {
146145

146+
private val streamRequestLimit = options.streamRequestLimit()
147+
private val keepALive = keepAliveOpts as KeepAlive
148+
private val keepAliveData = keepAliveOpts.keepAliveData()
149+
147150
override fun start(): Single<RSocket> {
148151
return transportClient()
149152
.connect()
@@ -184,7 +187,8 @@ object RSocketFactory {
184187

185188
ClientServiceHandler(
186189
demuxer.serviceConnection(),
187-
keepAlive,
190+
keepALive,
191+
keepAliveData,
188192
errorConsumer)
189193

190194
val setupFrame = createSetupFrame()
@@ -205,8 +209,8 @@ object RSocketFactory {
205209
private fun createSetupFrame(): Frame {
206210
return Frame.Setup.from(
207211
flags,
208-
keepAlive.keepAliveInterval().intMillis,
209-
keepAlive.keepAliveMaxLifeTime().intMillis,
212+
keepALive.keepAliveInterval().intMillis,
213+
keepALive.keepAliveMaxLifeTime().intMillis,
210214
mediaType.metadataMimeType(),
211215
mediaType.dataMimeType(),
212216
setupPayload)
@@ -229,7 +233,7 @@ object RSocketFactory {
229233
private var mtu = 0
230234
private var leaseRefConsumer: ((LeaseRef) -> Unit)? = null
231235
private val interceptors = GlobalInterceptors.create()
232-
private var streamRequestLimit = defaultStreamRequestLimit
236+
private val options = ServerOptions()
233237

234238
fun interceptors(configure: (InterceptorOptions) -> Unit): ServerRSocketFactory {
235239
configure(interceptors)
@@ -242,7 +246,7 @@ object RSocketFactory {
242246
return this
243247
}
244248

245-
fun enableLease(leaseRefConsumer: (LeaseRef) -> Unit): ServerRSocketFactory {
249+
fun lease(leaseRefConsumer: (LeaseRef) -> Unit): ServerRSocketFactory {
246250
this.leaseRefConsumer = leaseRefConsumer
247251
return this
248252
}
@@ -252,8 +256,8 @@ object RSocketFactory {
252256
return this
253257
}
254258

255-
fun streamRequestLimit(streamRequestLimit: Int): ServerRSocketFactory {
256-
this.streamRequestLimit = streamRequestLimit
259+
fun options(configure: (ServerOptions) -> Unit): ServerRSocketFactory {
260+
configure(options)
257261
return this
258262
}
259263

@@ -268,7 +272,7 @@ object RSocketFactory {
268272
mtu,
269273
leaseRefConsumer,
270274
interceptors.copy(),
271-
streamRequestLimit)
275+
options.copy())
272276
}
273277
}
274278

@@ -279,7 +283,9 @@ object RSocketFactory {
279283
private val mtu: Int,
280284
private val leaseRef: ((LeaseRef) -> Unit)?,
281285
private val parentInterceptors: InterceptorRegistry,
282-
private val streamRequestLimit: Int) : Start<T> {
286+
options: ServerOptions) : Start<T> {
287+
288+
private val streamRequestLimit = options.streamRequestLimit()
283289

284290
override fun start(): Single<T> {
285291
return transportServer().start(object
@@ -331,12 +337,15 @@ object RSocketFactory {
331337

332338
ServerServiceHandler(
333339
demuxer.serviceConnection(),
334-
setup as KeepAlive,
340+
setup,
335341
errorConsumer)
336342

337343
val handlerRSocket = acceptor()(setup, wrappedRequester)
338344

339-
return handlerRSocket
345+
val rejectingHandlerRSocket = RejectingRSocket(handlerRSocket)
346+
.with(demuxer.requesterConnection())
347+
348+
return rejectingHandlerRSocket
340349
.map { handler -> interceptors.interceptHandler(handler) }
341350
.doOnSuccess { handler ->
342351
RSocketResponder(
@@ -374,12 +383,6 @@ object RSocketFactory {
374383
}
375384
}
376385

377-
private fun assertRequestLimit(streamRequestLimit: Int) {
378-
if (streamRequestLimit <= 0) {
379-
throw IllegalArgumentException("stream request limit must be positive")
380-
}
381-
}
382-
383386
private fun assertFragmentation(mtu: Int) {
384387
if (mtu < 0) {
385388
throw IllegalArgumentException("fragmentation mtu must be non-negative")
@@ -405,8 +408,6 @@ object RSocketFactory {
405408
transport { transport }
406409
}
407410

408-
private const val defaultStreamRequestLimit = 128
409-
410411
private val emptyRSocket = object : AbstractRSocket() {}
411412
}
412413

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.rsocket.kotlin
2+
3+
class ServerOptions : Options<ServerOptions>() {
4+
5+
override fun copy(): ServerOptions =
6+
ServerOptions().streamRequestLimit(streamRequestLimit())
7+
}

rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/ClientServiceHandler.kt

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,23 @@ import io.reactivex.Flowable
66
import io.reactivex.disposables.Disposable
77
import io.rsocket.kotlin.DuplexConnection
88
import io.rsocket.kotlin.Frame
9-
import io.rsocket.kotlin.exceptions.ConnectionException
109
import io.rsocket.kotlin.KeepAlive
10+
import io.rsocket.kotlin.KeepAliveData
11+
import io.rsocket.kotlin.exceptions.ConnectionException
12+
import java.nio.ByteBuffer
1113
import java.util.concurrent.TimeUnit
1214

1315
internal class ClientServiceHandler(serviceConnection: DuplexConnection,
1416
keepAlive: KeepAlive,
17+
keepAliveData: KeepAliveData,
1518
errorConsumer: (Throwable) -> Unit)
1619
: ServiceHandler(serviceConnection, errorConsumer) {
1720

1821
@Volatile
1922
private var keepAliveReceivedMillis = System.currentTimeMillis()
2023
private var subscription: Disposable? = null
24+
private val dataProducer: () -> ByteBuffer = keepAliveData.producer()
25+
private val dataHandler: (ByteBuffer) -> Unit = keepAliveData.handler()
2126

2227
init {
2328
val tickPeriod = keepAlive.keepAliveInterval().millis
@@ -35,6 +40,9 @@ internal class ClientServiceHandler(serviceConnection: DuplexConnection,
3540
override fun handleKeepAlive(frame: Frame) {
3641
if (!Frame.Keepalive.hasRespondFlag(frame)) {
3742
keepAliveReceivedMillis = System.currentTimeMillis()
43+
dataHandler(frame.data)
44+
} else {
45+
sendKeepAliveFrame(frame.data, false)
3846
}
3947
}
4048

@@ -51,8 +59,12 @@ internal class ClientServiceHandler(serviceConnection: DuplexConnection,
5159
"keep-alive timed out: $duration of $timeout ms"
5260
throw ConnectionException(message)
5361
}
54-
sentFrames.onNext(
55-
Frame.Keepalive.from(Unpooled.EMPTY_BUFFER, true))
62+
sendKeepAliveFrame(dataProducer(), true)
5663
}
5764
}
65+
66+
private fun sendKeepAliveFrame(data: ByteBuffer, respond: Boolean) {
67+
sentFrames.onNext(
68+
Frame.Keepalive.from(Unpooled.wrappedBuffer(data), respond))
69+
}
5870
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.rsocket.kotlin.internal
2+
3+
import io.rsocket.kotlin.KeepAliveData
4+
import java.nio.ByteBuffer
5+
6+
internal class EmptyKeepAliveData : KeepAliveData {
7+
8+
override fun producer(): () -> ByteBuffer = noopProducer
9+
10+
override fun handler(): (ByteBuffer) -> Unit = noopHandler
11+
12+
companion object {
13+
private val emptyBuffer = ByteBuffer.allocateDirect(0)
14+
private val noopProducer = { emptyBuffer }
15+
private val noopHandler: (ByteBuffer) -> Unit = { }
16+
}
17+
}

0 commit comments

Comments
 (0)