Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions reactivesocket-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ dependencies {
compile project(':reactivesocket-transport-tcp')

compile project(':reactivesocket-test')
runtime group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.21'
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,53 @@
*/
package io.reactivesocket.examples;

import io.netty.channel.nio.NioEventLoopGroup;
import io.reactivesocket.ConnectionSetupHandler;
import io.reactivesocket.ConnectionSetupPayload;
import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.RequestHandler;
import io.reactivesocket.client.ClientBuilder;
import io.reactivesocket.test.TestUtil;
import io.reactivesocket.transport.tcp.client.TcpReactiveSocketConnector;
import io.reactivesocket.transport.tcp.server.TcpReactiveSocketServer;
import io.reactivesocket.util.Unsafe;
import io.reactivesocket.test.TestUtil;
import org.reactivestreams.Publisher;
import rx.Observable;
import rx.RxReactiveStreams;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

public final class EchoClient {

private static Publisher<List<SocketAddress>> source(SocketAddress sa) {
return sub -> sub.onNext(Collections.singletonList(sa));
}

public static void main(String... args) throws Exception {
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 8888);

ConnectionSetupHandler setupHandler = (setupPayload, reactiveSocket) -> {
return new RequestHandler.Builder()
.withRequestResponse(
payload -> RxReactiveStreams.toPublisher(Observable.just(payload)))
.build();
};

SocketAddress serverAddress = TcpReactiveSocketServer.create()
.start(setupHandler)
.getServerAddress();

ConnectionSetupPayload setupPayload =
ConnectionSetupPayload.create("UTF-8", "UTF-8", ConnectionSetupPayload.NO_FLAGS);

TcpReactiveSocketConnector tcp =
new TcpReactiveSocketConnector(new NioEventLoopGroup(8), setupPayload, System.err::println);
TcpReactiveSocketConnector tcp = TcpReactiveSocketConnector.create(setupPayload, Throwable::printStackTrace);

ReactiveSocket client = ClientBuilder.instance()
.withSource(source(address))
.withSource(RxReactiveStreams.toPublisher(Observable.just(Collections.singletonList(serverAddress))))
.withConnector(tcp)
.build();

Unsafe.awaitAvailability(client);

Payload request = TestUtil.utf8EncodedPayload("Hello", "META");
Payload response = Unsafe.blockingSingleWait(client.requestResponse(request), 1, TimeUnit.SECONDS);

System.out.println(response);
RxReactiveStreams.toObservable(client.requestResponse(request))
.map(TestUtil::dataAsString)
.toBlocking()
.forEach(System.out::println);
}
}
20 changes: 20 additions & 0 deletions reactivesocket-examples/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Copyright 2015 Netflix, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%c %d{dd MMM yyyy HH:mm:ss,SSS} %5p [%t] (%F:%L) - %m%n
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public static Payload utf8EncodedPayload(final String data, final String metadat
return new PayloadImpl(data, metadata);
}

public static String dataAsString(Payload payload) {
ByteBuffer data = payload.getData();
byte[] dst = new byte[data.remaining()];
data.get(dst);
return new String(dst);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll add a charset in another commit ;-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Steve - I'm looking out for the EBCDIC users of the world

}

public static String byteToString(ByteBuffer byteBuffer)
{
byteBuffer = byteBuffer.duplicate();
Expand Down
3 changes: 1 addition & 2 deletions reactivesocket-transport-tcp/build.gradle
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
dependencies {
compile project(':reactivesocket-core')
compile 'io.netty:netty-handler:4.1.0.CR7'
compile 'io.netty:netty-codec-http:4.1.0.CR7'
compile 'io.reactivex:rxnetty-tcp:0.5.2-rc.3'

testCompile project(':reactivesocket-test')
}
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.reactivesocket.transport.tcp;

import io.reactivesocket.Frame;
import io.reactivesocket.rx.Observer;
import rx.Subscriber;

public class ObserverSubscriber extends Subscriber<Frame> {

private final Observer<Frame> o;

public ObserverSubscriber(Observer<Frame> o) {
this.o = o;
}

@Override
public void onCompleted() {
o.onComplete();
}

@Override
public void onError(Throwable e) {
o.onError(e);
}

@Override
public void onNext(Frame frame) {
o.onNext(frame);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.reactivesocket.transport.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import io.reactivesocket.Frame;

import java.nio.ByteBuffer;

/**
* A Codec that aids reading and writing of ReactiveSocket {@link Frame}s.
*/
public class ReactiveSocketFrameCodec extends ChannelDuplexHandler {

private final MutableDirectByteBuf buffer = new MutableDirectByteBuf(Unpooled.buffer(0));
private final Frame frame = Frame.allocate(buffer);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
try {
buffer.wrap((ByteBuf) msg);
frame.wrap(buffer, 0);
ctx.fireChannelRead(frame);
} finally {
ReferenceCountUtil.release(msg);
}
} else {
super.channelRead(ctx, msg);
}
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof Frame) {
ByteBuffer src = ((Frame)msg).getByteBuffer();
ByteBuf toWrite = ctx.alloc().buffer(src.remaining()).writeBytes(src);
ctx.write(toWrite, promise);
} else {
super.write(ctx, msg, promise);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.reactivesocket.transport.tcp;

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.agrona.BitUtil;

public class ReactiveSocketLengthCodec extends LengthFieldBasedFrameDecoder {

public ReactiveSocketLengthCodec() {
super(Integer.MAX_VALUE, 0, BitUtil.SIZE_OF_INT, -1 * BitUtil.SIZE_OF_INT, 0);
}
}
Loading