Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 0 additions & 42 deletions rsocket-core/src/main/kotlin/io/rsocket/kotlin/LeaseGranter.kt

This file was deleted.

46 changes: 0 additions & 46 deletions rsocket-core/src/main/kotlin/io/rsocket/kotlin/LeaseOptions.kt

This file was deleted.

69 changes: 3 additions & 66 deletions rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketFactory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import io.reactivex.Single
import io.rsocket.kotlin.interceptors.GlobalInterceptors
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.fragmentation.FragmentationInterceptor
import io.rsocket.kotlin.internal.lease.ClientLeaseFeature
import io.rsocket.kotlin.internal.lease.ServerLeaseFeature
import io.rsocket.kotlin.transport.ClientTransport
import io.rsocket.kotlin.transport.ServerTransport
import io.rsocket.kotlin.util.AbstractRSocket
Expand All @@ -46,7 +44,6 @@ object RSocketFactory {
private var acceptor: ClientAcceptor = { { emptyRSocket } }
private var errorConsumer: (Throwable) -> Unit = { it.printStackTrace() }
private var mtu = 0
private val leaseOptions = LeaseOptions()
private val interceptors = GlobalInterceptors.create()
private var flags = 0
private var setupPayload: Payload = DefaultPayload.EMPTY
Expand Down Expand Up @@ -101,20 +98,6 @@ object RSocketFactory {
return this
}

/**
* Configure lease support
*
* @param configure function which configures [LeaseOptions]
* @return this ClientRSocketFactory
*/
fun lease(configure: (LeaseOptions) -> Unit): ClientRSocketFactory {
configure(leaseOptions)
if (leaseOptions.rSocketLeaseConsumer() != null) {
this.flags = Frame.Setup.enableLease(flags)
}
return this
}

/**
* Sets consumer for errors which are out of scope of RSocket interaction
* requests (e.g. request-response, request-stream and so on)
Expand Down Expand Up @@ -191,7 +174,6 @@ object RSocketFactory {
mtu,
flags,
setupPayload,
leaseOptions.copy(),
keepAlive.copy(),
mediaType.copy(),
options.copy(),
Expand All @@ -204,7 +186,6 @@ object RSocketFactory {
private var mtu: Int,
private val flags: Int,
private val setupPayload: Payload,
private val leaseOptions: LeaseOptions,
keepAliveOpts: KeepAliveOptions,
private val mediaType: MediaType,
options: RSocketOptions,
Expand All @@ -220,11 +201,8 @@ object RSocketFactory {
return transportClient()
.connect()
.flatMap { connection ->

val enabledLease =
enableLease(parentInterceptors)
val enabledFragmentation =
enableFragmentation(enabledLease)
enableFragmentation(parentInterceptors)
val interceptConnection =
enabledFragmentation as InterceptConnection
val interceptRSocket =
Expand Down Expand Up @@ -284,26 +262,13 @@ object RSocketFactory {
mediaType.dataMimeType(),
setupPayload)
}

private fun enableLease(parentInterceptors: InterceptorRegistry)
: InterceptorRegistry {
val rSocketLeaseConsumer =
leaseOptions.rSocketLeaseConsumer()
return if (rSocketLeaseConsumer != null) {
parentInterceptors.copyWith(
ClientLeaseFeature.enable(rSocketLeaseConsumer)())
} else {
parentInterceptors.copy()
}
}
}
}

class ServerRSocketFactory internal constructor() {

private var errorConsumer: (Throwable) -> Unit = { it.printStackTrace() }
private var mtu = 0
private val leaseOptions = LeaseOptions()
private val interceptors = GlobalInterceptors.create()
private val options = RSocketOptions()

Expand Down Expand Up @@ -331,17 +296,6 @@ object RSocketFactory {
return this
}

/**
* Configure lease support
*
* @param configure function which configures [LeaseOptions]
* @return this ServerRSocketFactory
*/
fun lease(configure: (LeaseOptions) -> Unit): ServerRSocketFactory {
configure(leaseOptions)
return this
}

/**
* Sets consumer for errors which are out of scope of RSocket interaction
* requests (e.g. request-response, request-stream and so on)
Expand Down Expand Up @@ -382,7 +336,6 @@ object RSocketFactory {
acceptor,
errorConsumer,
mtu,
leaseOptions.copy(),
interceptors.copy(),
options.copy())
}
Expand All @@ -393,7 +346,6 @@ object RSocketFactory {
private val acceptor: ServerAcceptor,
private val errorConsumer: (Throwable) -> Unit,
private val mtu: Int,
private val leaseOptions: LeaseOptions,
private val parentInterceptors: InterceptorRegistry,
options: RSocketOptions) : Start<T> {

Expand All @@ -406,10 +358,8 @@ object RSocketFactory {
override fun invoke(duplexConnection: DuplexConnection)
: Completable {

val enabledLease =
enableLease(parentInterceptors)
val enabledServerContract =
enableServerContract(enabledLease)
enableServerContract(parentInterceptors)
val enabledFragmentation =
enableFragmentation(enabledServerContract)
val interceptConnection =
Expand Down Expand Up @@ -472,24 +422,11 @@ object RSocketFactory {
.ignoreElement()
}

private fun enableLease(parentInterceptors: InterceptorRegistry)
: InterceptorRegistry {
val rSocketLeaseConsumer =
leaseOptions.rSocketLeaseConsumer()
return if (rSocketLeaseConsumer != null) {
parentInterceptors.copyWith(
ServerLeaseFeature.enable(rSocketLeaseConsumer)())
} else {
parentInterceptors.copy()
}
}

private fun enableServerContract(parentInterceptors: InterceptorRegistry)
: InterceptorRegistry {

parentInterceptors.connectionFirst(
ServerContractInterceptor(errorConsumer,
leaseOptions.rSocketLeaseConsumer() != null))
ServerContractInterceptor(errorConsumer, false))
return parentInterceptors
}

Expand Down
30 changes: 0 additions & 30 deletions rsocket-core/src/main/kotlin/io/rsocket/kotlin/RSocketLease.kt

This file was deleted.

This file was deleted.

This file was deleted.

Loading