Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,6 @@ if (project.hasProperty('bintrayUser') && project.hasProperty('bintrayKey')) {
gpg {
sign = true
}

mavenCentralSync {
user = project.property('sonatypeUsername')
password = project.property('sonatypePassword')
}
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions rsocket-core/src/main/kotlin/io/rsocket/kotlin/Frame.kt
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,11 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold

fun leaseEnabled(frame: Frame): Boolean {
ensureFrameType(FrameType.SETUP, frame)
return Frame.isFlagSet(
frame.flags(),
SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE)
return SetupFrameFlyweight.supportsLease(frame.flags())
}

fun enableLease(flags: Int): Int {
return flags or SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE
}

fun keepaliveInterval(frame: Frame): Int {
Expand Down Expand Up @@ -594,7 +596,7 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
object Fragmentation {

fun assembleFrame(blueprintFrame: Frame,
metadata: ByteBuf,
metadata: ByteBuf?,
data: ByteBuf): Frame =

create(blueprintFrame,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ class KeepAliveOptions : KeepAlive {

override fun keepAliveMaxLifeTime() = maxLifeTime

fun copy(): KeepAliveOptions = KeepAliveOptions()
.keepAliveInterval(interval)
.keepAliveMaxLifeTime(maxLifeTime)

private fun assertDuration(duration: Duration, name: String) {
if (duration.millis <= 0) {
throw IllegalArgumentException("$name must be positive")
Expand Down
18 changes: 18 additions & 0 deletions rsocket-core/src/main/kotlin/io/rsocket/kotlin/LeaseRef.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.rsocket.kotlin

import io.reactivex.Completable
import java.nio.ByteBuffer

/** Provides means to grant lease to peer */
interface LeaseRef {

fun grantLease(
numberOfRequests: Int,
ttlMillis: Long,
metadata: ByteBuffer): Completable

fun grantLease(numberOfRequests: Int,
timeToLiveMillis: Long): Completable

fun onClose(): Completable
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ class MediaTypeOptions : MediaType {

override fun metadataMimeType(): String = metadataMimeType

fun copy(): MediaTypeOptions = MediaTypeOptions()
.dataMimeType(dataMimeType)
.metadataMimeType(metadataMimeType)

private fun assertMediaType(mediaType: String) {
if (mediaType.isEmpty()) {
throw IllegalArgumentException("media type must be non-empty")
Expand Down
161 changes: 130 additions & 31 deletions rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import io.reactivex.Single
import io.rsocket.kotlin.interceptors.GlobalInterceptors
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.fragmentation.FragmentationInterceptor
import io.rsocket.kotlin.internal.lease.ClientLeaseSupport
import io.rsocket.kotlin.internal.lease.ServerLeaseSupport
import io.rsocket.kotlin.transport.ClientTransport
import io.rsocket.kotlin.transport.ServerTransport
import io.rsocket.kotlin.util.AbstractRSocket
Expand All @@ -47,6 +49,7 @@ object RSocketFactory {
private var acceptor: ClientAcceptor = { { emptyRSocket } }
private var errorConsumer: (Throwable) -> Unit = { it.printStackTrace() }
private var mtu = 0
private var leaseRefConsumer: ((LeaseRef) -> Unit)? = null
private val interceptors = GlobalInterceptors.create()
private var flags = 0
private var setupPayload: Payload = DefaultPayload.EMPTY
Expand Down Expand Up @@ -76,6 +79,12 @@ object RSocketFactory {
return this
}

fun enableLease(leaseRefConsumer: (LeaseRef) -> Unit): ClientRSocketFactory {
this.flags = Frame.Setup.enableLease(flags)
this.leaseRefConsumer = leaseRefConsumer
return this
}

fun errorConsumer(errorConsumer: (Throwable) -> Unit): ClientRSocketFactory {
this.errorConsumer = errorConsumer
return this
Expand All @@ -93,49 +102,78 @@ object RSocketFactory {
}

fun transport(transport: () -> ClientTransport): Start<RSocket> =
ClientStart(transport, interceptors())
clientStart(acceptor, transport)

fun transport(transport: ClientTransport): Start<RSocket> =
transport { transport }

fun acceptor(acceptor: ClientAcceptor): ClientTransportAcceptor {
this.acceptor = acceptor
return object : ClientTransportAcceptor {
override fun transport(transport: () -> ClientTransport)
: Start<RSocket> =
ClientStart(transport, interceptors())
: Start<RSocket> = clientStart(acceptor, transport)
}
}

private fun interceptors(): InterceptorRegistry =
interceptors.copyWith {
it.connectionFirst(
FragmentationInterceptor(mtu))
}
private fun clientStart(acceptor: ClientAcceptor,
transport: () -> ClientTransport): ClientStart =

private inner class ClientStart(private val transportClient: () -> ClientTransport,
private val interceptors: InterceptorRegistry)
ClientStart(acceptor,
errorConsumer,
mtu,
leaseRefConsumer,
flags,
setupPayload,
keepAlive.copy(),
mediaType.copy(),
streamRequestLimit,
transport,
interceptors.copy())

private class ClientStart(
private val acceptor: ClientAcceptor,
private val errorConsumer: (Throwable) -> Unit,
private var mtu: Int,
private val leaseRef: ((LeaseRef) -> Unit)?,
private val flags: Int,
private val setupPayload: Payload,
private val keepAlive: KeepAlive,
private val mediaType: MediaType,
private val streamRequestLimit: Int,
private val transportClient: () -> ClientTransport,
private val parentInterceptors: InterceptorRegistry)
: Start<RSocket> {

override fun start(): Single<RSocket> {
return transportClient()
.connect()
.flatMap { connection ->
val setupFrame = createSetupFrame()

val withLease =
enableLease(parentInterceptors)

val interceptors =
enableFragmentation(withLease)

val interceptConnection = interceptors as InterceptConnection
val interceptRSocket = interceptors as InterceptRSocket

val demuxer = ClientConnectionDemuxer(
connection,
interceptors)
interceptConnection)

val rSocketRequester = RSocketRequester(
demuxer.requesterConnection(),
errorConsumer,
ClientStreamIds(),
streamRequestLimit)

val wrappedRequester = interceptors
val wrappedRequester = interceptRSocket
.interceptRequester(rSocketRequester)

val handlerRSocket = acceptor()(wrappedRequester)

val wrappedHandler = interceptors
val wrappedHandler = interceptRSocket
.interceptHandler(handlerRSocket)

RSocketResponder(
Expand All @@ -149,12 +187,21 @@ object RSocketFactory {
keepAlive,
errorConsumer)

val setupFrame = createSetupFrame()

connection
.sendOne(setupFrame)
.andThen(Single.just(wrappedRequester))
}
}

private fun enableFragmentation(parentInterceptors: InterceptorRegistry)
: InterceptorRegistry {
parentInterceptors.connectionFirst(
FragmentationInterceptor(mtu))
return parentInterceptors
}

private fun createSetupFrame(): Frame {
return Frame.Setup.from(
flags,
Expand All @@ -164,14 +211,23 @@ object RSocketFactory {
mediaType.dataMimeType(),
setupPayload)
}

private fun enableLease(parentInterceptors: InterceptorRegistry)
: InterceptorRegistry =
if (leaseRef != null) {
parentInterceptors.copyWith(
ClientLeaseSupport.enable(leaseRef)())
} else {
parentInterceptors.copy()
}
}
}

class ServerRSocketFactory internal constructor() {

private var acceptor: ServerAcceptor = { { _, _ -> Single.just(emptyRSocket) } }
private var errorConsumer: (Throwable) -> Unit = { it.printStackTrace() }
private var mtu = 0
private var leaseRefConsumer: ((LeaseRef) -> Unit)? = null
private val interceptors = GlobalInterceptors.create()
private var streamRequestLimit = defaultStreamRequestLimit

Expand All @@ -186,6 +242,11 @@ object RSocketFactory {
return this
}

fun enableLease(leaseRefConsumer: (LeaseRef) -> Unit): ServerRSocketFactory {
this.leaseRefConsumer = leaseRefConsumer
return this
}

fun errorConsumer(errorConsumer: (Throwable) -> Unit): ServerRSocketFactory {
this.errorConsumer = errorConsumer
return this
Expand All @@ -197,26 +258,28 @@ object RSocketFactory {
}

fun acceptor(acceptor: ServerAcceptor): ServerTransportAcceptor {
this.acceptor = acceptor
return object : ServerTransportAcceptor {

override fun <T : Closeable> transport(
transport: () -> ServerTransport<T>): Start<T> =
ServerStart(transport, interceptors())
}
}

private fun interceptors(): InterceptorRegistry {
return interceptors.copyWith {
it.connectionFirst(
ServerContractInterceptor(errorConsumer))
it.connectionFirst(
FragmentationInterceptor(mtu))
ServerStart(transport,
acceptor,
errorConsumer,
mtu,
leaseRefConsumer,
interceptors.copy(),
streamRequestLimit)
}
}

private inner class ServerStart<T : Closeable>(
private class ServerStart<T : Closeable>(
private val transportServer: () -> ServerTransport<T>,
private val interceptors: InterceptorRegistry) : Start<T> {
private val acceptor: ServerAcceptor,
private val errorConsumer: (Throwable) -> Unit,
private val mtu: Int,
private val leaseRef: ((LeaseRef) -> Unit)?,
private val parentInterceptors: InterceptorRegistry,
private val streamRequestLimit: Int) : Start<T> {

override fun start(): Single<T> {
return transportServer().start(object
Expand All @@ -225,25 +288,37 @@ object RSocketFactory {
override fun invoke(duplexConnection: DuplexConnection)
: Completable {

val withLease =
enableLease(parentInterceptors)

val withServerContract =
enableServerContract(withLease)

val interceptors =
enableFragmentation(withServerContract)

val demuxer = ServerConnectionDemuxer(
duplexConnection,
interceptors)
interceptors as InterceptConnection)

return demuxer
.setupConnection()
.receive()
.firstOrError()
.flatMapCompletable { setup ->
accept(setup, demuxer)
accept(setup,
interceptors as InterceptRSocket,
demuxer)
}
}
})
}

private fun accept(setupFrame: Frame,
interceptors: InterceptRSocket,
demuxer: ConnectionDemuxer): Completable {

val setup = Setup.create(setupFrame)
val setup = SetupContents.create(setupFrame)

val rSocketRequester = RSocketRequester(
demuxer.requesterConnection(),
Expand Down Expand Up @@ -272,6 +347,30 @@ object RSocketFactory {
}
.ignoreElement()
}

private fun enableLease(parentInterceptors: InterceptorRegistry)
: InterceptorRegistry =
if (leaseRef != null) {
parentInterceptors.copyWith(
ServerLeaseSupport.enable(leaseRef)())
} else {
parentInterceptors.copy()
}

private fun enableServerContract(parentInterceptors: InterceptorRegistry)
: InterceptorRegistry {

parentInterceptors.connectionFirst(
ServerContractInterceptor(errorConsumer, leaseRef != null))
return parentInterceptors
}

private fun enableFragmentation(parentInterceptors: InterceptorRegistry)
: InterceptorRegistry {
parentInterceptors.connectionFirst(
FragmentationInterceptor(mtu))
return parentInterceptors
}
}
}

Expand Down
Loading