Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#

group=io.rsocket.kotlin
version=0.9.7
version=0.9.8

buildScanPluginVersion=2.4.1
dependencyManagementPluginVersion=1.0.8.RELEASE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import io.reactivex.processors.UnicastProcessor
import io.rsocket.kotlin.*
import io.rsocket.kotlin.exceptions.ApplicationException
import io.rsocket.kotlin.exceptions.ChannelRequestException
import io.rsocket.kotlin.internal.util.reactiveStreamsRequestN
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
Expand Down Expand Up @@ -121,10 +122,11 @@ internal class RSocketRequester(
val streamId = streamIds.nextStreamId(receivers)
val receiver = StreamReceiver.create()
receivers[streamId] = receiver
val reqN = Cond()
var isFirstRequestN = true

receiver.doOnRequestIfActive { requestN ->
val frame = if (reqN.first()) {
val frame = if (isFirstRequestN) {
isFirstRequestN = false
Frame.Request.from(
streamId,
FrameType.REQUEST_STREAM,
Expand All @@ -148,35 +150,37 @@ internal class RSocketRequester(
return Flowable.defer {
val receiver = StreamReceiver.create()
val streamId = streamIds.nextStreamId(receivers)
val reqN = Cond()
var firstReqN = true
var firstReqPayload = true

receiver.doOnRequestIfActive { requestN ->

if (reqN.first()) {
val wrappedRequest = request.compose {
val sender = RequestingPublisher.wrap(it)
sender.request(1)
senders[streamId] = sender
receivers[streamId] = receiver
sender
}.publish().autoConnect(2)
if (firstReqN) {
firstReqN = false

val first = wrappedRequest.take(1)
.map { payload ->
Frame.Request.from(
streamId,
FrameType.REQUEST_CHANNEL,
payload,
requestN)
val requestFrames = request
.compose {
val sender = RequestingPublisher.wrap(it)
sender.request(1)
senders[streamId] = sender
receivers[streamId] = receiver
sender
}
val rest = wrappedRequest.skip(1)
.map { payload ->
Frame.PayloadFrame.from(
streamId,
FrameType.NEXT,
payload)
if (firstReqPayload) {
firstReqPayload = false
Frame.Request.from(
streamId,
FrameType.REQUEST_CHANNEL,
payload,
requestN)
} else {
Frame.PayloadFrame.from(
streamId,
FrameType.NEXT,
payload)
}
}
val requestFrames = Flowable.concatArrayEager(first, rest)
requestFrames.subscribe(
ChannelRequestSubscriber(
{ payload -> frameSender.send(payload) },
Expand Down Expand Up @@ -248,10 +252,7 @@ internal class RSocketRequester(
FrameType.NEXT -> receiver.onNext(DefaultPayload(frame))
FrameType.REQUEST_N -> {
val sender = senders[streamId]
sender?.let {
val n = Frame.RequestN.requestN(frame).toLong()
it.request(n)
}
sender?.request(reactiveStreamsRequestN(Frame.RequestN.requestN(frame)))
}
FrameType.COMPLETE -> {
receiver.onComplete()
Expand Down Expand Up @@ -320,18 +321,6 @@ internal class RSocketRequester(
}
}

private class Cond {
private var first = true

fun first(): Boolean =
if (first) {
first = false
true
} else {
false
}
}

private class ChannelRequestSubscriber(private val next: (Frame) -> Unit,
private val error: (Throwable) -> Unit,
private val complete: (Boolean) -> Unit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

package io.rsocket.kotlin.internal

import io.reactivex.Completable
import io.reactivex.Flowable
import io.reactivex.Single
import io.reactivex.*
import io.reactivex.disposables.Disposable
import io.rsocket.kotlin.*
import io.rsocket.kotlin.Frame.Request.initialRequestN
Expand All @@ -27,6 +25,7 @@ import io.rsocket.kotlin.exceptions.ApplicationException
import io.rsocket.kotlin.internal.frame.FrameHeaderFlyweight.FLAGS_C
import io.rsocket.kotlin.internal.frame.FrameHeaderFlyweight.FLAGS_M
import io.rsocket.kotlin.DefaultPayload
import io.rsocket.kotlin.internal.util.reactiveStreamsRequestN
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
Expand Down Expand Up @@ -57,8 +56,7 @@ internal class RSocketResponder(

receiveDisposable = connection
.receive()
.concatMap { frame -> handleFrame(frame) }
.subscribe({}, { completion.error(it) })
.subscribe({ handleFrame(it) }, { completion.error(it) })

connection
.onClose()
Expand Down Expand Up @@ -111,8 +109,8 @@ internal class RSocketResponder(

override fun onClose(): Completable = connection.onClose()

private fun handleFrame(frame: Frame): Flowable<Void> {
return try {
private fun handleFrame(frame: Frame) {
try {
val streamId = frame.streamId
when (frame.type) {
FrameType.FIRE_AND_FORGET -> handleFireAndForget(streamId, fireAndForget(DefaultPayload(frame)))
Expand All @@ -121,96 +119,126 @@ internal class RSocketResponder(
FrameType.REQUEST_N -> handleRequestN(streamId, frame)
FrameType.REQUEST_STREAM -> handleStream(streamId, requestStream(DefaultPayload(frame)), initialRequestN(frame))
FrameType.REQUEST_CHANNEL -> handleChannel(streamId, frame)
FrameType.METADATA_PUSH -> metadataPush(DefaultPayload(frame))
FrameType.METADATA_PUSH -> handleMetadataPush(metadataPush(DefaultPayload(frame)))
FrameType.NEXT -> handleNext(streamId, frame)
FrameType.COMPLETE -> handleComplete(streamId)
FrameType.ERROR -> handleError(streamId, frame)
FrameType.NEXT_COMPLETE -> handleNextComplete(streamId, frame)
else -> handleUnsupportedFrame(frame)
}.toFlowable()
}
} finally {
frame.release()
}
}

private fun handleUnsupportedFrame(frame: Frame): Completable {
private fun handleUnsupportedFrame(frame: Frame) {
errorConsumer(IllegalArgumentException("Unsupported frame: $frame"))
return Completable.complete()
}

private fun handleNextComplete(streamId: Int, frame: Frame): Completable {
private fun handleNextComplete(streamId: Int, frame: Frame) {
val receiver = channelReceivers[streamId]
receiver?.onNext(DefaultPayload(frame))
receiver?.onComplete()
return Completable.complete()
}

private fun handleError(streamId: Int, frame: Frame): Completable {
private fun handleError(streamId: Int, frame: Frame) {
val receiver = channelReceivers[streamId]
receiver?.onError(ApplicationException(Frame.Error.message(frame)))
return Completable.complete()
}

private fun handleComplete(streamId: Int): Completable {
private fun handleComplete(streamId: Int) {
val receiver = channelReceivers[streamId]
receiver?.onComplete()
return Completable.complete()
}

private fun handleNext(streamId: Int, frame: Frame): Completable {
private fun handleNext(streamId: Int, frame: Frame) {
val receiver = channelReceivers[streamId]
receiver?.onNext(DefaultPayload(frame))
return Completable.complete()
}

private fun handleFireAndForget(streamId: Int,
result: Completable): Completable {
return result
.doOnSubscribe { d -> sendingSubscriptions[streamId] = subscription(d) }
.doOnError(errorConsumer)
.doFinally { sendingSubscriptions -= streamId }
result: Completable) {
result.subscribe(object : CompletableObserver {
override fun onComplete() {
sendingSubscriptions -= streamId
}

override fun onSubscribe(d: Disposable) {
sendingSubscriptions[streamId] = subscription(d)
}

override fun onError(e: Throwable) {
sendingSubscriptions -= streamId
errorConsumer(e)
}
})
}

private fun handleRequestResponse(streamId: Int,
response: Single<Payload>): Completable {
return response
.doOnSubscribe { d -> sendingSubscriptions[streamId] = subscription(d) }
.map { payload ->
var flags = FLAGS_C
if (payload.hasMetadata) {
flags = Frame.setFlag(flags, FLAGS_M)
}
Frame.PayloadFrame.from(
streamId,
FrameType.NEXT_COMPLETE,
payload,
flags)
response: Single<Payload>) {
response.subscribe(object : SingleObserver<Payload> {

override fun onSuccess(payload: Payload) {
sendingSubscriptions -= streamId
var flags = FLAGS_C
if (payload.hasMetadata) {
flags = Frame.setFlag(flags, FLAGS_M)
}
.onErrorResumeNext { t -> Single.just(Frame.Error.from(streamId, t)) }
.doOnSuccess { frameSender.send(it) }
.doFinally { sendingSubscriptions -= streamId }
.ignoreElement()
frameSender.send(Frame.PayloadFrame.from(
streamId,
FrameType.NEXT_COMPLETE,
payload,
flags))
}

override fun onSubscribe(d: Disposable) {
sendingSubscriptions[streamId] = subscription(d)
}

override fun onError(e: Throwable) {
sendingSubscriptions -= streamId
val frame = when (e) {
is NoSuchElementException -> Frame.PayloadFrame.from(streamId, FrameType.COMPLETE)
else -> Frame.Error.from(streamId, e)
}
frameSender.send(frame)
}
})
}

private fun handleStream(streamId: Int,
response: Flowable<Payload>,
initialRequestN: Int): Completable {
initialRequestN: Int) {
response
.map { payload -> Frame.PayloadFrame.from(streamId, FrameType.NEXT, payload) }
.compose { frameFlux ->
val frames = RequestingPublisher.wrap(frameFlux)
sendingSubscriptions[streamId] = frames
frames.request(initialRequestN.toLong())
frames.request(reactiveStreamsRequestN(initialRequestN))
frames
}
.concatWith(Flowable.just(Frame.PayloadFrame.from(streamId, FrameType.COMPLETE)))
.onErrorResumeNext { t: Throwable -> Flowable.just(Frame.Error.from(streamId, t)) }
.doFinally { sendingSubscriptions -= streamId }
.subscribe { frameSender.send(it) }
return Completable.complete()
.subscribe(object : Subscriber<Payload> {

override fun onSubscribe(s: Subscription) {
s.request(Long.MAX_VALUE)
}

override fun onNext(payload: Payload) {
frameSender.send(Frame.PayloadFrame.from(streamId, FrameType.NEXT, payload))
}

override fun onComplete() {
sendingSubscriptions -= streamId
frameSender.send(Frame.PayloadFrame.from(streamId, FrameType.COMPLETE))
}

override fun onError(t: Throwable) {
sendingSubscriptions -= streamId
frameSender.send(Frame.Error.from(streamId, t))
}
})
}

private fun handleChannel(streamId: Int, firstFrame: Frame): Completable {
private fun handleChannel(streamId: Int, firstFrame: Frame) {
val receiver = StreamReceiver.create()
channelReceivers[streamId] = receiver

Expand All @@ -222,25 +250,32 @@ internal class RSocketResponder(

receiver.onNext(DefaultPayload(firstFrame))

return handleStream(
handleStream(
streamId,
requestChannel(request),
initialRequestN(firstFrame))
}

private fun handleCancel(streamId: Int): Completable {
private fun handleMetadataPush(result: Completable) {
result.subscribe(object : CompletableObserver {
override fun onComplete() {
}

override fun onSubscribe(d: Disposable) {
}

override fun onError(e: Throwable) = errorConsumer(e)
})
}

private fun handleCancel(streamId: Int) {
val subscription = sendingSubscriptions.remove(streamId)
subscription?.cancel()
return Completable.complete()
}

private fun handleRequestN(streamId: Int, frame: Frame): Completable {
private fun handleRequestN(streamId: Int, frame: Frame) {
val subscription = sendingSubscriptions[streamId]
subscription?.let {
val n = Frame.RequestN.requestN(frame).toLong()
it.request(if (n >= Integer.MAX_VALUE) Long.MAX_VALUE else n)
}
return Completable.complete()
subscription?.request(reactiveStreamsRequestN(Frame.RequestN.requestN(frame)))
}

private inner class Lifecycle {
Expand All @@ -261,8 +296,8 @@ internal class RSocketResponder(
.close()
.subscribe({}, errorConsumer)

cleanUp(sendingSubscriptions, { it.cancel() })
cleanUp(channelReceivers, { it.onError(err) })
cleanUp(sendingSubscriptions) { it.cancel() }
cleanUp(channelReceivers) { it.onError(err) }
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.rsocket.kotlin.internal.util

internal fun reactiveStreamsRequestN(initialRequestN: Int) =
if (initialRequestN == Int.MAX_VALUE) {
Long.MAX_VALUE
} else {
initialRequestN.toLong()
}