diff --git a/reactor-netty-examples/src/main/java/reactor/netty/examples/tcp/securechat/SecureChatClient.java b/reactor-netty-examples/src/main/java/reactor/netty/examples/tcp/securechat/SecureChatClient.java new file mode 100644 index 0000000000..b7498c06d6 --- /dev/null +++ b/reactor-netty-examples/src/main/java/reactor/netty/examples/tcp/securechat/SecureChatClient.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2023 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.examples.tcp.securechat; + +import java.nio.charset.StandardCharsets; +import java.util.Scanner; + +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.Delimiters; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import reactor.core.publisher.Mono; +import reactor.netty.Connection; +import reactor.netty.tcp.TcpClient; +import reactor.netty.tcp.TcpSslContextSpec; + +public class SecureChatClient { + + private static final String HOST = System.getProperty("host", "127.0.0.1"); + + private static final int PORT = Integer.parseInt(System.getProperty("port", "8992")); + + private static final boolean WIRETAP = System.getProperty("wiretap") != null; + + public static void main(String[] args) { + TcpSslContextSpec tcpSslContextSpec = TcpSslContextSpec.forClient() + .configure(builder -> builder.trustManager(InsecureTrustManagerFactory.INSTANCE)); + + TcpClient client = TcpClient.create() + .host(HOST) + .port(PORT) + .wiretap(WIRETAP) + .doOnConnected(connection -> { + connection.addHandlerLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); + }).secure(spec -> spec.sslContext(tcpSslContextSpec)); + + Connection conn = client.connectNow(); + conn.inbound() + .receive() + .asString() + .subscribe(System.out::println); + + Scanner scanner = new Scanner(System.in, StandardCharsets.UTF_8.name()); + while (scanner.hasNext()) { + String text = scanner.nextLine(); + conn.outbound().sendString(Mono.just(text + "\r\n")) + .then() + .subscribe(); + + if ("bye".equalsIgnoreCase(text)) { + break; + } + } + + conn.onDispose().block(); + + } +} diff --git a/reactor-netty-examples/src/main/java/reactor/netty/examples/tcp/securechat/SecureChatServer.java b/reactor-netty-examples/src/main/java/reactor/netty/examples/tcp/securechat/SecureChatServer.java new file mode 100644 index 0000000000..3ef757dba1 --- /dev/null +++ b/reactor-netty-examples/src/main/java/reactor/netty/examples/tcp/securechat/SecureChatServer.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2023 VMware, Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package reactor.netty.examples.tcp.securechat; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.security.cert.CertificateException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import javax.net.ssl.SSLSession; + +import io.netty.channel.ChannelId; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.Delimiters; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.ssl.util.SelfSignedCertificate; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.netty.Connection; +import reactor.netty.tcp.TcpServer; +import reactor.netty.tcp.TcpSslContextSpec; + +public class SecureChatServer { + + private static final int PORT = Integer.parseInt(System.getProperty("port", "8992")); + + private static final boolean WIRETAP = System.getProperty("wiretap") != null; + + public static void main(String[] args) throws UnknownHostException, CertificateException { + ConcurrentHashMap conns = new ConcurrentHashMap<>(); + String hostname = InetAddress.getLocalHost().getHostName(); + + TcpServer server = TcpServer.create() + .port(PORT) + .wiretap(WIRETAP) + .doOnConnection(connection -> { + // cache the new connection. it'll be needed later + // when the server broadcasts messages from other clients. + ChannelId id = connection.channel().id(); + conns.put(id, connection); + connection.addHandlerLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); + connection.onDispose(() -> conns.remove(id)); + }).handle((in, out) -> { + List welcomeTexts = new ArrayList<>(); + welcomeTexts.add("Welcome to " + hostname + " secure chat service!\n"); + in.withConnection(connection -> { + SslHandler handler = connection.channel().pipeline().get(SslHandler.class); + SSLSession session = handler.engine().getSession(); + String cipherSuite = session.getCipherSuite(); + String msg = "Your session is protected by " + cipherSuite + " cipher suite.\n"; + welcomeTexts.add(msg); + }); + + Flux welcomeFlux = Flux.fromIterable(welcomeTexts); + + Flux flux = in.receive() + .asString() + .takeUntil("bye"::equalsIgnoreCase) + .handle((text, sink) -> { + in.withConnection(current -> { + for (Connection conn : conns.values()) { + if (conn == current) { + sink.next(text); + } + else { + String msg = "[" + conn.channel().remoteAddress() + "] " + text + '\n'; + + conn.outbound().sendString(Mono.just(msg)).then().subscribe(); + } + } + }); + }) + .map(msg -> "[you] " + msg + '\n'); + + + return out.sendString(Flux.concat(welcomeFlux, flux)); + + }); + + SelfSignedCertificate ssc = new SelfSignedCertificate(); + server = server.secure(spec -> spec.sslContext(TcpSslContextSpec.forServer(ssc.certificate(), ssc.privateKey()))); + server.bindNow().onDispose().block(); + } +}