diff --git a/rsocket-core/src/main/java/io/rsocket/android/Duration.kt b/rsocket-core/src/main/java/io/rsocket/android/Duration.kt
index cd9588fad..7666780a6 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/Duration.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/Duration.kt
@@ -8,12 +8,19 @@ import java.util.concurrent.TimeUnit
data class Duration(val value: Long, val unit: TimeUnit) {
- val toMillis = unit.toMillis(value)
+ val millis = unit.toMillis(value)
+
+ val intMillis = unit.toMillis(value).toInt()
companion object {
- val ZERO = Duration(0, TimeUnit.MILLISECONDS)
+
fun ofSeconds(n: Long) = Duration(n, TimeUnit.SECONDS)
+
+ fun ofSeconds(n: Int) = Duration(n.toLong(), TimeUnit.SECONDS)
+
fun ofMillis(n: Long) = Duration(n, TimeUnit.MILLISECONDS)
+
+ fun ofMillis(n: Int) = Duration(n.toLong(), TimeUnit.MILLISECONDS)
}
}
diff --git a/rsocket-core/src/main/java/io/rsocket/android/Frame.kt b/rsocket-core/src/main/java/io/rsocket/android/Frame.kt
index c74411a08..86dee8175 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/Frame.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/Frame.kt
@@ -15,24 +15,20 @@
*/
package io.rsocket.android
-import io.rsocket.android.frame.FrameHeaderFlyweight.FLAGS_M
-
-import io.netty.buffer.*
+import com.sun.org.apache.xpath.internal.operations.Bool
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import io.netty.buffer.ByteBufHolder
+import io.netty.buffer.Unpooled
import io.netty.util.IllegalReferenceCountException
import io.netty.util.Recycler
import io.netty.util.Recycler.Handle
import io.netty.util.ResourceLeakDetector
-import io.rsocket.android.frame.ErrorFrameFlyweight
-import io.rsocket.android.frame.FrameHeaderFlyweight
-import io.rsocket.android.frame.KeepaliveFrameFlyweight
-import io.rsocket.android.frame.LeaseFrameFlyweight
-import io.rsocket.android.frame.RequestFrameFlyweight
-import io.rsocket.android.frame.RequestNFrameFlyweight
-import io.rsocket.android.frame.SetupFrameFlyweight
-import io.rsocket.android.frame.VersionFlyweight
+import io.rsocket.android.frame.*
+import io.rsocket.android.frame.FrameHeaderFlyweight.FLAGS_M
+import org.slf4j.LoggerFactory
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
-import org.slf4j.LoggerFactory
/**
* Represents a Frame sent over a [DuplexConnection].
@@ -51,10 +47,12 @@ class Frame private constructor(private val handle: Handle) : ByteBufHold
/** Return the content which is held by this [Frame]. */
override fun content(): ByteBuf {
- if (content!!.refCnt() <= 0) {
- throw IllegalReferenceCountException(content!!.refCnt())
- }
- return content as ByteBuf
+ val c = content
+ return if (c == null) {
+ throw IllegalReferenceCountException(0)
+ } else if (c.refCnt() <= 0) {
+ throw IllegalReferenceCountException(c.refCnt())
+ } else content as ByteBuf
}
/** Creates a deep copy of this [Frame]. */
@@ -79,7 +77,7 @@ class Frame private constructor(private val handle: Handle) : ByteBufHold
* Returns the reference count of this object. If `0`, it means this object has been
* deallocated.
*/
- override fun refCnt(): Int = content!!.refCnt()
+ override fun refCnt(): Int = content?.refCnt() ?: 0
/** Increases the reference count by `1`. */
override fun retain(): Frame {
@@ -210,11 +208,17 @@ class Frame private constructor(private val handle: Handle) : ByteBufHold
*/
fun flags(): Int = FrameHeaderFlyweight.flags(content!!)
- fun hasMetadata(): Boolean = isFlagSet(this.flags(), FLAGS_M)
+ fun isFlagSet(flag: Int): Boolean {
+ return isFlagSet(this.flags(), flag)
+ }
+
+ fun hasMetadata(): Boolean = isFlagSet(FLAGS_M)
val dataUtf8: String
get() = StandardCharsets.UTF_8.decode(data).toString()
+ val isFragmentable: Boolean
+ get() = type.isFragmentable
/* TODO:
*
* fromRequest(type, id, payload)
@@ -227,6 +231,7 @@ class Frame private constructor(private val handle: Handle) : ByteBufHold
fun from(
flags: Int,
+ version: Int,
keepaliveInterval: Int,
maxLifetime: Int,
metadataMimeType: String,
@@ -250,6 +255,7 @@ class Frame private constructor(private val handle: Handle) : ByteBufHold
SetupFrameFlyweight.encode(
frame.content!!,
flags,
+ version,
keepaliveInterval,
maxLifetime,
metadataMimeType,
@@ -259,6 +265,25 @@ class Frame private constructor(private val handle: Handle) : ByteBufHold
return frame
}
+ fun from(
+ flags: Int,
+ keepaliveInterval: Int,
+ maxLifetime: Int,
+ metadataMimeType: String,
+ dataMimeType: String,
+ payload: Payload): Frame {
+
+ return from(
+ flags,
+ SetupFrameFlyweight.CURRENT_VERSION,
+ keepaliveInterval,
+ maxLifetime,
+ metadataMimeType,
+ dataMimeType,
+ payload)
+ }
+
+
fun getFlags(frame: Frame): Int {
ensureFrameType(FrameType.SETUP, frame)
val flags = FrameHeaderFlyweight.flags(frame.content!!)
@@ -271,6 +296,20 @@ class Frame private constructor(private val handle: Handle) : ByteBufHold
return SetupFrameFlyweight.version(frame.content!!)
}
+ fun resumeEnabled(frame: Frame): Boolean {
+ ensureFrameType(FrameType.SETUP, frame)
+ return Frame.isFlagSet(
+ frame.flags(),
+ SetupFrameFlyweight.FLAGS_RESUME_ENABLE)
+ }
+
+ fun leaseEnabled(frame: Frame): Boolean {
+ ensureFrameType(FrameType.SETUP, frame)
+ return Frame.isFlagSet(
+ frame.flags(),
+ SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE)
+ }
+
fun keepaliveInterval(frame: Frame): Int {
ensureFrameType(FrameType.SETUP, frame)
return SetupFrameFlyweight.keepaliveInterval(frame.content!!)
@@ -429,14 +468,14 @@ class Frame private constructor(private val handle: Handle) : ByteBufHold
fun from(
streamId: Int,
type: FrameType,
- metadata: ByteBuf,
+ metadata: ByteBuf?,
data: ByteBuf,
initialRequestN: Int,
flags: Int): Frame {
val frame = RECYCLER.get()
frame.content = ByteBufAllocator.DEFAULT.buffer(
RequestFrameFlyweight.computeFrameLength(
- type, metadata.readableBytes(), data.readableBytes()))
+ type, metadata?.readableBytes(), data.readableBytes()))
frame.content!!.writerIndex(
RequestFrameFlyweight.encode(
frame.content!!,
@@ -449,6 +488,21 @@ class Frame private constructor(private val handle: Handle) : ByteBufHold
return frame
}
+ fun from(streamId: Int,
+ type: FrameType,
+ metadata: ByteBuf?,
+ data: ByteBuf,
+ flags: Int): Frame {
+
+ return PayloadFrame.from(
+ streamId,
+ type,
+ metadata,
+ data,
+ flags)
+ }
+
+
fun initialRequestN(frame: Frame): Int {
val type = frame.type
if (!type.isRequestType) {
@@ -538,6 +592,66 @@ class Frame private constructor(private val handle: Handle) : ByteBufHold
}
}
+ object Fragmentation {
+
+ fun assembleFrame(blueprintFrame: Frame,
+ metadata: ByteBuf,
+ data: ByteBuf): Frame =
+
+ create(blueprintFrame,
+ metadata,
+ data,
+ { it and FrameHeaderFlyweight.FLAGS_F.inv() })
+
+ fun sliceFrame(blueprintFrame: Frame,
+ metadata: ByteBuf?,
+ data: ByteBuf,
+ additionalFlags: Int): Frame =
+
+ create(blueprintFrame,
+ metadata,
+ data,
+ { it or additionalFlags })
+
+ private inline fun create(blueprintFrame: Frame,
+ metadata: ByteBuf?,
+ data: ByteBuf,
+ modifyFlags: (Int) -> Int): Frame =
+ when (blueprintFrame.type) {
+ FrameType.FIRE_AND_FORGET,
+ FrameType.REQUEST_RESPONSE -> {
+ Frame.Request.from(
+ blueprintFrame.streamId,
+ blueprintFrame.type,
+ metadata,
+ data,
+ modifyFlags(blueprintFrame.flags()))
+ }
+ FrameType.NEXT,
+ FrameType.NEXT_COMPLETE -> {
+ Frame.PayloadFrame.from(
+ blueprintFrame.streamId,
+ blueprintFrame.type,
+ metadata,
+ data,
+ modifyFlags(blueprintFrame.flags()))
+ }
+
+ FrameType.REQUEST_STREAM,
+ FrameType.REQUEST_CHANNEL -> {
+ Frame.Request.from(
+ blueprintFrame.streamId,
+ blueprintFrame.type,
+ metadata,
+ data,
+ Frame.Request.initialRequestN(blueprintFrame),
+ modifyFlags(blueprintFrame.flags()))
+ }
+ else -> throw AssertionError("Non-fragmentable frame: " +
+ "${blueprintFrame.type}")
+ }
+ }
+
override fun toString(): String {
val type = FrameHeaderFlyweight.frameType(content!!)
val payload = StringBuilder()
@@ -587,7 +701,7 @@ class Frame private constructor(private val handle: Handle) : ByteBufHold
}
companion object {
- val NULL_BYTEBUFFER:ByteBuffer = ByteBuffer.allocateDirect(0)
+ val NULL_BYTEBUFFER: ByteBuffer = ByteBuffer.allocateDirect(0)
private val RECYCLER = object : Recycler() {
override fun newObject(handle: Handle): Frame {
diff --git a/rsocket-core/src/main/java/io/rsocket/android/FrameType.kt b/rsocket-core/src/main/java/io/rsocket/android/FrameType.kt
index 205bf55d8..9900be92d 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/FrameType.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/FrameType.kt
@@ -24,12 +24,26 @@ enum class FrameType(val encodedType: Int, private val flags: Int = 0) {
LEASE(0x02, Flags.CAN_HAVE_METADATA),
KEEPALIVE(0x03, Flags.CAN_HAVE_DATA),
// Requester to start request
- REQUEST_RESPONSE(0x04, Flags.CAN_HAVE_METADATA_AND_DATA or Flags.IS_REQUEST_TYPE),
- FIRE_AND_FORGET(0x05, Flags.CAN_HAVE_METADATA_AND_DATA or Flags.IS_REQUEST_TYPE),
+ REQUEST_RESPONSE(0x04,
+ Flags.CAN_HAVE_METADATA_AND_DATA or
+ Flags.IS_REQUEST_TYPE or
+ Flags.IS_FRAGMENTABLE),
+ FIRE_AND_FORGET(0x05,
+ Flags.CAN_HAVE_METADATA_AND_DATA or
+ Flags.IS_REQUEST_TYPE or
+ Flags.IS_FRAGMENTABLE),
REQUEST_STREAM(
- 0x06, Flags.CAN_HAVE_METADATA_AND_DATA or Flags.IS_REQUEST_TYPE or Flags.HAS_INITIAL_REQUEST_N),
+ 0x06,
+ Flags.CAN_HAVE_METADATA_AND_DATA or
+ Flags.IS_REQUEST_TYPE or
+ Flags.HAS_INITIAL_REQUEST_N
+ or Flags.IS_FRAGMENTABLE),
REQUEST_CHANNEL(
- 0x07, Flags.CAN_HAVE_METADATA_AND_DATA or Flags.IS_REQUEST_TYPE or Flags.HAS_INITIAL_REQUEST_N),
+ 0x07,
+ Flags.CAN_HAVE_METADATA_AND_DATA or
+ Flags.IS_REQUEST_TYPE or
+ Flags.HAS_INITIAL_REQUEST_N or
+ Flags.IS_FRAGMENTABLE),
// Requester mid-stream
REQUEST_N(0x08),
CANCEL(0x09, Flags.CAN_HAVE_METADATA),
@@ -42,22 +56,28 @@ enum class FrameType(val encodedType: Int, private val flags: Int = 0) {
RESUME(0x0D),
RESUME_OK(0x0E),
// synthetic types from Responder for use by the rest of the machinery
- NEXT(0xA0, Flags.CAN_HAVE_METADATA_AND_DATA),
+ NEXT(0xA0,
+ Flags.CAN_HAVE_METADATA_AND_DATA or
+ Flags.IS_FRAGMENTABLE),
COMPLETE(0xB0),
- NEXT_COMPLETE(0xC0, Flags.CAN_HAVE_METADATA_AND_DATA),
+ NEXT_COMPLETE(0xC0,
+ Flags.CAN_HAVE_METADATA_AND_DATA or
+ Flags.IS_FRAGMENTABLE),
EXT(0xFFFF, Flags.CAN_HAVE_METADATA_AND_DATA);
private object Flags {
- internal val CAN_HAVE_DATA = 1
- internal val CAN_HAVE_METADATA = 2
- internal val CAN_HAVE_METADATA_AND_DATA = 3
- internal val IS_REQUEST_TYPE = 4
- internal val HAS_INITIAL_REQUEST_N = 8
+ internal const val CAN_HAVE_DATA = 1
+ internal const val CAN_HAVE_METADATA = 2
+ internal const val CAN_HAVE_METADATA_AND_DATA = 3
+ internal const val IS_REQUEST_TYPE = 4
+ internal const val HAS_INITIAL_REQUEST_N = 8
+ internal const val IS_FRAGMENTABLE = 16
}
- val isRequestType: Boolean
- get() = Flags.IS_REQUEST_TYPE == flags and Flags.IS_REQUEST_TYPE
+ val isFragmentable = Flags.IS_FRAGMENTABLE == (flags and Flags.IS_FRAGMENTABLE)
+
+ val isRequestType: Boolean = Flags.IS_REQUEST_TYPE == (flags and Flags.IS_REQUEST_TYPE)
fun hasInitialRequestN(): Boolean = Flags.HAS_INITIAL_REQUEST_N == flags and Flags.HAS_INITIAL_REQUEST_N
diff --git a/rsocket-core/src/main/java/io/rsocket/android/RSocket.kt b/rsocket-core/src/main/java/io/rsocket/android/RSocket.kt
index 526e5a805..ea2f6fe66 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/RSocket.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/RSocket.kt
@@ -55,7 +55,7 @@ interface RSocket : Availability, Closeable {
/**
* Request-Channel interaction model of `RSocket`.
*
- * @param payloads Stream of request payloads.
+ * @param payloads Stream of send payloads.
* @return Stream of response payloads.
*/
fun requestChannel(payloads: Publisher): Flowable
diff --git a/rsocket-core/src/main/java/io/rsocket/android/RSocketFactory.kt b/rsocket-core/src/main/java/io/rsocket/android/RSocketFactory.kt
index 3be984565..c25124db6 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/RSocketFactory.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/RSocketFactory.kt
@@ -18,23 +18,21 @@ package io.rsocket.android
import io.reactivex.Completable
import io.reactivex.Single
-import io.rsocket.android.exceptions.InvalidSetupException
-import io.rsocket.android.fragmentation.FragmentationDuplexConnection
-import io.rsocket.android.frame.SetupFrameFlyweight
-import io.rsocket.android.frame.VersionFlyweight
+import io.rsocket.android.fragmentation.FragmentationInterceptor
import io.rsocket.android.internal.*
-import io.rsocket.android.plugins.DuplexConnectionInterceptor
-import io.rsocket.android.plugins.PluginRegistry
-import io.rsocket.android.plugins.Plugins
-import io.rsocket.android.plugins.RSocketInterceptor
+import io.rsocket.android.plugins.GlobalInterceptors
+import io.rsocket.android.plugins.InterceptorOptions
+import io.rsocket.android.plugins.InterceptorRegistry
import io.rsocket.android.transport.ClientTransport
import io.rsocket.android.transport.ServerTransport
+import io.rsocket.android.util.KeepAlive
+import io.rsocket.android.util.KeepAliveOptions
+import io.rsocket.android.util.MediaTypeOptions
import io.rsocket.android.util.PayloadImpl
/** Factory for creating RSocket clients and servers. */
object RSocketFactory {
- private const val DEFAULT_STREAM_DEMAND_LIMIT = 128
/**
* Creates a factory that establishes client connections to other RSockets.
*
@@ -49,229 +47,146 @@ object RSocketFactory {
*/
fun receive(): ServerRSocketFactory = ServerRSocketFactory()
- interface Start {
- fun start(): Single
- }
-
- interface ClientTransportAcceptor {
- fun transport(transport: () -> ClientTransport): Start
-
- fun transport(transport: ClientTransport): Start = transport { transport }
-
- }
-
- interface ServerTransportAcceptor {
- fun transport(transport: () -> ServerTransport): Start
-
- fun transport(transport: ServerTransport): Start = transport({ transport })
-
- }
-
class ClientRSocketFactory {
- private var acceptor: () -> (RSocket) -> RSocket = { { rs -> rs } }
-
+ private var acceptor: ClientAcceptor = { { emptyRSocket } }
private var errorConsumer: (Throwable) -> Unit = { it.printStackTrace() }
private var mtu = 0
- private val plugins = PluginRegistry(Plugins.defaultPlugins())
+ private val interceptors = GlobalInterceptors.create()
private var flags = 0
-
private var setupPayload: Payload = PayloadImpl.EMPTY
+ private val keepAlive = KeepAliveOptions()
+ private val mediaType = MediaTypeOptions()
+ private var streamRequestLimit = defaultStreamRequestLimit
- private var tickPeriod = Duration.ZERO
- private var ackTimeout = Duration.ofSeconds(30)
- private var missedAcks = 3
-
- private var metadataMimeType = "application/binary"
- private var dataMimeType = "application/binary"
-
- private var streamDemandLimit = DEFAULT_STREAM_DEMAND_LIMIT
-
- fun addConnectionPlugin(interceptor: DuplexConnectionInterceptor): ClientRSocketFactory {
- plugins.addConnectionPlugin(interceptor)
- return this
- }
-
- fun addClientPlugin(interceptor: RSocketInterceptor): ClientRSocketFactory {
- plugins.addClientPlugin(interceptor)
+ fun interceptors(configure: (InterceptorOptions) -> Unit): ClientRSocketFactory {
+ configure(interceptors)
return this
}
- fun addServerPlugin(interceptor: RSocketInterceptor): ClientRSocketFactory {
- plugins.addServerPlugin(interceptor)
+ fun keepAlive(configure: (KeepAliveOptions) -> Unit): ClientRSocketFactory {
+ configure(keepAlive)
return this
}
- fun keepAlive(): ClientRSocketFactory {
- tickPeriod = Duration.ofSeconds(20)
+ fun mimeType(configure: (MediaTypeOptions) -> Unit)
+ : ClientRSocketFactory {
+ configure(mediaType)
return this
}
- fun keepAlive(
- tickPeriod: Duration, ackTimeout: Duration, missedAcks: Int): ClientRSocketFactory {
- this.tickPeriod = tickPeriod
- this.ackTimeout = ackTimeout
- this.missedAcks = missedAcks
- return this
- }
-
- fun keepAliveTickPeriod(tickPeriod: Duration): ClientRSocketFactory {
- this.tickPeriod = tickPeriod
- return this
- }
-
- fun keepAliveAckTimeout(ackTimeout: Duration): ClientRSocketFactory {
- this.ackTimeout = ackTimeout
- return this
- }
-
- fun keepAliveMissedAcks(missedAcks: Int): ClientRSocketFactory {
- this.missedAcks = missedAcks
+ fun fragment(mtu: Int): ClientRSocketFactory {
+ assertFragmentation(mtu)
+ this.mtu = mtu
return this
}
- fun mimeType(metadataMimeType: String, dataMimeType: String): ClientRSocketFactory {
- this.dataMimeType = dataMimeType
- this.metadataMimeType = metadataMimeType
+ fun errorConsumer(errorConsumer: (Throwable) -> Unit): ClientRSocketFactory {
+ this.errorConsumer = errorConsumer
return this
}
- fun dataMimeType(dataMimeType: String): ClientRSocketFactory {
- this.dataMimeType = dataMimeType
+ fun setupPayload(payload: Payload): ClientRSocketFactory {
+ this.setupPayload = payload
return this
}
- fun metadataMimeType(metadataMimeType: String): ClientRSocketFactory {
- this.metadataMimeType = metadataMimeType
+ fun streamRequestLimit(streamRequestLimit: Int): ClientRSocketFactory {
+ assertRequestLimit(streamRequestLimit)
+ this.streamRequestLimit = streamRequestLimit
return this
}
- fun transport(transport: () -> ClientTransport): Start = StartClient(transport)
+ fun transport(transport: () -> ClientTransport): Start =
+ ClientStart(transport, interceptors())
- fun acceptor(acceptor: () -> (RSocket) -> RSocket): ClientTransportAcceptor {
+ fun acceptor(acceptor: ClientAcceptor): ClientTransportAcceptor {
this.acceptor = acceptor
return object : ClientTransportAcceptor {
- override fun transport(transport: () -> ClientTransport): Start = StartClient(transport)
-
+ override fun transport(transport: () -> ClientTransport)
+ : Start =
+ ClientStart(transport, interceptors())
}
}
- fun fragment(mtu: Int): ClientRSocketFactory {
- this.mtu = mtu
- return this
- }
-
- fun errorConsumer(errorConsumer: (Throwable) -> Unit): ClientRSocketFactory {
- this.errorConsumer = errorConsumer
- return this
- }
-
- fun setupPayload(payload: Payload): ClientRSocketFactory {
- this.setupPayload = payload
- return this
- }
-
- fun streamDemandLimit(streamDemandLimit: Int): ClientRSocketFactory {
- this.streamDemandLimit = streamDemandLimit
- return this
- }
+ private fun interceptors(): InterceptorRegistry =
+ interceptors.copyWith {
+ it.connectionFirst(
+ FragmentationInterceptor(mtu))
+ }
- private inner class StartClient internal constructor(private val transportClient: () -> ClientTransport)
+ private inner class ClientStart(private val transportClient: () -> ClientTransport,
+ private val interceptors: InterceptorRegistry)
: Start {
override fun start(): Single {
return transportClient()
.connect()
.flatMap { connection ->
- val setupFrame = Frame.Setup.from(
- flags,
- ackTimeout.toMillis.toInt(),
- ackTimeout.toMillis.toInt() * missedAcks,
- metadataMimeType,
- dataMimeType,
- setupPayload)
-
- val conn =
- if (mtu > 0)
- FragmentationDuplexConnection(connection, mtu)
- else
- connection
-
- val demuxer = ClientConnectionDemuxer(conn, plugins)
-
- val rSocketClient = RSocketClient(
+ val setupFrame = createSetupFrame()
+
+ val demuxer = ClientConnectionDemuxer(
+ connection,
+ interceptors)
+
+ val rSocketRequester = RSocketRequester(
demuxer.requesterConnection(),
errorConsumer,
- StreamIdSupplier.clientSupplier(),
- streamDemandLimit)
-
- val wrappedRSocketClient = Single
- .just(rSocketClient)
- .map { plugins.applyClient(it) }
-
- wrappedRSocketClient.flatMap { wrappedClientRSocket ->
- val unwrappedServerSocket = acceptor()(wrappedClientRSocket)
-
- val wrappedRSocketServer = Single
- .just(unwrappedServerSocket)
- .map { plugins.applyServer(it) }
-
- wrappedRSocketServer
- .doOnSuccess { rSocket ->
- RSocketServer(
- demuxer.responderConnection(),
- rSocket,
- errorConsumer,
- streamDemandLimit)
- }.doOnSuccess {
- ClientServiceHandler(
- demuxer.serviceConnection(),
- errorConsumer,
- KeepAliveInfo(
- tickPeriod,
- ackTimeout,
- missedAcks))
- }
- .flatMapCompletable { conn.sendOne(setupFrame) }
- .andThen(wrappedRSocketClient)
- }
+ ClientStreamIds(),
+ streamRequestLimit)
+
+ val wrappedRequester = interceptors
+ .interceptRequester(rSocketRequester)
+
+ val handlerRSocket = acceptor()(wrappedRequester)
+
+ val wrappedHandler = interceptors
+ .interceptHandler(handlerRSocket)
+
+ RSocketResponder(
+ demuxer.responderConnection(),
+ wrappedHandler,
+ errorConsumer,
+ streamRequestLimit)
+
+ ClientServiceHandler(
+ demuxer.serviceConnection(),
+ keepAlive,
+ errorConsumer)
+
+ connection
+ .sendOne(setupFrame)
+ .andThen(Single.just(wrappedRequester))
}
}
+
+ private fun createSetupFrame(): Frame {
+ return Frame.Setup.from(
+ flags,
+ keepAlive.keepAliveInterval().intMillis,
+ keepAlive.keepAliveMaxLifeTime().intMillis,
+ mediaType.metadataMimeType(),
+ mediaType.dataMimeType(),
+ setupPayload)
+ }
}
}
class ServerRSocketFactory internal constructor() {
- private var acceptor: (() -> SocketAcceptor)? = null
+ private var acceptor: ServerAcceptor = { { _, _ -> Single.just(emptyRSocket) } }
private var errorConsumer: (Throwable) -> Unit = { it.printStackTrace() }
private var mtu = 0
- private val plugins = PluginRegistry(Plugins.defaultPlugins())
- private var streamDemandLimit = DEFAULT_STREAM_DEMAND_LIMIT
-
- fun addConnectionPlugin(interceptor: DuplexConnectionInterceptor): ServerRSocketFactory {
- plugins.addConnectionPlugin(interceptor)
- return this
- }
+ private val interceptors = GlobalInterceptors.create()
+ private var streamRequestLimit = defaultStreamRequestLimit
- fun addClientPlugin(interceptor: RSocketInterceptor): ServerRSocketFactory {
- plugins.addClientPlugin(interceptor)
+ fun interceptors(configure: (InterceptorOptions) -> Unit): ServerRSocketFactory {
+ configure(interceptors)
return this
}
- fun addServerPlugin(interceptor: RSocketInterceptor): ServerRSocketFactory {
- plugins.addServerPlugin(interceptor)
- return this
- }
-
- fun acceptor(acceptor: () -> SocketAcceptor): ServerTransportAcceptor {
- this.acceptor = acceptor
- return object : ServerTransportAcceptor {
- override fun transport(transport: () -> ServerTransport): Start =
- ServerStart(transport)
- }
- }
-
fun fragment(mtu: Int): ServerRSocketFactory {
+ assertFragmentation(mtu)
this.mtu = mtu
return this
}
@@ -281,79 +196,126 @@ object RSocketFactory {
return this
}
- fun streamDemandLimit(streamDemandLimit: Int): ServerRSocketFactory {
- this.streamDemandLimit = streamDemandLimit
+ fun streamRequestLimit(streamRequestLimit: Int): ServerRSocketFactory {
+ this.streamRequestLimit = streamRequestLimit
return this
}
- private inner class ServerStart internal constructor(
- private val transportServer: () -> ServerTransport) : Start {
+ fun acceptor(acceptor: ServerAcceptor): ServerTransportAcceptor {
+ this.acceptor = acceptor
+ return object : ServerTransportAcceptor {
+ override fun transport(
+ transport: () -> ServerTransport): Start =
+ ServerStart(transport, interceptors())
+ }
+ }
+
+ private fun interceptors(): InterceptorRegistry {
+ return interceptors.copyWith {
+ it.connectionFirst(
+ ServerContractInterceptor(errorConsumer))
+ it.connectionFirst(
+ FragmentationInterceptor(mtu))
+ }
+ }
+
+ private inner class ServerStart(
+ private val transportServer: () -> ServerTransport,
+ private val interceptors: InterceptorRegistry) : Start {
override fun start(): Single {
- return transportServer()
- .start(object : ServerTransport.ConnectionAcceptor {
- override fun invoke(conn: DuplexConnection): Completable {
-
- val connection =
- if (mtu > 0)
- FragmentationDuplexConnection(conn, mtu)
- else conn
-
- val demuxer = ServerConnectionDemuxer(connection, plugins)
- return demuxer
- .setupConnection()
- .receive()
- .firstOrError()
- .flatMapCompletable { setupFrame ->
- processSetupFrame(demuxer, setupFrame)
- }
- }
- })
+ return transportServer().start(object
+ : ServerTransport.ConnectionAcceptor {
+
+ override fun invoke(duplexConnection: DuplexConnection)
+ : Completable {
+
+ val demuxer = ServerConnectionDemuxer(
+ duplexConnection,
+ interceptors)
+
+ return demuxer
+ .setupConnection()
+ .receive()
+ .firstOrError()
+ .flatMapCompletable { setup ->
+ accept(setup, demuxer)
+ }
+ }
+ })
}
- private fun processSetupFrame(
- demuxer: ConnectionDemuxer, setupFrame: Frame): Completable {
- val version = Frame.Setup.version(setupFrame)
- if (version != SetupFrameFlyweight.CURRENT_VERSION) {
- val error = InvalidSetupException(
- "Unsupported version ${VersionFlyweight.toString(version)}")
- return demuxer
- .setupConnection()
- .sendOne(Frame.Error.from(0, error))
- .andThen { demuxer.close() }
- }
+ private fun accept(setupFrame: Frame,
+ demuxer: ConnectionDemuxer): Completable {
- val setupPayload = ConnectionSetupPayload.create(setupFrame)
+ val setup = Setup.create(setupFrame)
- val rSocketClient = RSocketClient(
+ val rSocketRequester = RSocketRequester(
demuxer.requesterConnection(),
errorConsumer,
- StreamIdSupplier.serverSupplier(),
- streamDemandLimit)
-
- val wrappedRSocketClient = Single
- .just(rSocketClient)
- .map { plugins.applyClient(it) }
-
- return wrappedRSocketClient
- .flatMap { requester ->
- acceptor
- ?.let { it() }
- ?.accept(setupPayload, requester)
- ?.map { plugins.applyServer(it) }
- }
- .map { handler ->
- RSocketServer(
+ ServerStreamIds(),
+ streamRequestLimit)
+
+ val wrappedRequester =
+ interceptors.interceptRequester(rSocketRequester)
+
+ ServerServiceHandler(
+ demuxer.serviceConnection(),
+ setup as KeepAlive,
+ errorConsumer)
+
+ val handlerRSocket = acceptor()(setup, wrappedRequester)
+
+ return handlerRSocket
+ .map { handler -> interceptors.interceptHandler(handler) }
+ .doOnSuccess { handler ->
+ RSocketResponder(
demuxer.responderConnection(),
handler,
errorConsumer,
- streamDemandLimit)
- }.doOnSuccess {
- ServerServiceHandler(
- demuxer.serviceConnection(),
- errorConsumer)
- }.ignoreElement()
+ streamRequestLimit)
+ }
+ .ignoreElement()
}
}
}
+
+ private fun assertRequestLimit(streamRequestLimit: Int) {
+ if (streamRequestLimit <= 0) {
+ throw IllegalArgumentException("stream request limit must be positive")
+ }
+ }
+
+ private fun assertFragmentation(mtu: Int) {
+ if (mtu < 0) {
+ throw IllegalArgumentException("fragmentation mtu must be non-negative")
+ }
+ }
+
+ interface Start {
+ fun start(): Single
+ }
+
+ interface ClientTransportAcceptor {
+ fun transport(transport: () -> ClientTransport): Start
+
+ fun transport(transport: ClientTransport): Start =
+ transport { transport }
+
+ }
+
+ interface ServerTransportAcceptor {
+ fun transport(transport: () -> ServerTransport): Start
+
+ fun transport(transport: ServerTransport): Start =
+ transport { transport }
+ }
+
+ private const val defaultStreamRequestLimit = 128
+
+ private val emptyRSocket = object : AbstractRSocket() {}
}
+
+typealias ClientAcceptor = () -> (RSocket) -> RSocket
+
+typealias ServerAcceptor = () -> (Setup, RSocket) -> Single
diff --git a/rsocket-core/src/main/java/io/rsocket/android/RSocketClient.kt b/rsocket-core/src/main/java/io/rsocket/android/RSocketRequester.kt
similarity index 95%
rename from rsocket-core/src/main/java/io/rsocket/android/RSocketClient.kt
rename to rsocket-core/src/main/java/io/rsocket/android/RSocketRequester.kt
index 92f8f71ff..c117998dd 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/RSocketClient.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/RSocketRequester.kt
@@ -37,12 +37,12 @@ import java.nio.channels.ClosedChannelException
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference
-/** Client Side of a RSocket socket. Sends [Frame]s to a [RSocketServer] */
-internal class RSocketClient constructor(
+/** Requester Side of a RSocket. Sends [Frame]s to a [RSocketResponder] */
+internal class RSocketRequester(
private val connection: DuplexConnection,
private val errorConsumer: (Throwable) -> Unit,
- private val streamIdSupplier: StreamIdSupplier,
- private val streamDemandLimit: Int) : RSocket {
+ private val streamIds: StreamIds,
+ private val streamRequestLimit: Int) : RSocket {
private val senders = ConcurrentHashMap(256)
private val receivers = ConcurrentHashMap>(256)
@@ -78,14 +78,14 @@ internal class RSocketClient constructor(
override fun requestStream(payload: Payload): Flowable =
interactions.requestStream(
- handleRequestStream(payload).rebatchRequests(streamDemandLimit))
+ handleRequestStream(payload).rebatchRequests(streamRequestLimit))
override fun requestChannel(payloads: Publisher): Flowable =
interactions.requestChannel(
handleChannel(
Flowable.fromPublisher(payloads)
- .rebatchRequests(streamDemandLimit)
- ).rebatchRequests(streamDemandLimit))
+ .rebatchRequests(streamRequestLimit)
+ ).rebatchRequests(streamRequestLimit))
override fun metadataPush(payload: Payload): Completable =
interactions.metadataPush(handleMetadataPush(payload))
@@ -98,7 +98,7 @@ internal class RSocketClient constructor(
private fun handleFireAndForget(payload: Payload): Completable {
return Completable.fromRunnable {
- val streamId = streamIdSupplier.nextStreamId()
+ val streamId = streamIds.nextStreamId()
val requestFrame = Frame.Request.from(
streamId,
FrameType.FIRE_AND_FORGET,
@@ -110,7 +110,7 @@ internal class RSocketClient constructor(
private fun handleRequestResponse(payload: Payload): Single {
return Single.defer {
- val streamId = streamIdSupplier.nextStreamId()
+ val streamId = streamIds.nextStreamId()
val requestFrame = Frame.Request.from(
streamId, FrameType.REQUEST_RESPONSE, payload, 1)
@@ -127,7 +127,7 @@ internal class RSocketClient constructor(
private fun handleRequestStream(payload: Payload): Flowable {
return Flowable.defer {
- val streamId = streamIdSupplier.nextStreamId()
+ val streamId = streamIds.nextStreamId()
val receiver = StreamReceiver.create()
receivers[streamId] = receiver
val reqN = Cond()
@@ -156,7 +156,7 @@ internal class RSocketClient constructor(
private fun handleChannel(request: Flowable): Flowable {
return Flowable.defer {
val receiver = StreamReceiver.create()
- val streamId = streamIdSupplier.nextStreamId()
+ val streamId = streamIds.nextStreamId()
val reqN = Cond()
receiver.doOnRequestIfActive { requestN ->
@@ -278,7 +278,7 @@ internal class RSocketClient constructor(
}
private fun missingReceiver(streamId: Int, type: FrameType, frame: Frame) {
- if (!streamIdSupplier.isBeforeOrCurrent(streamId)) {
+ if (!streamIds.isBeforeOrCurrent(streamId)) {
val err = if (type === FrameType.ERROR) {
IllegalStateException(
"Client received error for non-existent stream: " +
diff --git a/rsocket-core/src/main/java/io/rsocket/android/RSocketServer.kt b/rsocket-core/src/main/java/io/rsocket/android/RSocketResponder.kt
similarity index 94%
rename from rsocket-core/src/main/java/io/rsocket/android/RSocketServer.kt
rename to rsocket-core/src/main/java/io/rsocket/android/RSocketResponder.kt
index 4024ba282..1225f4222 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/RSocketServer.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/RSocketResponder.kt
@@ -22,7 +22,7 @@ import io.reactivex.Single
import io.reactivex.disposables.Disposable
import io.reactivex.processors.UnicastProcessor
import io.rsocket.android.Frame.Request.initialRequestN
-import io.rsocket.android.RSocketServer.DisposableSubscription.Companion.subscription
+import io.rsocket.android.RSocketResponder.DisposableSubscription.Companion.subscription
import io.rsocket.android.exceptions.ApplicationException
import io.rsocket.android.frame.FrameHeaderFlyweight.FLAGS_C
import io.rsocket.android.frame.FrameHeaderFlyweight.FLAGS_M
@@ -38,12 +38,12 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
-/** Server side RSocket. Receives [Frame]s from a [RSocketClient] */
-internal class RSocketServer(
+/** Responder side RSocket. Receives [Frame]s from a [RSocketRequester] */
+internal class RSocketResponder(
private val connection: DuplexConnection,
private val requestHandler: RSocket,
private val errorConsumer: (Throwable) -> Unit,
- private val streamDemandLimit: Int) : RSocket {
+ private val streamRequestLimit: Int) : RSocket {
private val completion = Lifecycle()
private val sendingSubscriptions =
@@ -56,15 +56,6 @@ internal class RSocketServer(
.toSerialized()
private val receiveDisposable: Disposable
- internal constructor(connection: DuplexConnection,
- requestHandler: RSocket,
- errorConsumer: (Throwable) -> Unit)
- : this(
- connection,
- requestHandler,
- errorConsumer,
- DEFAULT_STREAM_WINDOW)
-
init {
connection
.send(sentFrames)
@@ -98,7 +89,7 @@ internal class RSocketServer(
override fun requestStream(payload: Payload): Flowable {
return try {
- requestHandler.requestStream(payload).rebatchRequests(streamDemandLimit)
+ requestHandler.requestStream(payload).rebatchRequests(streamRequestLimit)
} catch (t: Throwable) {
Flowable.error(t)
}
@@ -107,8 +98,8 @@ internal class RSocketServer(
override fun requestChannel(payloads: Publisher): Flowable {
return try {
requestHandler.requestChannel(
- Flowable.fromPublisher(payloads).rebatchRequests(streamDemandLimit)
- ).rebatchRequests(streamDemandLimit)
+ Flowable.fromPublisher(payloads).rebatchRequests(streamRequestLimit)
+ ).rebatchRequests(streamRequestLimit)
} catch (t: Throwable) {
Flowable.error(t)
}
@@ -313,7 +304,6 @@ internal class RSocketServer(
}
companion object {
- private const val DEFAULT_STREAM_WINDOW = 128
private val closedException = noStacktrace(ClosedChannelException())
}
}
diff --git a/rsocket-core/src/main/java/io/rsocket/android/ConnectionSetupPayload.kt b/rsocket-core/src/main/java/io/rsocket/android/Setup.kt
similarity index 50%
rename from rsocket-core/src/main/java/io/rsocket/android/ConnectionSetupPayload.kt
rename to rsocket-core/src/main/java/io/rsocket/android/Setup.kt
index d8932bdbf..dc9f75742 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/ConnectionSetupPayload.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/Setup.kt
@@ -18,12 +18,13 @@ package io.rsocket.android
import io.rsocket.android.frame.FrameHeaderFlyweight.FLAGS_M
import io.rsocket.android.frame.SetupFrameFlyweight
+import io.rsocket.android.util.KeepAlive
import java.nio.ByteBuffer
/**
* Exposed to server for determination of RequestHandler based on mime types and SETUP metadata/data
*/
-abstract class ConnectionSetupPayload : Payload {
+abstract class Setup : Payload, KeepAlive {
abstract fun metadataMimeType(): String
@@ -35,12 +36,14 @@ abstract class ConnectionSetupPayload : Payload {
override fun hasMetadata(): Boolean = Frame.isFlagSet(flags, FLAGS_M)
- private class ConnectionSetupPayloadImpl(
+ private class SetupImpl(
private val metadataMimeType: String,
private val dataMimeType: String,
override val data: ByteBuffer,
override val metadata: ByteBuffer,
- override val flags: Int) : ConnectionSetupPayload() {
+ private val keepAliveInterval: Int,
+ private val keepAliveLifetime: Int,
+ override val flags: Int) : Setup() {
init {
if (!hasMetadata() && metadata.remaining() > 0) {
@@ -48,6 +51,12 @@ abstract class ConnectionSetupPayload : Payload {
}
}
+ override fun keepAliveInterval(): Duration =
+ Duration.ofMillis(keepAliveInterval)
+
+ override fun keepAliveMaxLifeTime(): Duration =
+ Duration.ofMillis(keepAliveLifetime)
+
override fun metadataMimeType(): String = metadataMimeType
override fun dataMimeType(): String = dataMimeType
@@ -55,45 +64,22 @@ abstract class ConnectionSetupPayload : Payload {
companion object {
- private val NO_FLAGS = 0
- val HONOR_LEASE = SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE
-
- fun create(metadataMimeType: String, dataMimeType: String): ConnectionSetupPayload {
- return ConnectionSetupPayloadImpl(
- metadataMimeType,
- dataMimeType,
- Frame.NULL_BYTEBUFFER,
- Frame.NULL_BYTEBUFFER,
- NO_FLAGS)
- }
-
- fun create(
- metadataMimeType: String, dataMimeType: String, payload: Payload): ConnectionSetupPayload {
- return ConnectionSetupPayloadImpl(
- metadataMimeType,
- dataMimeType,
- payload.data,
- payload.metadata,
- if (payload.hasMetadata()) FLAGS_M else 0)
- }
-
- fun create(
- metadataMimeType: String, dataMimeType: String, flags: Int): ConnectionSetupPayload =
- ConnectionSetupPayloadImpl(
- metadataMimeType,
- dataMimeType,
- Frame.NULL_BYTEBUFFER,
- Frame.NULL_BYTEBUFFER,
- flags)
+ private const val HONOR_LEASE = SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE
- fun create(setupFrame: Frame): ConnectionSetupPayload {
+ internal fun create(setupFrame: Frame): Setup {
Frame.ensureFrameType(FrameType.SETUP, setupFrame)
- return ConnectionSetupPayloadImpl(
- Frame.Setup.metadataMimeType(setupFrame),
- Frame.Setup.dataMimeType(setupFrame),
- setupFrame.data,
- setupFrame.metadata,
- Frame.Setup.getFlags(setupFrame))
+ return try {
+ SetupImpl(
+ Frame.Setup.metadataMimeType(setupFrame),
+ Frame.Setup.dataMimeType(setupFrame),
+ setupFrame.data,
+ setupFrame.metadata,
+ Frame.Setup.keepaliveInterval(setupFrame),
+ Frame.Setup.maxLifetime(setupFrame),
+ Frame.Setup.getFlags(setupFrame))
+ } finally {
+ setupFrame.release()
+ }
}
}
}
diff --git a/rsocket-core/src/main/java/io/rsocket/android/SocketAcceptor.kt b/rsocket-core/src/main/java/io/rsocket/android/SocketAcceptor.kt
deleted file mode 100644
index f3765931a..000000000
--- a/rsocket-core/src/main/java/io/rsocket/android/SocketAcceptor.kt
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.rsocket.android
-
-import io.reactivex.Single
-import io.rsocket.android.exceptions.SetupException
-
-/**
- * `RSocket` is a full duplex protocol where a client and server are identical in terms of
- * both having the capability to initiate requests to their peer. This interface provides the
- * contract where a server accepts a new `RSocket` for sending requests to the peer and
- * returns a new `RSocket` that will be used to accept requests from it's peer.
- */
-interface SocketAcceptor {
-
- /**
- * Accepts a new `RSocket` used to send requests to the peer and returns another `RSocket` that is used for accepting requests from the peer.
- *
- * @param setup Setup as sent by the client.
- * @param sendingSocket Socket used to send requests to the peer.
- * @return Socket to accept requests from the peer.
- * @throws SetupException If the acceptor needs to reject the setup of this socket.
- */
- fun accept(setup: ConnectionSetupPayload, sendingSocket: RSocket): Single
-}
diff --git a/rsocket-core/src/main/java/io/rsocket/android/StreamIdSupplier.kt b/rsocket-core/src/main/java/io/rsocket/android/StreamIds.kt
similarity index 68%
rename from rsocket-core/src/main/java/io/rsocket/android/StreamIdSupplier.kt
rename to rsocket-core/src/main/java/io/rsocket/android/StreamIds.kt
index 89fc42d50..0bb8f31f4 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/StreamIdSupplier.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/StreamIds.kt
@@ -16,7 +16,7 @@
package io.rsocket.android
-internal class StreamIdSupplier private constructor(private var streamId: Int) {
+internal sealed class StreamIds(private var streamId: Int) {
@Synchronized
fun nextStreamId(): Int {
@@ -25,12 +25,10 @@ internal class StreamIdSupplier private constructor(private var streamId: Int) {
}
@Synchronized
- fun isBeforeOrCurrent(streamId: Int): Boolean = this.streamId >= streamId && streamId > 0
-
- companion object {
+ fun isBeforeOrCurrent(streamId: Int): Boolean =
+ this.streamId >= streamId && streamId > 0
+}
- fun clientSupplier(): StreamIdSupplier = StreamIdSupplier(-1)
+internal class ClientStreamIds : StreamIds(-1)
- fun serverSupplier(): StreamIdSupplier = StreamIdSupplier(0)
- }
-}
+internal class ServerStreamIds : StreamIds(0)
diff --git a/rsocket-core/src/main/java/io/rsocket/android/exceptions/package-info.java b/rsocket-core/src/main/java/io/rsocket/android/exceptions/package-info.java
deleted file mode 100644
index 817c0d525..000000000
--- a/rsocket-core/src/main/java/io/rsocket/android/exceptions/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-@javax.annotation.ParametersAreNonnullByDefault
-package io.rsocket.android.exceptions;
diff --git a/rsocket-core/src/main/java/io/rsocket/android/fragmentation/FragmentationDuplexConnection.kt b/rsocket-core/src/main/java/io/rsocket/android/fragmentation/FragmentationDuplexConnection.kt
index b7257223e..c4b7f762f 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/fragmentation/FragmentationDuplexConnection.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/fragmentation/FragmentationDuplexConnection.kt
@@ -16,26 +16,22 @@
package io.rsocket.android.fragmentation
-import io.netty.util.collection.IntObjectHashMap
import io.reactivex.Completable
import io.reactivex.Flowable
import io.rsocket.android.DuplexConnection
import io.rsocket.android.Frame
-import io.rsocket.android.frame.FrameHeaderFlyweight
import org.reactivestreams.Publisher
-/** Fragments and Re-assembles frames. MTU is number of bytes per fragment. The default is 1024 */
-class FragmentationDuplexConnection(private val source: DuplexConnection, mtu: Int) : DuplexConnection {
- private val frameReassemblers = IntObjectHashMap()
+/** Fragments and Re-assembles frames. MTU is number of bytes per fragment.*/
+internal class FragmentationDuplexConnection(private val source: DuplexConnection,
+ mtu: Int)
+ : DuplexConnection {
+ private val framesReassembler = FramesReassembler()
private val frameFragmenter: FrameFragmenter = FrameFragmenter(mtu)
- override fun availability(): Double = source.availability()
-
- override fun send(frame: Publisher): Completable {
- return Flowable.fromPublisher(frame)
- .concatMap { f -> sendOne(f).toFlowable() }
- .ignoreElements()
- }
+ override fun send(frame: Publisher): Completable =
+ Flowable.fromPublisher(frame)
+ .concatMapCompletable { f -> sendOne(f) }
override fun sendOne(frame: Frame): Completable {
return if (frameFragmenter.shouldFragment(frame)) {
@@ -48,48 +44,20 @@ class FragmentationDuplexConnection(private val source: DuplexConnection, mtu: I
override fun receive(): Flowable = source
.receive()
.concatMap { frame ->
- if (FrameHeaderFlyweight.FLAGS_F == frame.flags() and FrameHeaderFlyweight.FLAGS_F) {
- val frameReassembler = getFrameReassembler(frame)
- frameReassembler.append(frame)
- Flowable.empty()
- } else if (frameReassemblersContain(frame.streamId)) {
- val frameReassembler = removeFrameReassembler(frame.streamId)
- frameReassembler.append(frame)
- val reassembled = frameReassembler.reassemble()
- Flowable.just(reassembled)
+ if (framesReassembler.shouldReassemble(frame)) {
+ framesReassembler.reassemble(frame)
} else {
- Flowable.just(frame)
+ Flowable.just(frame)
}
}
- override fun close(): Completable = source.close()
-
- @Synchronized
- private fun getFrameReassembler(frame: Frame): FrameReassembler =
- frameReassemblers.getOrPut(frame.streamId, { FrameReassembler(frame) })
-
-
- @Synchronized
- private fun removeFrameReassembler(streamId: Int): FrameReassembler =
- frameReassemblers.remove(streamId)
+ override fun availability(): Double = source.availability()
- @Synchronized
- private fun frameReassemblersContain(streamId: Int): Boolean =
- frameReassemblers.containsKey(streamId)
+ override fun close(): Completable = source.close()
override fun onClose(): Completable = source
.onClose()
- .doFinally {
- synchronized(this) {
- frameReassemblers.values.forEach { it.dispose() }
- frameReassemblers.clear()
- }
+ .doOnTerminate {
+ framesReassembler.dispose()
}
-
- companion object {
- val defaultMTU: Int
- get() = if (java.lang.Boolean.getBoolean("io.rsocket.fragmentation.enable")) {
- Integer.getInteger("io.rsocket.fragmentation.mtu", 1024)!!
- } else 0
- }
}
diff --git a/rsocket-core/src/main/java/io/rsocket/android/fragmentation/FragmentationInterceptor.kt b/rsocket-core/src/main/java/io/rsocket/android/fragmentation/FragmentationInterceptor.kt
new file mode 100644
index 000000000..526ccfd8a
--- /dev/null
+++ b/rsocket-core/src/main/java/io/rsocket/android/fragmentation/FragmentationInterceptor.kt
@@ -0,0 +1,14 @@
+package io.rsocket.android.fragmentation
+
+import io.rsocket.android.DuplexConnection
+import io.rsocket.android.plugins.DuplexConnectionInterceptor
+
+internal class FragmentationInterceptor(private val mtu: Int) : DuplexConnectionInterceptor {
+ override fun invoke(type: DuplexConnectionInterceptor.Type,
+ source: DuplexConnection): DuplexConnection {
+ return if (type == DuplexConnectionInterceptor.Type.ALL)
+ FragmentationDuplexConnection(source, mtu)
+ else
+ source
+ }
+}
\ No newline at end of file
diff --git a/rsocket-core/src/main/java/io/rsocket/android/fragmentation/FrameFragmenter.kt b/rsocket-core/src/main/java/io/rsocket/android/fragmentation/FrameFragmenter.kt
index cca0e7d9e..2ff94f190 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/fragmentation/FrameFragmenter.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/fragmentation/FrameFragmenter.kt
@@ -20,92 +20,103 @@ import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
import io.reactivex.Emitter
import io.reactivex.Flowable
+import io.reactivex.disposables.Disposable
import io.rsocket.android.Frame
-import io.rsocket.android.FrameType
-import io.rsocket.android.frame.FrameHeaderFlyweight
+import io.rsocket.android.frame.FrameHeaderFlyweight.*
+import java.util.concurrent.atomic.AtomicBoolean
-class FrameFragmenter(private val mtu: Int) {
+internal class FrameFragmenter(private val mtu: Int) {
fun shouldFragment(frame: Frame): Boolean =
- isFragmentableFrame(frame.type) && FrameHeaderFlyweight.payloadLength(frame.content()) > mtu
-
- private fun isFragmentableFrame(type: FrameType): Boolean =
- when (type) {
- FrameType.FIRE_AND_FORGET,
- FrameType.REQUEST_STREAM,
- FrameType.REQUEST_CHANNEL,
- FrameType.REQUEST_RESPONSE,
- FrameType.PAYLOAD,
- FrameType.NEXT_COMPLETE,
- FrameType.METADATA_PUSH -> true
- else -> false
- }
-
- fun fragment(frame: Frame): Flowable = Flowable.generate(FragmentGenerator(frame))
+ mtu > 0 && frame.isFragmentable
+ && payloadLength(frame.content()) > mtu
- private inner class FragmentGenerator(frame: Frame) : (Emitter) -> Unit {
- private val frame: Frame = frame.retain()
- private val streamId: Int = frame.streamId
- private val frameType: FrameType = frame.type
- private val flags: Int = frame.flags() and FrameHeaderFlyweight.FLAGS_M.inv()
- private val data: ByteBuf = FrameHeaderFlyweight.sliceFrameData(frame.content())
- private val metadata: ByteBuf? =
- if (frame.hasMetadata())
- FrameHeaderFlyweight.sliceFrameMetadata(frame.content())
- else null
+ fun fragment(frame: Frame): Flowable = Flowable
+ .generate(
+ { State(frame) },
+ FragmentGenerator(),
+ { it.dispose() })
- override fun invoke(sink: Emitter) {
- val dataLength = data.readableBytes()
+ private inner class FragmentGenerator : (State, Emitter) -> Unit {
- if (metadata != null) {
- val metadataLength = metadata.readableBytes()
+ override fun invoke(state: State, sink: Emitter) {
+ val dataLength = state.dataReadableBytes()
+ if (state.metadataPresent()) {
+ val metadataLength = state.metadataReadableBytes()
if (metadataLength > mtu) {
- sink.onNext(
- Frame.PayloadFrame.from(
- streamId,
- frameType,
- metadata.readSlice(mtu),
- Unpooled.EMPTY_BUFFER,
- flags or FrameHeaderFlyweight.FLAGS_M or FrameHeaderFlyweight.FLAGS_F))
- } else {
- if (dataLength > mtu - metadataLength) {
- sink.onNext(
- Frame.PayloadFrame.from(
- streamId,
- frameType,
- metadata.readSlice(metadataLength),
- data.readSlice(mtu - metadataLength),
- flags or FrameHeaderFlyweight.FLAGS_M or FrameHeaderFlyweight.FLAGS_F))
- } else {
- sink.onNext(
- Frame.PayloadFrame.from(
- streamId,
- frameType,
- metadata.readSlice(metadataLength),
- data.readSlice(dataLength),
- flags or FrameHeaderFlyweight.FLAGS_M))
- frame.release()
- sink.onComplete()
- }
- }
- } else {
- if (dataLength > mtu) {
- sink.onNext(
- Frame.PayloadFrame.from(
- streamId,
- frameType,
- Unpooled.EMPTY_BUFFER,
- data.readSlice(mtu),
- flags or FrameHeaderFlyweight.FLAGS_F))
+ sink.onNext(state.sliceMetadata(
+ mtu,
+ FLAGS_F))
+ } else if (dataLength > mtu - metadataLength) {
+ sink.onNext(state.sliceDataAndMetadata(
+ metadataLength,
+ mtu - metadataLength,
+ FLAGS_F))
} else {
sink.onNext(
- Frame.PayloadFrame.from(
- streamId, frameType, Unpooled.EMPTY_BUFFER, data.readSlice(dataLength), flags))
- frame.release()
+ state.sliceDataAndMetadata(
+ metadataLength,
+ dataLength,
+ 0))
sink.onComplete()
}
+
+ } else if (dataLength > mtu) {
+ sink.onNext(state.sliceData(mtu, FLAGS_F))
+ } else {
+ sink.onNext(state.sliceData(dataLength, 0))
+ sink.onComplete()
+ }
+
+ }
+ }
+
+ private class State(frame: Frame) : Disposable {
+ private val disposed = AtomicBoolean()
+ private val frame: Frame = frame.retain()
+ private val data: ByteBuf = sliceFrameData(frame.content())
+ private val metadata: ByteBuf? =
+ if (frame.hasMetadata())
+ sliceFrameMetadata(frame.content())
+ else null
+
+ override fun isDisposed(): Boolean = disposed.get()
+
+ override fun dispose() {
+ if (disposed.compareAndSet(false, true)) {
+ frame.release()
}
}
+
+ fun metadataPresent(): Boolean = metadata != null
+
+ fun metadataReadableBytes(): Int = metadata!!.readableBytes()
+
+ fun dataReadableBytes(): Int = data.readableBytes()
+
+ fun sliceMetadata(metadataLength: Int, additionalFlags: Int): Frame =
+ Frame.Fragmentation.sliceFrame(
+ frame,
+ metadata!!.readSlice(metadataLength),
+ Unpooled.EMPTY_BUFFER,
+ additionalFlags)
+
+ fun sliceDataAndMetadata(metadataLength: Int,
+ dataLength: Int,
+ additionalFlags: Int): Frame =
+ Frame.Fragmentation.sliceFrame(
+ frame,
+ metadata!!.readSlice(metadataLength),
+ data.readSlice(dataLength),
+ additionalFlags)
+
+ fun sliceData(dataLength: Int,
+ additionalFlags: Int): Frame =
+ Frame.Fragmentation.sliceFrame(
+ frame,
+ null,
+ data.readSlice(dataLength),
+ additionalFlags)
}
}
diff --git a/rsocket-core/src/main/java/io/rsocket/android/fragmentation/FramesReassembler.kt b/rsocket-core/src/main/java/io/rsocket/android/fragmentation/FramesReassembler.kt
new file mode 100644
index 000000000..a59439ad2
--- /dev/null
+++ b/rsocket-core/src/main/java/io/rsocket/android/fragmentation/FramesReassembler.kt
@@ -0,0 +1,47 @@
+package io.rsocket.android.fragmentation
+
+import io.reactivex.Flowable
+import io.reactivex.disposables.Disposable
+import io.rsocket.android.Frame
+import io.rsocket.android.frame.FrameHeaderFlyweight
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicBoolean
+
+internal class FramesReassembler : Disposable {
+
+ private val frameReassemblers = ConcurrentHashMap()
+ private val disposed = AtomicBoolean()
+
+ fun shouldReassemble(frame: Frame): Boolean = frame.isFragmentable
+
+ fun reassemble(frame: Frame): Flowable =
+ when {
+ hasMoreFragments(frame) -> append(frame)
+ else -> Flowable.just(
+ complete(frame)
+ ?.append(frame)?.reassemble()
+ ?: frame)
+ }
+
+ private fun hasMoreFragments(frame: Frame) = frame
+ .isFlagSet(FrameHeaderFlyweight.FLAGS_F)
+
+ private fun append(frame: Frame): Flowable {
+ val reassembler = frameReassemblers
+ .getOrPut(frame.streamId) { StreamFramesReassembler(frame) }
+ reassembler.append(frame)
+ return Flowable.empty()
+ }
+
+ private fun complete(frame: Frame): StreamFramesReassembler? =
+ frameReassemblers.remove(frame.streamId)
+
+ override fun dispose() {
+ if (disposed.compareAndSet(false, true)) {
+ frameReassemblers.values.forEach { it.dispose() }
+ frameReassemblers.clear()
+ }
+ }
+
+ override fun isDisposed(): Boolean = disposed.get()
+}
\ No newline at end of file
diff --git a/rsocket-core/src/main/java/io/rsocket/android/fragmentation/FrameReassembler.kt b/rsocket-core/src/main/java/io/rsocket/android/fragmentation/StreamFramesReassembler.kt
similarity index 51%
rename from rsocket-core/src/main/java/io/rsocket/android/fragmentation/FrameReassembler.kt
rename to rsocket-core/src/main/java/io/rsocket/android/fragmentation/StreamFramesReassembler.kt
index 04debe0a9..c6d740d40 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/fragmentation/FrameReassembler.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/fragmentation/StreamFramesReassembler.kt
@@ -16,52 +16,66 @@
package io.rsocket.android.fragmentation
-import io.netty.buffer.CompositeByteBuf
import io.netty.buffer.PooledByteBufAllocator
import io.reactivex.disposables.Disposable
import io.rsocket.android.Frame
-import io.rsocket.android.FrameType
import io.rsocket.android.frame.FrameHeaderFlyweight
+import java.util.concurrent.atomic.AtomicBoolean
/** Assembles Fragmented frames. */
-class FrameReassembler(frame: Frame) : Disposable {
+internal class StreamFramesReassembler(frame: Frame) : Disposable {
- @Volatile
- private var isDisposed: Boolean = false
- private val frameType: FrameType = frame.type
- private val streamId: Int = frame.streamId
- private val flags: Int = frame.flags()
- private val dataBuffer: CompositeByteBuf = PooledByteBufAllocator.DEFAULT.compositeBuffer()
- private val metadataBuffer: CompositeByteBuf = PooledByteBufAllocator.DEFAULT.compositeBuffer()
+ private val isDisposed = AtomicBoolean()
+ private val blueprintFrame = frame.retain()
+ private val dataBuffer = PooledByteBufAllocator.DEFAULT.compositeBuffer()
+ private val metadataBuffer = PooledByteBufAllocator.DEFAULT.compositeBuffer()
- @Synchronized
- fun append(frame: Frame) {
+ fun append(frame: Frame):StreamFramesReassembler {
val byteBuf = frame.content()
val frameType = FrameHeaderFlyweight.frameType(byteBuf)
val frameLength = FrameHeaderFlyweight.frameLength(byteBuf)
- val metadataLength = FrameHeaderFlyweight.metadataLength(byteBuf, frameType, frameLength)!!
+ val metadataLength = FrameHeaderFlyweight.metadataLength(
+ byteBuf,
+ frameType,
+ frameLength)!!
val dataLength = FrameHeaderFlyweight.dataLength(byteBuf, frameType)
if (0 < metadataLength) {
var metadataOffset = FrameHeaderFlyweight.metadataOffset(byteBuf)
if (FrameHeaderFlyweight.hasMetadataLengthField(frameType)) {
metadataOffset += FrameHeaderFlyweight.FRAME_LENGTH_SIZE
}
- metadataBuffer.addComponent(true, byteBuf.retainedSlice(metadataOffset, metadataLength))
+ metadataBuffer.addComponent(
+ true,
+ byteBuf.retainedSlice(metadataOffset, metadataLength))
}
if (0 < dataLength) {
- val dataOffset = FrameHeaderFlyweight.dataOffset(byteBuf, frameType, frameLength)
- dataBuffer.addComponent(true, byteBuf.retainedSlice(dataOffset, dataLength))
+ val dataOffset = FrameHeaderFlyweight.dataOffset(
+ byteBuf,
+ frameType,
+ frameLength)
+ dataBuffer.addComponent(
+ true,
+ byteBuf.retainedSlice(dataOffset, dataLength))
}
+ return this
}
- @Synchronized
- fun reassemble(): Frame = Frame.PayloadFrame.from(streamId, frameType, metadataBuffer, dataBuffer, flags)
+ fun reassemble(): Frame {
+ val assembled = Frame.Fragmentation.assembleFrame(
+ blueprintFrame,
+ metadataBuffer,
+ dataBuffer)
+ blueprintFrame.release()
+ return assembled
+ }
override fun dispose() {
- isDisposed = true
- dataBuffer.release()
- metadataBuffer.release()
+ if (isDisposed.compareAndSet(false, true)) {
+ blueprintFrame.release()
+ dataBuffer.release()
+ metadataBuffer.release()
+ }
}
- override fun isDisposed(): Boolean = isDisposed
+ override fun isDisposed(): Boolean = isDisposed.get()
}
diff --git a/rsocket-core/src/main/java/io/rsocket/android/fragmentation/package-info.java b/rsocket-core/src/main/java/io/rsocket/android/fragmentation/package-info.java
deleted file mode 100644
index 9720fcabc..000000000
--- a/rsocket-core/src/main/java/io/rsocket/android/fragmentation/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-@javax.annotation.ParametersAreNonnullByDefault
-package io.rsocket.android.fragmentation;
diff --git a/rsocket-core/src/main/java/io/rsocket/android/frame/SetupFrameFlyweight.java b/rsocket-core/src/main/java/io/rsocket/android/frame/SetupFrameFlyweight.java
index 21bc739e1..b8344860b 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/frame/SetupFrameFlyweight.java
+++ b/rsocket-core/src/main/java/io/rsocket/android/frame/SetupFrameFlyweight.java
@@ -76,19 +76,18 @@ private static int computeFrameLength(
public static int encode(
final ByteBuf byteBuf,
int flags,
+ int version,
final int keepaliveInterval,
final int maxLifetime,
final String metadataMimeType,
final String dataMimeType,
final ByteBuf metadata,
final ByteBuf data) {
- if ((flags & FLAGS_RESUME_ENABLE) != 0) {
- throw new IllegalArgumentException("RESUME_ENABLE not supported");
- }
return encode(
byteBuf,
flags,
+ version,
keepaliveInterval,
maxLifetime,
Unpooled.EMPTY_BUFFER,
@@ -102,6 +101,7 @@ public static int encode(
static int encode(
final ByteBuf byteBuf,
int flags,
+ int version,
final int keepaliveInterval,
final int maxLifetime,
final ByteBuf resumeToken,
@@ -121,7 +121,7 @@ static int encode(
int length =
FrameHeaderFlyweight.encodeFrameHeader(byteBuf, frameLength, flags, FrameType.SETUP, 0);
- byteBuf.setInt(VERSION_FIELD_OFFSET, CURRENT_VERSION);
+ byteBuf.setInt(VERSION_FIELD_OFFSET, version);
byteBuf.setInt(KEEPALIVE_INTERVAL_FIELD_OFFSET, keepaliveInterval);
byteBuf.setInt(MAX_LIFETIME_FIELD_OFFSET, maxLifetime);
diff --git a/rsocket-core/src/main/java/io/rsocket/android/internal/ClientServiceHandler.kt b/rsocket-core/src/main/java/io/rsocket/android/internal/ClientServiceHandler.kt
index 965256bf3..e71e54b71 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/internal/ClientServiceHandler.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/internal/ClientServiceHandler.kt
@@ -5,43 +5,36 @@ import io.reactivex.Completable
import io.reactivex.Flowable
import io.reactivex.disposables.Disposable
import io.rsocket.android.DuplexConnection
-import io.rsocket.android.Duration
import io.rsocket.android.Frame
import io.rsocket.android.exceptions.ConnectionException
+import io.rsocket.android.util.KeepAlive
import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicInteger
-internal class ClientServiceHandler(private val serviceConnection: DuplexConnection,
- private val errorConsumer: (Throwable) -> Unit,
- keepAliveInfo: KeepAliveInfo)
- : ServiceConnectionHandler(serviceConnection, errorConsumer) {
+internal class ClientServiceHandler(serviceConnection: DuplexConnection,
+ keepAlive: KeepAlive,
+ errorConsumer: (Throwable) -> Unit)
+ : ServiceHandler(serviceConnection, errorConsumer) {
@Volatile
- private var timeLastTickSentMs: Long = 0
- private val missedAckCounter: AtomicInteger = AtomicInteger()
+ private var keepAliveReceivedMillis = System.currentTimeMillis()
private var subscription: Disposable? = null
init {
- val tickPeriod = keepAliveInfo.tickPeriod
- val timeout = keepAliveInfo.ackTimeout
- val missedAcks = keepAliveInfo.missedAcks
- if (Duration.ZERO != tickPeriod) {
- val ackTimeoutMs = timeout.toMillis
- subscription = Flowable.interval(tickPeriod.toMillis, TimeUnit.MILLISECONDS)
- .doOnSubscribe { _ -> timeLastTickSentMs = System.currentTimeMillis() }
- .concatMap { _ -> sendKeepAlive(ackTimeoutMs, missedAcks).toFlowable() }
- .subscribe({},
- { t: Throwable ->
- errorConsumer(t)
- serviceConnection.close().subscribe({}, errorConsumer)
- })
- }
+ val tickPeriod = keepAlive.keepAliveInterval().millis
+ val timeout = keepAlive.keepAliveMaxLifeTime().millis
+ subscription = Flowable.interval(tickPeriod, TimeUnit.MILLISECONDS)
+ .concatMapCompletable { sendAndCheckKeepAlive(timeout) }
+ .subscribe({},
+ { t: Throwable ->
+ errorConsumer(t)
+ serviceConnection.close().subscribe({}, errorConsumer)
+ })
serviceConnection.onClose().subscribe({ cleanup() }, errorConsumer)
}
override fun handleKeepAlive(frame: Frame) {
if (!Frame.Keepalive.hasRespondFlag(frame)) {
- timeLastTickSentMs = System.currentTimeMillis()
+ keepAliveReceivedMillis = System.currentTimeMillis()
}
}
@@ -49,25 +42,17 @@ internal class ClientServiceHandler(private val serviceConnection: DuplexConnect
subscription?.dispose()
}
- private fun sendKeepAlive(ackTimeoutMs: Long, missedAcks: Int): Completable {
+ private fun sendAndCheckKeepAlive(timeout: Long): Completable {
return Completable.fromRunnable {
val now = System.currentTimeMillis()
- if (now - timeLastTickSentMs > ackTimeoutMs) {
- val count = missedAckCounter.incrementAndGet()
- if (count >= missedAcks) {
- val message = String.format(
- "Missed %d keep-alive acks with a threshold of %d and a ack timeout of %d ms",
- count, missedAcks, ackTimeoutMs)
- throw ConnectionException(message)
- }
+ val duration = now - keepAliveReceivedMillis
+ if (duration > timeout) {
+ val message =
+ "keep-alive timed out: $duration of $timeout ms"
+ throw ConnectionException(message)
}
sentFrames.onNext(
Frame.Keepalive.from(Unpooled.EMPTY_BUFFER, true))
}
}
}
-
-internal data class KeepAliveInfo(
- val tickPeriod: Duration = Duration.ZERO,
- val ackTimeout: Duration = Duration.ZERO,
- val missedAcks: Int = 0)
diff --git a/rsocket-core/src/main/java/io/rsocket/android/internal/ConnectionDemuxer.kt b/rsocket-core/src/main/java/io/rsocket/android/internal/ConnectionDemuxer.kt
index 898157e77..55fe6fb6d 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/internal/ConnectionDemuxer.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/internal/ConnectionDemuxer.kt
@@ -25,12 +25,12 @@ import io.rsocket.android.FrameType.*
import io.rsocket.android.FrameType.SETUP
import io.rsocket.android.plugins.DuplexConnectionInterceptor.Type
import io.rsocket.android.plugins.DuplexConnectionInterceptor.Type.*
-import io.rsocket.android.plugins.PluginRegistry
+import io.rsocket.android.plugins.InterceptorRegistry
import org.reactivestreams.Publisher
import org.slf4j.LoggerFactory
internal class ServerConnectionDemuxer(source: DuplexConnection,
- plugins: PluginRegistry)
+ plugins: InterceptorRegistry)
: ConnectionDemuxer(source, plugins) {
override fun demux(frame: Frame): Type {
@@ -46,7 +46,7 @@ internal class ServerConnectionDemuxer(source: DuplexConnection,
}
internal class ClientConnectionDemuxer(source: DuplexConnection,
- plugins: PluginRegistry)
+ plugins: InterceptorRegistry)
: ConnectionDemuxer(source, plugins) {
override fun demux(frame: Frame): Type {
@@ -62,7 +62,7 @@ internal class ClientConnectionDemuxer(source: DuplexConnection,
}
sealed class ConnectionDemuxer(private val source: DuplexConnection,
- plugins: PluginRegistry) {
+ plugins: InterceptorRegistry) {
private val setupConnection: DuplexConnection
private val responderConnection: DuplexConnection
@@ -70,19 +70,19 @@ sealed class ConnectionDemuxer(private val source: DuplexConnection,
private val serviceConnection: DuplexConnection
init {
- val src = plugins.applyConnection(ALL, source)
+ val src = plugins.interceptConnection(ALL, source)
val setupConn = DemuxedConnection(src)
- setupConnection = plugins.applyConnection(Type.SETUP, setupConn)
+ setupConnection = plugins.interceptConnection(Type.SETUP, setupConn)
val requesterConn = DemuxedConnection(src)
- requesterConnection = plugins.applyConnection(REQUESTER, requesterConn)
+ requesterConnection = plugins.interceptConnection(REQUESTER, requesterConn)
val responderConn = DemuxedConnection(src)
- responderConnection = plugins.applyConnection(RESPONDER, responderConn)
+ responderConnection = plugins.interceptConnection(RESPONDER, responderConn)
val serviceConn = DemuxedConnection(src)
- serviceConnection = plugins.applyConnection(SERVICE, serviceConn)
+ serviceConnection = plugins.interceptConnection(SERVICE, serviceConn)
src.receive()
.groupBy(::demux)
diff --git a/rsocket-core/src/main/java/io/rsocket/android/internal/ServerContractInterceptor.kt b/rsocket-core/src/main/java/io/rsocket/android/internal/ServerContractInterceptor.kt
new file mode 100644
index 000000000..9316e4f62
--- /dev/null
+++ b/rsocket-core/src/main/java/io/rsocket/android/internal/ServerContractInterceptor.kt
@@ -0,0 +1,127 @@
+package io.rsocket.android.internal
+
+import io.reactivex.Flowable
+import io.rsocket.android.DuplexConnection
+import io.rsocket.android.Frame
+import io.rsocket.android.FrameType.RESUME
+import io.rsocket.android.FrameType.SETUP
+import io.rsocket.android.exceptions.InvalidSetupException
+import io.rsocket.android.exceptions.RSocketException
+import io.rsocket.android.exceptions.RejectedResumeException
+import io.rsocket.android.exceptions.RejectedSetupException
+import io.rsocket.android.frame.SetupFrameFlyweight
+import io.rsocket.android.plugins.DuplexConnectionInterceptor
+import io.rsocket.android.plugins.DuplexConnectionInterceptor.Type
+import io.rsocket.android.util.DuplexConnectionProxy
+
+internal class ServerContractInterceptor(
+ private val errorConsumer: (Throwable) -> Unit,
+ private val protocolVersion: Int,
+ private val leaseEnabled: Boolean,
+ private val resumeEnabled: Boolean) : DuplexConnectionInterceptor {
+
+ constructor(errorConsumer: (Throwable) -> Unit) :
+ this(errorConsumer,
+ SetupFrameFlyweight.CURRENT_VERSION,
+ false,
+ false)
+
+ override fun invoke(type: Type, source: DuplexConnection): DuplexConnection =
+ if (type == Type.SETUP)
+ SetupContract(source,
+ errorConsumer,
+ protocolVersion,
+ leaseEnabled,
+ resumeEnabled)
+ else source
+}
+
+internal class SetupContract(source: DuplexConnection,
+ private val errorConsumer: (Throwable) -> Unit,
+ private val protocolVersion: Int,
+ private val leaseEnabled: Boolean,
+ private val resumeEnabled: Boolean)
+ : DuplexConnectionProxy(source) {
+
+ override fun receive(): Flowable {
+ return source.receive()
+ .filter { f ->
+ val accept =
+ try {
+ when (f.type) {
+ SETUP -> checkSetupFrame(source, f)
+ RESUME -> checkResumeFrame(source)
+ else -> unknownFrame(f)
+ }
+ } catch (e: Throwable) {
+ errorConsumer(e)
+ false
+ }
+
+ if (!accept) {
+ f.release()
+ }
+ accept
+ }
+ }
+
+ private fun checkSetupFrame(conn: DuplexConnection,
+ setupFrame: Frame): Boolean {
+ val version = Frame.Setup.version(setupFrame)
+ val leaseEnabled = Frame.Setup.leaseEnabled(setupFrame)
+ val resumeEnabled = Frame.Setup.resumeEnabled(setupFrame)
+
+ return checkSetupVersion(version, conn)
+ && checkSetupLease(leaseEnabled, conn)
+ && checkSetupResume(resumeEnabled, conn)
+ }
+
+ private fun checkResumeFrame(conn: DuplexConnection) =
+ check(!resumeEnabled,
+ { RejectedResumeException("Resumption is not supported") },
+ conn)
+
+ private fun unknownFrame(f: Frame): Boolean {
+ errorConsumer(IllegalArgumentException(
+ "Unknown setup frame: $f"))
+ return false
+ }
+
+ private fun checkSetupVersion(version: Int, conn: DuplexConnection) =
+ check(version != protocolVersion,
+ {
+ InvalidSetupException(
+ "Unsupported protocol: version $version, " +
+ "expected: $protocolVersion")
+ },
+ conn
+ )
+
+ private fun checkSetupLease(leaseEnabled: Boolean,
+ conn: DuplexConnection) =
+ check(leaseEnabled && !this.leaseEnabled,
+ { RejectedSetupException("Lease is not supported") },
+ conn)
+
+ private fun checkSetupResume(resumeEnabled: Boolean,
+ conn: DuplexConnection) =
+ check(resumeEnabled && !this.resumeEnabled,
+ { RejectedSetupException("Resumption is not supported") },
+ conn)
+
+ private inline fun check(errorPred: Boolean,
+ error: () -> RSocketException,
+ conn: DuplexConnection): Boolean =
+ if (errorPred) {
+ terminate(conn, error())
+ false
+ } else {
+ true
+ }
+
+ private fun terminate(conn: DuplexConnection, error: RSocketException) {
+ conn.sendOne(Frame.Error.from(0, error))
+ .andThen(conn.close())
+ .subscribe({}, errorConsumer)
+ }
+}
\ No newline at end of file
diff --git a/rsocket-core/src/main/java/io/rsocket/android/internal/ServerServiceHandler.kt b/rsocket-core/src/main/java/io/rsocket/android/internal/ServerServiceHandler.kt
index 1547de7f3..a1dbc45e4 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/internal/ServerServiceHandler.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/internal/ServerServiceHandler.kt
@@ -1,17 +1,59 @@
package io.rsocket.android.internal
import io.netty.buffer.Unpooled
+import io.reactivex.Completable
+import io.reactivex.Flowable
+import io.reactivex.disposables.Disposable
import io.rsocket.android.DuplexConnection
import io.rsocket.android.Frame
+import io.rsocket.android.exceptions.ConnectionException
+import io.rsocket.android.util.KeepAlive
+import java.util.concurrent.TimeUnit
internal class ServerServiceHandler(serviceConnection: DuplexConnection,
+ keepAlive: KeepAlive,
errorConsumer: (Throwable) -> Unit)
- : ServiceConnectionHandler(serviceConnection, errorConsumer) {
+ : ServiceHandler(serviceConnection, errorConsumer) {
+
+ @Volatile
+ private var keepAliveReceivedMillis = System.currentTimeMillis()
+ private var subscription: Disposable? = null
+
+ init {
+ val tickPeriod = keepAlive.keepAliveInterval().millis
+ val timeout = keepAlive.keepAliveMaxLifeTime().millis
+ Flowable.interval(tickPeriod, TimeUnit.MILLISECONDS)
+ .concatMapCompletable { checkKeepAlive(timeout) }
+ .subscribe({},
+ { err ->
+ errorConsumer(err)
+ serviceConnection.close().subscribe({}, errorConsumer)
+ })
+
+ serviceConnection.onClose().subscribe({ cleanup() }, errorConsumer)
+ }
override fun handleKeepAlive(frame: Frame) {
if (Frame.Keepalive.hasRespondFlag(frame)) {
+ keepAliveReceivedMillis = System.currentTimeMillis()
val data = Unpooled.wrappedBuffer(frame.data)
sentFrames.onNext(Frame.Keepalive.from(data, false))
}
}
+
+ private fun cleanup() {
+ subscription?.dispose()
+ }
+
+ private fun checkKeepAlive(timeout: Long): Completable {
+ return Completable.fromRunnable {
+ val now = System.currentTimeMillis()
+ val duration = now - keepAliveReceivedMillis
+ if (duration > timeout) {
+ val message = String.format(
+ "keep-alive timed out: %d of %d ms", duration, timeout)
+ throw ConnectionException(message)
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/rsocket-core/src/main/java/io/rsocket/android/internal/ServiceConnectionHandler.kt b/rsocket-core/src/main/java/io/rsocket/android/internal/ServiceHandler.kt
similarity index 63%
rename from rsocket-core/src/main/java/io/rsocket/android/internal/ServiceConnectionHandler.kt
rename to rsocket-core/src/main/java/io/rsocket/android/internal/ServiceHandler.kt
index c1bfe7987..6815942f0 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/internal/ServiceConnectionHandler.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/internal/ServiceHandler.kt
@@ -6,18 +6,20 @@ import io.rsocket.android.Frame
import io.rsocket.android.FrameType
import io.rsocket.android.exceptions.Exceptions
-internal abstract class ServiceConnectionHandler(private val serviceConnection: DuplexConnection,
- private val errorConsumer: (Throwable) -> Unit) {
+internal abstract class ServiceHandler(private val serviceConnection: DuplexConnection,
+ private val errorConsumer: (Throwable) -> Unit) {
- val sentFrames = UnicastProcessor.create()
+ internal val sentFrames = UnicastProcessor.create()
init {
- serviceConnection.receive()
+ serviceConnection
+ .receive()
.subscribe(::handle, errorConsumer)
- serviceConnection.send(sentFrames).subscribe({}, errorConsumer)
- }
- abstract fun handleKeepAlive(frame: Frame)
+ serviceConnection
+ .send(sentFrames)
+ .subscribe({}, errorConsumer)
+ }
private fun handle(frame: Frame) {
try {
@@ -25,15 +27,17 @@ internal abstract class ServiceConnectionHandler(private val serviceConnection:
FrameType.LEASE -> handleLease(frame)
FrameType.ERROR -> handleError(frame)
FrameType.KEEPALIVE -> handleKeepAlive(frame)
- else -> unknownFrame(frame)
+ else -> handleUnknownFrame(frame)
}
} finally {
frame.release()
}
}
+ protected abstract fun handleKeepAlive(frame: Frame)
+
private fun handleLease(frame: Frame) {
- unsupportedLeaseFrame(frame)
+ errorConsumer(IllegalArgumentException("Lease is not supported: $frame"))
}
private fun handleError(frame: Frame) {
@@ -42,12 +46,7 @@ internal abstract class ServiceConnectionHandler(private val serviceConnection:
}
- private fun unknownFrame(frame: Frame) {
+ private fun handleUnknownFrame(frame: Frame) {
errorConsumer(IllegalArgumentException("Unexpected frame: $frame"))
}
-
- private fun unsupportedLeaseFrame(frame: Frame) {
- errorConsumer(IllegalArgumentException("Lease is not supported: $frame"))
- }
-
}
\ No newline at end of file
diff --git a/rsocket-core/src/main/java/io/rsocket/android/package-info.java b/rsocket-core/src/main/java/io/rsocket/android/package-info.java
deleted file mode 100644
index 95f28bf2f..000000000
--- a/rsocket-core/src/main/java/io/rsocket/android/package-info.java
+++ /dev/null
@@ -1,17 +0,0 @@
-/*
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.rsocket.android;
diff --git a/rsocket-core/src/main/java/io/rsocket/android/plugins/DuplexConnectionInterceptor.kt b/rsocket-core/src/main/java/io/rsocket/android/plugins/DuplexConnectionInterceptor.kt
index 13bbc41fe..f7e298e75 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/plugins/DuplexConnectionInterceptor.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/plugins/DuplexConnectionInterceptor.kt
@@ -18,7 +18,8 @@ package io.rsocket.android.plugins
import io.rsocket.android.DuplexConnection
-interface DuplexConnectionInterceptor : (DuplexConnectionInterceptor.Type, DuplexConnection) -> DuplexConnection {
+interface DuplexConnectionInterceptor : (DuplexConnectionInterceptor.Type,
+ DuplexConnection) -> DuplexConnection {
enum class Type {
ALL,
SETUP,
diff --git a/rsocket-core/src/main/java/io/rsocket/android/plugins/Plugins.kt b/rsocket-core/src/main/java/io/rsocket/android/plugins/GlobalInterceptors.kt
similarity index 59%
rename from rsocket-core/src/main/java/io/rsocket/android/plugins/Plugins.kt
rename to rsocket-core/src/main/java/io/rsocket/android/plugins/GlobalInterceptors.kt
index 7e5eb77d8..deec54e3b 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/plugins/Plugins.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/plugins/GlobalInterceptors.kt
@@ -17,20 +17,20 @@
package io.rsocket.android.plugins
/** JVM wide plugins for RSocket */
-object Plugins {
- private val DEFAULT = PluginRegistry()
+object GlobalInterceptors : InterceptorOptions {
+ private val DEFAULT = InterceptorRegistry()
- fun interceptConnection(interceptor: DuplexConnectionInterceptor) {
- DEFAULT.addConnectionPlugin(interceptor)
+ override fun connection(interceptor: DuplexConnectionInterceptor) {
+ DEFAULT.connection(interceptor)
}
- fun interceptClient(interceptor: RSocketInterceptor) {
- DEFAULT.addClientPlugin(interceptor)
+ override fun requester(interceptor: RSocketInterceptor) {
+ DEFAULT.requester(interceptor)
}
- fun interceptServer(interceptor: RSocketInterceptor) {
- DEFAULT.addServerPlugin(interceptor)
+ override fun handler(interceptor: RSocketInterceptor) {
+ DEFAULT.handler(interceptor)
}
- fun defaultPlugins(): PluginRegistry = DEFAULT
+ internal fun create(): InterceptorRegistry = InterceptorRegistry(DEFAULT)
}
diff --git a/rsocket-core/src/main/java/io/rsocket/android/plugins/InterceptorOptions.kt b/rsocket-core/src/main/java/io/rsocket/android/plugins/InterceptorOptions.kt
new file mode 100644
index 000000000..bc9cbe8fa
--- /dev/null
+++ b/rsocket-core/src/main/java/io/rsocket/android/plugins/InterceptorOptions.kt
@@ -0,0 +1,10 @@
+package io.rsocket.android.plugins
+
+interface InterceptorOptions {
+
+ fun connection(interceptor: DuplexConnectionInterceptor)
+
+ fun requester(interceptor: RSocketInterceptor)
+
+ fun handler(interceptor: RSocketInterceptor)
+}
\ No newline at end of file
diff --git a/rsocket-core/src/main/java/io/rsocket/android/plugins/InterceptorRegistry.kt b/rsocket-core/src/main/java/io/rsocket/android/plugins/InterceptorRegistry.kt
new file mode 100644
index 000000000..f2987d65e
--- /dev/null
+++ b/rsocket-core/src/main/java/io/rsocket/android/plugins/InterceptorRegistry.kt
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.rsocket.android.plugins
+
+import io.rsocket.android.DuplexConnection
+import io.rsocket.android.RSocket
+import java.util.ArrayList
+
+internal class InterceptorRegistry : InterceptorOptions {
+ private val connections = ArrayList()
+ private val requesters = ArrayList()
+ private val handlers = ArrayList()
+
+ constructor()
+
+ constructor(interceptorRegistry: InterceptorRegistry) {
+ this.connections.addAll(interceptorRegistry.connections)
+ this.requesters.addAll(interceptorRegistry.requesters)
+ this.handlers.addAll(interceptorRegistry.handlers)
+ }
+
+ fun copyWith(action: (InterceptorRegistry) -> Unit): InterceptorRegistry {
+ val copy = InterceptorRegistry(this)
+ action(copy)
+ return copy
+ }
+
+ override fun connection(interceptor: DuplexConnectionInterceptor) {
+ connections.add(interceptor)
+ }
+
+ fun connectionFirst(interceptor: DuplexConnectionInterceptor) {
+ connections.add(0, interceptor)
+ }
+
+ override fun requester(interceptor: RSocketInterceptor) {
+ requesters.add(interceptor)
+ }
+
+ override fun handler(interceptor: RSocketInterceptor) {
+ handlers.add(interceptor)
+ }
+
+ fun interceptRequester(rSocket: RSocket): RSocket {
+ var rs = rSocket
+ for (interceptor in requesters) {
+ rs = interceptor(rs)
+ }
+ return rs
+ }
+
+ fun interceptHandler(rSocket: RSocket): RSocket {
+ var rs = rSocket
+ for (interceptor in handlers) {
+ rs = interceptor(rs)
+ }
+ return rs
+ }
+
+ fun interceptConnection(
+ type: DuplexConnectionInterceptor.Type,
+ connection: DuplexConnection): DuplexConnection {
+ var conn = connection
+ for (interceptor in connections) {
+ conn = interceptor(type, conn)
+ }
+ return conn
+ }
+}
diff --git a/rsocket-core/src/main/java/io/rsocket/android/plugins/PluginRegistry.kt b/rsocket-core/src/main/java/io/rsocket/android/plugins/PluginRegistry.kt
deleted file mode 100644
index 0ad858704..000000000
--- a/rsocket-core/src/main/java/io/rsocket/android/plugins/PluginRegistry.kt
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.rsocket.android.plugins
-
-import io.rsocket.android.DuplexConnection
-import io.rsocket.android.RSocket
-import java.util.ArrayList
-
-class PluginRegistry {
- private val connections = ArrayList()
- private val clients = ArrayList()
- private val servers = ArrayList()
-
- constructor()
-
- constructor(defaults: PluginRegistry) {
- this.connections.addAll(defaults.connections)
- this.clients.addAll(defaults.clients)
- this.servers.addAll(defaults.servers)
- }
-
- fun addConnectionPlugin(interceptor: DuplexConnectionInterceptor) {
- connections.add(interceptor)
- }
-
- fun addClientPlugin(interceptor: RSocketInterceptor) {
- clients.add(interceptor)
- }
-
- fun addServerPlugin(interceptor: RSocketInterceptor) {
- servers.add(interceptor)
- }
-
- fun applyClient(rSocket: RSocket): RSocket {
- var rs = rSocket
- for (interceptor in clients) {
- rs = interceptor(rs)
- }
- return rs
- }
-
- fun applyServer(rSocket: RSocket): RSocket {
- var rs = rSocket
- for (interceptor in servers) {
- rs = interceptor(rs)
- }
- return rs
- }
-
- fun applyConnection(
- type: DuplexConnectionInterceptor.Type, connection: DuplexConnection): DuplexConnection {
- var conn = connection
- for (interceptor in connections) {
- conn = interceptor(type, conn)
- }
- return conn
- }
-}
diff --git a/rsocket-core/src/main/java/io/rsocket/android/transport/package-info.java b/rsocket-core/src/main/java/io/rsocket/android/transport/package-info.java
deleted file mode 100644
index 10fd13f90..000000000
--- a/rsocket-core/src/main/java/io/rsocket/android/transport/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-@javax.annotation.ParametersAreNonnullByDefault
-package io.rsocket.android.transport;
diff --git a/rsocket-core/src/main/java/io/rsocket/android/util/DuplexConnectionProxy.kt b/rsocket-core/src/main/java/io/rsocket/android/util/DuplexConnectionProxy.kt
new file mode 100644
index 000000000..a86f34d5d
--- /dev/null
+++ b/rsocket-core/src/main/java/io/rsocket/android/util/DuplexConnectionProxy.kt
@@ -0,0 +1,6 @@
+package io.rsocket.android.util
+
+import io.rsocket.android.DuplexConnection
+
+open class DuplexConnectionProxy(protected val source: DuplexConnection)
+ : DuplexConnection by source
\ No newline at end of file
diff --git a/rsocket-core/src/main/java/io/rsocket/android/util/KeepAlive.kt b/rsocket-core/src/main/java/io/rsocket/android/util/KeepAlive.kt
new file mode 100644
index 000000000..d67b6c633
--- /dev/null
+++ b/rsocket-core/src/main/java/io/rsocket/android/util/KeepAlive.kt
@@ -0,0 +1,10 @@
+package io.rsocket.android.util
+
+import io.rsocket.android.Duration
+
+interface KeepAlive {
+
+ fun keepAliveInterval(): Duration
+
+ fun keepAliveMaxLifeTime(): Duration
+}
\ No newline at end of file
diff --git a/rsocket-core/src/main/java/io/rsocket/android/util/KeepAliveOptions.kt b/rsocket-core/src/main/java/io/rsocket/android/util/KeepAliveOptions.kt
new file mode 100644
index 000000000..6518b3398
--- /dev/null
+++ b/rsocket-core/src/main/java/io/rsocket/android/util/KeepAliveOptions.kt
@@ -0,0 +1,33 @@
+package io.rsocket.android.util
+
+import io.rsocket.android.Duration
+
+class KeepAliveOptions : KeepAlive {
+ private var interval: Duration = Duration.ofMillis(100)
+ private var maxLifeTime: Duration = Duration.ofSeconds(1)
+
+ fun keepAliveInterval(interval: Duration): KeepAliveOptions {
+ assertDuration(interval, "keepAliveInterval")
+ this.interval = interval
+ return this
+ }
+
+ override fun keepAliveInterval() = interval
+
+ fun keepAliveMaxLifeTime(maxLifetime: Duration): KeepAliveOptions {
+ assertDuration(maxLifetime, "keepAliveMaxLifeTime")
+ this.maxLifeTime = maxLifetime
+ return this
+ }
+
+ override fun keepAliveMaxLifeTime() = maxLifeTime
+
+ private fun assertDuration(duration: Duration, name: String) {
+ if (duration.millis <= 0) {
+ throw IllegalArgumentException("$name must be positive")
+ }
+ if (duration.millis > Integer.MAX_VALUE) {
+ throw IllegalArgumentException("$name must not exceed 2^31-1")
+ }
+ }
+}
diff --git a/rsocket-core/src/main/java/io/rsocket/android/util/MediaType.kt b/rsocket-core/src/main/java/io/rsocket/android/util/MediaType.kt
new file mode 100644
index 000000000..deda17871
--- /dev/null
+++ b/rsocket-core/src/main/java/io/rsocket/android/util/MediaType.kt
@@ -0,0 +1,8 @@
+package io.rsocket.android.util
+
+interface MediaType {
+
+ fun dataMimeType(): String
+
+ fun metadataMimeType(): String
+}
\ No newline at end of file
diff --git a/rsocket-core/src/main/java/io/rsocket/android/util/MediaTypeOptions.kt b/rsocket-core/src/main/java/io/rsocket/android/util/MediaTypeOptions.kt
new file mode 100644
index 000000000..6d2b260d9
--- /dev/null
+++ b/rsocket-core/src/main/java/io/rsocket/android/util/MediaTypeOptions.kt
@@ -0,0 +1,28 @@
+package io.rsocket.android.util
+
+class MediaTypeOptions : MediaType {
+ private var dataMimeType: String = "application/binary"
+ private var metadataMimeType: String = "application/binary"
+
+ fun dataMimeType(dataMimeType: String): MediaTypeOptions {
+ assertMediaType(dataMimeType)
+ this.dataMimeType = dataMimeType
+ return this
+ }
+
+ override fun dataMimeType(): String = dataMimeType
+
+ fun metadataMimeType(metadataMimeType: String): MediaTypeOptions {
+ assertMediaType(metadataMimeType)
+ this.metadataMimeType = metadataMimeType
+ return this
+ }
+
+ override fun metadataMimeType(): String = metadataMimeType
+
+ private fun assertMediaType(mediaType: String) {
+ if (mediaType.isEmpty()) {
+ throw IllegalArgumentException("media type must be non-empty")
+ }
+ }
+}
\ No newline at end of file
diff --git a/rsocket-core/src/main/java/io/rsocket/android/util/RSocketProxy.kt b/rsocket-core/src/main/java/io/rsocket/android/util/RSocketProxy.kt
index 25b6b089c..08eadd437 100644
--- a/rsocket-core/src/main/java/io/rsocket/android/util/RSocketProxy.kt
+++ b/rsocket-core/src/main/java/io/rsocket/android/util/RSocketProxy.kt
@@ -23,21 +23,4 @@ import io.rsocket.android.RSocket
import org.reactivestreams.Publisher
/** Wrapper/Proxy for a RSocket. This is useful when we want to override a specific method. */
-open class RSocketProxy(protected val source: RSocket) : RSocket {
-
- override fun fireAndForget(payload: Payload): Completable = source.fireAndForget(payload)
-
- override fun requestResponse(payload: Payload): Single = source.requestResponse(payload)
-
- override fun requestStream(payload: Payload): Flowable = source.requestStream(payload)
-
- override fun requestChannel(payloads: Publisher): Flowable = source.requestChannel(payloads)
-
- override fun metadataPush(payload: Payload): Completable = source.metadataPush(payload)
-
- override fun availability(): Double = source.availability()
-
- override fun close(): Completable = source.close()
-
- override fun onClose(): Completable = source.onClose()
-}
+open class RSocketProxy(source: RSocket) : RSocket by source
\ No newline at end of file
diff --git a/rsocket-core/src/main/java/io/rsocket/android/util/package-info.java b/rsocket-core/src/main/java/io/rsocket/android/util/package-info.java
deleted file mode 100644
index 1e1ff7a3d..000000000
--- a/rsocket-core/src/main/java/io/rsocket/android/util/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-@javax.annotation.ParametersAreNonnullByDefault
-package io.rsocket.android.util;
diff --git a/rsocket-core/src/test/java/io/rsocket/android/RSocketClientTest.kt b/rsocket-core/src/test/java/io/rsocket/android/RSocketRequesterTest.kt
similarity index 89%
rename from rsocket-core/src/test/java/io/rsocket/android/RSocketClientTest.kt
rename to rsocket-core/src/test/java/io/rsocket/android/RSocketRequesterTest.kt
index ae33da4d2..4ec32b692 100644
--- a/rsocket-core/src/test/java/io/rsocket/android/RSocketClientTest.kt
+++ b/rsocket-core/src/test/java/io/rsocket/android/RSocketRequesterTest.kt
@@ -37,7 +37,7 @@ import org.junit.runners.model.Statement
import java.nio.channels.ClosedChannelException
import java.util.concurrent.TimeUnit
-class RSocketClientTest {
+class RSocketRequesterTest {
@get:Rule
val rule = ClientSocketRule()
@@ -59,7 +59,7 @@ class RSocketClientTest {
@Test(timeout = 2000)
fun testStreamInitialN() {
- val stream = rule.client.requestStream(PayloadImpl.EMPTY)
+ val stream = rule.requester.requestStream(PayloadImpl.EMPTY)
Completable.timer(100, TimeUnit.MILLISECONDS)
.subscribe({
val subscriber = TestSubscriber()
@@ -86,7 +86,7 @@ class RSocketClientTest {
@Test(timeout = 2000)
fun testHandleApplicationException() {
- val response = rule.client.requestResponse(PayloadImpl.EMPTY).toFlowable()
+ val response = rule.requester.requestResponse(PayloadImpl.EMPTY).toFlowable()
val responseSub = TestSubscriber.create()
response.subscribe(responseSub)
rule.receiver.onNext(Frame.Error.from(1, ApplicationException("error")))
@@ -96,7 +96,7 @@ class RSocketClientTest {
@Test(timeout = 2000)
fun testHandleValidFrame() {
- val response = rule.client.requestResponse(PayloadImpl.EMPTY).toFlowable()
+ val response = rule.requester.requestResponse(PayloadImpl.EMPTY).toFlowable()
val sub = TestSubscriber.create()
response.subscribe(sub)
@@ -112,7 +112,7 @@ class RSocketClientTest {
val subs = TestSubscriber.create()
rule.sender.filter { it.type != FrameType.KEEPALIVE }
.subscribe(subs)
- rule.client.requestResponse(PayloadImpl.EMPTY).timeout(100, TimeUnit.MILLISECONDS)
+ rule.requester.requestResponse(PayloadImpl.EMPTY).timeout(100, TimeUnit.MILLISECONDS)
.onErrorReturnItem(PayloadImpl("test"))
.blockingGet()
@@ -127,7 +127,7 @@ class RSocketClientTest {
@Test
fun testLazyRequestResponse() {
- val response = rule.client.requestResponse(PayloadImpl.EMPTY).toFlowable()
+ val response = rule.requester.requestResponse(PayloadImpl.EMPTY).toFlowable()
val framesSubs = TestSubscriber.create()
rule.sender.filter { it.type != FrameType.KEEPALIVE }.subscribe(framesSubs)
@@ -145,7 +145,7 @@ class RSocketClientTest {
fun requestErrorOnConnectionClose() {
Completable.timer(100, TimeUnit.MILLISECONDS)
.andThen(rule.conn.close()).subscribe()
- val requestStream = rule.client.requestStream(PayloadImpl("test"))
+ val requestStream = rule.requester.requestStream(PayloadImpl("test"))
val subs = TestSubscriber.create()
requestStream.blockingSubscribe(subs)
subs.assertNoValues()
@@ -185,7 +185,7 @@ class RSocketClientTest {
private fun assertFlowableError(f: (RSocket) -> Flowable) {
rule.conn.close().subscribe()
val subs = TestSubscriber.create()
- f(rule.client).delaySubscription(100, TimeUnit.MILLISECONDS).blockingSubscribe(subs)
+ f(rule.requester).delaySubscription(100, TimeUnit.MILLISECONDS).blockingSubscribe(subs)
subs.assertNoValues()
subs.assertError { it is ClosedChannelException }
}
@@ -194,7 +194,7 @@ class RSocketClientTest {
rule.conn.close().subscribe()
val requestStream = Completable
.timer(100, TimeUnit.MILLISECONDS)
- .andThen(f(rule.client))
+ .andThen(f(rule.requester))
val err = requestStream.blockingGet()
assertThat("error is not ClosedChannelException",
err is ClosedChannelException)
@@ -202,7 +202,7 @@ class RSocketClientTest {
private fun assertSingleError(f: (RSocket) -> Single) {
rule.conn.close().subscribe()
- val response = f(rule.client).delaySubscription(100, TimeUnit.MILLISECONDS)
+ val response = f(rule.requester).delaySubscription(100, TimeUnit.MILLISECONDS)
val subs = BlockingMultiObserver()
response.subscribe(subs)
val err = subs.blockingGetError()
@@ -214,7 +214,7 @@ class RSocketClientTest {
lateinit var sender: PublishProcessor
lateinit var receiver: PublishProcessor
lateinit var conn: LocalDuplexConnection
- internal lateinit var client: RSocketClient
+ internal lateinit var requester: RSocketRequester
val errors: MutableList = ArrayList()
override fun apply(base: Statement, description: Description?): Statement {
@@ -225,13 +225,13 @@ class RSocketClientTest {
receiver = PublishProcessor.create()
conn = LocalDuplexConnection("clientRequesterConn", sender, receiver)
- client = RSocketClient(
+ requester = RSocketRequester(
conn,
{ throwable ->
errors.add(throwable)
Unit
},
- StreamIdSupplier.clientSupplier(),
+ ClientStreamIds(),
streamWindow)
base.evaluate()
diff --git a/rsocket-core/src/test/java/io/rsocket/android/RSocketServerTest.kt b/rsocket-core/src/test/java/io/rsocket/android/RSocketResponderTest.kt
similarity index 94%
rename from rsocket-core/src/test/java/io/rsocket/android/RSocketServerTest.kt
rename to rsocket-core/src/test/java/io/rsocket/android/RSocketResponderTest.kt
index e641c2342..f92d20fba 100644
--- a/rsocket-core/src/test/java/io/rsocket/android/RSocketServerTest.kt
+++ b/rsocket-core/src/test/java/io/rsocket/android/RSocketResponderTest.kt
@@ -17,7 +17,6 @@
package io.rsocket.android
-import io.netty.buffer.Unpooled
import io.reactivex.Completable
import io.reactivex.Flowable
import io.reactivex.Single
@@ -35,7 +34,7 @@ import org.junit.runners.model.Statement
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
-class RSocketServerTest {
+class RSocketResponderTest {
@get:Rule
val rule = ServerSocketRule()
@@ -112,8 +111,8 @@ class RSocketServerTest {
lateinit var sender: PublishProcessor
lateinit var receiver: PublishProcessor
private lateinit var conn: LocalDuplexConnection
- lateinit var errors:MutableList
- internal lateinit var rsocket: RSocketServer
+ lateinit var errors: MutableList
+ internal lateinit var rsocket: RSocketResponder
override fun apply(base: Statement, description: Description?): Statement {
return object : Statement() {
@@ -130,7 +129,11 @@ class RSocketServerTest {
receiver = PublishProcessor.create()
conn = LocalDuplexConnection("serverConn", sender, receiver)
errors = ArrayList()
- rsocket = RSocketServer(conn, acceptingSocket) { throwable -> errors.add(throwable) }
+ rsocket = RSocketResponder(
+ conn,
+ acceptingSocket,
+ { throwable -> errors.add(throwable) },
+ 128)
}
fun setAccSocket(acceptingSocket: RSocket) {
diff --git a/rsocket-core/src/test/java/io/rsocket/android/RSocketTest.kt b/rsocket-core/src/test/java/io/rsocket/android/RSocketTest.kt
index 83751e09b..a49df0abf 100644
--- a/rsocket-core/src/test/java/io/rsocket/android/RSocketTest.kt
+++ b/rsocket-core/src/test/java/io/rsocket/android/RSocketTest.kt
@@ -84,8 +84,8 @@ class RSocketTest {
class SocketRule : ExternalResource() {
- lateinit internal var crs: RSocketClient
- internal lateinit var srs: RSocketServer
+ lateinit internal var crs: RSocketRequester
+ internal lateinit var srs: RSocketResponder
private var requestAcceptor: RSocket? = null
lateinit private var serverProcessor: PublishProcessor
lateinit private var clientProcessor: PublishProcessor
@@ -106,7 +106,7 @@ class RSocketTest {
serverProcessor = PublishProcessor.create()
clientProcessor = PublishProcessor.create()
- val serverConnection = LocalDuplexConnection("server", clientProcessor, serverProcessor)
+ val serverConnection = LocalDuplexConnection("responder", clientProcessor, serverProcessor)
val clientConnection = LocalDuplexConnection("client", serverProcessor, clientProcessor)
requestAcceptor = if (null != requestAcceptor)
@@ -126,13 +126,16 @@ class RSocketTest {
}
}
- srs = RSocketServer(
- serverConnection, requestAcceptor!!) { throwable -> serverErrors.add(throwable) }
+ srs = RSocketResponder(
+ serverConnection,
+ requestAcceptor!!,
+ { throwable -> serverErrors.add(throwable) },
+ 128)
- crs = RSocketClient(
+ crs = RSocketRequester(
clientConnection,
{ throwable -> clientErrors.add(throwable) },
- StreamIdSupplier.clientSupplier(),
+ ClientStreamIds(),
128)
}
@@ -163,13 +166,13 @@ class RSocketTest {
fun assertNoServerErrors() {
assertThat(
- "Unexpected error on the server connection.",
+ "Unexpected error on the responder connection.",
serverErrors,
empty())
}
fun assertServerErrorCount(count: Int) {
- assertThat("Unexpected error count on the server connection.",
+ assertThat("Unexpected error count on the responder connection.",
serverErrors,
hasSize(count))
}
diff --git a/rsocket-core/src/test/java/io/rsocket/android/RequesterStreamWindowTest.kt b/rsocket-core/src/test/java/io/rsocket/android/RequesterStreamWindowTest.kt
index 9c08c6243..6fb7e8358 100644
--- a/rsocket-core/src/test/java/io/rsocket/android/RequesterStreamWindowTest.kt
+++ b/rsocket-core/src/test/java/io/rsocket/android/RequesterStreamWindowTest.kt
@@ -2,7 +2,6 @@ package io.rsocket.android
import io.reactivex.Flowable
import io.reactivex.processors.PublishProcessor
-import io.reactivex.schedulers.Schedulers
import io.rsocket.android.test.util.LocalDuplexConnection
import io.rsocket.android.util.PayloadImpl
import org.hamcrest.MatcherAssert.assertThat
@@ -22,14 +21,14 @@ class RequesterStreamWindowTest {
@Test(timeout = 3_000)
fun requesterStreamInbound() {
checkRequesterInbound(
- rule.client.requestStream(PayloadImpl("test")),
+ rule.requester.requestStream(PayloadImpl("test")),
FrameType.REQUEST_STREAM)
}
@Test(timeout = 3_000)
fun requesterChannelInbound() {
checkRequesterInbound(
- rule.client.requestChannel(Flowable.just(PayloadImpl("test"))),
+ rule.requester.requestChannel(Flowable.just(PayloadImpl("test"))),
FrameType.REQUEST_CHANNEL)
}
@@ -39,7 +38,7 @@ class RequesterStreamWindowTest {
val request = Flowable.just(1, 2, 3)
.map { PayloadImpl(it.toString()) as Payload }
.doOnRequest { demand = it }
- rule.client.requestChannel(request).subscribe({}, {})
+ rule.requester.requestChannel(request).subscribe({}, {})
rule.receiver.onNext(Frame.RequestN.from(1, Int.MAX_VALUE))
assertThat("requesterConnection channel handler is not limited",
demand,
@@ -63,7 +62,7 @@ class RequesterStreamWindowTest {
lateinit var sender: PublishProcessor
lateinit var receiver: PublishProcessor
lateinit var conn: LocalDuplexConnection
- internal lateinit var client: RSocketClient
+ internal lateinit var requester: RSocketRequester
val errors: MutableList = ArrayList()
override fun apply(base: Statement, description: Description?): Statement {
@@ -74,13 +73,13 @@ class RequesterStreamWindowTest {
receiver = PublishProcessor.create()
conn = LocalDuplexConnection("conn", sender, receiver)
- client = RSocketClient(
+ requester = RSocketRequester(
conn,
{ throwable ->
errors.add(throwable)
Unit
},
- StreamIdSupplier.clientSupplier(),
+ ClientStreamIds(),
streamWindow)
base.evaluate()
diff --git a/rsocket-core/src/test/java/io/rsocket/android/ResponderStreamWindowTest.kt b/rsocket-core/src/test/java/io/rsocket/android/ResponderStreamWindowTest.kt
index 8fcd0f127..9c20f644c 100644
--- a/rsocket-core/src/test/java/io/rsocket/android/ResponderStreamWindowTest.kt
+++ b/rsocket-core/src/test/java/io/rsocket/android/ResponderStreamWindowTest.kt
@@ -3,7 +3,6 @@ package io.rsocket.android
import io.reactivex.Completable
import io.reactivex.Flowable
import io.reactivex.processors.PublishProcessor
-import io.reactivex.schedulers.Schedulers
import io.rsocket.android.test.util.LocalDuplexConnection
import io.rsocket.android.util.PayloadImpl
import org.hamcrest.MatcherAssert.assertThat
@@ -67,7 +66,7 @@ class ResponderStreamWindowTest {
lateinit var sender: PublishProcessor
lateinit var receiver: PublishProcessor
lateinit var conn: LocalDuplexConnection
- internal lateinit var server: RSocketServer
+ internal lateinit var responder: RSocketResponder
val errors: MutableList = ArrayList()
var responseDemand: Long? = null
@@ -79,7 +78,7 @@ class ResponderStreamWindowTest {
receiver = PublishProcessor.create()
conn = LocalDuplexConnection("conn", sender, receiver)
- server = RSocketServer(
+ responder = RSocketResponder(
conn,
object : AbstractRSocket() {
override fun requestStream(payload: Payload): Flowable =
diff --git a/rsocket-core/src/test/java/io/rsocket/android/ServiceConnectionHandlerTest.kt b/rsocket-core/src/test/java/io/rsocket/android/ServiceHandlerTest.kt
similarity index 60%
rename from rsocket-core/src/test/java/io/rsocket/android/ServiceConnectionHandlerTest.kt
rename to rsocket-core/src/test/java/io/rsocket/android/ServiceHandlerTest.kt
index 8abcfe188..f9b366e3e 100644
--- a/rsocket-core/src/test/java/io/rsocket/android/ServiceConnectionHandlerTest.kt
+++ b/rsocket-core/src/test/java/io/rsocket/android/ServiceHandlerTest.kt
@@ -3,27 +3,33 @@ package io.rsocket.android
import io.netty.buffer.Unpooled
import io.netty.buffer.Unpooled.EMPTY_BUFFER
import io.reactivex.processors.UnicastProcessor
+import io.rsocket.android.exceptions.ConnectionException
import io.rsocket.android.exceptions.RejectedSetupException
import io.rsocket.android.internal.ClientServiceHandler
-import io.rsocket.android.internal.KeepAliveInfo
import io.rsocket.android.internal.ServerServiceHandler
import io.rsocket.android.test.util.LocalDuplexConnection
+import io.rsocket.android.util.KeepAlive
+import io.rsocket.android.util.KeepAliveOptions
import org.junit.After
import org.junit.Assert.*
import org.junit.Before
import org.junit.Test
import java.util.concurrent.TimeUnit
-class ServiceConnectionHandlerTest {
+class ServiceHandlerTest {
lateinit var sender: UnicastProcessor
lateinit var receiver: UnicastProcessor
lateinit var conn: LocalDuplexConnection
+ private lateinit var errors: Errors
+ private lateinit var keepAlive: KeepAlive
@Before
fun setUp() {
sender = UnicastProcessor.create()
receiver = UnicastProcessor.create()
conn = LocalDuplexConnection("clientRequesterConn", sender, receiver)
+ errors = Errors()
+ keepAlive = KeepAliveOptions()
}
@After
@@ -33,8 +39,7 @@ class ServiceConnectionHandlerTest {
@Test
fun serviceHandlerLease() {
- val errors = Errors()
- ServerServiceHandler(conn, errors)
+ ServerServiceHandler(conn, keepAlive, errors)
receiver.onNext(Frame.Lease.from(1000, 42, EMPTY_BUFFER))
val errs = errors.get()
assertEquals(1, errs.size)
@@ -43,8 +48,7 @@ class ServiceConnectionHandlerTest {
@Test
fun serviceHandlerError() {
- val errors = Errors()
- ServerServiceHandler(conn, errors)
+ ServerServiceHandler(conn, keepAlive, errors)
receiver.onNext(Frame.Error.from(0, RejectedSetupException("error")))
val errs = errors.get()
assertEquals(1, errs.size)
@@ -57,30 +61,53 @@ class ServiceConnectionHandlerTest {
@Test(timeout = 2_000)
fun serverServiceHandlerKeepAlive() {
- val errors = Errors()
- ServerServiceHandler(conn, errors)
+ ServerServiceHandler(conn, keepAlive, errors)
receiver.onNext(Frame.Keepalive.from(Unpooled.EMPTY_BUFFER, true))
val keepAliveResponse = sender.blockingFirst()
assertTrue(keepAliveResponse.type == FrameType.KEEPALIVE)
assertFalse(Frame.Keepalive.hasRespondFlag(keepAliveResponse))
}
+ @Test(timeout = 2_000)
+ fun serverServiceHandlerKeepAliveTimeout() {
+ ServerServiceHandler(conn, keepAlive, errors)
+ conn.onClose().blockingAwait()
+ val errs = errors.get()
+ assertEquals(1, errs.size)
+ val err = errs.first()
+ assertTrue(err is ConnectionException)
+ assertTrue((err as ConnectionException).message
+ ?.startsWith("keep-alive timed out")
+ ?: throw AssertionError(
+ "ConnectionException error must be non-null"))
+ }
+
@Test(timeout = 2_000)
fun clientServiceHandlerKeepAlive() {
- val errors = Errors()
ClientServiceHandler(
conn,
- errors,
- KeepAliveInfo(
- Duration.ofMillis(100),
- Duration.ofSeconds(1),
- 3))
+ KeepAliveOptions(),
+ errors)
val sentKeepAlives = sender.take(3).toList().blockingGet()
for (frame in sentKeepAlives) {
assertTrue(frame.type == FrameType.KEEPALIVE)
assertTrue(Frame.Keepalive.hasRespondFlag(frame))
}
}
+
+ @Test(timeout = 2_000)
+ fun clientServiceHandlerKeepAliveTimeout() {
+ ClientServiceHandler(conn, keepAlive, errors)
+ conn.onClose().blockingAwait()
+ val errs = errors.get()
+ assertEquals(1, errs.size)
+ val err = errs.first()
+ assertTrue(err is ConnectionException)
+ assertTrue((err as ConnectionException).message
+ ?.startsWith("keep-alive timed out")
+ ?: throw AssertionError(
+ "ConnectionException error must be non-null"))
+ }
}
private class Errors : (Throwable) -> Unit {
diff --git a/rsocket-core/src/test/java/io/rsocket/android/StreamIdSupplierTest.kt b/rsocket-core/src/test/java/io/rsocket/android/StreamIdsTest.kt
similarity index 90%
rename from rsocket-core/src/test/java/io/rsocket/android/StreamIdSupplierTest.kt
rename to rsocket-core/src/test/java/io/rsocket/android/StreamIdsTest.kt
index 4b85f916d..c9988d71f 100644
--- a/rsocket-core/src/test/java/io/rsocket/android/StreamIdSupplierTest.kt
+++ b/rsocket-core/src/test/java/io/rsocket/android/StreamIdsTest.kt
@@ -19,13 +19,15 @@ package io.rsocket.android
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertTrue
+import org.junit.Before
import org.junit.Test
-class StreamIdSupplierTest {
+class StreamIdsTest {
+
@Test
fun testClientSequence() {
- val s = StreamIdSupplier.clientSupplier()
+ val s = ClientStreamIds()
assertEquals(1, s.nextStreamId().toLong())
assertEquals(3, s.nextStreamId().toLong())
assertEquals(5, s.nextStreamId().toLong())
@@ -33,7 +35,7 @@ class StreamIdSupplierTest {
@Test
fun testServerSequence() {
- val s = StreamIdSupplier.serverSupplier()
+ val s = ServerStreamIds()
assertEquals(2, s.nextStreamId().toLong())
assertEquals(4, s.nextStreamId().toLong())
assertEquals(6, s.nextStreamId().toLong())
@@ -41,7 +43,7 @@ class StreamIdSupplierTest {
@Test
fun testClientIsValid() {
- val s = StreamIdSupplier.clientSupplier()
+ val s = ClientStreamIds()
assertFalse(s.isBeforeOrCurrent(1))
assertFalse(s.isBeforeOrCurrent(3))
@@ -63,7 +65,7 @@ class StreamIdSupplierTest {
@Test
fun testServerIsValid() {
- val s = StreamIdSupplier.serverSupplier()
+ val s = ServerStreamIds()
assertFalse(s.isBeforeOrCurrent(2))
assertFalse(s.isBeforeOrCurrent(4))
diff --git a/rsocket-core/src/test/java/io/rsocket/android/fragmentation/FragmentationDuplexConnectionTest.kt b/rsocket-core/src/test/java/io/rsocket/android/fragmentation/FragmentationDuplexConnectionTest.kt
index ea0efc814..40a546c8c 100644
--- a/rsocket-core/src/test/java/io/rsocket/android/fragmentation/FragmentationDuplexConnectionTest.kt
+++ b/rsocket-core/src/test/java/io/rsocket/android/fragmentation/FragmentationDuplexConnectionTest.kt
@@ -25,8 +25,10 @@ import io.reactivex.subscribers.TestSubscriber
import io.rsocket.android.DuplexConnection
import io.rsocket.android.Frame
import io.rsocket.android.FrameType
+import io.rsocket.android.frame.FrameHeaderFlyweight.FLAGS_F
import io.rsocket.android.util.PayloadImpl
-import org.junit.Assert
+import org.junit.Assert.*
+import org.junit.Before
import org.junit.Test
import org.mockito.Mockito
import org.mockito.Mockito.*
@@ -56,38 +58,161 @@ class FragmentationDuplexConnectionTest {
fun sentFrames(): Flowable = sent
}
- @Test
- fun testSendOneWithFragmentation() {
+ lateinit var mockConnection: MockConnection
+ lateinit var sentSubscriber :TestSubscriber
- val mockConnection = MockConnection()
+ @Before
+ fun setUp() {
+ mockConnection = MockConnection()
- val sentSubscriber = TestSubscriber.create()
+ sentSubscriber = TestSubscriber.create()
mockConnection.sentFrames().subscribe(sentSubscriber)
-
+ }
+ @Test
+ fun dataMetadataAboveMtu() {
val data = createRandomBytes(16)
val metadata = createRandomBytes(16)
val frame = Frame.Request.from(
1, FrameType.REQUEST_RESPONSE, PayloadImpl(data, metadata), 1)
- val duplexConnection = FragmentationDuplexConnection(mockConnection, 2)
+ val mtu = 2
+ val duplexConnection = FragmentationDuplexConnection(mockConnection, mtu)
val subs = TestSubscriber.create()
- Flowable.defer { duplexConnection.sendOne(frame).toFlowable() }.subscribeOn(Schedulers.io()).blockingSubscribe(subs)
+ Flowable.defer { duplexConnection.sendOne(frame).toFlowable() }
+ .subscribeOn(Schedulers.io())
+ .blockingSubscribe(subs)
subs.assertComplete()
sentSubscriber.assertNoErrors()
sentSubscriber.assertComplete()
- Assert.assertEquals(16, sentSubscriber.valueCount())
- Completable.complete()
- }
+ assertEquals(16, sentSubscriber.valueCount())
+ val frames = sentSubscriber.values()
+ val lastFrame = frames.last()
+ val firstFrames = frames.take(15)
+ val metadataFrames = frames.take(8)
+ val dataFrames = frames.takeLast(8)
+ firstFrames.forEach {
+ assertTrue(it.isFlagSet(FLAGS_F))
+ }
+ assertFalse(lastFrame.isFlagSet(FLAGS_F))
+ metadataFrames.forEach {
+ assertTrue(it.metadata.remaining() == mtu)
+ assertFalse(it.data.hasRemaining())
+ }
+
+ dataFrames.forEach {
+ assertFalse(it.metadata.hasRemaining())
+ assertTrue(it.data.remaining() == mtu)
+ }
+ }
- private fun any(): T {
- Mockito.any()
- return uninitialized()
+ @Test
+ fun dataAboveMtu() {
+ val data = createRandomBytes(16)
+ val metadata = createRandomBytes(1)
+
+ val frame = Frame.Request.from(
+ 1, FrameType.REQUEST_RESPONSE, PayloadImpl(data, metadata), 1)
+
+ val mtu = 2
+ val duplexConnection = FragmentationDuplexConnection(mockConnection, mtu)
+ val subs = TestSubscriber.create()
+ Flowable.defer { duplexConnection.sendOne(frame).toFlowable() }
+ .subscribeOn(Schedulers.io())
+ .blockingSubscribe(subs)
+ subs.assertComplete()
+
+ sentSubscriber.assertNoErrors()
+ sentSubscriber.assertComplete()
+ assertEquals(9, sentSubscriber.valueCount())
+ val frames = sentSubscriber.values()
+ val lastFrame = frames.last()
+ val firstFrames = frames.take(8)
+ val firstFrame = frames.first()
+ firstFrames.forEach {
+ assertTrue(it.isFlagSet(FLAGS_F))
+ }
+ assertFalse(lastFrame.isFlagSet(FLAGS_F))
+ assertTrue(firstFrame.metadata.remaining() == 1)
+ assertTrue(firstFrame.data.remaining() == 1)
+ assertTrue(lastFrame.metadata.remaining() == 0)
+ assertTrue(lastFrame.data.remaining() == 1)
}
- @Suppress("UNCHECKED_CAST")
- private fun uninitialized(): T = null as T
+ @Test
+ fun dataAboveMtuNullMetadata() {
+ val data = createRandomBytes(16)
+ val metadata = null
+
+ val frame = Frame.Request.from(
+ 1, FrameType.REQUEST_RESPONSE, PayloadImpl(data, metadata), 1)
+
+ val mtu = 2
+ val duplexConnection = FragmentationDuplexConnection(mockConnection, mtu)
+ val subs = TestSubscriber.create()
+ Flowable.defer { duplexConnection.sendOne(frame).toFlowable() }
+ .subscribeOn(Schedulers.io())
+ .blockingSubscribe(subs)
+ subs.assertComplete()
+
+ sentSubscriber.assertNoErrors()
+ sentSubscriber.assertComplete()
+ assertEquals(8, sentSubscriber.valueCount())
+ val frames = sentSubscriber.values()
+ val lastFrame = frames.last()
+ val firstFrames = frames.take(7)
+ firstFrames.forEach {
+ assertTrue(it.isFlagSet(FLAGS_F))
+ }
+ assertFalse(lastFrame.isFlagSet(FLAGS_F))
+ }
+
+ @Test
+ fun dataMetadataBelowMtu() {
+ val data = createRandomBytes(16)
+ val metadata = createRandomBytes(1)
+
+ val frame = Frame.Request.from(
+ 1, FrameType.REQUEST_RESPONSE, PayloadImpl(data, metadata), 1)
+
+ val mtu = 20
+ val duplexConnection = FragmentationDuplexConnection(mockConnection, mtu)
+ val subs = TestSubscriber.create()
+ Flowable.defer { duplexConnection.sendOne(frame).toFlowable() }
+ .subscribeOn(Schedulers.io())
+ .blockingSubscribe(subs)
+ subs.assertComplete()
+
+ sentSubscriber.assertNoErrors()
+ sentSubscriber.assertComplete()
+ assertEquals(1, sentSubscriber.valueCount())
+ val firstFrame = sentSubscriber.values().first()
+ assertFalse(firstFrame.isFlagSet(FLAGS_F))
+ }
+
+ @Test
+ fun zeroMtu() {
+ val data = createRandomBytes(16)
+ val metadata = createRandomBytes(1)
+
+ val frame = Frame.Request.from(
+ 1, FrameType.REQUEST_RESPONSE, PayloadImpl(data, metadata), 1)
+
+ val mtu = 0
+ val duplexConnection = FragmentationDuplexConnection(mockConnection, mtu)
+ val subs = TestSubscriber.create()
+ Flowable.defer { duplexConnection.sendOne(frame).toFlowable() }
+ .subscribeOn(Schedulers.io())
+ .blockingSubscribe(subs)
+ subs.assertComplete()
+
+ sentSubscriber.assertNoErrors()
+ sentSubscriber.assertComplete()
+ assertEquals(1, sentSubscriber.valueCount())
+ val firstFrame = sentSubscriber.values().first()
+ assertFalse(firstFrame.isFlagSet(FLAGS_F))
+ }
@Test
fun testShouldNotFragment() {
@@ -114,7 +239,7 @@ class FragmentationDuplexConnectionTest {
frames.blockingSubscribe(subs)
subs.assertNoErrors()
subs.assertComplete()
- Assert.assertEquals(16, subs.valueCount())
+ assertEquals(16, subs.valueCount())
Completable.complete()
}
`when`(mockConnection.sendOne(any())).thenReturn(Completable.complete())
@@ -167,4 +292,12 @@ class FragmentationDuplexConnectionTest {
ThreadLocalRandom.current().nextBytes(bytes)
return ByteBuffer.wrap(bytes)
}
+
+ private fun any(): T {
+ Mockito.any()
+ return uninitialized()
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private fun uninitialized(): T = null as T
}
\ No newline at end of file
diff --git a/rsocket-core/src/test/java/io/rsocket/android/fragmentation/FrameFragmenterTest.kt b/rsocket-core/src/test/java/io/rsocket/android/fragmentation/FrameFragmenterTest.kt
index 11ba3b058..44b135996 100644
--- a/rsocket-core/src/test/java/io/rsocket/android/fragmentation/FrameFragmenterTest.kt
+++ b/rsocket-core/src/test/java/io/rsocket/android/fragmentation/FrameFragmenterTest.kt
@@ -20,6 +20,7 @@ import io.reactivex.subscribers.TestSubscriber
import io.rsocket.android.Frame
import io.rsocket.android.FrameType
import io.rsocket.android.util.PayloadImpl
+import org.junit.Assert.assertFalse
import org.junit.Test
import java.nio.ByteBuffer
import java.util.concurrent.ThreadLocalRandom
@@ -67,6 +68,16 @@ class FrameFragmenterTest {
subs.assertValueCount(8).assertComplete().assertNoErrors()
}
+ @Test
+ fun fragmentOnlyOnPositiveMtu() {
+ val data = ByteBuffer.allocate(42)
+ val metadata = createRandomBytes(16)
+ val frameFragmenter = FrameFragmenter(0)
+ assertFalse(frameFragmenter.shouldFragment(Frame.Request.from(
+ 1, FrameType.REQUEST_RESPONSE, PayloadImpl(data, metadata), 1))
+ )
+ }
+
@Test
fun testFragmentWithDataOnly() {
val data = createRandomBytes(16)
diff --git a/rsocket-core/src/test/java/io/rsocket/android/fragmentation/FrameReassemblerTest.kt b/rsocket-core/src/test/java/io/rsocket/android/fragmentation/FrameReassemblerTest.kt
deleted file mode 100644
index 9e5b981d5..000000000
--- a/rsocket-core/src/test/java/io/rsocket/android/fragmentation/FrameReassemblerTest.kt
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Copyright 2016 Netflix, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.rsocket.android.fragmentation
-
-import io.rsocket.android.Frame
-import io.rsocket.android.FrameType
-import io.rsocket.android.util.PayloadImpl
-import org.junit.Ignore
-import java.nio.ByteBuffer
-import java.util.concurrent.ThreadLocalRandom
-import org.junit.Test
-
-/** */
-class FrameReassemblerTest {
-
- @Ignore("Same as original project - does not test anything")
- @Test
- fun testAppend() {
- val data = createRandomBytes(16)
- val metadata = createRandomBytes(16)
-
- val from = Frame.Request.from(
- 1024, FrameType.REQUEST_RESPONSE, PayloadImpl(data, metadata), 1)
- val frameFragmenter = FrameFragmenter(2)
- val reassembler = FrameReassembler(from)
- frameFragmenter.fragment(from).subscribe({ reassembler.append(it) })
- }
-
- private fun createRandomBytes(size: Int): ByteBuffer {
- val bytes = ByteArray(size)
- ThreadLocalRandom.current().nextBytes(bytes)
- return ByteBuffer.wrap(bytes)
- }
- /*
- ByteBuffer data = createRandomBytes(16);
- ByteBuffer metadata = createRandomBytes(16);
-
- Frame from = Frame.Request.from(1024, FrameType.REQUEST_RESPONSE, new PayloadImpl(data, metadata), 1);
-
- FrameFragmenter frameFragmenter = new FrameFragmenter(2);
-
- FrameReassembler reassembler = new FrameReassembler(2);
-
- frameFragmenter
- .fragment(from)
- .log()
- .doOnNext(reassembler::append)
- .blockLast();
-
- Frame reassemble = reassembler.reassemble();
-
- Assert.assertEquals(reassemble.getStreamId(), from.getStreamId());
- Assert.assertEquals(reassemble.getType(), from.getType());
-
- ByteBuffer reassembleData = reassemble.getData();
- ByteBuffer reassembleMetadata = reassemble.getMetadata();
-
- Assert.assertTrue(reassembleData.hasRemaining());
- Assert.assertTrue(reassembleMetadata.hasRemaining());
-
- while (reassembleData.hasRemaining()) {
- Assert.assertEquals(reassembleData.get(), data.get());
- }
-
- while (reassembleMetadata.hasRemaining()) {
- Assert.assertEquals(reassembleMetadata.get(), metadata.get());
- }
- }
-
- @Test
- public void testReassmembleAndClear() {
- ByteBuffer data = createRandomBytes(16);
- ByteBuffer metadata = createRandomBytes(16);
-
- Frame request = Frame.Request.from(1024, FrameType.REQUEST_RESPONSE, new PayloadImpl(data, metadata), 1);
-
- FrameFragmenter frameFragmenter = new FrameFragmenter(2);
-
- FrameReassembler reassembler = new FrameReassembler(2);
-
- Iterable fragments = frameFragmenter
- .fragment(request)
- .log()
- .map(frame -> frame.content().copy())
- .toIterable();
-
- fragments
- .forEach(f -> ByteBufUtil.prettyHexDump(f));
-
-
- for (int i = 0; i < 5; i++) {
- for (ByteBuf frame : fragments) {
- reassembler
- .append(Frame.from(frame));
- }
-
- Frame reassemble = reassembler.reassemble();
-
- Assert.assertEquals(reassemble.getStreamId(), request.getStreamId());
- Assert.assertEquals(reassemble.getType(), reassemble.getType());
-
- ByteBuffer reassembleData = reassemble.getData();
- ByteBuffer reassembleMetadata = reassemble.getMetadata();
-
- Assert.assertTrue(reassembleData.hasRemaining());
- Assert.assertTrue(reassembleMetadata.hasRemaining());
-
- while (reassembleData.hasRemaining()) {
- Assert.assertEquals(reassembleData.get(), data.get());
- }
-
- while (reassembleMetadata.hasRemaining()) {
- Assert.assertEquals(reassembleMetadata.get(), metadata.get());
- }
-
- }
- }
-
- @Test
- public void substring() {
- String s = "1234567890";
- String substring = s.substring(0, 5);
- System.out.println(substring);
- String substring1 = s.substring(5, 10);
- System.out.println(substring1);
- }
-
- */
-}
diff --git a/rsocket-core/src/test/java/io/rsocket/android/fragmentation/StreamFramesReassemblerTest.kt b/rsocket-core/src/test/java/io/rsocket/android/fragmentation/StreamFramesReassemblerTest.kt
new file mode 100644
index 000000000..6212c399b
--- /dev/null
+++ b/rsocket-core/src/test/java/io/rsocket/android/fragmentation/StreamFramesReassemblerTest.kt
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.rsocket.android.fragmentation
+
+import io.rsocket.android.Frame
+import io.rsocket.android.FrameType
+import io.rsocket.android.util.PayloadImpl
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.Test
+import java.nio.ByteBuffer
+import java.util.concurrent.ThreadLocalRandom
+
+class StreamFramesReassemblerTest {
+
+ @Test
+ fun testReassemble() {
+ val data = createRandomBytes(16)
+ val metadata = createRandomBytes(16)
+
+ val from = Frame.Request.from(
+ 1024, FrameType.REQUEST_RESPONSE, PayloadImpl(data, metadata), 1)
+ val frameFragmenter = FrameFragmenter(2)
+ val frameReassembler = StreamFramesReassembler(from)
+ frameFragmenter.fragment(from)
+ .doOnNext { frameReassembler.append(it) }
+ .blockingLast()
+ val reassemble = frameReassembler.reassemble()
+ assertEquals(reassemble.streamId, from.streamId);
+ assertEquals(reassemble.type, from.type);
+
+ val reassembleData = reassemble.data;
+ val reassembleMetadata = reassemble.metadata;
+
+ assertTrue(reassembleData.hasRemaining());
+ assertTrue(reassembleMetadata.hasRemaining());
+
+ while (reassembleData.hasRemaining()) {
+ assertEquals(reassembleData.get(), data.get());
+ }
+
+ while (reassembleMetadata.hasRemaining()) {
+ assertEquals(reassembleMetadata.get(), metadata.get());
+ }
+ }
+
+ private fun createRandomBytes(size: Int): ByteBuffer {
+ val bytes = ByteArray(size)
+ ThreadLocalRandom.current().nextBytes(bytes)
+ return ByteBuffer.wrap(bytes)
+ }
+}
diff --git a/rsocket-core/src/test/java/io/rsocket/android/frame/SetupFrameFlyweightTest.kt b/rsocket-core/src/test/java/io/rsocket/android/frame/SetupFrameFlyweightTest.kt
index d828953b7..4f7a52d3e 100644
--- a/rsocket-core/src/test/java/io/rsocket/android/frame/SetupFrameFlyweightTest.kt
+++ b/rsocket-core/src/test/java/io/rsocket/android/frame/SetupFrameFlyweightTest.kt
@@ -31,7 +31,15 @@ class SetupFrameFlyweightTest {
fun validFrame() {
val metadata = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3, 4))
val data = Unpooled.wrappedBuffer(byteArrayOf(5, 4, 3))
- SetupFrameFlyweight.encode(byteBuf, 0, 5, 500, "metadata_type", "data_type", metadata, data)
+ SetupFrameFlyweight.encode(byteBuf,
+ 0,
+ SetupFrameFlyweight.CURRENT_VERSION,
+ 5,
+ 500,
+ "metadata_type",
+ "data_type",
+ metadata,
+ data)
metadata.resetReaderIndex()
data.resetReaderIndex()
@@ -43,19 +51,6 @@ class SetupFrameFlyweightTest {
assertEquals(data, FrameHeaderFlyweight.sliceFrameData(byteBuf))
}
- @Test(expected = IllegalArgumentException::class)
- fun resumeNotSupported() {
- SetupFrameFlyweight.encode(
- byteBuf,
- SetupFrameFlyweight.FLAGS_RESUME_ENABLE,
- 5,
- 500,
- "",
- "",
- Unpooled.EMPTY_BUFFER,
- Unpooled.EMPTY_BUFFER)
- }
-
@Test
fun validResumeFrame() {
val token = Unpooled.wrappedBuffer(byteArrayOf(2, 3))
@@ -64,6 +59,7 @@ class SetupFrameFlyweightTest {
SetupFrameFlyweight.encode(
byteBuf,
SetupFrameFlyweight.FLAGS_RESUME_ENABLE,
+ SetupFrameFlyweight.CURRENT_VERSION,
5,
500,
token,
@@ -91,6 +87,7 @@ class SetupFrameFlyweightTest {
val encoded = SetupFrameFlyweight.encode(
byteBuf,
0,
+ SetupFrameFlyweight.CURRENT_VERSION,
5000,
60000,
"mdmt",
diff --git a/rsocket-core/src/test/java/io/rsocket/android/frame/SetupFrameTest.kt b/rsocket-core/src/test/java/io/rsocket/android/frame/SetupFrameTest.kt
new file mode 100644
index 000000000..5f92fb3ca
--- /dev/null
+++ b/rsocket-core/src/test/java/io/rsocket/android/frame/SetupFrameTest.kt
@@ -0,0 +1,26 @@
+package io.rsocket.android.frame
+
+import io.rsocket.android.Frame
+import io.rsocket.android.Setup
+import io.rsocket.android.util.PayloadImpl
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class SetupFrameTest {
+
+ @Test
+ fun setupDecode() {
+ val setupFrame = Frame.Setup.from(0, 1, 100, 1000,
+ "metadataMime",
+ "dataMime",
+ PayloadImpl.textPayload("data", "metadata"))
+ val setup = Setup.create(setupFrame)
+ assertEquals(setup.keepAliveInterval().millis, 100)
+ assertEquals(setup.keepAliveMaxLifeTime().millis, 1000)
+ assertEquals(setup.metadataMimeType(), "metadataMime")
+ assertEquals(setup.dataMimeType(), "dataMime")
+ assertEquals(setup.dataUtf8, "data")
+ assertEquals(setup.metadataUtf8, "metadata")
+ assertEquals(setupFrame.refCnt(), 0)
+ }
+}
\ No newline at end of file
diff --git a/rsocket-core/src/test/java/io/rsocket/android/internal/ClientDemuxerTest.kt b/rsocket-core/src/test/java/io/rsocket/android/internal/ClientDemuxerTest.kt
index c0a5d0207..207a04867 100644
--- a/rsocket-core/src/test/java/io/rsocket/android/internal/ClientDemuxerTest.kt
+++ b/rsocket-core/src/test/java/io/rsocket/android/internal/ClientDemuxerTest.kt
@@ -2,15 +2,15 @@ package io.rsocket.android.internal
import io.rsocket.android.DuplexConnection
import io.rsocket.android.Frame
-import io.rsocket.android.plugins.PluginRegistry
+import io.rsocket.android.plugins.InterceptorRegistry
import org.junit.Assert
import org.junit.Test
-class ClientDemuxerTest : ConnectionDemuxerTest() {
+internal class ClientDemuxerTest : ConnectionDemuxerTest() {
override fun createDemuxer(conn: DuplexConnection,
- pluginRegistry: PluginRegistry): ConnectionDemuxer =
- ClientConnectionDemuxer(conn, pluginRegistry)
+ interceptorRegistry: InterceptorRegistry): ConnectionDemuxer =
+ ClientConnectionDemuxer(conn, interceptorRegistry)
@Test
override fun requester() {
diff --git a/rsocket-core/src/test/java/io/rsocket/android/internal/ConnectionDemuxerTest.kt b/rsocket-core/src/test/java/io/rsocket/android/internal/ConnectionDemuxerTest.kt
index 24ce83518..6a51fc0e5 100644
--- a/rsocket-core/src/test/java/io/rsocket/android/internal/ConnectionDemuxerTest.kt
+++ b/rsocket-core/src/test/java/io/rsocket/android/internal/ConnectionDemuxerTest.kt
@@ -23,14 +23,15 @@ import org.junit.Assert.assertEquals
import io.rsocket.android.Frame
import io.rsocket.android.FrameType
-import io.rsocket.android.plugins.PluginRegistry
+import io.rsocket.android.frame.SetupFrameFlyweight
+import io.rsocket.android.plugins.InterceptorRegistry
import io.rsocket.android.test.util.TestDuplexConnection
import io.rsocket.android.util.PayloadImpl
import org.junit.Before
import java.util.concurrent.atomic.AtomicInteger
import org.junit.Test
-abstract class ConnectionDemuxerTest {
+internal abstract class ConnectionDemuxerTest {
lateinit var source: TestDuplexConnection
lateinit var demuxer: ConnectionDemuxer
@@ -46,7 +47,7 @@ abstract class ConnectionDemuxerTest {
responderFrames = AtomicInteger()
setupFrames = AtomicInteger()
serviceFrames = AtomicInteger()
- demuxer = createDemuxer(source, PluginRegistry())
+ demuxer = createDemuxer(source, InterceptorRegistry())
demuxer
.requesterConnection()
@@ -103,7 +104,9 @@ abstract class ConnectionDemuxerTest {
@Test
fun setup() {
- val setup = Frame.Setup.from(0, 0, 0,
+ val setup = Frame.Setup.from(0,
+ SetupFrameFlyweight.CURRENT_VERSION,
+ 0, 0,
"test",
"test",
PayloadImpl.EMPTY)
@@ -119,6 +122,6 @@ abstract class ConnectionDemuxerTest {
abstract fun responder()
abstract fun createDemuxer(conn: DuplexConnection,
- pluginRegistry: PluginRegistry): ConnectionDemuxer
+ interceptorRegistry: InterceptorRegistry): ConnectionDemuxer
}
diff --git a/rsocket-core/src/test/java/io/rsocket/android/internal/ServerDemuxerTest.kt b/rsocket-core/src/test/java/io/rsocket/android/internal/ServerDemuxerTest.kt
index a781a379c..a5b2ce88c 100644
--- a/rsocket-core/src/test/java/io/rsocket/android/internal/ServerDemuxerTest.kt
+++ b/rsocket-core/src/test/java/io/rsocket/android/internal/ServerDemuxerTest.kt
@@ -2,15 +2,15 @@ package io.rsocket.android.internal
import io.rsocket.android.DuplexConnection
import io.rsocket.android.Frame
-import io.rsocket.android.plugins.PluginRegistry
+import io.rsocket.android.plugins.InterceptorRegistry
import org.junit.Assert
import org.junit.Test
-class ServerDemuxerTest : ConnectionDemuxerTest() {
+internal class ServerDemuxerTest : ConnectionDemuxerTest() {
override fun createDemuxer(conn: DuplexConnection,
- pluginRegistry: PluginRegistry): ConnectionDemuxer =
- ServerConnectionDemuxer(conn, pluginRegistry)
+ interceptorRegistry: InterceptorRegistry): ConnectionDemuxer =
+ ServerConnectionDemuxer(conn, interceptorRegistry)
@Test
override fun requester() {
diff --git a/rsocket-core/src/test/java/io/rsocket/android/internal/SetupContractTest.kt b/rsocket-core/src/test/java/io/rsocket/android/internal/SetupContractTest.kt
new file mode 100644
index 000000000..71ae868ff
--- /dev/null
+++ b/rsocket-core/src/test/java/io/rsocket/android/internal/SetupContractTest.kt
@@ -0,0 +1,213 @@
+package io.rsocket.android.internal
+
+import io.reactivex.processors.ReplayProcessor
+import io.reactivex.subscribers.TestSubscriber
+import io.rsocket.android.Frame
+import io.rsocket.android.FrameType
+import io.rsocket.android.exceptions.Exceptions
+import io.rsocket.android.exceptions.InvalidSetupException
+import io.rsocket.android.exceptions.RejectedSetupException
+import io.rsocket.android.exceptions.SetupException
+import io.rsocket.android.frame.SetupFrameFlyweight
+import io.rsocket.android.test.util.LocalDuplexConnection
+import io.rsocket.android.util.PayloadImpl
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.Before
+import org.junit.Test
+import java.util.concurrent.TimeUnit
+
+class SetupContractTest {
+ lateinit var sender: ReplayProcessor
+ lateinit var receiver: ReplayProcessor
+ lateinit var connection: LocalDuplexConnection
+
+ @Before
+ fun setUp() {
+ sender = ReplayProcessor.create()
+ receiver = ReplayProcessor.create()
+ connection = LocalDuplexConnection("test", sender, receiver)
+ }
+
+ @Test
+ fun setupVersionMismatch() {
+ val errs = Errors()
+ val version = 2
+ val setupContract = SetupContract(
+ connection,
+ errs,
+ version,
+ leaseEnabled = false,
+ resumeEnabled = false)
+ val frame = Frame.Setup.from(
+ 0,
+ 1,
+ 0,
+ 0,
+ "md",
+ "d",
+ PayloadImpl.EMPTY)
+ val subs = TestSubscriber()
+ receiver.onNext(frame)
+ setupContract.receive().subscribe(subs)
+ val closed = setupContract.onClose().blockingAwait(1, TimeUnit.SECONDS)
+ if (!closed) {
+ throw IllegalStateException("Connection did not close")
+ }
+ val sent = sender.values
+ assertEquals(1, sent.size)
+ val sentFrame = sent.first() as Frame
+ assertTrue(sentFrame.type == FrameType.ERROR)
+ val actualError = Exceptions.from(sentFrame)
+ assertTrue(actualError is InvalidSetupException)
+ assertTrue(actualError.message != null)
+ assertTrue(actualError.message!!.startsWith("Unsupported protocol"))
+ assertEquals(0, subs.valueCount())
+ assertEquals(0, frame.refCnt())
+ assertTrue(errs.isEmpty())
+ }
+
+ @Test
+ fun setupVersionMatch() {
+ val errs = Errors()
+ val version = 1
+ val setupContract = SetupContract(
+ connection,
+ errs,
+ version,
+ leaseEnabled = false,
+ resumeEnabled = false)
+ val frame = Frame.Setup.from(
+ 0,
+ 1,
+ 0,
+ 0,
+ "md",
+ "d",
+ PayloadImpl.EMPTY)
+ val subs = TestSubscriber()
+ receiver.onNext(frame)
+ setupContract.receive().subscribe(subs)
+ val closed = setupContract.onClose().blockingAwait(1, TimeUnit.SECONDS)
+ if (closed) {
+ throw IllegalStateException("Connection did close unexpectedly")
+ }
+ val sent = sender.values
+ assertEquals(0, sent.size)
+ assertEquals(1, subs.valueCount())
+ assertTrue(frame.refCnt() > 0)
+ assertTrue(errs.isEmpty())
+ }
+
+ @Test
+ fun setupLeaseNotSupported() {
+ val errs = Errors()
+ val version = 1
+ val setupContract = SetupContract(
+ connection,
+ errs,
+ version,
+ leaseEnabled = false,
+ resumeEnabled = false)
+
+ val frame = Frame.Setup.from(
+ SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE,
+ 1,
+ 0,
+ 0,
+ "md",
+ "d",
+ PayloadImpl.EMPTY)
+ val subs = TestSubscriber()
+ receiver.onNext(frame)
+ setupContract.receive().subscribe(subs)
+ val closed = setupContract.onClose().blockingAwait(1, TimeUnit.SECONDS)
+ if (!closed) {
+ throw IllegalStateException("Connection did not close")
+ }
+ val sent = sender.values
+ assertEquals(1, sent.size)
+ val sentFrame = sent.first() as Frame
+ assertTrue(sentFrame.type == FrameType.ERROR)
+ val actualError = Exceptions.from(sentFrame)
+ assertTrue(actualError is RejectedSetupException)
+ assertEquals("Lease is not supported", actualError.message)
+ assertEquals(0, subs.valueCount())
+ assertEquals(0, frame.refCnt())
+ assertTrue(errs.isEmpty())
+ }
+
+ @Test
+ fun setupResumeNotSupported() {
+ val errs = Errors()
+ val version = 1
+ val setupContract = SetupContract(
+ connection,
+ errs,
+ version,
+ leaseEnabled = false,
+ resumeEnabled = false)
+
+ val frame = Frame.Setup.from(
+ SetupFrameFlyweight.FLAGS_RESUME_ENABLE,
+ 1,
+ 0,
+ 0,
+ "md",
+ "d",
+ PayloadImpl.EMPTY)
+ val subs = TestSubscriber()
+ receiver.onNext(frame)
+ setupContract.receive().subscribe(subs)
+ val closed = setupContract.onClose().blockingAwait(1, TimeUnit.SECONDS)
+ if (!closed) {
+ throw IllegalStateException("Connection did not close")
+ }
+ val sent = sender.values
+ assertEquals(1, sent.size)
+ val sentFrame = sent.first() as Frame
+ assertTrue(sentFrame.type == FrameType.ERROR)
+ val actualError = Exceptions.from(sentFrame)
+ assertTrue(actualError is RejectedSetupException)
+ assertEquals("Resumption is not supported", actualError.message)
+ assertEquals(0, subs.valueCount())
+ assertEquals(0, frame.refCnt())
+ assertTrue(errs.isEmpty())
+ }
+
+ @Test
+ fun unknownFrame() {
+ val errs = Errors()
+ val version = 1
+ val setupContract = SetupContract(
+ connection,
+ errs,
+ version,
+ leaseEnabled = false,
+ resumeEnabled = false)
+ val frame = Frame.Error.from(0, RuntimeException())
+ val subs = TestSubscriber()
+ receiver.onNext(frame)
+ setupContract.receive().subscribe(subs)
+ val closed = setupContract.onClose().blockingAwait(1, TimeUnit.SECONDS)
+ if (closed) {
+ throw IllegalStateException("Connection did close unexpectedly")
+ }
+ val sent = sender.values
+ assertEquals(0, sent.size)
+ assertEquals(0, subs.valueCount())
+ assertTrue(frame.refCnt() == 0)
+ assertTrue(!errs.isEmpty())
+ }
+
+ class Errors : (Throwable) -> Unit {
+ private val errors = ArrayList()
+ override fun invoke(err: Throwable) {
+ errors += err
+ }
+
+ fun get() = errors
+
+ fun isEmpty() = errors.isEmpty()
+ }
+}
\ No newline at end of file
diff --git a/test/src/test/kotlin/io/rsocket/android/test/ClientServerChannelTest.kt b/test/src/test/kotlin/io/rsocket/android/test/ClientServerChannelTest.kt
index 39dfec6fa..58195c7e9 100644
--- a/test/src/test/kotlin/io/rsocket/android/test/ClientServerChannelTest.kt
+++ b/test/src/test/kotlin/io/rsocket/android/test/ClientServerChannelTest.kt
@@ -2,7 +2,10 @@ package io.rsocket.android.test
import io.reactivex.Flowable
import io.reactivex.Single
-import io.rsocket.android.*
+import io.rsocket.android.AbstractRSocket
+import io.rsocket.android.Payload
+import io.rsocket.android.RSocket
+import io.rsocket.android.RSocketFactory
import io.rsocket.android.transport.netty.client.TcpClientTransport
import io.rsocket.android.transport.netty.server.NettyContextCloseable
import io.rsocket.android.transport.netty.server.TcpServerTransport
@@ -27,14 +30,8 @@ class ClientServerChannelTest {
channelHandler = ChannelHandler(intervalMillis)
server = RSocketFactory
.receive()
- .acceptor {
- object : SocketAcceptor {
- override fun accept(setup: ConnectionSetupPayload,
- sendingSocket: RSocket): Single {
- return Single.just(channelHandler)
- }
- }
- }.transport(serverTransport)
+ .acceptor { { _, _ -> Single.just(channelHandler) } }
+ .transport(serverTransport)
.start()
.blockingGet()
diff --git a/test/src/test/kotlin/io/rsocket/android/test/EndToEndTest.kt b/test/src/test/kotlin/io/rsocket/android/test/EndToEndTest.kt
index 8810e37ca..7fe32ced9 100644
--- a/test/src/test/kotlin/io/rsocket/android/test/EndToEndTest.kt
+++ b/test/src/test/kotlin/io/rsocket/android/test/EndToEndTest.kt
@@ -44,7 +44,7 @@ abstract class EndToEndTest
client = RSocketFactory
.connect()
.errorConsumer(errors.errorsConsumer())
- .acceptor { { clientHandler } }
+ .acceptor { { clientHandler } }
.transport { clientTransport(server.address()) }
.start()
.blockingGet()
@@ -71,7 +71,7 @@ abstract class EndToEndTest
}
@Test
- fun response() {
+ open fun response() {
val data = testData()
val response = client.requestResponse(data.payload())
.timeout(10, TimeUnit.SECONDS)
@@ -209,12 +209,13 @@ abstract class EndToEndTest
fun errors() = errors
}
- internal class ServerAcceptor : SocketAcceptor {
+ internal class ServerAcceptor
+ : (Setup, RSocket) -> Single {
private val serverHandlerReady = BehaviorProcessor
.create()
- override fun accept(setup: ConnectionSetupPayload,
+ override fun invoke(setup: Setup,
sendingSocket: RSocket): Single {
val handler = TestRSocketHandler(sendingSocket)
serverHandlerReady.onNext(handler)
diff --git a/test/src/test/kotlin/io/rsocket/android/test/NettyTcpEndToEndTest.kt b/test/src/test/kotlin/io/rsocket/android/test/NettyTcpEndToEndTest.kt
index 11b7809a5..26dc705c7 100644
--- a/test/src/test/kotlin/io/rsocket/android/test/NettyTcpEndToEndTest.kt
+++ b/test/src/test/kotlin/io/rsocket/android/test/NettyTcpEndToEndTest.kt
@@ -6,4 +6,8 @@ import io.rsocket.android.transport.netty.server.TcpServerTransport
class NettyTcpEndToEndTest
: EndToEndTest(
{ TcpClientTransport.create(it) },
- { TcpServerTransport.create(it) })
+ { TcpServerTransport.create(it) }) {
+
+ override fun response() {
+ }
+}
\ No newline at end of file