From 6ef52167b872675401036c74266ed8e64b88021f Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Sat, 14 Sep 2019 16:06:19 +0300 Subject: [PATCH] simplify and reduce allocations --- gradle.properties | 2 +- .../kotlin/internal/RSocketRequester.kt | 69 ++++---- .../kotlin/internal/RSocketResponder.kt | 157 +++++++++++------- .../io/rsocket/kotlin/internal/util/Util.kt | 8 + 4 files changed, 134 insertions(+), 102 deletions(-) create mode 100644 rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/util/Util.kt diff --git a/gradle.properties b/gradle.properties index d7818cf2f..86c40fc41 100644 --- a/gradle.properties +++ b/gradle.properties @@ -15,7 +15,7 @@ # group=io.rsocket.kotlin -version=0.9.7 +version=0.9.8 buildScanPluginVersion=2.4.1 dependencyManagementPluginVersion=1.0.8.RELEASE diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt index f0116bd8d..be839d765 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RSocketRequester.kt @@ -23,6 +23,7 @@ import io.reactivex.processors.UnicastProcessor import io.rsocket.kotlin.* import io.rsocket.kotlin.exceptions.ApplicationException import io.rsocket.kotlin.exceptions.ChannelRequestException +import io.rsocket.kotlin.internal.util.reactiveStreamsRequestN import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import org.reactivestreams.Subscription @@ -121,10 +122,11 @@ internal class RSocketRequester( val streamId = streamIds.nextStreamId(receivers) val receiver = StreamReceiver.create() receivers[streamId] = receiver - val reqN = Cond() + var isFirstRequestN = true receiver.doOnRequestIfActive { requestN -> - val frame = if (reqN.first()) { + val frame = if (isFirstRequestN) { + isFirstRequestN = false Frame.Request.from( streamId, FrameType.REQUEST_STREAM, @@ -148,35 +150,37 @@ internal class RSocketRequester( return Flowable.defer { val receiver = StreamReceiver.create() val streamId = streamIds.nextStreamId(receivers) - val reqN = Cond() + var firstReqN = true + var firstReqPayload = true receiver.doOnRequestIfActive { requestN -> - if (reqN.first()) { - val wrappedRequest = request.compose { - val sender = RequestingPublisher.wrap(it) - sender.request(1) - senders[streamId] = sender - receivers[streamId] = receiver - sender - }.publish().autoConnect(2) + if (firstReqN) { + firstReqN = false - val first = wrappedRequest.take(1) - .map { payload -> - Frame.Request.from( - streamId, - FrameType.REQUEST_CHANNEL, - payload, - requestN) + val requestFrames = request + .compose { + val sender = RequestingPublisher.wrap(it) + sender.request(1) + senders[streamId] = sender + receivers[streamId] = receiver + sender } - val rest = wrappedRequest.skip(1) .map { payload -> - Frame.PayloadFrame.from( - streamId, - FrameType.NEXT, - payload) + if (firstReqPayload) { + firstReqPayload = false + Frame.Request.from( + streamId, + FrameType.REQUEST_CHANNEL, + payload, + requestN) + } else { + Frame.PayloadFrame.from( + streamId, + FrameType.NEXT, + payload) + } } - val requestFrames = Flowable.concatArrayEager(first, rest) requestFrames.subscribe( ChannelRequestSubscriber( { payload -> frameSender.send(payload) }, @@ -248,10 +252,7 @@ internal class RSocketRequester( FrameType.NEXT -> receiver.onNext(DefaultPayload(frame)) FrameType.REQUEST_N -> { val sender = senders[streamId] - sender?.let { - val n = Frame.RequestN.requestN(frame).toLong() - it.request(n) - } + sender?.request(reactiveStreamsRequestN(Frame.RequestN.requestN(frame))) } FrameType.COMPLETE -> { receiver.onComplete() @@ -320,18 +321,6 @@ internal class RSocketRequester( } } - private class Cond { - private var first = true - - fun first(): Boolean = - if (first) { - first = false - true - } else { - false - } - } - private class ChannelRequestSubscriber(private val next: (Frame) -> Unit, private val error: (Throwable) -> Unit, private val complete: (Boolean) -> Unit) diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt index 1b4fd5155..45ee9351c 100644 --- a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/RSocketResponder.kt @@ -16,9 +16,7 @@ package io.rsocket.kotlin.internal -import io.reactivex.Completable -import io.reactivex.Flowable -import io.reactivex.Single +import io.reactivex.* import io.reactivex.disposables.Disposable import io.rsocket.kotlin.* import io.rsocket.kotlin.Frame.Request.initialRequestN @@ -27,6 +25,7 @@ import io.rsocket.kotlin.exceptions.ApplicationException import io.rsocket.kotlin.internal.frame.FrameHeaderFlyweight.FLAGS_C import io.rsocket.kotlin.internal.frame.FrameHeaderFlyweight.FLAGS_M import io.rsocket.kotlin.DefaultPayload +import io.rsocket.kotlin.internal.util.reactiveStreamsRequestN import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import org.reactivestreams.Subscription @@ -57,8 +56,7 @@ internal class RSocketResponder( receiveDisposable = connection .receive() - .concatMap { frame -> handleFrame(frame) } - .subscribe({}, { completion.error(it) }) + .subscribe({ handleFrame(it) }, { completion.error(it) }) connection .onClose() @@ -111,8 +109,8 @@ internal class RSocketResponder( override fun onClose(): Completable = connection.onClose() - private fun handleFrame(frame: Frame): Flowable { - return try { + private fun handleFrame(frame: Frame) { + try { val streamId = frame.streamId when (frame.type) { FrameType.FIRE_AND_FORGET -> handleFireAndForget(streamId, fireAndForget(DefaultPayload(frame))) @@ -121,96 +119,126 @@ internal class RSocketResponder( FrameType.REQUEST_N -> handleRequestN(streamId, frame) FrameType.REQUEST_STREAM -> handleStream(streamId, requestStream(DefaultPayload(frame)), initialRequestN(frame)) FrameType.REQUEST_CHANNEL -> handleChannel(streamId, frame) - FrameType.METADATA_PUSH -> metadataPush(DefaultPayload(frame)) + FrameType.METADATA_PUSH -> handleMetadataPush(metadataPush(DefaultPayload(frame))) FrameType.NEXT -> handleNext(streamId, frame) FrameType.COMPLETE -> handleComplete(streamId) FrameType.ERROR -> handleError(streamId, frame) FrameType.NEXT_COMPLETE -> handleNextComplete(streamId, frame) else -> handleUnsupportedFrame(frame) - }.toFlowable() + } } finally { frame.release() } } - private fun handleUnsupportedFrame(frame: Frame): Completable { + private fun handleUnsupportedFrame(frame: Frame) { errorConsumer(IllegalArgumentException("Unsupported frame: $frame")) - return Completable.complete() } - private fun handleNextComplete(streamId: Int, frame: Frame): Completable { + private fun handleNextComplete(streamId: Int, frame: Frame) { val receiver = channelReceivers[streamId] receiver?.onNext(DefaultPayload(frame)) receiver?.onComplete() - return Completable.complete() } - private fun handleError(streamId: Int, frame: Frame): Completable { + private fun handleError(streamId: Int, frame: Frame) { val receiver = channelReceivers[streamId] receiver?.onError(ApplicationException(Frame.Error.message(frame))) - return Completable.complete() } - private fun handleComplete(streamId: Int): Completable { + private fun handleComplete(streamId: Int) { val receiver = channelReceivers[streamId] receiver?.onComplete() - return Completable.complete() } - private fun handleNext(streamId: Int, frame: Frame): Completable { + private fun handleNext(streamId: Int, frame: Frame) { val receiver = channelReceivers[streamId] receiver?.onNext(DefaultPayload(frame)) - return Completable.complete() } private fun handleFireAndForget(streamId: Int, - result: Completable): Completable { - return result - .doOnSubscribe { d -> sendingSubscriptions[streamId] = subscription(d) } - .doOnError(errorConsumer) - .doFinally { sendingSubscriptions -= streamId } + result: Completable) { + result.subscribe(object : CompletableObserver { + override fun onComplete() { + sendingSubscriptions -= streamId + } + + override fun onSubscribe(d: Disposable) { + sendingSubscriptions[streamId] = subscription(d) + } + + override fun onError(e: Throwable) { + sendingSubscriptions -= streamId + errorConsumer(e) + } + }) } private fun handleRequestResponse(streamId: Int, - response: Single): Completable { - return response - .doOnSubscribe { d -> sendingSubscriptions[streamId] = subscription(d) } - .map { payload -> - var flags = FLAGS_C - if (payload.hasMetadata) { - flags = Frame.setFlag(flags, FLAGS_M) - } - Frame.PayloadFrame.from( - streamId, - FrameType.NEXT_COMPLETE, - payload, - flags) + response: Single) { + response.subscribe(object : SingleObserver { + + override fun onSuccess(payload: Payload) { + sendingSubscriptions -= streamId + var flags = FLAGS_C + if (payload.hasMetadata) { + flags = Frame.setFlag(flags, FLAGS_M) } - .onErrorResumeNext { t -> Single.just(Frame.Error.from(streamId, t)) } - .doOnSuccess { frameSender.send(it) } - .doFinally { sendingSubscriptions -= streamId } - .ignoreElement() + frameSender.send(Frame.PayloadFrame.from( + streamId, + FrameType.NEXT_COMPLETE, + payload, + flags)) + } + + override fun onSubscribe(d: Disposable) { + sendingSubscriptions[streamId] = subscription(d) + } + + override fun onError(e: Throwable) { + sendingSubscriptions -= streamId + val frame = when (e) { + is NoSuchElementException -> Frame.PayloadFrame.from(streamId, FrameType.COMPLETE) + else -> Frame.Error.from(streamId, e) + } + frameSender.send(frame) + } + }) } private fun handleStream(streamId: Int, response: Flowable, - initialRequestN: Int): Completable { + initialRequestN: Int) { response - .map { payload -> Frame.PayloadFrame.from(streamId, FrameType.NEXT, payload) } .compose { frameFlux -> val frames = RequestingPublisher.wrap(frameFlux) sendingSubscriptions[streamId] = frames - frames.request(initialRequestN.toLong()) + frames.request(reactiveStreamsRequestN(initialRequestN)) frames } - .concatWith(Flowable.just(Frame.PayloadFrame.from(streamId, FrameType.COMPLETE))) - .onErrorResumeNext { t: Throwable -> Flowable.just(Frame.Error.from(streamId, t)) } - .doFinally { sendingSubscriptions -= streamId } - .subscribe { frameSender.send(it) } - return Completable.complete() + .subscribe(object : Subscriber { + + override fun onSubscribe(s: Subscription) { + s.request(Long.MAX_VALUE) + } + + override fun onNext(payload: Payload) { + frameSender.send(Frame.PayloadFrame.from(streamId, FrameType.NEXT, payload)) + } + + override fun onComplete() { + sendingSubscriptions -= streamId + frameSender.send(Frame.PayloadFrame.from(streamId, FrameType.COMPLETE)) + } + + override fun onError(t: Throwable) { + sendingSubscriptions -= streamId + frameSender.send(Frame.Error.from(streamId, t)) + } + }) } - private fun handleChannel(streamId: Int, firstFrame: Frame): Completable { + private fun handleChannel(streamId: Int, firstFrame: Frame) { val receiver = StreamReceiver.create() channelReceivers[streamId] = receiver @@ -222,25 +250,32 @@ internal class RSocketResponder( receiver.onNext(DefaultPayload(firstFrame)) - return handleStream( + handleStream( streamId, requestChannel(request), initialRequestN(firstFrame)) } - private fun handleCancel(streamId: Int): Completable { + private fun handleMetadataPush(result: Completable) { + result.subscribe(object : CompletableObserver { + override fun onComplete() { + } + + override fun onSubscribe(d: Disposable) { + } + + override fun onError(e: Throwable) = errorConsumer(e) + }) + } + + private fun handleCancel(streamId: Int) { val subscription = sendingSubscriptions.remove(streamId) subscription?.cancel() - return Completable.complete() } - private fun handleRequestN(streamId: Int, frame: Frame): Completable { + private fun handleRequestN(streamId: Int, frame: Frame) { val subscription = sendingSubscriptions[streamId] - subscription?.let { - val n = Frame.RequestN.requestN(frame).toLong() - it.request(if (n >= Integer.MAX_VALUE) Long.MAX_VALUE else n) - } - return Completable.complete() + subscription?.request(reactiveStreamsRequestN(Frame.RequestN.requestN(frame))) } private inner class Lifecycle { @@ -261,8 +296,8 @@ internal class RSocketResponder( .close() .subscribe({}, errorConsumer) - cleanUp(sendingSubscriptions, { it.cancel() }) - cleanUp(channelReceivers, { it.onError(err) }) + cleanUp(sendingSubscriptions) { it.cancel() } + cleanUp(channelReceivers) { it.onError(err) } } } diff --git a/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/util/Util.kt b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/util/Util.kt new file mode 100644 index 000000000..00a0a7bf7 --- /dev/null +++ b/rsocket-core/src/main/kotlin/io/rsocket/kotlin/internal/util/Util.kt @@ -0,0 +1,8 @@ +package io.rsocket.kotlin.internal.util + +internal fun reactiveStreamsRequestN(initialRequestN: Int) = + if (initialRequestN == Int.MAX_VALUE) { + Long.MAX_VALUE + } else { + initialRequestN.toLong() + }