Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document use of TLS with RSocket #719

Closed
codependent opened this issue Nov 19, 2019 · 12 comments
Closed

Document use of TLS with RSocket #719

codependent opened this issue Nov 19, 2019 · 12 comments

Comments

@codependent
Copy link

codependent commented Nov 19, 2019

Sample project available on Github: https://github.com/codependent/rsocket-tls/

The full context of this problem can be found on StackOverflow.

The problem is that without TLS on the server/client, the communication works perfectly, but after securing them it fails.

Here's the full server and client code:

import io.netty.handler.ssl.SslContextBuilder
import io.rsocket.AbstractRSocket
import io.rsocket.Payload
import io.rsocket.RSocketFactory
import io.rsocket.frame.decoder.PayloadDecoder
import io.rsocket.transport.netty.server.TcpServerTransport
import io.rsocket.util.DefaultPayload
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.toFlux
import reactor.netty.tcp.TcpServer
import java.io.File
import java.util.concurrent.CountDownLatch
import kotlin.random.Random
import kotlin.random.nextUInt

class RequestStreamRSocketServer

@ExperimentalUnsignedTypes
fun main() {

    val latch = CountDownLatch(1)

    RSocketFactory.receive()
        .frameDecoder(PayloadDecoder.DEFAULT)
        .acceptor { setup, sendingSocket ->
            Mono.just(
                object : AbstractRSocket() {
                    override fun requestStream(payload: Payload): Flux<Payload> {
                        val randomNumberGenerator = Random(1234)
                        val numbers = payload.dataUtf8.toInt()
                        println("Generating $numbers random numbers")
                        return IntRange(1, numbers)
                            .map { DefaultPayload.create(randomNumberGenerator.nextUInt().toString().toByteArray()) }
                            .toList().toFlux()
                    }
                })
        }
        .transport(
            TcpServerTransport.create(TcpServer.create().port(7878).secure {
                it.sslContext(
                    SslContextBuilder.forServer(
                        File(RequestStreamRSocketServer::class.java.getResource("certificate.pem").toURI()),
                        File(RequestStreamRSocketServer::class.java.getResource("key.pem").toURI())
                    )
                )
            })
        )
        .start()
        .block()
        ?.onClose()

    latch.await()
}
import io.netty.handler.ssl.SslContextBuilder
import io.rsocket.RSocketFactory
import io.rsocket.frame.decoder.PayloadDecoder
import io.rsocket.transport.netty.client.TcpClientTransport
import io.rsocket.util.DefaultPayload
import reactor.netty.tcp.TcpClient
import java.util.concurrent.CountDownLatch

class RequestStreamRSocketClient

@ExperimentalUnsignedTypes
fun main() {

    val latch = CountDownLatch(1)

    val path = RequestStreamRSocketClient::class.java.getResource("truststore.jks").path
    System.setProperty("javax.net.ssl.trustStore", path)
    System.setProperty("javax.net.ssl.trustStorePassword", "123456")


    val client = RSocketFactory.connect()
        .frameDecoder(PayloadDecoder.DEFAULT)
        .transport(TcpClientTransport.create(TcpClient.create().port(7878).secure {
            it.sslContext(SslContextBuilder.forClient())
        }))
        .start()
        .block()

    client.requestStream(DefaultPayload.create("10"))
        .map { it.dataUtf8 }
        .doOnNext(System.out::println)
        .doOnComplete { latch.countDown() }
        .doOnError { it.printStackTrace() }
        .subscribe()

    latch.await()
}

Server log:

/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=52226:/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/lib/tools.jar:/Users/jose/git/codependent/github/rsocket-simple-client/build/classes/java/main:/Users/jose/git/codependent/github/rsocket-simple-client/out/production/resources:/Users/jose/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-jdk8/1.3.50/bf65725d4ae2cf00010d84e945fcbc201f590e11/kotlin-stdlib-jdk8-1.3.50.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.rsocket/rsocket-transport-netty/1.0.0-RC5/7d0093068e332fcbfa3e9f5de971174a795a9122/rsocket-transport-netty-1.0.0-RC5.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.rsocket/rsocket-core/1.0.0-RC5/fbe165e1e57c5748a40af66832206c4616aa6290/rsocket-core-1.0.0-RC5.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-jdk7/1.3.50/50ad05ea1c2595fb31b800e76db464d08d599af3/kotlin-stdlib-jdk7-1.3.50.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib/1.3.50/b529d1738c7e98bbfa36a4134039528f2ce78ebf/kotlin-stdlib-1.3.50.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.projectreactor.netty/reactor-netty/0.9.0.RELEASE/f0a0ae4e38ad8b36596ffe4bf82519cf8fc4adfb/reactor-netty-0.9.0.RELEASE.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-codec-http2/4.1.39.Final/6e4660fb8b1054e34e09aa95a10115edf0d74f37/netty-codec-http2-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-handler-proxy/4.1.39.Final/8a5c8a0b4ceb75531d04a14e0e65839ee07f2378/netty-handler-proxy-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-codec-http/4.1.39.Final/732d06961162e27fa3ae5989541c4460853745d3/netty-codec-http-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-handler/4.1.39.Final/4a63b56de071c1b10a56b5d90095e4201ea4098f/netty-handler-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-transport-native-epoll/4.1.39.Final/ab86de9bb5fccbfb60a9c0036a3516ad9b8befbb/netty-transport-native-epoll-4.1.39.Final-linux-x86_64.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-codec-socks/4.1.39.Final/adc3df7362874b53c11e56f79c53ebea97d29aa7/netty-codec-socks-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-codec/4.1.39.Final/38b9d79e31f6b00bd680f88c0289a2522d30d05b/netty-codec-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-transport-native-unix-common/4.1.39.Final/e5d94d2f6847919afbbfdb08a7a9e1f9ae19b101/netty-transport-native-unix-common-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-transport/4.1.39.Final/25374210da8a561689c4280e9d5661ff5dee30b7/netty-transport-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-buffer/4.1.39.Final/3518c7c7d0097460eeeaba32fb0c241b9cbe628a/netty-buffer-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.projectreactor/reactor-core/3.3.0.RELEASE/4824f980e5696e95289d5fb0de62e3d34508b358/reactor-core-3.3.0.RELEASE.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.25/da76ca59f6a57ee3102f8f9bd9cee742973efa8a/slf4j-api-1.7.25.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-common/1.3.50/3d9cd3e1bc7b92e95f43d45be3bfbcf38e36ab87/kotlin-stdlib-common-1.3.50.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/org.jetbrains/annotations/13.0/919f0dfe192fb4e063e7dacadee7f8bb9a2672a9/annotations-13.0.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-resolver/4.1.39.Final/2ca0a547341ba72dacf60121302357e7ea110b96/netty-resolver-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-common/4.1.39.Final/9c8c6d0dd43ee26ec8052a42d3ee1113dc6c08ed/netty-common-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/org.reactivestreams/reactive-streams/1.0.3/d9fb7a7926ffa635b3dcaa5049fb2bfa25b3e7d0/reactive-streams-1.0.3.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.projectreactor.addons/reactor-pool/0.1.0.RELEASE/3aa0e33a1647a85e94bea47d7efb57c46977c71a/reactor-pool-0.1.0.RELEASE.jar RequestStreamRSocketServerKt
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Generating 10 random numbers
java.lang.IllegalArgumentException: promise already done: DefaultChannelPromise@74ed1d6a(failure: java.lang.UnsupportedOperationException)
	at io.netty.channel.AbstractChannelHandlerContext.isNotValidPromise(AbstractChannelHandlerContext.java:891)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:773)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:701)
	at io.netty.handler.ssl.SslHandler.finishWrap(SslHandler.java:899)
	at io.netty.handler.ssl.SslHandler.wrap(SslHandler.java:885)
	at io.netty.handler.ssl.SslHandler.wrapAndFlush(SslHandler.java:797)
	at io.netty.handler.ssl.SslHandler.flush(SslHandler.java:778)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:749)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:741)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:727)
	at reactor.netty.channel.MonoSendMany$SendManyInner$AsyncFlush.run(MonoSendMany.java:621)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:515)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

Client log:

/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/bin/java "-javaagent:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=52243:/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath /Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_201.jdk/Contents/Home/lib/tools.jar:/Users/jose/git/codependent/github/rsocket-simple-client/build/classes/java/main:/Users/jose/git/codependent/github/rsocket-simple-client/out/production/resources:/Users/jose/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-jdk8/1.3.50/bf65725d4ae2cf00010d84e945fcbc201f590e11/kotlin-stdlib-jdk8-1.3.50.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.rsocket/rsocket-transport-netty/1.0.0-RC5/7d0093068e332fcbfa3e9f5de971174a795a9122/rsocket-transport-netty-1.0.0-RC5.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.rsocket/rsocket-core/1.0.0-RC5/fbe165e1e57c5748a40af66832206c4616aa6290/rsocket-core-1.0.0-RC5.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-jdk7/1.3.50/50ad05ea1c2595fb31b800e76db464d08d599af3/kotlin-stdlib-jdk7-1.3.50.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib/1.3.50/b529d1738c7e98bbfa36a4134039528f2ce78ebf/kotlin-stdlib-1.3.50.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.projectreactor.netty/reactor-netty/0.9.0.RELEASE/f0a0ae4e38ad8b36596ffe4bf82519cf8fc4adfb/reactor-netty-0.9.0.RELEASE.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-codec-http2/4.1.39.Final/6e4660fb8b1054e34e09aa95a10115edf0d74f37/netty-codec-http2-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-handler-proxy/4.1.39.Final/8a5c8a0b4ceb75531d04a14e0e65839ee07f2378/netty-handler-proxy-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-codec-http/4.1.39.Final/732d06961162e27fa3ae5989541c4460853745d3/netty-codec-http-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-handler/4.1.39.Final/4a63b56de071c1b10a56b5d90095e4201ea4098f/netty-handler-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-transport-native-epoll/4.1.39.Final/ab86de9bb5fccbfb60a9c0036a3516ad9b8befbb/netty-transport-native-epoll-4.1.39.Final-linux-x86_64.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-codec-socks/4.1.39.Final/adc3df7362874b53c11e56f79c53ebea97d29aa7/netty-codec-socks-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-codec/4.1.39.Final/38b9d79e31f6b00bd680f88c0289a2522d30d05b/netty-codec-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-transport-native-unix-common/4.1.39.Final/e5d94d2f6847919afbbfdb08a7a9e1f9ae19b101/netty-transport-native-unix-common-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-transport/4.1.39.Final/25374210da8a561689c4280e9d5661ff5dee30b7/netty-transport-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-buffer/4.1.39.Final/3518c7c7d0097460eeeaba32fb0c241b9cbe628a/netty-buffer-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.projectreactor/reactor-core/3.3.0.RELEASE/4824f980e5696e95289d5fb0de62e3d34508b358/reactor-core-3.3.0.RELEASE.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.25/da76ca59f6a57ee3102f8f9bd9cee742973efa8a/slf4j-api-1.7.25.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-common/1.3.50/3d9cd3e1bc7b92e95f43d45be3bfbcf38e36ab87/kotlin-stdlib-common-1.3.50.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/org.jetbrains/annotations/13.0/919f0dfe192fb4e063e7dacadee7f8bb9a2672a9/annotations-13.0.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-resolver/4.1.39.Final/2ca0a547341ba72dacf60121302357e7ea110b96/netty-resolver-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.netty/netty-common/4.1.39.Final/9c8c6d0dd43ee26ec8052a42d3ee1113dc6c08ed/netty-common-4.1.39.Final.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/org.reactivestreams/reactive-streams/1.0.3/d9fb7a7926ffa635b3dcaa5049fb2bfa25b3e7d0/reactive-streams-1.0.3.jar:/Users/jose/.gradle/caches/modules-2/files-2.1/io.projectreactor.addons/reactor-pool/0.1.0.RELEASE/3aa0e33a1647a85e94bea47d7efb57c46977c71a/reactor-pool-0.1.0.RELEASE.jar RequestStreamRSocketClientKt
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
java.nio.channels.ClosedChannelException
	at io.rsocket.RSocketRequester.terminate(RSocketRequester.java:476)
	at io.rsocket.RSocketRequester.lambda$new$0(RSocketRequester.java:94)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:156)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:139)
	at reactor.core.publisher.MonoProcessor$NextInner.onComplete(MonoProcessor.java:518)
	at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:308)
	at reactor.core.publisher.MonoProcessor.onComplete(MonoProcessor.java:265)
	at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:23)
	at io.rsocket.transport.netty.TcpDuplexConnection.lambda$new$0(TcpDuplexConnection.java:61)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:500)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:493)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:472)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:413)
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:538)
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:527)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:98)
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
	at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1156)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:758)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:734)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:605)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.close(DefaultChannelPipeline.java:1363)
	at io.netty.channel.AbstractChannelHandlerContext.invokeClose(AbstractChannelHandlerContext.java:621)
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:605)
	at io.netty.channel.AbstractChannelHandlerContext.close(AbstractChannelHandlerContext.java:467)
	at io.netty.handler.ssl.SslHandler.exceptionCaught(SslHandler.java:1092)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:297)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:276)
	at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:268)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.exceptionCaught(DefaultChannelPipeline.java:1388)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:297)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:276)
	at io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:918)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:125)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:174)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

I've searched everywhere for a TLS RSocket example but found nothing. Also the docs don't show anything in this regard so I'm not sure whether I'm doing something wrong or there's an actual problem when using TLS.

@linux-china
Copy link
Contributor

linux-china commented Nov 20, 2019

  • Generate rsocket.p12 keystore file. you can use mkcert to generate rsocket.p12
mkcert -pkcs12 rsocket
  • Put rsocket.p12 under the resources directory

  • Add following dependency:

 <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-tcnative-boringssl-static</artifactId>
            <version>2.0.27.Final</version>
        </dependency>
  • TCP SSL Uri handler: TcpSslUriHandler.java as following:
package com.foobar.rsocket.TcpSslUriHandler;

import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.ServerTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.uri.UriHandler;
import reactor.netty.tcp.TcpClient;
import reactor.netty.tcp.TcpServer;

import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URLDecoder;
import java.security.KeyStore;
import java.security.PrivateKey;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.*;

/**
 * A SSL implementation of {@link UriHandler} that creates {@link TcpClientTransport}s and {@link TcpServerTransport}s.
 *
 * @author linux_china
 */
public final class TcpSslUriHandler implements UriHandler {
    private static final String SCHEME = "tcps";
    private static final String DEFAULT_PASSWORD = "changeit";
    public static List<String> fingerPrintsSha256 = new ArrayList<>();
    public static String[] protocols = new String[]{"TLSv1.3", "TLSv.1.2"};

    @Override
    public Optional<ClientTransport> buildClient(URI uri) {
        Objects.requireNonNull(uri, "uri must not be null");

        if (!SCHEME.equals(uri.getScheme())) {
            return Optional.empty();
        }
        try {
            SslContext context = SslContextBuilder
                    .forClient()
                    .protocols(protocols)
                    .sslProvider(getSslProvider())
                    //.trustManager(new FingerPrintTrustManagerFactory(fingerPrintsSha256))
                    .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
            TcpClient tcpClient = TcpClient.create()
                    .host(uri.getHost())
                    .port(uri.getPort())
                    .secure(ssl -> ssl.sslContext(context));
            return Optional.of(TcpClientTransport.create(tcpClient));
        } catch (Exception e) {
            return Optional.empty();
        }
    }

    @Override
    public Optional<ServerTransport> buildServer(URI uri) {
        Objects.requireNonNull(uri, "uri must not be null");
        if (!SCHEME.equals(uri.getScheme())) {
            return Optional.empty();
        }
        try {
            KeyStore store = KeyStore.getInstance("PKCS12");
            Map<String, String> params = splitQuery(uri);
            char[] password = params.getOrDefault("password", DEFAULT_PASSWORD).toCharArray();
            String keyStore = params.getOrDefault("store", "/rsocket.p12");
            store.load(this.getClass().getResourceAsStream(keyStore), password);
            String alias = store.aliases().nextElement();
            Certificate certificate = store.getCertificate(alias);
            KeyStore.Entry entry = store.getEntry(alias, new KeyStore.PasswordProtection(password));
            PrivateKey privateKey = ((KeyStore.PrivateKeyEntry) entry).getPrivateKey();
            TcpServer tcpServer = TcpServer.create()
                    .host(uri.getHost())
                    .port(uri.getPort())
                    .secure(ssl -> ssl.sslContext(
                            SslContextBuilder.forServer(privateKey, (X509Certificate) certificate)
                                    .protocols(protocols)
                                    .sslProvider(getSslProvider())
                    ));
            return Optional.of(TcpServerTransport.create(tcpServer));
        } catch (Exception e) {
            return Optional.empty();
        }
    }

    private SslProvider getSslProvider() {
        if (OpenSsl.isAvailable()) {
            return SslProvider.OPENSSL_REFCNT;
        } else {
            return SslProvider.JDK;
        }
    }

    private Map<String, String> splitQuery(URI url) throws UnsupportedEncodingException {
        Map<String, String> query_pairs = new LinkedHashMap<>();
        String query = url.getQuery();
        if (query != null && !query.isEmpty()) {
            String[] pairs = query.split("&");
            for (String pair : pairs) {
                int idx = pair.indexOf("=");
                query_pairs.put(URLDecoder.decode(pair.substring(0, idx), "UTF-8"), URLDecoder.decode(pair.substring(idx + 1), "UTF-8"));
            }
        }
        return query_pairs;
    }
}
  • create UriHandler SPI file "META-INF/services/io.rsocket.uri.UriHandler" with following content
com.foobar.rsocket.TcpSslUriHandler
  • write your code as normal
RSocketFactory.receive()
                .acceptor((setup, sendingSocket) -> RSocketHandlerFactory.create(setup, sendingSocket))
                .transport(UriTransportRegistry.serverForUri("tcps://0.0.0.0:42252"))
                .start()
                .block();

@pckeyan
Copy link

pckeyan commented Nov 20, 2019

Try sending the Client Cert as below for 2 way SSL; it works for me:

.transport(() -> {
                            TcpClient tcpClient = TcpClient.create().host("localhost").port(7878);
                            return TcpClientTransport.create(keyCertChainFile != null ?
                                    tcpClient.secure(sslContextSpec -> sslContextSpec
                                            .sslContext(SslContextBuilder.forClient().keyManager("client-cert.pem", "client-key.pem")
                                                    .trustManager(
                                                            new File("cacert.pem")))) :
                                    tcpClient);
                        }

@codependent
Copy link
Author

Hi @pckeyan In my case I’d rather have only server authentication. The point is in my sample the secure TLS channel seems to be correctly stablished, the client request reaches the acceptor but for some reason it ends up throwing that exception...

@OlegDokuka
Copy link
Member

@codependent working on the example for you

@codependent
Copy link
Author

@OlegDokuka Thank you!!

@pckeyan
Copy link

pckeyan commented Nov 25, 2019

@codependent I tried without client certs but at least it expects trustmanager to be configured or OS to present the default trustmanger. I am not sure how we set the DefaultSSLFactory settings to netty via RSocket. @OlegDokuka might have an answer. Below code works without client certs:

.transport(() -> {
                            TcpClient tcpClient = TcpClient.create().host("localhost").port(9001);
                            return TcpClientTransport.create(
                                    tcpClient.secure(sslContextSpec -> sslContextSpec
                                            .sslContext(SslContextBuilder.forClient().trustManager(
                                                    new File("cacert.pem")))));
                        }

Server should not enable ClientAuth.

@codependent
Copy link
Author

codependent commented Nov 26, 2019

I've updated the sample with @pckeyan's latest suggestion. The use of the trustmanager is equivalent to setting this on the client, but I think the tm is a better approach:

    val path = RequestStreamRSocketClient::class.java.getResource("truststore.jks").path
    System.setProperty("javax.net.ssl.trustStore", path)
    System.setProperty("javax.net.ssl.trustStorePassword", "123456")

In any case, I keep getting the same errors :S

@codependent
Copy link
Author

Hi @OlegDokuka, could you have a look at this?

@rstoyanchev rstoyanchev added this to the 1.x Backlog milestone Apr 17, 2020
@rstoyanchev rstoyanchev changed the title TLS RSockets not working Document use of TLS with RSocket Apr 17, 2020
@rstoyanchev
Copy link
Contributor

For now please take a look at the Reactor Netty reference and #719 (comment)

@codependent
Copy link
Author

After upgrading to RC7 and replacing the deprecated classes the sample started working:

Server:

    val latch = CountDownLatch(1)
    
    RSocketServer.create()
        .payloadDecoder(PayloadDecoder.DEFAULT)
        .acceptor { setup, sendingSocket ->
            Mono.just(
                object : AbstractRSocket() {
                    override fun requestStream(payload: Payload): Flux<Payload> {
                        val randomNumberGenerator = Random(1234)
                        val numbers = payload.dataUtf8.toInt()
                        println("Generating $numbers random numbers")
                        return IntRange(1, numbers)
                            .map { DefaultPayload.create(randomNumberGenerator.nextUInt().toString().toByteArray()) }
                            .toList().toFlux()
                    }
                })
        }.bind(
            TcpServerTransport.create(TcpServer.create().port(7878)
                .secure {
                    it.sslContext(
                        SslContextBuilder.forServer(
                            File(RequestStreamRSocketServer::class.java.getResource("certificate.pem").toURI()),
                            File(RequestStreamRSocketServer::class.java.getResource("key.pem").toURI())
                        )
                    )
                })
        )
        .block()

    latch.await()

Client:

    val latch = CountDownLatch(1)

    val client = RSocketConnector.connectWith(
        TcpClientTransport.create(TcpClient.create().port(7878)
            .secure {
                it.sslContext(
                    SslContextBuilder.forClient().trustManager(
                        File(
                            RequestStreamRSocketClient::class.java.getResource(
                                "certificate.pem"
                            ).path
                        )
                    )
                )
            })
    ).block()!!

    client.requestStream(DefaultPayload.create("10"))
        .map { it.dataUtf8 }
        .doOnNext(System.out::println)
        .doOnComplete { latch.countDown() }
        .doOnError { it.printStackTrace() }
        .subscribe()

    latch.await()

@pckeyan
Copy link

pckeyan commented May 11, 2020

@codependent Sending TrustManager earlier as well worked for me as stated above. Did you find the root cause of this issue or @OlegDokuka can you please update on the root cause for our knowledge? Just curious to learn.

@yschimke
Copy link
Member

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Transoport-Netty
  
Awaiting triage
Development

No branches or pull requests

6 participants