diff --git a/reactivesocket-transport-jsr-356/build.gradle b/reactivesocket-transport-jsr-356/build.gradle deleted file mode 100644 index d78cacae5..000000000 --- a/reactivesocket-transport-jsr-356/build.gradle +++ /dev/null @@ -1,8 +0,0 @@ -dependencies { - compile project(':reactivesocket-core') - compile 'org.glassfish.tyrus:tyrus-client:1.12' - - testCompile 'org.glassfish.tyrus:tyrus-server:1.12' - testCompile 'org.glassfish.tyrus:tyrus-container-grizzly-server:1.12' - testCompile project(':reactivesocket-test') -} \ No newline at end of file diff --git a/reactivesocket-transport-jsr-356/src/main/java/io/reactivesocket/javax/websocket/WebSocketDuplexConnection.java b/reactivesocket-transport-jsr-356/src/main/java/io/reactivesocket/javax/websocket/WebSocketDuplexConnection.java deleted file mode 100644 index 04c978912..000000000 --- a/reactivesocket-transport-jsr-356/src/main/java/io/reactivesocket/javax/websocket/WebSocketDuplexConnection.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.javax.websocket; - -import io.reactivesocket.DuplexConnection; -import io.reactivesocket.Frame; -import io.reactivesocket.rx.Completable; -import io.reactivesocket.rx.Observable; -import org.reactivestreams.Publisher; -import rx.RxReactiveStreams; -import rx.Subscription; -import rx.subscriptions.BooleanSubscription; - -import javax.websocket.Session; -import java.io.IOException; - -public class WebSocketDuplexConnection implements DuplexConnection { - private final Session session; - private final rx.Observable input; - - public WebSocketDuplexConnection(Session session, rx.Observable input) { - this.session = session; - this.input = input; - } - - @Override - public Observable getInput() { - return o -> { - Subscription subscription = input.subscribe(o::onNext, o::onError, o::onComplete); - o.onSubscribe(subscription::unsubscribe); - }; - } - - @Override - public void addOutput(Publisher o, Completable callback) { - rx.Completable sent = rx.Completable.concat(RxReactiveStreams.toObservable(o).map(frame -> - rx.Completable.create(s -> { - BooleanSubscription bs = new BooleanSubscription(); - s.onSubscribe(bs); - session.getAsyncRemote().sendBinary(frame.getByteBuffer(), result -> { - if (!bs.isUnsubscribed()) { - if (result.isOK()) { - s.onCompleted(); - } else { - s.onError(result.getException()); - } - } - }); - }) - )); - - sent.subscribe(new rx.Completable.CompletableSubscriber() { - @Override - public void onCompleted() { - callback.success(); - } - - @Override - public void onError(Throwable e) { - callback.error(e); - } - - @Override - public void onSubscribe(Subscription s) { - } - }); - } - - @Override - public double availability() { - return session.isOpen() ? 1.0 : 0.0; - } - - @Override - public void close() throws IOException { - session.close(); - } - - public String toString() { - if (session == null) { - return getClass().getName() + ":session=null"; - } - - return getClass().getName() + ":session=[" + session.toString() + "]"; - - } -} diff --git a/reactivesocket-transport-jsr-356/src/main/java/io/reactivesocket/javax/websocket/client/ReactiveSocketWebSocketClient.java b/reactivesocket-transport-jsr-356/src/main/java/io/reactivesocket/javax/websocket/client/ReactiveSocketWebSocketClient.java deleted file mode 100644 index d952b8ee5..000000000 --- a/reactivesocket-transport-jsr-356/src/main/java/io/reactivesocket/javax/websocket/client/ReactiveSocketWebSocketClient.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.javax.websocket.client; - -import io.reactivesocket.Frame; -import io.reactivesocket.javax.websocket.WebSocketDuplexConnection; -import org.glassfish.tyrus.client.ClientManager; -import org.glassfish.tyrus.client.ClientProperties; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import rx.Observable; -import rx.subjects.PublishSubject; - -import javax.websocket.*; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; - -public class ReactiveSocketWebSocketClient extends Endpoint { - private final PublishSubject input = PublishSubject.create(); - private final Subscriber subscriber; - - public ReactiveSocketWebSocketClient(Subscriber subscriber) { - this.subscriber = subscriber; - } - - public Observable getInput() { - return input; - } - - public static Publisher create(SocketAddress socketAddress, String path, ClientManager clientManager) { - if (socketAddress instanceof InetSocketAddress) { - InetSocketAddress address = (InetSocketAddress)socketAddress; - try { - return create(new URI("ws", null, address.getHostName(), address.getPort(), path, null, null), clientManager); - } catch (URISyntaxException e) { - throw new IllegalArgumentException(e.getMessage(), e); - } - } else { - throw new IllegalArgumentException("unknown socket address type => " + socketAddress.getClass()); - } - } - - public static Publisher create(URI uri, ClientManager clientManager) { - return s -> { - try { - clientManager.getProperties().put(ClientProperties.RECONNECT_HANDLER, new ClientManager.ReconnectHandler() { - @Override - public boolean onConnectFailure(Exception exception) { - s.onError(exception); - return false; - } - }); - ReactiveSocketWebSocketClient endpoint = new ReactiveSocketWebSocketClient(s); - clientManager.asyncConnectToServer(endpoint, null, uri); - } catch (DeploymentException e) { - s.onError(e); - } - }; - } - - @Override - public void onOpen(Session session, EndpointConfig config) { - subscriber.onNext(new WebSocketDuplexConnection(session, input)); - subscriber.onComplete(); - session.addMessageHandler(new MessageHandler.Whole() { - @Override - public void onMessage(ByteBuffer message) { - Frame frame = Frame.from(message); - input.onNext(frame); - } - }); - } - - @Override - public void onClose(Session session, CloseReason closeReason) { - input.onCompleted(); - } - - @Override - public void onError(Session session, Throwable thr) { - input.onError(thr); - } -} diff --git a/reactivesocket-transport-jsr-356/src/main/java/io/reactivesocket/javax/websocket/client/WebSocketReactiveSocketConnector.java b/reactivesocket-transport-jsr-356/src/main/java/io/reactivesocket/javax/websocket/client/WebSocketReactiveSocketConnector.java deleted file mode 100644 index 51ce9120d..000000000 --- a/reactivesocket-transport-jsr-356/src/main/java/io/reactivesocket/javax/websocket/client/WebSocketReactiveSocketConnector.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.javax.websocket.client; - -import io.reactivesocket.*; -import io.reactivesocket.javax.websocket.WebSocketDuplexConnection; -import io.reactivesocket.rx.Completable; -import org.glassfish.tyrus.client.ClientManager; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rx.Observable; -import rx.RxReactiveStreams; - -import java.net.SocketAddress; -import java.util.function.Consumer; - -/** - * An implementation of {@link ReactiveSocketConnector} that creates JSR-356 WebSocket ReactiveSockets. - */ -public class WebSocketReactiveSocketConnector implements ReactiveSocketConnector { - private static final Logger logger = LoggerFactory.getLogger(WebSocketReactiveSocketConnector.class); - - private final ConnectionSetupPayload connectionSetupPayload; - private final Consumer errorStream; - private final String path; - private final ClientManager clientManager; - - public WebSocketReactiveSocketConnector(String path, ClientManager clientManager, ConnectionSetupPayload connectionSetupPayload, Consumer errorStream) { - this.connectionSetupPayload = connectionSetupPayload; - this.errorStream = errorStream; - this.path = path; - this.clientManager = clientManager; - } - - @Override - public Publisher connect(SocketAddress address) { - Publisher connection - = ReactiveSocketWebSocketClient.create(address, path, clientManager); - - Observable result = Observable.create(s -> - connection.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - s.request(1); - } - - @Override - public void onNext(WebSocketDuplexConnection connection) { - ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(connection, connectionSetupPayload, errorStream); - reactiveSocket.start(new Completable() { - @Override - public void success() { - s.onNext(reactiveSocket); - s.onCompleted(); - } - - @Override - public void error(Throwable e) { - s.onError(e); - } - }); - } - - @Override - public void onError(Throwable t) { - s.onError(t); - } - - @Override - public void onComplete() { - } - }) - ); - - return RxReactiveStreams.toPublisher(result); - } -} diff --git a/reactivesocket-transport-jsr-356/src/main/java/io/reactivesocket/javax/websocket/server/ReactiveSocketWebSocketServer.java b/reactivesocket-transport-jsr-356/src/main/java/io/reactivesocket/javax/websocket/server/ReactiveSocketWebSocketServer.java deleted file mode 100644 index 085e57cd9..000000000 --- a/reactivesocket-transport-jsr-356/src/main/java/io/reactivesocket/javax/websocket/server/ReactiveSocketWebSocketServer.java +++ /dev/null @@ -1,102 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.javax.websocket.server; - -import io.reactivesocket.ConnectionSetupHandler; -import io.reactivesocket.DefaultReactiveSocket; -import io.reactivesocket.Frame; -import io.reactivesocket.LeaseGovernor; -import io.reactivesocket.ReactiveSocket; -import io.reactivesocket.javax.websocket.WebSocketDuplexConnection; -import io.reactivesocket.rx.Completable; -import org.agrona.LangUtil; -import rx.subjects.PublishSubject; - -import javax.websocket.CloseReason; -import javax.websocket.Endpoint; -import javax.websocket.EndpointConfig; -import javax.websocket.MessageHandler; -import javax.websocket.Session; -import java.nio.ByteBuffer; -import java.util.concurrent.ConcurrentHashMap; - -public class ReactiveSocketWebSocketServer extends Endpoint { - private final PublishSubject input = PublishSubject.create(); - private final ConcurrentHashMap reactiveSockets = new ConcurrentHashMap<>(); - - private final ConnectionSetupHandler setupHandler; - private final LeaseGovernor leaseGovernor; - - protected ReactiveSocketWebSocketServer(ConnectionSetupHandler setupHandler, LeaseGovernor leaseGovernor) { - this.setupHandler = setupHandler; - this.leaseGovernor = leaseGovernor; - } - - protected ReactiveSocketWebSocketServer(ConnectionSetupHandler setupHandler) { - this(setupHandler, LeaseGovernor.UNLIMITED_LEASE_GOVERNOR); - } - - @Override - public void onOpen(Session session, EndpointConfig config) { - session.addMessageHandler(new MessageHandler.Whole() { - @Override - public void onMessage(ByteBuffer message) { - Frame frame = Frame.from(message); - input.onNext(frame); - } - }); - - WebSocketDuplexConnection connection = new WebSocketDuplexConnection(session, input); - - final ReactiveSocket reactiveSocket = reactiveSockets.computeIfAbsent(session.getId(), id -> - DefaultReactiveSocket.fromServerConnection( - connection, - setupHandler, - leaseGovernor, - t -> t.printStackTrace() - ) - ); - - reactiveSocket.start(new Completable() { - @Override - public void success() { - } - - @Override - public void error(Throwable e) { - e.printStackTrace(); - } - }); - } - - @Override - public void onClose(Session session, CloseReason closeReason) { - input.onCompleted(); - try { - ReactiveSocket reactiveSocket = reactiveSockets.remove(session.getId()); - if (reactiveSocket != null) { - reactiveSocket.close(); - } - } catch (Exception e) { - LangUtil.rethrowUnchecked(e); - } - } - - @Override - public void onError(Session session, Throwable thr) { - input.onError(thr); - } -} diff --git a/reactivesocket-transport-jsr-356/src/test/java/io/reactivesocket/javax/websocket/ClientServerEndpoint.java b/reactivesocket-transport-jsr-356/src/test/java/io/reactivesocket/javax/websocket/ClientServerEndpoint.java deleted file mode 100644 index 669e21d3f..000000000 --- a/reactivesocket-transport-jsr-356/src/test/java/io/reactivesocket/javax/websocket/ClientServerEndpoint.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.javax.websocket; - -import io.reactivesocket.Payload; -import io.reactivesocket.RequestHandler; -import io.reactivesocket.javax.websocket.server.ReactiveSocketWebSocketServer; -import io.reactivesocket.test.TestUtil; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import rx.Observable; -import rx.RxReactiveStreams; - -public class ClientServerEndpoint extends ReactiveSocketWebSocketServer { - public ClientServerEndpoint() { - super((setupPayload, rs) -> new RequestHandler() { - @Override - public Publisher handleRequestResponse(Payload payload) { - return s -> { - //System.out.println("Handling request/response payload => " + s.toString()); - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - s.onNext(response); - s.onComplete(); - }; - } - - @Override - public Publisher handleRequestStream(Payload payload) { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response)); - } - - @Override - public Publisher handleSubscription(Payload payload) { - Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata"); - - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response) - .repeat()); - } - - @Override - public Publisher handleFireAndForget(Payload payload) { - return Subscriber::onComplete; - } - - @Override - public Publisher handleChannel(Payload initialPayload, Publisher inputs) { - return null; - } - - @Override - public Publisher handleMetadataPush(Payload payload) { - return null; - } - }); - } -} diff --git a/reactivesocket-transport-jsr-356/src/test/java/io/reactivesocket/javax/websocket/ClientServerTest.java b/reactivesocket-transport-jsr-356/src/test/java/io/reactivesocket/javax/websocket/ClientServerTest.java deleted file mode 100644 index bab516104..000000000 --- a/reactivesocket-transport-jsr-356/src/test/java/io/reactivesocket/javax/websocket/ClientServerTest.java +++ /dev/null @@ -1,162 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.javax.websocket; - -import io.reactivesocket.ConnectionSetupPayload; -import io.reactivesocket.DefaultReactiveSocket; -import io.reactivesocket.Payload; -import io.reactivesocket.ReactiveSocket; -import io.reactivesocket.javax.websocket.client.ReactiveSocketWebSocketClient; -import io.reactivesocket.test.TestUtil; -import org.glassfish.tyrus.client.ClientManager; -import org.glassfish.tyrus.server.Server; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; -import rx.Observable; -import rx.RxReactiveStreams; -import rx.observers.TestSubscriber; - -import javax.websocket.DeploymentException; -import javax.websocket.Endpoint; -import javax.websocket.server.ServerApplicationConfig; -import javax.websocket.server.ServerEndpointConfig; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URISyntaxException; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -public class ClientServerTest { - - static ReactiveSocket client; - static Server server; - - public static class ApplicationConfig implements ServerApplicationConfig { - @Override - public Set getEndpointConfigs(Set> endpointClasses) { - Set cfgs = new HashSet<>(); - cfgs.add(ServerEndpointConfig.Builder - .create(ClientServerEndpoint.class, "/rs") - .build()); - return cfgs; - } - - @Override - public Set> getAnnotatedEndpointClasses(Set> scanned) { - return Collections.emptySet(); - } - } - - @BeforeClass - public static void setup() throws URISyntaxException, DeploymentException, IOException { - server = new Server("localhost", 8025, null, null, ApplicationConfig.class); - server.start(); - - WebSocketDuplexConnection duplexConnection = RxReactiveStreams.toObservable( - ReactiveSocketWebSocketClient.create(InetSocketAddress.createUnresolved("localhost", 8025), "/rs", ClientManager.createClient()) - ).toBlocking().single(); - - client = DefaultReactiveSocket.fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), t -> t.printStackTrace()); - - client.startAndWait(); - } - - @AfterClass - public static void tearDown() { - //server.shutdown(); - server.stop(); - } - - @Test - public void testRequestResponse1() { - requestResponseN(1500, 1); - } - - @Test - public void testRequestResponse10() { - requestResponseN(1500, 10); - } - - - @Test - public void testRequestResponse100() { - requestResponseN(1500, 100); - } - - @Test - public void testRequestResponse10_000() { - requestResponseN(60_000, 10_000); - } - - @Test - public void testRequestStream() { - TestSubscriber ts = TestSubscriber.create(); - - RxReactiveStreams - .toObservable(client.requestStream(TestUtil.utf8EncodedPayload("hello", "metadata"))) - .subscribe(ts); - - - ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS); - ts.assertValueCount(10); - ts.assertNoErrors(); - ts.assertCompleted(); - } - - @Test - public void testRequestSubscription() throws InterruptedException { - TestSubscriber ts = TestSubscriber.create(); - - RxReactiveStreams - .toObservable(client.requestSubscription(TestUtil.utf8EncodedPayload("hello sub", "metadata sub"))) - .take(10) - .subscribe(ts); - - ts.awaitTerminalEvent(3_000, TimeUnit.MILLISECONDS); - ts.assertValueCount(10); - ts.assertNoErrors(); - } - - - public void requestResponseN(int timeout, int count) { - - TestSubscriber ts = TestSubscriber.create(); - - Observable - .range(1, count) - .flatMap(i -> - RxReactiveStreams - .toObservable( - client.requestResponse(TestUtil.utf8EncodedPayload("hello", "metadata")) - ) - .map(payload -> - TestUtil.byteToString(payload.getData()) - ) - //.doOnNext(System.out::println) - ) - .subscribe(ts); - - ts.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS); - ts.assertValueCount(count); - ts.assertNoErrors(); - ts.assertCompleted(); - } - - -} diff --git a/reactivesocket-transport-jsr-356/src/test/java/io/reactivesocket/javax/websocket/Ping.java b/reactivesocket-transport-jsr-356/src/test/java/io/reactivesocket/javax/websocket/Ping.java deleted file mode 100644 index 20193551f..000000000 --- a/reactivesocket-transport-jsr-356/src/test/java/io/reactivesocket/javax/websocket/Ping.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.javax.websocket; - -import io.reactivesocket.ConnectionSetupPayload; -import io.reactivesocket.DefaultReactiveSocket; -import io.reactivesocket.Payload; -import io.reactivesocket.ReactiveSocket; -import io.reactivesocket.javax.websocket.client.ReactiveSocketWebSocketClient; -import org.HdrHistogram.Recorder; -import org.glassfish.tyrus.client.ClientManager; -import org.reactivestreams.Publisher; -import rx.Observable; -import rx.RxReactiveStreams; -import rx.Subscriber; -import rx.schedulers.Schedulers; - -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -public class Ping { - public static void main(String... args) throws Exception { - Publisher publisher = ReactiveSocketWebSocketClient.create(InetSocketAddress.createUnresolved("localhost", 8025), "/rs", ClientManager.createClient()); - - WebSocketDuplexConnection duplexConnection = RxReactiveStreams.toObservable(publisher).toBlocking().single(); - ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8")); - - reactiveSocket.startAndWait(); - - byte[] data = "hello".getBytes(); - - Payload keyPayload = new Payload() { - @Override - public ByteBuffer getData() { - return ByteBuffer.wrap(data); - } - - @Override - public ByteBuffer getMetadata() { - return null; - } - }; - - int n = 1_000_000; - CountDownLatch latch = new CountDownLatch(n); - final Recorder histogram = new Recorder(3600000000000L, 3); - - Schedulers - .computation() - .createWorker() - .schedulePeriodically(() -> { - System.out.println("---- PING/ PONG HISTO ----"); - histogram.getIntervalHistogram() - .outputPercentileDistribution(System.out, 5, 1000.0, false); - System.out.println("---- PING/ PONG HISTO ----"); - }, 10, 10, TimeUnit.SECONDS); - - Observable - .range(1, Integer.MAX_VALUE) - .flatMap(i -> { - long start = System.nanoTime(); - - return RxReactiveStreams - .toObservable( - reactiveSocket - .requestResponse(keyPayload)) - .doOnNext(s -> { - long diff = System.nanoTime() - start; - histogram.recordValue(diff); - }); - }) - .subscribe(new Subscriber() { - @Override - public void onCompleted() { - - } - - @Override - public void onError(Throwable e) { - e.printStackTrace(); - } - - @Override - public void onNext(Payload payload) { - latch.countDown(); - } - }); - - latch.await(1, TimeUnit.HOURS); - System.out.println("Sent => " + n); - System.exit(0); - } -} diff --git a/reactivesocket-transport-jsr-356/src/test/java/io/reactivesocket/javax/websocket/Pong.java b/reactivesocket-transport-jsr-356/src/test/java/io/reactivesocket/javax/websocket/Pong.java deleted file mode 100644 index c55e3dc87..000000000 --- a/reactivesocket-transport-jsr-356/src/test/java/io/reactivesocket/javax/websocket/Pong.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.javax.websocket; - -import org.glassfish.tyrus.server.Server; - -import javax.websocket.DeploymentException; -import javax.websocket.Endpoint; -import javax.websocket.server.ServerApplicationConfig; -import javax.websocket.server.ServerEndpointConfig; -import java.util.Collections; -import java.util.HashSet; -import java.util.Random; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -public class Pong { - public static class ApplicationConfig implements ServerApplicationConfig { - @Override - public Set getEndpointConfigs(Set> endpointClasses) { - Set cfgs = new HashSet<>(); - cfgs.add(ServerEndpointConfig.Builder - .create(PongEndpoint.class, "/rs") - .build()); - return cfgs; - } - - @Override - public Set> getAnnotatedEndpointClasses(Set> scanned) { - return Collections.emptySet(); - } - } - - public static void main(String... args) throws DeploymentException { - byte[] response = new byte[1024]; - Random r = new Random(); - r.nextBytes(response); - - Server server = new Server("localhost", 8025, null, null, ApplicationConfig.class); - server.start(); - - // Tyrus spawns all of its threads as daemon threads so we need to prop open the JVM with a blocking call. - CountDownLatch latch = new CountDownLatch(1); - try { - latch.await(1, TimeUnit.HOURS); - } catch (InterruptedException e) { - System.out.println("Interrupted main thread"); - } finally { - server.stop(); - } - System.exit(0); - } -} diff --git a/reactivesocket-transport-jsr-356/src/test/java/io/reactivesocket/javax/websocket/PongEndpoint.java b/reactivesocket-transport-jsr-356/src/test/java/io/reactivesocket/javax/websocket/PongEndpoint.java deleted file mode 100644 index 127df6b8c..000000000 --- a/reactivesocket-transport-jsr-356/src/test/java/io/reactivesocket/javax/websocket/PongEndpoint.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Copyright 2015 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.reactivesocket.javax.websocket; - -import io.reactivesocket.Payload; -import io.reactivesocket.RequestHandler; -import io.reactivesocket.javax.websocket.server.ReactiveSocketWebSocketServer; -import io.reactivesocket.test.TestUtil; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import rx.Observable; -import rx.RxReactiveStreams; - -import java.nio.ByteBuffer; -import java.util.Random; - -public class PongEndpoint extends ReactiveSocketWebSocketServer { - static byte[] response = new byte[1024]; - static { - Random r = new Random(); - r.nextBytes(response); - } - - public PongEndpoint() { - super((setupPayload, rs) -> new RequestHandler() { - @Override - public Publisher handleRequestResponse(Payload payload) { - return new Publisher() { - @Override - public void subscribe(Subscriber s) { - Payload responsePayload = new Payload() { - ByteBuffer data = ByteBuffer.wrap(response); - ByteBuffer metadata = ByteBuffer.allocate(0); - - public ByteBuffer getData() { - return data; - } - - @Override - public ByteBuffer getMetadata() { - return metadata; - } - }; - - s.onNext(responsePayload); - s.onComplete(); - } - }; - } - - @Override - public Publisher handleRequestStream(Payload payload) { - Payload response1 = - TestUtil.utf8EncodedPayload("hello world", "metadata"); - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response1)); - } - - @Override - public Publisher handleSubscription(Payload payload) { - Payload response1 = - TestUtil.utf8EncodedPayload("hello world", "metadata"); - return RxReactiveStreams - .toPublisher(Observable - .range(1, 10) - .map(i -> response1)); - } - - @Override - public Publisher handleFireAndForget(Payload payload) { - return Subscriber::onComplete; - } - - @Override - public Publisher handleChannel(Payload initialPayload, Publisher inputs) { - Observable observable = - RxReactiveStreams - .toObservable(inputs) - .map(input -> input); - return RxReactiveStreams.toPublisher(observable); - } - - @Override - public Publisher handleMetadataPush(Payload payload) { - return null; - } - }); - } -} diff --git a/reactivesocket-transport-jsr-356/src/test/resources/simplelogger.properties b/reactivesocket-transport-jsr-356/src/test/resources/simplelogger.properties deleted file mode 100644 index 463129958..000000000 --- a/reactivesocket-transport-jsr-356/src/test/resources/simplelogger.properties +++ /dev/null @@ -1,35 +0,0 @@ -# SLF4J's SimpleLogger configuration file -# Simple implementation of Logger that sends all enabled log messages, for all defined loggers, to System.err. - -# Default logging detail level for all instances of SimpleLogger. -# Must be one of ("trace", "debug", "info", "warn", or "error"). -# If not specified, defaults to "info". -#org.slf4j.simpleLogger.defaultLogLevel=debug -org.slf4j.simpleLogger.defaultLogLevel=trace - -# Logging detail level for a SimpleLogger instance named "xxxxx". -# Must be one of ("trace", "debug", "info", "warn", or "error"). -# If not specified, the default logging detail level is used. -#org.slf4j.simpleLogger.log.xxxxx= - -# Set to true if you want the current date and time to be included in output messages. -# Default is false, and will output the number of milliseconds elapsed since startup. -org.slf4j.simpleLogger.showDateTime=true - -# The date and time format to be used in the output messages. -# The pattern describing the date and time format is the same that is used in java.text.SimpleDateFormat. -# If the format is not specified or is invalid, the default format is used. -# The default format is yyyy-MM-dd HH:mm:ss:SSS Z. -org.slf4j.simpleLogger.dateTimeFormat=HH:mm:ss - -# Set to true if you want to output the current thread name. -# Defaults to true. -org.slf4j.simpleLogger.showThreadName=true - -# Set to true if you want the Logger instance name to be included in output messages. -# Defaults to true. -org.slf4j.simpleLogger.showLogName=true - -# Set to true if you want the last component of the name to be included in output messages. -# Defaults to false. -org.slf4j.simpleLogger.showShortLogName=true \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 47d88f0fe..2fd29ba3d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,7 +1,6 @@ rootProject.name='reactivesocket' include 'reactivesocket-core' include 'reactivesocket-transport-aeron' -include 'reactivesocket-transport-jsr-356' include 'reactivesocket-transport-netty' include 'reactivesocket-transport-local' include 'reactivesocket-mime-types'