Skip to content

Commit

Permalink
Configurable low-level network components.
Browse files Browse the repository at this point in the history
  • Loading branch information
nmihajlovski committed Dec 12, 2016
1 parent e714dce commit 099824c
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 46 deletions.
19 changes: 14 additions & 5 deletions rapidoid-buffer/src/main/java/org/rapidoid/buffer/BufGroup.java
Expand Up @@ -5,6 +5,7 @@
import org.rapidoid.annotation.Since;
import org.rapidoid.pool.Pool;
import org.rapidoid.pool.Pools;
import org.rapidoid.u.U;

import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -41,10 +42,14 @@ public class BufGroup extends RapidoidThing {

private final boolean synchronizedBuffers;

public BufGroup(int factor, boolean synchronizedBuffers) {
public BufGroup(final int capacity, boolean synchronizedBuffers) {
this.synchronizedBuffers = synchronizedBuffers;
this.factor = factor;
this.capacity = (int) Math.pow(2, factor);
this.capacity = capacity;
this.factor = calcFactor(capacity);

U.must(capacity >= 2, "The capacity must >= 2!");
U.must((capacity & (capacity - 1)) == 0, "The capacity must be a power of 2!");
U.must(capacity == Math.pow(2, factor));

pool = Pools.create("buffers", new Callable<ByteBuffer>() {
@Override
Expand All @@ -54,8 +59,12 @@ public ByteBuffer call() {
}, 1000);
}

public BufGroup(int factor) {
this(factor, true);
public BufGroup(int capacity) {
this(capacity, true);
}

static int calcFactor(int atomSizeKB) {
return 32 - Integer.numberOfLeadingZeros(atomSizeKB - 1);
}

public Buf newBuf(String name) {
Expand Down
39 changes: 29 additions & 10 deletions rapidoid-buffer/src/test/java/org/rapidoid/buffer/BufTest.java
Expand Up @@ -37,7 +37,7 @@ public class BufTest extends BufferTestCommons implements Constants {

@Test
public void shouldAppendData() {
BufGroup bufs = new BufGroup(2);
BufGroup bufs = new BufGroup(4);
Buf buf = bufs.newBuf();

eq(buf, "");
Expand Down Expand Up @@ -72,7 +72,7 @@ public void shouldAppendData() {

@Test
public void shouldShrinkOnLeft() {
BufGroup bufs = new BufGroup(2);
BufGroup bufs = new BufGroup(4);
Buf buf = bufs.newBuf();

buf.append("abcdefgh-foo-bar-123456789-the-end");
Expand Down Expand Up @@ -114,7 +114,7 @@ public void shouldShrinkOnLeft() {

@Test
public void shouldShrinkOnRight() {
BufGroup bufs = new BufGroup(2);
BufGroup bufs = new BufGroup(4);
Buf buf = bufs.newBuf();

buf.append("abcdefgh-foo-bar-123456789-the-end");
Expand Down Expand Up @@ -156,7 +156,7 @@ public void shouldShrinkOnRight() {

@Test
public void shouldParseNumbers() {
BufGroup bufs = new BufGroup(2);
BufGroup bufs = new BufGroup(4);
Buf buf = bufs.newBuf();

buf.append("5a1234567890fg-3450fg0x45g-3");
Expand All @@ -172,7 +172,7 @@ public void shouldParseNumbers() {

@Test
public void shouldFindSubsequences() {
BufGroup bufs = new BufGroup(2);
BufGroup bufs = new BufGroup(4);
Buf buf = bufs.newBuf();

/************* 0123456789012345678901234567890 */
Expand Down Expand Up @@ -200,7 +200,7 @@ public void shouldFindSubsequences() {

@Test
public void testScanUntil() {
BufGroup bufs = new BufGroup(2);
BufGroup bufs = new BufGroup(4);
Buf buf = bufs.newBuf();

buf.append("first second third\r\na b c\r\n");
Expand Down Expand Up @@ -237,7 +237,7 @@ public void testScanUntil() {

@Test
public void testScanWhile() {
BufGroup bufs = new BufGroup(2);
BufGroup bufs = new BufGroup(4);
Buf buf = bufs.newBuf();

buf.append("abc: xy:");
Expand Down Expand Up @@ -308,7 +308,7 @@ public void testScanUntilAndMatchPrefix() {
@Test
public void testScanLnLn() {
for (int factor = 1; factor <= 10; factor++) {
BufGroup bufs = new BufGroup(factor);
BufGroup bufs = new BufGroup((int) Math.pow(2, factor));
Buf buf = bufs.newBuf();

String s = "GET /hi H\naa: bb\nxyz\r\n\r\n";
Expand Down Expand Up @@ -343,7 +343,7 @@ public void testScanLnLn() {

@Test
public void testPutNumAsText() {
BufGroup bufs = new BufGroup(1);
BufGroup bufs = new BufGroup(2);

String num = "1234567890";

Expand Down Expand Up @@ -389,7 +389,7 @@ public void testPutNumAsText() {

@Test
public void testDeleteAfter() {
BufGroup bufs = new BufGroup(4);
BufGroup bufs = new BufGroup(16);
Buf buf = bufs.newBuf();

int size = 0;
Expand Down Expand Up @@ -429,4 +429,23 @@ private void checkMatch(Buf buf, int start, int limit, String match, int... posi
}
}

@Test
public void shouldCalcSizeFactor() {
eq(BufGroup.calcFactor(2), 1);

eq(BufGroup.calcFactor(3), 2);
eq(BufGroup.calcFactor(4), 2);

eq(BufGroup.calcFactor(5), 3);
eq(BufGroup.calcFactor(8), 3);

eq(BufGroup.calcFactor(9), 4);
eq(BufGroup.calcFactor(16), 4);

eq(BufGroup.calcFactor(1024), 10);
eq(BufGroup.calcFactor(65536), 16);
eq(BufGroup.calcFactor(65536 * 1024), 26);
eq(BufGroup.calcFactor(Integer.MAX_VALUE), 31);
}

}
Expand Up @@ -85,7 +85,7 @@ protected void eq(Buf buf, String expected) {
}

protected Buf buf(String content) {
BufGroup bufs = new BufGroup(2);
BufGroup bufs = new BufGroup(4);
Buf buf = bufs.newBuf();

eq(buf, "");
Expand Down
Expand Up @@ -31,7 +31,7 @@ public class StatisticalBufTest extends BufferTestCommons {

@Test
public void shouldExpandAndShrink() {
BufGroup bufs = new BufGroup(2);
BufGroup bufs = new BufGroup(4);
Buf buf = bufs.newBuf("");
String copy = "";
String s;
Expand Down
Expand Up @@ -55,6 +55,7 @@ public Config map(String name) throws Exception {
public static final Config C3P0 = section("c3p0");
public static final Config APP = section("app");
public static final Config HTTP = section("http");
public static final Config NET = section("net");
public static final Config ON = section("on");
public static final Config ADMIN = section("admin");
public static final Config TOKEN = section("token");
Expand Down
8 changes: 8 additions & 0 deletions rapidoid-commons/src/main/resources/default-config.yml
Expand Up @@ -96,6 +96,14 @@ oauth:
clientId: YOUR_LINKEDIN_CLIENT_ID_HERE
clientSecret: YOUR_LINKEDIN_CLIENT_SECRET_HERE

net:
address: 0.0.0.0
port: 8888
# workers: ${cpus}
bufSizeKB: 16
noDelay: false
syncBufs: true

http:
timeout: 30000
timeoutResolution: 5000
Expand Down
Expand Up @@ -33,7 +33,7 @@
@Since("2.0.0")
public class HttpParserPerfTest {

private static final BufGroup BUFS = new BufGroup(10);
private static final BufGroup BUFS = new BufGroup(1024);

static String REQ1 = "GET /asd/ff?a=5&bn=4 HTTP/1.1|Host:www.test.com|Set-Cookie: a=2|Connection: keep-alive|Set-Cookie: aaa=2|Set-Cookie: aaa=2|Set-Cookie: aaa=2||";
static String REQ2 = "POST /opa/dd/fggh HTTP|Host:a.b.org|My-Header: ghhh|Content-Length: 1|My-Header: ghhh|Connection: keep-alive|My-Header: ghhh|My-Header: ghhh|My-Header: ghhh||X";
Expand Down
Expand Up @@ -66,7 +66,7 @@ private static String body(String s) {
public void shouldParseRequest1() {
RapidoidHelper req = parse(REQ1);

BufGroup bufs = new BufGroup(2);
BufGroup bufs = new BufGroup(4);
Buf reqbuf = bufs.from(REQ1, "r2");

eq(REQ1, req.verb, "GET");
Expand Down Expand Up @@ -145,7 +145,7 @@ public void shouldParseRequest6() {
private RapidoidHelper parse(String reqs) {
RapidoidHelper req = new RapidoidHelper();

Buf reqbuf = new BufGroup(10).from(reqs, "test");
Buf reqbuf = new BufGroup(1024).from(reqs, "test");

Channel conn = mock(Channel.class);
returns(conn.input(), reqbuf);
Expand Down
Expand Up @@ -49,7 +49,7 @@ public static void main(String[] args) {

String req = "GET /plaintext HTTP/1.1\r\nHost:www.test.com\r\n\r\n";

BufGroup gr = new BufGroup(14);
BufGroup gr = new BufGroup(16 * 1024);
final Buf buf = gr.newBuf();
buf.append(req);

Expand Down
29 changes: 15 additions & 14 deletions rapidoid-net/src/main/java/org/rapidoid/net/ServerBuilder.java
Expand Up @@ -3,6 +3,7 @@
import org.rapidoid.RapidoidThing;
import org.rapidoid.annotation.Authors;
import org.rapidoid.annotation.Since;
import org.rapidoid.config.Conf;
import org.rapidoid.net.impl.RapidoidHelper;
import org.rapidoid.net.impl.RapidoidServerLoop;

Expand Down Expand Up @@ -30,23 +31,23 @@
@Since("5.1.0")
public class ServerBuilder extends RapidoidThing {

private volatile String address = "0.0.0.0";
private volatile String address = Conf.NET.entry("address").or("0.0.0.0");

private volatile int port = 8888;
private volatile int port = Conf.NET.entry("port").or(8888);

private volatile int workers = Runtime.getRuntime().availableProcessors();
private volatile int workers = Conf.NET.entry("workers").or(Runtime.getRuntime().availableProcessors());

private volatile org.rapidoid.net.Protocol protocol = null;
private volatile int bufSizeKB = Conf.NET.entry("bufSizeKB").or(16);

private volatile Class<? extends org.rapidoid.net.impl.DefaultExchange<?>> exchangeClass = null;
private volatile boolean noDelay = Conf.NET.entry("noDelay").or(false);

private volatile Class<? extends org.rapidoid.net.impl.RapidoidHelper> helperClass = RapidoidHelper.class;
private volatile boolean syncBufs = Conf.NET.entry("syncBufs").or(true);

private volatile int bufSizeKB = 16;
private volatile org.rapidoid.net.Protocol protocol = null;

private volatile boolean noNelay = false;
private volatile Class<? extends org.rapidoid.net.impl.DefaultExchange<?>> exchangeClass = null;

private volatile boolean syncBufs = true;
private volatile Class<? extends org.rapidoid.net.impl.RapidoidHelper> helperClass = RapidoidHelper.class;

public ServerBuilder address(String address) {
this.address = address;
Expand Down Expand Up @@ -110,12 +111,12 @@ public void bufSizeKB(int bufSizeKB) {
this.bufSizeKB = bufSizeKB;
}

public boolean noNelay() {
return noNelay;
public boolean noDelay() {
return noDelay;
}

public void noNelay(boolean noNelay) {
this.noNelay = noNelay;
public void noDelay(boolean noDelay) {
this.noDelay = noDelay;
}

public boolean syncBufs() {
Expand All @@ -128,7 +129,7 @@ public ServerBuilder syncBufs(boolean syncBufs) {
}

public Server build() {
return new RapidoidServerLoop(protocol, exchangeClass, helperClass, address, port, workers, bufSizeKB, noNelay, syncBufs);
return new RapidoidServerLoop(protocol, exchangeClass, helperClass, address, port, workers, bufSizeKB, noDelay, syncBufs);
}

}
Expand Up @@ -68,13 +68,13 @@ public class RapidoidServerLoop extends AbstractLoop<Server> implements Server,

private final int bufSizeKB;

private final boolean noNelay;
private final boolean noDelay;

private final boolean syncBufs;

public RapidoidServerLoop(Protocol protocol, Class<? extends DefaultExchange<?>> exchangeClass,
Class<? extends RapidoidHelper> helperClass, String address, int port,
int workers, int bufSizeKB, boolean noNelay, boolean syncBufs) {
int workers, int bufSizeKB, boolean noDelay, boolean syncBufs) {
super("server");

this.protocol = protocol;
Expand All @@ -83,7 +83,7 @@ public RapidoidServerLoop(Protocol protocol, Class<? extends DefaultExchange<?>>
this.port = port;
this.workers = workers;
this.bufSizeKB = bufSizeKB;
this.noNelay = noNelay;
this.noDelay = noDelay;
this.syncBufs = syncBufs;
this.helperClass = U.or(helperClass, RapidoidHelper.class);

Expand Down Expand Up @@ -154,7 +154,7 @@ private void initWorkers() {
for (int i = 0; i < ioWorkers.length; i++) {

RapidoidWorkerThread workerThread = new RapidoidWorkerThread(i, protocol, exchangeClass,
helperClass, bufSizeKB, noNelay, syncBufs);
helperClass, bufSizeKB, noDelay, syncBufs);
workerThread.start();

ioWorkers[i] = workerThread.getWorker();
Expand Down
Expand Up @@ -96,11 +96,13 @@ public class RapidoidWorker extends AbstractEventLoop<RapidoidWorker> {
}

public RapidoidWorker(String name, final Protocol protocol, final RapidoidHelper helper,
int bufSizeKB, boolean noNelay, boolean syncBufs) {
int bufSizeKB, boolean noDelay, boolean syncBufs) {

super(name);

this.bufs = new BufGroup(14, syncBufs); // 2^14B (16 KB per buffer segment)
this.bufSize = bufSizeKB * 1024;
this.noDelay = noDelay;
this.bufs = new BufGroup(bufSize, syncBufs);

this.serverProtocol = protocol;
this.helper = helper;
Expand All @@ -120,9 +122,6 @@ public RapidoidConnection call() throws Exception {
}
}, 100000);

this.bufSize = bufSizeKB * 1024;
this.noDelay = noNelay;

if (idleConnectionsCrawler != null) {
idleConnectionsCrawler.register(allConnections);
}
Expand Down
Expand Up @@ -48,15 +48,15 @@ public class RapidoidWorkerThread extends RapidoidThread {
private final boolean syncBufs;

public RapidoidWorkerThread(int workerIndex, Protocol protocol, Class<? extends DefaultExchange<?>> exchangeClass,
Class<? extends RapidoidHelper> helperClass, int bufSizeKB, boolean noNelay, boolean syncBufs) {
Class<? extends RapidoidHelper> helperClass, int bufSizeKB, boolean noDelay, boolean syncBufs) {
super("server" + (workerIndex + 1));

this.workerIndex = workerIndex;
this.protocol = protocol;
this.exchangeClass = exchangeClass;
this.helperClass = helperClass;
this.bufSizeKB = bufSizeKB;
this.noDelay = noNelay;
this.noDelay = noDelay;
this.syncBufs = syncBufs;
}

Expand Down

0 comments on commit 099824c

Please sign in to comment.