Skip to content

Commit

Permalink
Improved TCP client to support different endpoints and protocols.
Browse files Browse the repository at this point in the history
  • Loading branch information
nmihajlovski committed Apr 25, 2015
1 parent ee0d618 commit 7dd37a0
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 31 deletions.
Expand Up @@ -25,21 +25,25 @@

import org.rapidoid.annotation.Authors;
import org.rapidoid.annotation.Since;
import org.rapidoid.net.Protocol;
import org.rapidoid.util.U;

@Authors("Nikolche Mihajlovski")
@Since("2.0.0")
public class ConnectionTarget {

SocketChannel socketChannel;
volatile SocketChannel socketChannel;

InetSocketAddress addr;
final InetSocketAddress addr;

long retryAfter;
volatile long retryAfter;

public ConnectionTarget(SocketChannel socketChannel, InetSocketAddress addr) {
final Protocol protocol;

public ConnectionTarget(SocketChannel socketChannel, InetSocketAddress addr, Protocol protocol) {
this.socketChannel = socketChannel;
this.addr = addr;
this.protocol = protocol;
this.retryAfter = U.time();
}

Expand Down
Expand Up @@ -24,17 +24,20 @@

import org.rapidoid.annotation.Authors;
import org.rapidoid.annotation.Since;
import org.rapidoid.net.Protocol;

@Authors("Nikolche Mihajlovski")
@Since("2.0.0")
public class RapidoidChannel {

final SocketChannel socketChannel;
final boolean isClient;
final Protocol protocol;

public RapidoidChannel(SocketChannel socketChannel, boolean isClient) {
public RapidoidChannel(SocketChannel socketChannel, boolean isClient, Protocol protocol) {
this.socketChannel = socketChannel;
this.isClient = isClient;
this.protocol = protocol;
}

}
Expand Up @@ -55,6 +55,7 @@ public class RapidoidClientLoop extends AbstractEventLoop<TCPClient> implements
@Inject(optional = true)
private boolean noDelay = false;

// initial number of connections to the default server
@Inject(optional = true)
private int connections = 1;

Expand All @@ -64,6 +65,9 @@ public class RapidoidClientLoop extends AbstractEventLoop<TCPClient> implements

private final Class<? extends DefaultExchange<?, ?>> exchangeClass;

// round-robin workers for new connections
private int currentWorkerInd = 0;

public RapidoidClientLoop(Protocol protocol, Class<? extends DefaultExchange<?, ?>> exchangeClass,
Class<? extends RapidoidHelper> helperClass) {
super("client");
Expand All @@ -90,35 +94,44 @@ private void openSockets() throws IOException {

U.notNull(host, "host");

InetSocketAddress addr = new InetSocketAddress(host, port);

workers = new RapidoidWorker[workersN];

for (int i = 0; i < workers.length; i++) {
RapidoidHelper helper = Cls.newInstance(helperClass, exchangeClass);
String workerName = "client" + (i + 1);
workers[i] = new RapidoidWorker(workerName, new BufGroup(13), protocol, helper, bufSizeKB, noDelay);
workers[i] = new RapidoidWorker(workerName, new BufGroup(13), null, helper, bufSizeKB, noDelay);
new Thread(workers[i], workerName).start();
}

int workerInd = 0;

for (int c = 0; c < connections; c++) {
connect(host, port, protocol);
}
}

SocketChannel socketChannel = SocketChannel.open();
public synchronized void connect(String serverHost, int serverPort, Protocol clientProtocol) throws IOException {

InetSocketAddress addr = new InetSocketAddress(serverHost, serverPort);
SocketChannel socketChannel = socket();

workers[currentWorkerInd++].connect(new ConnectionTarget(socketChannel, addr, clientProtocol));

if ((socketChannel.isOpen())) {
workers[workerInd++].connect(new ConnectionTarget(socketChannel, addr));
if (currentWorkerInd == workers.length) {
currentWorkerInd = 0;
}
}

if (workerInd == workers.length) {
workerInd = 0;
}
protected static SocketChannel socket() {
try {
SocketChannel socketChannel = SocketChannel.open();

} else {
if (!socketChannel.isOpen()) {
throw U.rte("Cannot open socket!");
}
}

return socketChannel;
} catch (IOException e) {
throw U.rte("Cannot open socket!", e);
}
}

@Override
Expand Down
Expand Up @@ -61,12 +61,10 @@ public class RapidoidWorker extends AbstractEventLoop<RapidoidWorker> {

private final int maxPipelineSize;

final Protocol protocol;
final Protocol serverProtocol;

final RapidoidHelper helper;

private final boolean isProtocolListener;

private final int bufSize;

private final boolean noDelay;
Expand All @@ -78,7 +76,7 @@ public RapidoidWorker(String name, final BufGroup bufs, final Protocol protocol,
super(name);
this.bufs = bufs;

this.protocol = protocol;
this.serverProtocol = protocol;
this.helper = helper;
this.maxPipelineSize = Conf.option("pipeline-max", Integer.MAX_VALUE);

Expand All @@ -90,8 +88,6 @@ public RapidoidWorker(String name, final BufGroup bufs, final Protocol protocol,
this.connected = new ArrayBlockingQueue<RapidoidChannel>(queueSize);
this.done = new SimpleList<RapidoidConnection>(queueSize / 10, growFactor);

this.isProtocolListener = protocol instanceof CtxListener;

connections = new ArrayPool<RapidoidConnection>(new Callable<RapidoidConnection>() {
@Override
public RapidoidConnection call() throws Exception {
Expand All @@ -107,7 +103,7 @@ public void accept(SocketChannel socketChannel) throws IOException {

configureSocket(socketChannel);

connected.add(new RapidoidChannel(socketChannel, false));
connected.add(new RapidoidChannel(socketChannel, false, serverProtocol));
selector.wakeup();
}

Expand Down Expand Up @@ -146,7 +142,7 @@ protected void connectOP(SelectionKey key) throws IOException {
try {
ready = socketChannel.finishConnect();
U.rteIf(!ready, "Expected an established connection!");
connected.add(new RapidoidChannel(socketChannel, true));
connected.add(new RapidoidChannel(socketChannel, true, target.protocol));
} catch (ConnectException e) {
retryConnecting(target);
}
Expand Down Expand Up @@ -178,7 +174,7 @@ protected void readOP(SelectionKey key) throws IOException {
if (conn.isClient()) {
InetSocketAddress addr = conn.getAddress();
close(key);
retryConnecting(new ConnectionTarget(null, addr));
retryConnecting(new ConnectionTarget(null, addr, conn.getProtocol()));
} else {
close(key);
}
Expand Down Expand Up @@ -366,7 +362,7 @@ protected void doProcessing() {

try {
SelectionKey newKey = socketChannel.register(selector, SelectionKey.OP_READ);
RapidoidConnection conn = attachConn(newKey);
RapidoidConnection conn = attachConn(newKey, channel.protocol);
conn.setClient(channel.isClient);

try {
Expand Down Expand Up @@ -398,7 +394,7 @@ protected void doProcessing() {
}
}

private RapidoidConnection attachConn(SelectionKey key) {
private RapidoidConnection attachConn(SelectionKey key, Protocol protocol) {
Object attachment = key.attachment();
assert attachment == null || attachment instanceof ConnectionTarget;

Expand All @@ -409,8 +405,8 @@ private RapidoidConnection attachConn(SelectionKey key) {

conn.key = key;
conn.setProtocol(protocol);
if (isProtocolListener) {

if (protocol instanceof CtxListener) {
conn.setListener((CtxListener) protocol);
}

Expand Down

0 comments on commit 7dd37a0

Please sign in to comment.