Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions rsocket-core/src/main/java/io/rsocket/android/Duration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@ import java.util.concurrent.TimeUnit

data class Duration(val value: Long, val unit: TimeUnit) {

val toMillis = unit.toMillis(value)
val millis = unit.toMillis(value)

val intMillis = unit.toMillis(value).toInt()

companion object {
val ZERO = Duration(0, TimeUnit.MILLISECONDS)

fun ofSeconds(n: Long) = Duration(n, TimeUnit.SECONDS)

fun ofSeconds(n: Int) = Duration(n.toLong(), TimeUnit.SECONDS)

fun ofMillis(n: Long) = Duration(n, TimeUnit.MILLISECONDS)

fun ofMillis(n: Int) = Duration(n.toLong(), TimeUnit.MILLISECONDS)
}
}

156 changes: 135 additions & 21 deletions rsocket-core/src/main/java/io/rsocket/android/Frame.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,20 @@
*/
package io.rsocket.android

import io.rsocket.android.frame.FrameHeaderFlyweight.FLAGS_M

import io.netty.buffer.*
import com.sun.org.apache.xpath.internal.operations.Bool
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.ByteBufHolder
import io.netty.buffer.Unpooled
import io.netty.util.IllegalReferenceCountException
import io.netty.util.Recycler
import io.netty.util.Recycler.Handle
import io.netty.util.ResourceLeakDetector
import io.rsocket.android.frame.ErrorFrameFlyweight
import io.rsocket.android.frame.FrameHeaderFlyweight
import io.rsocket.android.frame.KeepaliveFrameFlyweight
import io.rsocket.android.frame.LeaseFrameFlyweight
import io.rsocket.android.frame.RequestFrameFlyweight
import io.rsocket.android.frame.RequestNFrameFlyweight
import io.rsocket.android.frame.SetupFrameFlyweight
import io.rsocket.android.frame.VersionFlyweight
import io.rsocket.android.frame.*
import io.rsocket.android.frame.FrameHeaderFlyweight.FLAGS_M
import org.slf4j.LoggerFactory
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import org.slf4j.LoggerFactory

/**
* Represents a Frame sent over a [DuplexConnection].
Expand All @@ -51,10 +47,12 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold

/** Return the content which is held by this [Frame]. */
override fun content(): ByteBuf {
if (content!!.refCnt() <= 0) {
throw IllegalReferenceCountException(content!!.refCnt())
}
return content as ByteBuf
val c = content
return if (c == null) {
throw IllegalReferenceCountException(0)
} else if (c.refCnt() <= 0) {
throw IllegalReferenceCountException(c.refCnt())
} else content as ByteBuf
}

/** Creates a deep copy of this [Frame]. */
Expand All @@ -79,7 +77,7 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
* Returns the reference count of this object. If `0`, it means this object has been
* deallocated.
*/
override fun refCnt(): Int = content!!.refCnt()
override fun refCnt(): Int = content?.refCnt() ?: 0

/** Increases the reference count by `1`. */
override fun retain(): Frame {
Expand Down Expand Up @@ -210,11 +208,17 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
*/
fun flags(): Int = FrameHeaderFlyweight.flags(content!!)

fun hasMetadata(): Boolean = isFlagSet(this.flags(), FLAGS_M)
fun isFlagSet(flag: Int): Boolean {
return isFlagSet(this.flags(), flag)
}

fun hasMetadata(): Boolean = isFlagSet(FLAGS_M)

val dataUtf8: String
get() = StandardCharsets.UTF_8.decode(data).toString()

val isFragmentable: Boolean
get() = type.isFragmentable
/* TODO:
*
* fromRequest(type, id, payload)
Expand All @@ -227,6 +231,7 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold

fun from(
flags: Int,
version: Int,
keepaliveInterval: Int,
maxLifetime: Int,
metadataMimeType: String,
Expand All @@ -250,6 +255,7 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
SetupFrameFlyweight.encode(
frame.content!!,
flags,
version,
keepaliveInterval,
maxLifetime,
metadataMimeType,
Expand All @@ -259,6 +265,25 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
return frame
}

fun from(
flags: Int,
keepaliveInterval: Int,
maxLifetime: Int,
metadataMimeType: String,
dataMimeType: String,
payload: Payload): Frame {

return from(
flags,
SetupFrameFlyweight.CURRENT_VERSION,
keepaliveInterval,
maxLifetime,
metadataMimeType,
dataMimeType,
payload)
}


fun getFlags(frame: Frame): Int {
ensureFrameType(FrameType.SETUP, frame)
val flags = FrameHeaderFlyweight.flags(frame.content!!)
Expand All @@ -271,6 +296,20 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
return SetupFrameFlyweight.version(frame.content!!)
}

fun resumeEnabled(frame: Frame): Boolean {
ensureFrameType(FrameType.SETUP, frame)
return Frame.isFlagSet(
frame.flags(),
SetupFrameFlyweight.FLAGS_RESUME_ENABLE)
}

fun leaseEnabled(frame: Frame): Boolean {
ensureFrameType(FrameType.SETUP, frame)
return Frame.isFlagSet(
frame.flags(),
SetupFrameFlyweight.FLAGS_WILL_HONOR_LEASE)
}

fun keepaliveInterval(frame: Frame): Int {
ensureFrameType(FrameType.SETUP, frame)
return SetupFrameFlyweight.keepaliveInterval(frame.content!!)
Expand Down Expand Up @@ -429,14 +468,14 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
fun from(
streamId: Int,
type: FrameType,
metadata: ByteBuf,
metadata: ByteBuf?,
data: ByteBuf,
initialRequestN: Int,
flags: Int): Frame {
val frame = RECYCLER.get()
frame.content = ByteBufAllocator.DEFAULT.buffer(
RequestFrameFlyweight.computeFrameLength(
type, metadata.readableBytes(), data.readableBytes()))
type, metadata?.readableBytes(), data.readableBytes()))
frame.content!!.writerIndex(
RequestFrameFlyweight.encode(
frame.content!!,
Expand All @@ -449,6 +488,21 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
return frame
}

fun from(streamId: Int,
type: FrameType,
metadata: ByteBuf?,
data: ByteBuf,
flags: Int): Frame {

return PayloadFrame.from(
streamId,
type,
metadata,
data,
flags)
}


fun initialRequestN(frame: Frame): Int {
val type = frame.type
if (!type.isRequestType) {
Expand Down Expand Up @@ -538,6 +592,66 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
}
}

object Fragmentation {

fun assembleFrame(blueprintFrame: Frame,
metadata: ByteBuf,
data: ByteBuf): Frame =

create(blueprintFrame,
metadata,
data,
{ it and FrameHeaderFlyweight.FLAGS_F.inv() })

fun sliceFrame(blueprintFrame: Frame,
metadata: ByteBuf?,
data: ByteBuf,
additionalFlags: Int): Frame =

create(blueprintFrame,
metadata,
data,
{ it or additionalFlags })

private inline fun create(blueprintFrame: Frame,
metadata: ByteBuf?,
data: ByteBuf,
modifyFlags: (Int) -> Int): Frame =
when (blueprintFrame.type) {
FrameType.FIRE_AND_FORGET,
FrameType.REQUEST_RESPONSE -> {
Frame.Request.from(
blueprintFrame.streamId,
blueprintFrame.type,
metadata,
data,
modifyFlags(blueprintFrame.flags()))
}
FrameType.NEXT,
FrameType.NEXT_COMPLETE -> {
Frame.PayloadFrame.from(
blueprintFrame.streamId,
blueprintFrame.type,
metadata,
data,
modifyFlags(blueprintFrame.flags()))
}

FrameType.REQUEST_STREAM,
FrameType.REQUEST_CHANNEL -> {
Frame.Request.from(
blueprintFrame.streamId,
blueprintFrame.type,
metadata,
data,
Frame.Request.initialRequestN(blueprintFrame),
modifyFlags(blueprintFrame.flags()))
}
else -> throw AssertionError("Non-fragmentable frame: " +
"${blueprintFrame.type}")
}
}

override fun toString(): String {
val type = FrameHeaderFlyweight.frameType(content!!)
val payload = StringBuilder()
Expand Down Expand Up @@ -587,7 +701,7 @@ class Frame private constructor(private val handle: Handle<Frame>) : ByteBufHold
}

companion object {
val NULL_BYTEBUFFER:ByteBuffer = ByteBuffer.allocateDirect(0)
val NULL_BYTEBUFFER: ByteBuffer = ByteBuffer.allocateDirect(0)

private val RECYCLER = object : Recycler<Frame>() {
override fun newObject(handle: Handle<Frame>): Frame {
Expand Down
46 changes: 33 additions & 13 deletions rsocket-core/src/main/java/io/rsocket/android/FrameType.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,26 @@ enum class FrameType(val encodedType: Int, private val flags: Int = 0) {
LEASE(0x02, Flags.CAN_HAVE_METADATA),
KEEPALIVE(0x03, Flags.CAN_HAVE_DATA),
// Requester to start request
REQUEST_RESPONSE(0x04, Flags.CAN_HAVE_METADATA_AND_DATA or Flags.IS_REQUEST_TYPE),
FIRE_AND_FORGET(0x05, Flags.CAN_HAVE_METADATA_AND_DATA or Flags.IS_REQUEST_TYPE),
REQUEST_RESPONSE(0x04,
Flags.CAN_HAVE_METADATA_AND_DATA or
Flags.IS_REQUEST_TYPE or
Flags.IS_FRAGMENTABLE),
FIRE_AND_FORGET(0x05,
Flags.CAN_HAVE_METADATA_AND_DATA or
Flags.IS_REQUEST_TYPE or
Flags.IS_FRAGMENTABLE),
REQUEST_STREAM(
0x06, Flags.CAN_HAVE_METADATA_AND_DATA or Flags.IS_REQUEST_TYPE or Flags.HAS_INITIAL_REQUEST_N),
0x06,
Flags.CAN_HAVE_METADATA_AND_DATA or
Flags.IS_REQUEST_TYPE or
Flags.HAS_INITIAL_REQUEST_N
or Flags.IS_FRAGMENTABLE),
REQUEST_CHANNEL(
0x07, Flags.CAN_HAVE_METADATA_AND_DATA or Flags.IS_REQUEST_TYPE or Flags.HAS_INITIAL_REQUEST_N),
0x07,
Flags.CAN_HAVE_METADATA_AND_DATA or
Flags.IS_REQUEST_TYPE or
Flags.HAS_INITIAL_REQUEST_N or
Flags.IS_FRAGMENTABLE),
// Requester mid-stream
REQUEST_N(0x08),
CANCEL(0x09, Flags.CAN_HAVE_METADATA),
Expand All @@ -42,22 +56,28 @@ enum class FrameType(val encodedType: Int, private val flags: Int = 0) {
RESUME(0x0D),
RESUME_OK(0x0E),
// synthetic types from Responder for use by the rest of the machinery
NEXT(0xA0, Flags.CAN_HAVE_METADATA_AND_DATA),
NEXT(0xA0,
Flags.CAN_HAVE_METADATA_AND_DATA or
Flags.IS_FRAGMENTABLE),
COMPLETE(0xB0),
NEXT_COMPLETE(0xC0, Flags.CAN_HAVE_METADATA_AND_DATA),
NEXT_COMPLETE(0xC0,
Flags.CAN_HAVE_METADATA_AND_DATA or
Flags.IS_FRAGMENTABLE),
EXT(0xFFFF, Flags.CAN_HAVE_METADATA_AND_DATA);

private object Flags {

internal val CAN_HAVE_DATA = 1
internal val CAN_HAVE_METADATA = 2
internal val CAN_HAVE_METADATA_AND_DATA = 3
internal val IS_REQUEST_TYPE = 4
internal val HAS_INITIAL_REQUEST_N = 8
internal const val CAN_HAVE_DATA = 1
internal const val CAN_HAVE_METADATA = 2
internal const val CAN_HAVE_METADATA_AND_DATA = 3
internal const val IS_REQUEST_TYPE = 4
internal const val HAS_INITIAL_REQUEST_N = 8
internal const val IS_FRAGMENTABLE = 16
}

val isRequestType: Boolean
get() = Flags.IS_REQUEST_TYPE == flags and Flags.IS_REQUEST_TYPE
val isFragmentable = Flags.IS_FRAGMENTABLE == (flags and Flags.IS_FRAGMENTABLE)

val isRequestType: Boolean = Flags.IS_REQUEST_TYPE == (flags and Flags.IS_REQUEST_TYPE)

fun hasInitialRequestN(): Boolean = Flags.HAS_INITIAL_REQUEST_N == flags and Flags.HAS_INITIAL_REQUEST_N

Expand Down
2 changes: 1 addition & 1 deletion rsocket-core/src/main/java/io/rsocket/android/RSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ interface RSocket : Availability, Closeable {
/**
* Request-Channel interaction model of `RSocket`.
*
* @param payloads Stream of request payloads.
* @param payloads Stream of send payloads.
* @return Stream of response payloads.
*/
fun requestChannel(payloads: Publisher<Payload>): Flowable<Payload>
Expand Down
Loading