diff --git a/build.gradle b/build.gradle index 18447774b..e35c38b26 100644 --- a/build.gradle +++ b/build.gradle @@ -36,8 +36,8 @@ subprojects { ext['mockito.version'] = '2.28.2' ext['hamcrest-library.version'] = '1.3' ext['slf4j-log4j.version'] = '1.7.25' - ext['reactor-netty.version'] = '0.7.15.RELEASE' ext['okhttp.version'] = '3.14.2' + ext['rsocket.java.version'] = '1.0.0-RC1' dependencyManagement { @@ -49,13 +49,13 @@ subprojects { dependency "io.reactivex.rxjava2:rxjava:${ext['rxjava.version']}" dependency "org.slf4j:slf4j-api:${ext['slf4j-api.version']}" /*transports*/ - dependency "io.projectreactor.ipc:reactor-netty:${ext['reactor-netty.version']}" dependency "com.squareup.okhttp3:okhttp:${ext['okhttp.version']}" /*test*/ dependency "junit:junit:${ext['junit.version']}" dependency "org.mockito:mockito-core:${ext['mockito.version']}" dependency "org.hamcrest:hamcrest-library:${ext['hamcrest-library.version']}" dependency "org.slf4j:slf4j-log4j12:${ext['slf4j-log4j.version']}" + dependency "io.rsocket:rsocket-transport-netty:${ext['rsocket.java.version']}" } } diff --git a/rsocket-transport-netty/build.gradle b/rsocket-transport-netty/build.gradle deleted file mode 100644 index 38515541a..000000000 --- a/rsocket-transport-netty/build.gradle +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -apply plugin: 'com.jfrog.bintray' -apply plugin: 'com.jfrog.artifactory' -apply plugin: 'maven-publish' - -dependencies { - api project(':rsocket-core') - implementation ('io.projectreactor.ipc:reactor-netty') -} -compileKotlin { - kotlinOptions.jvmTarget = "1.8" -} -compileTestKotlin { - kotlinOptions.jvmTarget = "1.8" -} - -sourceCompatibility = 1.8 - -description = "Websockets and TCP transport based on reactor-netty" \ No newline at end of file diff --git a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/Ext.kt b/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/Ext.kt deleted file mode 100644 index 5e49d8a7d..000000000 --- a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/Ext.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.transport.netty - -import io.reactivex.Completable -import io.reactivex.Flowable -import io.reactivex.Single -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono - -internal fun Flux.toFlowable(): Flowable = Flowable.fromPublisher(this) - -internal fun Mono.toCompletable(): Completable = Completable.fromPublisher(this) - -internal fun Mono.toSingle(): Single = Single.fromPublisher(this) - -internal fun Completable.toMono(): Mono = Mono.from(toFlowable()) - -internal const val frameLengthSize = 3 - -internal const val frameLengthMask = 0xFFFFFF \ No newline at end of file diff --git a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/NettyDuplexConnection.kt b/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/NettyDuplexConnection.kt deleted file mode 100644 index 5a2b49821..000000000 --- a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/NettyDuplexConnection.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.transport.netty - -import io.reactivex.Completable -import io.reactivex.Flowable -import io.rsocket.kotlin.DuplexConnection -import io.rsocket.kotlin.Frame -import org.reactivestreams.Publisher -import reactor.ipc.netty.NettyContext -import reactor.ipc.netty.NettyInbound -import reactor.ipc.netty.NettyOutbound - -class NettyDuplexConnection(private val inbound: NettyInbound, - private val outbound: NettyOutbound, - private val context: NettyContext) : DuplexConnection { - - override fun send(frame: Publisher): Completable = - Flowable.fromPublisher(frame) - .concatMap { sendOne(it).toFlowable() } - .ignoreElements() - - override fun sendOne(frame: Frame): Completable = - outbound.sendObject(frame.content()).then().toCompletable() - - override fun receive(): Flowable = - inbound.receive() - .map { buf -> Frame.from(buf.retain()) } - .toFlowable() - - override fun availability(): Double = if (context.isDisposed) 0.0 else 1.0 - - override fun close(): Completable = - Completable.fromRunnable { - if (!context.isDisposed) { - context.channel().close() - } - } - - override fun onClose(): Completable = - context.onClose().toCompletable() -} diff --git a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/RSocketLengthCodec.kt b/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/RSocketLengthCodec.kt deleted file mode 100644 index e2d92d5b3..000000000 --- a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/RSocketLengthCodec.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.transport.netty - -import io.netty.buffer.ByteBuf -import io.netty.handler.codec.LengthFieldBasedFrameDecoder - -class RSocketLengthCodec : LengthFieldBasedFrameDecoder( - frameLengthMask, - 0, - frameLengthSize, - 0, - 0) { - - fun decode(bytebuf: ByteBuf): Any = decode(null, bytebuf) -} diff --git a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/WebsocketDuplexConnection.kt b/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/WebsocketDuplexConnection.kt deleted file mode 100644 index cd504b5e2..000000000 --- a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/WebsocketDuplexConnection.kt +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.rsocket.kotlin.transport.netty - -import io.netty.buffer.ByteBuf -import io.netty.buffer.Unpooled.wrappedBuffer -import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame -import io.reactivex.Completable -import io.reactivex.Flowable -import io.rsocket.kotlin.DuplexConnection -import io.rsocket.kotlin.Frame -import org.reactivestreams.Publisher -import reactor.ipc.netty.NettyContext -import reactor.ipc.netty.NettyInbound -import reactor.ipc.netty.NettyOutbound - -/** - * Implementation of a DuplexConnection for Websocket. - * - * - * RSocket impl strongly assumes that each Frame is encoded with the length. This is not true for - * message oriented transports so this must be specifically dropped from Frames sent and stitched - * back on for frames received. - */ -class WebsocketDuplexConnection(private val inbound: NettyInbound, - private val outbound: NettyOutbound, - private val context: NettyContext) : DuplexConnection { - - override fun send(frame: Publisher): Completable { - return Flowable.fromPublisher(frame) - .concatMap { sendOne(it).toFlowable() } - .ignoreElements() - } - - override fun sendOne(frame: Frame): Completable { - return outbound.sendObject( - BinaryWebSocketFrame( - frame.content().skipBytes(frameLengthSize))) - .then() - .toCompletable() - } - - override fun receive(): Flowable { - return inbound.receive() - .map { buf -> - val composite = context.channel().alloc().compositeBuffer() - val length = wrappedBuffer(ByteArray(frameLengthSize)) - encodeLength(length, 0, buf.readableBytes()) - composite.addComponents(true, length, buf.retain()) - Frame.from(composite) - }.toFlowable() - } - - override fun availability(): Double = if (context.isDisposed) 0.0 else 1.0 - - override fun close(): Completable = - Completable.fromRunnable { - if (!context.isDisposed) { - context.channel().close() - } - } - - override fun onClose(): Completable = context.onClose().toCompletable() - - private fun encodeLength(byteBuf: ByteBuf, offset: Int, length: Int) { - if (length and frameLengthMask.inv() != 0) { - throw IllegalArgumentException("Length is larger than 24 bits") - } - byteBuf.setByte(offset, length shr 16) - byteBuf.setByte(offset + 1, length shr 8) - byteBuf.setByte(offset + 2, length) - } -} diff --git a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/client/TcpClientTransport.kt b/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/client/TcpClientTransport.kt deleted file mode 100644 index 9b72db2da..000000000 --- a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/client/TcpClientTransport.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.transport.netty.client - -import io.reactivex.Single -import io.rsocket.kotlin.DuplexConnection -import io.rsocket.kotlin.transport.ClientTransport -import io.rsocket.kotlin.transport.netty.NettyDuplexConnection -import io.rsocket.kotlin.transport.netty.RSocketLengthCodec -import io.rsocket.kotlin.transport.netty.toMono -import reactor.ipc.netty.tcp.TcpClient -import java.net.InetSocketAddress - -class TcpClientTransport private constructor(private val client: TcpClient) - : ClientTransport { - - override fun connect(): Single = - Single.create { sink -> - client.newHandler { inbound, outbound -> - inbound.context().addHandler( - "client-length-codec", - RSocketLengthCodec()) - - val connection = NettyDuplexConnection( - inbound, - outbound, - inbound.context()) - sink.onSuccess(connection) - connection.onClose().toMono() - }.doOnError { sink.onError(it) }.subscribe() - } - - companion object { - - fun create(port: Int): TcpClientTransport { - val tcpClient = TcpClient.create(port) - return create(tcpClient) - } - - fun create(bindAddress: String, port: Int): TcpClientTransport { - val tcpClient = TcpClient.create(bindAddress, port) - return create(tcpClient) - } - - fun create(address: InetSocketAddress): TcpClientTransport { - val tcpClient = TcpClient.create(address.hostString, address.port) - return create(tcpClient) - } - - fun create(client: TcpClient): TcpClientTransport { - return TcpClientTransport(client) - } - } -} diff --git a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/client/WebsocketClientTransport.kt b/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/client/WebsocketClientTransport.kt deleted file mode 100644 index 4874cffbd..000000000 --- a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/client/WebsocketClientTransport.kt +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.transport.netty.client - -import io.reactivex.Single -import io.rsocket.kotlin.DuplexConnection -import io.rsocket.kotlin.transport.ClientTransport -import io.rsocket.kotlin.transport.TransportHeaderAware -import io.rsocket.kotlin.transport.netty.WebsocketDuplexConnection -import io.rsocket.kotlin.transport.netty.toMono -import reactor.ipc.netty.http.client.HttpClient -import java.net.InetSocketAddress -import java.net.URI - -class WebsocketClientTransport private constructor(private val client: HttpClient, - private val path: String) - : ClientTransport, TransportHeaderAware { - private var transportHeaders: () -> Map = { emptyMap() } - - override fun connect(): Single { - return Single.create { sink -> - client.ws(path) { hb -> - transportHeaders().forEach { name, value -> hb.set(name, value) } - }.flatMap { response -> - response.receiveWebsocket { inbound, outbound -> - val connection = WebsocketDuplexConnection( - inbound, - outbound, - inbound.context()) - sink.onSuccess(connection) - connection.onClose().toMono() - } - }.doOnError { sink.onError(it) }.subscribe() - } - } - - override fun setTransportHeaders(transportHeaders: () -> Map) { - this.transportHeaders = transportHeaders - } - - companion object { - - fun create(port: Int): WebsocketClientTransport { - val httpClient = HttpClient.create(port) - return create(httpClient, "/") - } - - fun create(bindAddress: String, port: Int): WebsocketClientTransport { - val httpClient = HttpClient.create(bindAddress, port) - return create(httpClient, "/") - } - - fun create(address: InetSocketAddress): WebsocketClientTransport { - return create(address.hostName, address.port) - } - - fun create(uri: URI): WebsocketClientTransport { - val httpClient = createClient(uri) - return create(httpClient, uri.toString()) - } - - private fun createClient(uri: URI): HttpClient { - return if (isSecureWebsocket(uri)) { - HttpClient.create { options -> - options.sslSupport() - .connectAddress { - InetSocketAddress.createUnresolved( - uri.host, - getPort(uri, 443)) - } - } - } else { - HttpClient.create(uri.host, getPort(uri, 80)) - } - } - - fun getPort(uri: URI, defaultPort: Int): Int { - return if (uri.port == -1) defaultPort else uri.port - } - - fun isSecureWebsocket(uri: URI): Boolean { - return uri.scheme == "wss" || uri.scheme == "https" - } - - fun isPlaintextWebsocket(uri: URI): Boolean { - return uri.scheme == "ws" || uri.scheme == "http" - } - - fun create(client: HttpClient, path: String): WebsocketClientTransport { - return WebsocketClientTransport(client, path) - } - } -} diff --git a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/NettyContextCloseable.kt b/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/NettyContextCloseable.kt deleted file mode 100644 index 1417f89e9..000000000 --- a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/NettyContextCloseable.kt +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.transport.netty.server - -import io.reactivex.Completable -import io.rsocket.kotlin.Closeable -import io.rsocket.kotlin.transport.netty.toCompletable -import reactor.ipc.netty.NettyContext -import java.net.InetSocketAddress - -/** - * A [Closeable] wrapping a [NettyContext], allowing for close and aware of its address. - */ -class NettyContextCloseable internal constructor(private val context: NettyContext) - : Closeable { - override fun close(): Completable = - Completable.fromRunnable { - if (!context.isDisposed) { - context.channel().close() - } - } - - override fun onClose(): Completable = context.onClose().toCompletable() - - /** - * @see NettyContext.address - * @return socket address. - */ - fun address(): InetSocketAddress = context.address() -} diff --git a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/TcpServerTransport.kt b/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/TcpServerTransport.kt deleted file mode 100644 index aa5e8780d..000000000 --- a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/TcpServerTransport.kt +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.transport.netty.server - -import io.reactivex.Single -import io.rsocket.kotlin.transport.ServerTransport -import io.rsocket.kotlin.transport.netty.NettyDuplexConnection -import io.rsocket.kotlin.transport.netty.RSocketLengthCodec -import io.rsocket.kotlin.transport.netty.toSingle -import reactor.ipc.netty.tcp.TcpServer -import java.net.InetSocketAddress - -class TcpServerTransport private constructor(private var server: TcpServer) - : ServerTransport { - - override fun start(acceptor: ServerTransport.ConnectionAcceptor) - : Single = - server.newHandler { inbound, outbound -> - inbound.context() - .addHandler( - "server-length-codec", - RSocketLengthCodec()) - val connection = NettyDuplexConnection( - inbound, - outbound, - inbound.context()) - acceptor(connection).andThen(outbound.neverComplete()) - }.toSingle().map { NettyContextCloseable(it) } - - companion object { - - fun create(address: InetSocketAddress): TcpServerTransport { - val server = TcpServer.create(address.hostName, address.port) - return create(server) - } - - fun create(bindAddress: String, port: Int): TcpServerTransport { - val server = TcpServer.create(bindAddress, port) - return create(server) - } - - fun create(port: Int): TcpServerTransport { - val server = TcpServer.create(port) - return create(server) - } - - fun create(server: TcpServer): TcpServerTransport { - return TcpServerTransport(server) - } - } -} diff --git a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/WebsocketRouteTransport.kt b/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/WebsocketRouteTransport.kt deleted file mode 100644 index 102169f9e..000000000 --- a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/WebsocketRouteTransport.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.transport.netty.server - -import io.reactivex.Single -import io.rsocket.kotlin.Closeable -import io.rsocket.kotlin.transport.ServerTransport -import io.rsocket.kotlin.transport.ServerTransport.ConnectionAcceptor -import io.rsocket.kotlin.transport.netty.WebsocketDuplexConnection -import io.rsocket.kotlin.transport.netty.toSingle -import reactor.ipc.netty.http.server.HttpServer -import reactor.ipc.netty.http.server.HttpServerRoutes - -class WebsocketRouteTransport(private val server: HttpServer, - private val routesBuilder: (HttpServerRoutes) -> Unit, - private val path: String) : ServerTransport { - - override fun start(acceptor: ConnectionAcceptor): Single { - return server - .newRouter { routes -> - routesBuilder(routes) - routes.ws(path) { inbound, outbound -> - val connection = WebsocketDuplexConnection( - inbound, - outbound, - inbound.context()) - acceptor(connection).andThen(outbound.neverComplete()) - } - }.toSingle().map { NettyContextCloseable(it) } - } -} diff --git a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/WebsocketServerTransport.kt b/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/WebsocketServerTransport.kt deleted file mode 100644 index 21fcc3357..000000000 --- a/rsocket-transport-netty/src/main/kotlin/io/rsocket/kotlin/transport/netty/server/WebsocketServerTransport.kt +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.transport.netty.server - -import io.reactivex.Single -import io.rsocket.kotlin.transport.ServerTransport -import io.rsocket.kotlin.transport.TransportHeaderAware -import io.rsocket.kotlin.transport.netty.WebsocketDuplexConnection -import io.rsocket.kotlin.transport.netty.toSingle -import reactor.ipc.netty.http.server.HttpServer - -class WebsocketServerTransport private constructor(internal var server: HttpServer) - : ServerTransport, TransportHeaderAware { - private var transportHeaders: () -> Map = { emptyMap() } - - override fun start(acceptor: ServerTransport.ConnectionAcceptor) - : Single { - return server - .newHandler { _, response -> - transportHeaders() - .forEach { name, value -> response.addHeader(name, value) } - response.sendWebsocket { inbound, outbound -> - val connection = - WebsocketDuplexConnection( - inbound, - outbound, - inbound.context()) - acceptor(connection).andThen(outbound.neverComplete()) - } - }.toSingle().map { NettyContextCloseable(it) } - } - - override fun setTransportHeaders(transportHeaders: () -> Map) { - this.transportHeaders = transportHeaders - } - - companion object { - - fun create(bindAddress: String, port: Int): WebsocketServerTransport { - val httpServer = HttpServer.create(bindAddress, port) - return create(httpServer) - } - - fun create(port: Int): WebsocketServerTransport { - val httpServer = HttpServer.create(port) - return create(httpServer) - } - - fun create(server: HttpServer): WebsocketServerTransport { - return WebsocketServerTransport(server) - } - } -} diff --git a/rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/OkHttpWebSocketConnection.kt b/rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/OkHttpWebSocketConnection.kt index 25f7cbce1..02b485277 100644 --- a/rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/OkHttpWebSocketConnection.kt +++ b/rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/OkHttpWebSocketConnection.kt @@ -24,7 +24,7 @@ import org.reactivestreams.Publisher class OkHttpWebSocketConnection internal constructor(private val ws: OkWebsocket) : DuplexConnection { - override fun availability(): Double = if (ws.isOpen) 1.0 else 0.0 + override fun availability(): Double = if (ws.isOpen()) 1.0 else 0.0 override fun close() = ws.close() diff --git a/rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/OkWebsocket.kt b/rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/OkWebsocket.kt index a41a1be1d..7a1a7976f 100644 --- a/rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/OkWebsocket.kt +++ b/rsocket-transport-okhttp/src/main/kotlin/io/rsocket/transport/okhttp/OkWebsocket.kt @@ -28,11 +28,11 @@ import okhttp3.* import okio.ByteString import org.reactivestreams.Publisher import java.nio.channels.ClosedChannelException +import java.util.concurrent.atomic.AtomicBoolean internal class OkWebsocket(client: OkHttpClient, request: Request) { - @Volatile - internal var isOpen = false + private val isOpen = AtomicBoolean() @Volatile private var failErr: ClosedChannelException? = null private val defFailErr by lazy { @@ -43,7 +43,7 @@ internal class OkWebsocket(client: OkHttpClient, private val listener = object : WebSocketListener() { override fun onOpen(webSocket: WebSocket?, response: Response?) { - isOpen = true + isOpen.set(true) connection.onNext(OkHttpWebSocketConnection(this@OkWebsocket)) } @@ -70,15 +70,16 @@ internal class OkWebsocket(client: OkHttpClient, override fun onClosed(webSocket: WebSocket?, code: Int, reason: String?) { - isOpen = false - connection.onComplete() + if (isOpen.compareAndSet(true, false)) { + connection.onComplete() + } } override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { - connection.onError(t) - if (isOpen) { + if (isOpen.compareAndSet(true, false)) { + connection.onError(t) val closedChannelException = ClosedChannelException() closedChannelException.initCause(t) failErr = closedChannelException @@ -115,6 +116,8 @@ internal class OkWebsocket(client: OkHttpClient, .onErrorResumeNext(Flowable.empty()) .ignoreElements() + internal fun isOpen(): Boolean = isOpen.get() + private fun WebSocket.sendAsync(bytes: ByteString): Completable = Completable.create { e -> if (send(bytes)) diff --git a/settings.gradle b/settings.gradle index 675ae4eca..df5eafee3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,6 +18,5 @@ include ':rsocket-transport-okhttp' rootProject.name='rsocket-kotlin' include 'rsocket-core' include 'rsocket-transport-okhttp' -include 'rsocket-transport-netty' include 'test' diff --git a/test/build.gradle b/test/build.gradle index a7893ab89..8179d1d77 100644 --- a/test/build.gradle +++ b/test/build.gradle @@ -16,7 +16,7 @@ dependencies { testImplementation project(":rsocket-transport-okhttp") - testImplementation project(":rsocket-transport-netty") + testImplementation 'io.rsocket:rsocket-transport-netty' testImplementation 'org.assertj:assertj-core:3.9.1' } diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/InteractionsStressTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/InteractionsTest.kt similarity index 68% rename from test/src/test/kotlin/io/rsocket/kotlin/test/InteractionsStressTest.kt rename to test/src/test/kotlin/io/rsocket/kotlin/test/InteractionsTest.kt index 9f13fe869..763dd761f 100644 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/InteractionsStressTest.kt +++ b/test/src/test/kotlin/io/rsocket/kotlin/test/InteractionsTest.kt @@ -17,42 +17,50 @@ package io.rsocket.kotlin.test import io.reactivex.Flowable -import io.reactivex.Single import io.reactivex.disposables.CompositeDisposable import io.reactivex.disposables.Disposable import io.reactivex.processors.UnicastProcessor import io.rsocket.kotlin.* -import io.rsocket.kotlin.transport.netty.client.TcpClientTransport -import io.rsocket.kotlin.transport.netty.server.NettyContextCloseable -import io.rsocket.kotlin.transport.netty.server.TcpServerTransport -import io.rsocket.kotlin.util.AbstractRSocket +import io.rsocket.transport.netty.server.CloseableChannel +import io.rsocket.transport.netty.server.WebsocketServerTransport +import io.rsocket.transport.okhttp.client.OkhttpWebsocketClientTransport +import okhttp3.HttpUrl import org.junit.After import org.junit.Before import org.junit.Test import org.reactivestreams.Publisher -import java.net.InetSocketAddress +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import java.util.concurrent.TimeUnit import kotlin.math.max +typealias RSocketFactoryJava = io.rsocket.RSocketFactory +typealias AbstractRSocketJava = io.rsocket.AbstractRSocket +typealias PayloadJava = io.rsocket.Payload +typealias DefaultPayloadJava = io.rsocket.util.DefaultPayload + class InteractionsStressTest { - private lateinit var server: NettyContextCloseable + private lateinit var server: CloseableChannel private lateinit var client: RSocket private lateinit var testHandler: TestHandler @Before fun setUp() { - val address = InetSocketAddress - .createUnresolved("localhost", 0) - val serverTransport = TcpServerTransport.create(address) + val serverTransport = WebsocketServerTransport.create("localhost", 0) testHandler = TestHandler() - server = RSocketFactory + server = RSocketFactoryJava .receive() - .acceptor { { _, _ -> Single.just(testHandler) } } + .acceptor { _, _ -> Mono.just(testHandler) } .transport(serverTransport) .start() - .blockingGet() + .block(java.time.Duration.ofSeconds(5))!! - val clientTransport = TcpClientTransport - .create(server.address()) + val address = server.address() + val clientTransport = OkhttpWebsocketClientTransport + .create(HttpUrl.Builder() + .scheme("http") + .host(address.hostName) + .port(address.port) + .build()) client = RSocketFactory .connect() @@ -67,7 +75,8 @@ class InteractionsStressTest { @After fun tearDown() { - server.close().andThen(server.onClose()).blockingAwait() + server.dispose() + server.onClose().block(java.time.Duration.ofSeconds(5)) } @Test @@ -117,7 +126,7 @@ class InteractionsStressTest { val errors = UnicastProcessor.create() val disposable = CompositeDisposable() repeat(interactionCount) { - disposable += interaction(source()) + disposable += interaction(source()).timeout(1, TimeUnit.SECONDS) .subscribe({ res -> if (!pred(res)) { errors.onError( @@ -143,21 +152,24 @@ class InteractionsStressTest { } internal class TestHandler - : AbstractRSocket() { + : AbstractRSocketJava() { - override fun requestResponse(payload: Payload): Single = - Single.just(payload) + override fun requestResponse(payload: PayloadJava): Mono = + Mono.just(payload) - override fun requestStream(payload: Payload): Flowable = - Flowable.just( - DefaultPayload.text(payload.dataUtf8), - DefaultPayload.text(payload.dataUtf8)) + override fun requestStream(payload: PayloadJava): Flux { + val data = payload.dataUtf8 + return Flux.just( + DefaultPayloadJava.create(data), + DefaultPayloadJava.create(data)) + } - override fun requestChannel(payloads: Publisher): Flowable { - return Flowable.fromPublisher(payloads).flatMap { payload -> - Flowable.just( - DefaultPayload.text(payload.dataUtf8), - DefaultPayload.text(payload.dataUtf8)) + override fun requestChannel(payloads: Publisher): Flux { + return Flux.from(payloads).flatMap { payload -> + val data = payload.dataUtf8 + Flux.just( + DefaultPayloadJava.create(data), + DefaultPayloadJava.create(data)) } } } @@ -168,11 +180,9 @@ class InteractionsStressTest { companion object { private fun source() = - Flowable.interval(intervalMillis, TimeUnit.MILLISECONDS) + Flowable.interval(100, TimeUnit.MICROSECONDS) .onBackpressureDrop() - private const val intervalMillis: Long = 1 - private const val testDuration = 20L private val interactionCount = max(1, Runtime.getRuntime().availableProcessors() / 2) diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/lease/LeaseServerTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/lease/LeaseServerTest.kt deleted file mode 100644 index 0949e4bfe..000000000 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/lease/LeaseServerTest.kt +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.test.lease - -import io.reactivex.Flowable -import io.reactivex.Single -import io.reactivex.processors.BehaviorProcessor -import io.reactivex.subscribers.TestSubscriber -import io.rsocket.kotlin.* -import io.rsocket.kotlin.exceptions.MissingLeaseException -import io.rsocket.kotlin.transport.netty.client.TcpClientTransport -import io.rsocket.kotlin.transport.netty.server.NettyContextCloseable -import io.rsocket.kotlin.transport.netty.server.TcpServerTransport -import io.rsocket.kotlin.util.AbstractRSocket -import org.junit.After -import org.junit.Assert.assertEquals -import org.junit.Assert.assertTrue -import org.junit.Before -import org.junit.Test -import java.util.concurrent.TimeUnit - -class LeaseServerTest { - private lateinit var nettyContextCloseable: NettyContextCloseable - private lateinit var serverLease: LeaseSupp - private lateinit var clientSocket: RSocket - private lateinit var leaseGranter: LeaseGranter - @Before - fun setUp() { - serverLease = LeaseSupp() - nettyContextCloseable = RSocketFactory.receive() - .lease { opts -> opts.enableLease(serverLease) } - .acceptor { - { _, _ -> - Single.just( - object : AbstractRSocket() { - override fun requestStream(payload: Payload) - : Flowable = - Flowable.just(payload) - }) - } - } - .transport(TcpServerTransport.create("localhost", 0)) - .start() - .blockingGet() - - val address = nettyContextCloseable.address() - clientSocket = RSocketFactory - .connect() - .lease { opts -> opts.enableLease(LeaseSupp()) } - .keepAlive { opts -> - opts.keepAliveInterval(Duration.ofMinutes(1)) - .keepAliveMaxLifeTime(Duration.ofMinutes(20)) - } - .transport(TcpClientTransport.create(address)) - .start() - .blockingGet() - - leaseGranter = serverLease.leaseGranter().blockingGet() - } - - @After - fun tearDown() { - clientSocket.close().subscribe() - nettyContextCloseable.close().subscribe() - nettyContextCloseable.onClose().blockingAwait() - } - - @Test - fun grantLeaseNumberOfRequests() { - assertEquals(clientSocket.availability(), 0.0, 1e-5) - leaseGranter.grant(2, 10_000) - .delay(100, TimeUnit.MILLISECONDS) - .blockingAwait() - assertEquals(clientSocket.availability(), 1.0, 1e-5) - clientSocket.requestStream(payload()) - .blockingSubscribe() - assertEquals(clientSocket.availability(), 0.5, 1e-5) - clientSocket.requestStream(payload()) - .blockingSubscribe() - assertEquals(clientSocket.availability(), 0.0, 1e-5) - - val subscriber = TestSubscriber() - clientSocket.requestStream(payload()) - .blockingSubscribe(subscriber) - assertEquals(1, subscriber.errorCount()) - assertTrue(subscriber.errors().first() is MissingLeaseException) - leaseGranter.grant(1, 10_000) - .delay(100, TimeUnit.MILLISECONDS) - .blockingAwait() - assertEquals(1.0, clientSocket.availability(), 1e-5) - } - - @Test - fun grantLeaseTtl() { - leaseGranter.grant(2, 200) - .delay(250, TimeUnit.MILLISECONDS) - .blockingAwait() - - assertEquals(clientSocket.availability(), 0.0, 1e-5) - val subscriber = TestSubscriber() - clientSocket.requestStream(payload()) - .blockingSubscribe(subscriber) - - assertEquals(1, subscriber.errorCount()) - assertTrue(subscriber.errors().first() is MissingLeaseException) - } - - private fun payload() = DefaultPayload("data") - - private class LeaseSupp : (RSocketLease) -> Unit { - private val leaseRefs = BehaviorProcessor.create() - - override fun invoke(RSocketLease: RSocketLease) { - leaseRefs.onNext(RSocketLease.granter()) - } - - fun leaseGranter(): Single = leaseRefs.firstOrError() - } - -} \ No newline at end of file diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/lease/example/LeaseClientServerExample.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/lease/example/LeaseClientServerExample.kt deleted file mode 100644 index 8afee9d8b..000000000 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/lease/example/LeaseClientServerExample.kt +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.test.lease.example - -import io.reactivex.Flowable -import io.reactivex.Single -import io.reactivex.processors.BehaviorProcessor -import io.reactivex.schedulers.Schedulers -import io.rsocket.kotlin.* -import io.rsocket.kotlin.transport.netty.client.TcpClientTransport -import io.rsocket.kotlin.transport.netty.server.TcpServerTransport -import io.rsocket.kotlin.util.AbstractRSocket -import org.slf4j.LoggerFactory -import java.util.* -import java.util.concurrent.TimeUnit - -object LeaseClientServerExample { - private val logger = LoggerFactory.getLogger(LeaseClientServerExample::class.java) - - @JvmStatic - fun main(args: Array) { - - val serverLeaseConsumer = RSocketLeaseConsumer() - val nettyContextCloseable = RSocketFactory.receive() - .lease { opts -> opts.enableLease(serverLeaseConsumer) } - .acceptor { - { _, _ -> - Single.just( - object : AbstractRSocket() { - override fun requestResponse(payload: Payload) - : Single = - Single.just(DefaultPayload("Server Response ${Date()}")) - }) - } - } - .transport(TcpServerTransport.create("localhost", 0)) - .start() - .blockingGet() - val clientLeaseConsumer = RSocketLeaseConsumer() - val address = nettyContextCloseable.address() - val clientSocket = RSocketFactory.connect() - .lease { opts -> opts.enableLease(clientLeaseConsumer) } - .keepAlive { opts -> - opts.keepAliveInterval(Duration.ofMinutes(1)) - .keepAliveMaxLifeTime(Duration.ofMinutes(20)) - } - .transport(TcpClientTransport.create(address)) - .start() - .blockingGet() - - Flowable.interval(1, TimeUnit.SECONDS) - .observeOn(Schedulers.io()) - .flatMap { - logger.info("Availability: ${clientSocket.availability()}") - clientSocket - .requestResponse(DefaultPayload("Client request ${Date()}")) - .toFlowable() - .doOnError { logger.info("Error: $it") } - .onErrorResumeNext { _: Throwable -> Flowable.empty() } - } - .subscribe { resp -> logger.info("Client response: ${resp.dataUtf8}") } - - serverLeaseConsumer - .rSocketLease() - .flatMapCompletable { rSocketLease -> - Flowable.interval(1, 10, TimeUnit.SECONDS) - .flatMapCompletable { - rSocketLease.granter().grant( - numberOfRequests = 7, - ttlSeconds = 5_000) - } - }.subscribe({}, { logger.error("Granter error: $it") }) - - clientSocket.onClose().blockingAwait() - } - - private class RSocketLeaseConsumer : (RSocketLease) -> Unit { - private val leaseSupport = BehaviorProcessor.create() - - override fun invoke(rSocketLease: RSocketLease) { - this.leaseSupport.onNext(rSocketLease) - } - - fun rSocketLease(): Single = this.leaseSupport.firstOrError() - } -} diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/transport/ClientServerChannelTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/ClientServerChannelTest.kt deleted file mode 100644 index d35c165e8..000000000 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/transport/ClientServerChannelTest.kt +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.test.transport - -import io.reactivex.Flowable -import io.reactivex.Single -import io.rsocket.kotlin.DefaultPayload -import io.rsocket.kotlin.Payload -import io.rsocket.kotlin.RSocket -import io.rsocket.kotlin.RSocketFactory -import io.rsocket.kotlin.transport.netty.client.TcpClientTransport -import io.rsocket.kotlin.transport.netty.server.NettyContextCloseable -import io.rsocket.kotlin.transport.netty.server.TcpServerTransport -import io.rsocket.kotlin.util.AbstractRSocket -import org.junit.Before -import org.junit.Test -import org.reactivestreams.Publisher -import java.net.InetSocketAddress -import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicInteger - -class ClientServerChannelTest { - private lateinit var server: NettyContextCloseable - private lateinit var client: RSocket - private lateinit var channelHandler: ChannelHandler - @Before - fun setUp() { - val address = InetSocketAddress - .createUnresolved("localhost", 0) - val serverTransport = TcpServerTransport.create(address) - channelHandler = ChannelHandler(intervalMillis) - server = RSocketFactory - .receive() - .acceptor { { _, _ -> Single.just(channelHandler) } } - .transport(serverTransport) - .start() - .blockingGet() - - val clientTransport = TcpClientTransport - .create(server.address()) - - client = RSocketFactory - .connect() - .transport { clientTransport } - .start() - .blockingGet() - } - - @Test - fun channel() { - var requestsCount = 0 - client.requestChannel(textStream(intervalMillis)) - .subscribe({ }, { throw it }) - - val delay = Flowable - .timer(5, TimeUnit.SECONDS) - .share() - - Flowable.interval(250, TimeUnit.MILLISECONDS) - .takeUntil(delay) - .subscribe { - val cur = channelHandler.counter() - if (requestsCount == cur) { - throw RuntimeException("Channel stream does not advance: $cur") - } - requestsCount = cur - } - delay.ignoreElements().blockingAwait() - } - - internal class ChannelHandler(private val intervalMillis: Long) - : AbstractRSocket() { - private val counter = AtomicInteger() - - fun counter() = counter.get() - - override fun requestChannel(payloads: Publisher): - Flowable { - Flowable.fromPublisher(payloads) - .subscribe( - { counter.incrementAndGet() }, - { println("Server channel error: $it") }) - return textStream(intervalMillis) - } - - } - - companion object { - internal fun textStream(intervalMillis: Long) = - Flowable.interval(intervalMillis, TimeUnit.MICROSECONDS) - .onBackpressureDrop() - .map { DefaultPayload.text(it.toString()) } - - internal const val intervalMillis: Long = 100 - } -} \ No newline at end of file diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/transport/EndToEndTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/EndToEndTest.kt index 1ff594636..0bb960626 100644 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/transport/EndToEndTest.kt +++ b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/EndToEndTest.kt @@ -19,29 +19,39 @@ package io.rsocket.kotlin.test.transport import io.reactivex.Completable import io.reactivex.Flowable import io.reactivex.Single -import io.reactivex.processors.BehaviorProcessor +import io.rsocket.ConnectionSetupPayload +import io.rsocket.SocketAcceptor import io.rsocket.kotlin.* import io.rsocket.kotlin.transport.ClientTransport -import io.rsocket.kotlin.transport.ServerTransport -import io.rsocket.kotlin.transport.netty.server.NettyContextCloseable import io.rsocket.kotlin.util.AbstractRSocket import io.rsocket.kotlin.DefaultPayload +import io.rsocket.transport.ServerTransport +import io.rsocket.transport.netty.server.CloseableChannel import org.assertj.core.api.Assertions.assertThat import org.assertj.core.data.Offset import org.junit.After import org.junit.Before import org.junit.Test import org.reactivestreams.Publisher +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.publisher.MonoProcessor import java.net.InetSocketAddress import java.util.concurrent.TimeUnit +typealias RSocketFactoryJava = io.rsocket.RSocketFactory +typealias RSocketJava = io.rsocket.RSocket +typealias AbstractRSocketJava = io.rsocket.AbstractRSocket +typealias PayloadJava = io.rsocket.Payload +typealias DefaultPayloadJava = io.rsocket.util.DefaultPayload + abstract class EndToEndTest (private val clientTransport: (InetSocketAddress) -> ClientTransport, - private val serverTransport: (InetSocketAddress) -> ServerTransport) { - private lateinit var server: NettyContextCloseable + private val serverTransport: (InetSocketAddress) -> ServerTransport) { + private lateinit var server: CloseableChannel private lateinit var client: RSocket - private lateinit var clientHandler: TestRSocketHandler - private lateinit var serverHandler: TestRSocketHandler + private lateinit var clientHandler: TestClientHandler + private lateinit var serverHandler: TestServerHandler private val errors = Errors() @Before @@ -49,14 +59,15 @@ abstract class EndToEndTest val address = InetSocketAddress .createUnresolved("localhost", 0) val serverAcceptor = ServerAcceptor() - clientHandler = TestRSocketHandler() - server = RSocketFactory + clientHandler = TestClientHandler() + + server = RSocketFactoryJava .receive() .errorConsumer(errors.errorsConsumer()) - .acceptor { serverAcceptor } + .acceptor (serverAcceptor) .transport(serverTransport(address)) .start() - .blockingGet() + .block(java.time.Duration.ofSeconds(5))!! client = RSocketFactory .connect() @@ -70,14 +81,13 @@ abstract class EndToEndTest .start() .blockingGet() - serverHandler = serverAcceptor.handler().blockingGet() + serverHandler = serverAcceptor.handler().block(java.time.Duration.ofSeconds(5))!! } @After fun tearDown() { - server.close() - .andThen(server.onClose()) - .blockingAwait(10, TimeUnit.SECONDS) + server.dispose() + server.onClose().block(java.time.Duration.ofSeconds(5)) } @Test @@ -141,11 +151,10 @@ abstract class EndToEndTest @Test fun serverMetadataPush() { - val payload = DefaultPayload("", "md") + val payload = DefaultPayloadJava.create("", "md") serverHandler.sendMetadataPush(payload) - .andThen(Completable.timer(1, TimeUnit.SECONDS)) - .timeout(10, TimeUnit.SECONDS) - .blockingAwait() + .then(Mono.delay(java.time.Duration.ofSeconds(1))) + .block(java.time.Duration.ofSeconds(5)) assertThat(errors.errors()).isEmpty() assertThat(clientHandler.metadataPushData()) @@ -173,8 +182,7 @@ abstract class EndToEndTest open fun closedAvailability() { client.close() .andThen(client.onClose()) - .timeout(10, TimeUnit.SECONDS) - .blockingAwait() + .blockingAwait(10,TimeUnit.SECONDS) assertThat(client.availability()) .isCloseTo(0.0, Offset.offset(1e-5)) @@ -183,7 +191,7 @@ abstract class EndToEndTest private fun testData() = Data("d", "md") - internal class TestRSocketHandler(private val requester: RSocket? = null) : AbstractRSocket() { + internal class TestClientHandler(private val requester: RSocket? = null) : AbstractRSocket() { private val fnf = ArrayList() private val metadata = ArrayList() @@ -209,14 +217,42 @@ abstract class EndToEndTest return Flowable.fromPublisher(payloads) } - fun sendMetadataPush(payload: Payload): Completable = requester + fun metadataPushData() = metadata + } + + internal class TestServerHandler(private val requester: RSocketJava? = null) : AbstractRSocketJava() { + private val fnf = ArrayList() + private val metadata = ArrayList() + + override fun fireAndForget(payload: PayloadJava): Mono { + fnf += Data(payload) + return Mono.empty() + } + + override fun metadataPush(payload: PayloadJava): Mono { + metadata += payload.metadataUtf8 + return Mono.empty() + } + + override fun requestResponse(payload: PayloadJava): Mono { + return Mono.just(payload) + } + + override fun requestStream(payload: PayloadJava): Flux { + return Flux.just(payload) + } + + override fun requestChannel(payloads: Publisher): Flux { + return Flux.from(payloads) + } + + fun sendMetadataPush(payload: PayloadJava): Mono = requester ?.metadataPush(payload) - ?: Completable.complete() + ?: Mono.empty() fun fireAndForgetData() = fnf fun metadataPushData() = metadata - } internal class Errors { @@ -231,25 +267,26 @@ abstract class EndToEndTest } internal class ServerAcceptor - : (Setup, RSocket) -> Single { + : SocketAcceptor { - private val serverHandlerReady = BehaviorProcessor - .create() + private val serverHandlerReady = MonoProcessor + .create() - override fun invoke(setup: Setup, - sendingSocket: RSocket): Single { - val handler = TestRSocketHandler(sendingSocket) + override fun accept(setup: ConnectionSetupPayload, + sendingSocket: RSocketJava): Mono { + val handler = TestServerHandler(sendingSocket) serverHandlerReady.onNext(handler) - return Single.just(handler) + return Mono.just(handler) } - fun handler(): Single { - return serverHandlerReady.firstOrError() + fun handler(): Mono { + return serverHandlerReady } } internal data class Data(val data: String, val metadata: String) { constructor(payload: Payload) : this(payload.dataUtf8, payload.metadataUtf8) + constructor(payload: PayloadJava) : this(payload.dataUtf8, payload.metadataUtf8) fun payload(): Payload = DefaultPayload(data, metadata) } diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyTcpEndToEndTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyTcpEndToEndTest.kt deleted file mode 100644 index 7e316a5c0..000000000 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyTcpEndToEndTest.kt +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.test.transport - -import io.rsocket.kotlin.transport.netty.client.TcpClientTransport -import io.rsocket.kotlin.transport.netty.server.TcpServerTransport - -class NettyTcpEndToEndTest - : EndToEndTest( - { TcpClientTransport.create(it) }, - { TcpServerTransport.create(it) }) diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyWebsocketEndToEndTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyWebsocketEndToEndTest.kt deleted file mode 100644 index 449a4e24f..000000000 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/transport/NettyWebsocketEndToEndTest.kt +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2015-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.test.transport - -import io.rsocket.kotlin.transport.netty.client.WebsocketClientTransport -import io.rsocket.kotlin.transport.netty.server.WebsocketServerTransport - -class NettyWebsocketEndToEndTest : EndToEndTest( - { WebsocketClientTransport.create(it) }, - { WebsocketServerTransport.create(it.port) }) diff --git a/test/src/test/kotlin/io/rsocket/kotlin/test/transport/OkHttpNettyEndToEndTest.kt b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/OkHttpNettyEndToEndTest.kt index 38f89a265..fdc173653 100644 --- a/test/src/test/kotlin/io/rsocket/kotlin/test/transport/OkHttpNettyEndToEndTest.kt +++ b/test/src/test/kotlin/io/rsocket/kotlin/test/transport/OkHttpNettyEndToEndTest.kt @@ -16,7 +16,7 @@ package io.rsocket.kotlin.test.transport -import io.rsocket.kotlin.transport.netty.server.WebsocketServerTransport +import io.rsocket.transport.netty.server.WebsocketServerTransport import io.rsocket.transport.okhttp.client.OkhttpWebsocketClientTransport import okhttp3.HttpUrl