Skip to content

Commit

Permalink
Better read/write operation control and other minor improvements in X…
Browse files Browse the repository at this point in the history
…-net.
  • Loading branch information
nmihajlovski committed Jul 18, 2015
1 parent 607f91b commit b399e58
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 22 deletions.
Expand Up @@ -57,6 +57,12 @@ public interface ProtocolContext<T> {

void log(String msg);

Channel nextOp(int nextOp);

Channel nextWrite();

Channel mode(int mode);

/* PROTOCOL */

boolean isInitial();
Expand Down
Expand Up @@ -129,14 +129,15 @@ protected final void insideLoop() {
try {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
synchronized (selectedKeys) {
if (!selectedKeys.isEmpty()) {
Iterator<?> iter = selectedKeys.iterator();

Iterator<?> iter = selectedKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();

while (iter.hasNext()) {
SelectionKey key = (SelectionKey) iter.next();
iter.remove();

processKey(key);
processKey(key);
}
}
}
} catch (ClosedSelectorException e) {
Expand Down
Expand Up @@ -89,6 +89,10 @@ public class RapidoidConnection implements Resetable, Channel, Constants {

private ChannelHolderImpl holder;

public volatile int nextOp = SelectionKey.OP_READ;

public volatile int mode = 0;

public RapidoidConnection(RapidoidWorker worker, BufGroup bufs) {
this.worker = worker;
this.input = bufs.newBuf("input#" + connId());
Expand Down Expand Up @@ -388,4 +392,21 @@ public ChannelHolder createHolder() {
return new ChannelHolderImpl(this);
}

@Override
public Channel nextOp(int nextOp) {
this.nextOp = nextOp;
return this;
}

@Override
public Channel nextWrite() {
return nextOp(SelectionKey.OP_WRITE);
}

@Override
public Channel mode(int mode) {
this.mode = mode;
return this;
}

}
Expand Up @@ -58,7 +58,7 @@ public class RapidoidWorker extends AbstractEventLoop<RapidoidWorker> {

private final Queue<RapidoidChannel> connected;

private final SimpleList<RapidoidConnection> done;
private final SimpleList<RapidoidConnection> waitingToWrite;

private final Pool<RapidoidConnection> connections;

Expand All @@ -85,7 +85,7 @@ public RapidoidWorker(String name, final BufGroup bufs, final Protocol protocol,

this.serverProtocol = protocol;
this.helper = helper;
this.maxPipelineSize = Conf.option("pipeline-max", Integer.MAX_VALUE);
this.maxPipelineSize = Conf.option("pipeline-max", 1000000L);
this.selectorTimeout = Conf.option("selector-timeout", 5);

final int queueSize = Conf.micro() ? 1000 : 1000000;
Expand All @@ -94,7 +94,8 @@ public RapidoidWorker(String name, final BufGroup bufs, final Protocol protocol,
this.restarting = new ArrayBlockingQueue<RapidoidConnection>(queueSize);
this.connecting = new ArrayBlockingQueue<ConnectionTarget>(queueSize);
this.connected = new ArrayBlockingQueue<RapidoidChannel>(queueSize);
this.done = new SimpleList<RapidoidConnection>(queueSize / 10, growFactor);
this.waitingToWrite = new SimpleList<RapidoidConnection>(queueSize / 10, growFactor);


connections = Pools.create("connections", new Callable<RapidoidConnection>() {
@Override
Expand Down Expand Up @@ -221,18 +222,18 @@ public void process(RapidoidConnection conn) {
private long processMsgs(RapidoidConnection conn) {
long reqN = 0;

while (reqN < maxPipelineSize && conn.input().hasRemaining() && processNext(conn, false)) {
while (reqN < maxPipelineSize && conn.input().hasRemaining() && processNext(conn, false, false)) {
reqN++;
}

return reqN;
}

private boolean processNext(RapidoidConnection conn, boolean initial) {
private boolean processNext(RapidoidConnection conn, boolean initial, boolean write) {

conn.log(initial ? "<< INIT >>" : "<< PROCESS >>");

U.must(initial || conn.input().hasRemaining());
U.must(initial || write || conn.input().hasRemaining());

// prepare for a rollback in case the message isn't complete yet
conn.input().checkpoint(conn.input().position());
Expand Down Expand Up @@ -366,9 +367,10 @@ protected void writeOP(SelectionKey key) throws IOException {
close(conn);
} else {
if (complete) {
key.interestOps(SelectionKey.OP_READ);
key.interestOps(conn.mode != 0 ? conn.mode : conn.nextOp);
processNext(conn, false, true);
} else {
key.interestOps(SelectionKey.OP_READ + SelectionKey.OP_WRITE);
key.interestOps(conn.mode != 0 ? conn.mode : (SelectionKey.OP_READ + SelectionKey.OP_WRITE));
}
conn.wrote(complete);
}
Expand All @@ -378,6 +380,8 @@ protected void writeOP(SelectionKey key) throws IOException {
}

public void wantToWrite(RapidoidConnection conn) {
U.must(conn.mode != SelectionKey.OP_READ);

if (onSameThread()) {
conn.key.interestOps(SelectionKey.OP_WRITE);
} else {
Expand All @@ -386,8 +390,8 @@ public void wantToWrite(RapidoidConnection conn) {
}

private void wantToWriteAsync(RapidoidConnection conn) {
synchronized (done) {
done.add(conn);
synchronized (waitingToWrite) {
waitingToWrite.add(conn);
}

selector.wakeup();
Expand Down Expand Up @@ -439,7 +443,7 @@ protected void doProcessing() {
}

try {
processNext(conn, true);
processNext(conn, true, false);
} finally {
conn.setInitial(false);
}
Expand All @@ -453,17 +457,17 @@ protected void doProcessing() {
while ((restartedConn = restarting.poll()) != null) {
Log.debug("restarting", "connection", restartedConn);

processNext(restartedConn, true);
processNext(restartedConn, true, false);
}

synchronized (done) {
for (int i = 0; i < done.size(); i++) {
RapidoidConnection conn = done.get(i);
synchronized (waitingToWrite) {
for (int i = 0; i < waitingToWrite.size(); i++) {
RapidoidConnection conn = waitingToWrite.get(i);
if (conn.key != null && conn.key.isValid()) {
conn.key.interestOps(SelectionKey.OP_WRITE);
}
}
done.clear();
waitingToWrite.clear();
}
}

Expand Down

0 comments on commit b399e58

Please sign in to comment.