Skip to content

Commit

Permalink
Added TCP server and client stats (e.g. total messages processed info).
Browse files Browse the repository at this point in the history
  • Loading branch information
nmihajlovski committed Apr 25, 2015
1 parent 48197f6 commit 5d288ae
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 9 deletions.
2 changes: 2 additions & 0 deletions rapidoid-net/src/main/java/org/rapidoid/net/TCPClient.java
Expand Up @@ -32,4 +32,6 @@ public interface TCPClient extends Activity<TCPClient> {

void connect(String serverHost, int serverPort, Protocol clientProtocol, int connections);

TCPClientInfo info();

}
27 changes: 27 additions & 0 deletions rapidoid-net/src/main/java/org/rapidoid/net/TCPClientInfo.java
@@ -0,0 +1,27 @@
package org.rapidoid.net;

/*
* #%L
* rapidoid-net
* %%
* Copyright (C) 2014 - 2015 Nikolche Mihajlovski
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

public interface TCPClientInfo {

long messagesProcessed();

}
2 changes: 2 additions & 0 deletions rapidoid-net/src/main/java/org/rapidoid/net/TCPServer.java
Expand Up @@ -30,4 +30,6 @@ public interface TCPServer extends Activity<TCPServer> {

String process(String input);

TCPServerInfo info();

}
27 changes: 27 additions & 0 deletions rapidoid-net/src/main/java/org/rapidoid/net/TCPServerInfo.java
@@ -0,0 +1,27 @@
package org.rapidoid.net;

/*
* #%L
* rapidoid-net
* %%
* Copyright (C) 2014 - 2015 Nikolche Mihajlovski
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/

public interface TCPServerInfo {

long messagesProcessed();

}
Expand Up @@ -31,12 +31,13 @@
import org.rapidoid.config.Conf;
import org.rapidoid.net.Protocol;
import org.rapidoid.net.TCPClient;
import org.rapidoid.net.TCPClientInfo;
import org.rapidoid.util.Cls;
import org.rapidoid.util.U;

@Authors("Nikolche Mihajlovski")
@Since("2.0.0")
public class RapidoidClientLoop extends AbstractEventLoop<TCPClient> implements TCPClient {
public class RapidoidClientLoop extends AbstractEventLoop<TCPClient> implements TCPClient, TCPClientInfo {

private RapidoidWorker[] workers;

Expand Down Expand Up @@ -171,4 +172,20 @@ public synchronized TCPClient shutdown() {
return super.shutdown();
}

@Override
public TCPClientInfo info() {
return this;
}

@Override
public long messagesProcessed() {
long total = 0;

for (int i = 0; i < workers.length; i++) {
total += workers[i].getMessagesProcessed();
}

return total;
}

}
Expand Up @@ -35,13 +35,14 @@
import org.rapidoid.log.Log;
import org.rapidoid.net.Protocol;
import org.rapidoid.net.TCPServer;
import org.rapidoid.net.TCPServerInfo;
import org.rapidoid.util.Cls;
import org.rapidoid.util.Rnd;
import org.rapidoid.util.U;

@Authors("Nikolche Mihajlovski")
@Since("2.0.0")
public class RapidoidServerLoop extends AbstractEventLoop<TCPServer> implements TCPServer {
public class RapidoidServerLoop extends AbstractEventLoop<TCPServer> implements TCPServer, TCPServerInfo {

private volatile RapidoidWorker[] workers;

Expand Down Expand Up @@ -127,7 +128,8 @@ private void openSocket() throws IOException {
for (int i = 0; i < workers.length; i++) {
RapidoidHelper helper = Cls.newInstance(helperClass, exchangeClass);
String workerName = "server" + (i + 1);
BufGroup bufGroup = new BufGroup(14); // 2^14B (16 KB per buffer segment)
BufGroup bufGroup = new BufGroup(14); // 2^14B (16 KB per buffer
// segment)
workers[i] = new RapidoidWorker(workerName, bufGroup, protocol, helper, bufSizeKB, noDelay);
new Thread(workers[i], workerName).start();
}
Expand Down Expand Up @@ -186,4 +188,20 @@ public Protocol getProtocol() {
return protocol;
}

@Override
public TCPServerInfo info() {
return this;
}

@Override
public long messagesProcessed() {
long total = 0;

for (int i = 0; i < workers.length; i++) {
total += workers[i].getMessagesProcessed();
}

return total;
}

}
Expand Up @@ -71,6 +71,8 @@ public class RapidoidWorker extends AbstractEventLoop<RapidoidWorker> {

private final BufGroup bufs;

private volatile long messagesProcessed;

public RapidoidWorker(String name, final BufGroup bufs, final Protocol protocol, final RapidoidHelper helper,
int bufSizeKB, boolean noNelay) {
super(name);
Expand Down Expand Up @@ -186,13 +188,13 @@ protected void readOP(SelectionKey key) throws IOException {
}

public void process(RapidoidConnection conn) {
processMsgs(conn);
messagesProcessed += processMsgs(conn);

conn.completedInputPos = conn.input.position();
}

private int processMsgs(RapidoidConnection conn) {
int reqN = 0;
private long processMsgs(RapidoidConnection conn) {
long reqN = 0;

while (reqN < maxPipelineSize && conn.input().hasRemaining() && processNext(conn)) {
reqN++;
Expand Down Expand Up @@ -429,4 +431,8 @@ public RapidoidConnection newConnection() {
return new RapidoidConnection(RapidoidWorker.this, bufs);
}

public long getMessagesProcessed() {
return messagesProcessed;
}

}
20 changes: 17 additions & 3 deletions rapidoid-net/src/test/java/org/rapidoid/TcpClientTest.java
Expand Up @@ -62,15 +62,19 @@ public void process(Channel ctx) {
public void testTCPClientWithDefaultConnections() {
Log.setLogLevel(LogLevel.DEBUG);

TCPClient client = TCP.client().host("localhost").port(8080).connections(1).protocol(HI_CLIENT).build().start();
TCPClient client = TCP.client().host("localhost").port(8080).connections(5).protocol(HI_CLIENT).build().start();

// let the clients wait
UTILS.sleep(3000);

TCPServer server = TCP.server().port(8080).protocol(UPPERCASE_SERVER).build().start();

// let the server serve the clients
UTILS.sleep(3000);

eq(client.info().messagesProcessed(), 5);
eq(server.info().messagesProcessed(), 5);

client.shutdown();
server.shutdown();
}
Expand All @@ -80,14 +84,19 @@ public void testTCPClientWithCustomConnections() {
Log.setLogLevel(LogLevel.DEBUG);

TCPClient client = TCP.client().build().start();
client.connect("localhost", 8080, HI_CLIENT, 1);
client.connect("localhost", 8080, HI_CLIENT, 10);

// let the clients wait
UTILS.sleep(3000);

TCPServer server = TCP.server().port(8080).protocol(UPPERCASE_SERVER).build().start();

// let the server serve the clients
UTILS.sleep(3000);

eq(client.info().messagesProcessed(), 10);
eq(server.info().messagesProcessed(), 10);

client.shutdown();
server.shutdown();
}
Expand All @@ -97,16 +106,21 @@ public void testTCPClientWithDefaultAndCustomConnections() {
Log.setLogLevel(LogLevel.DEBUG);

TCPClient client = TCP.client().host("127.0.0.1").port(8080).connections(3).protocol(HI_CLIENT).build().start();
client.connect("localhost", 9090, HI_CLIENT, 1);
client.connect("localhost", 9090, HI_CLIENT, 2);

// let the clients wait
UTILS.sleep(3000);

TCPServer server1 = TCP.server().port(8080).protocol(UPPERCASE_SERVER).build().start();
TCPServer server2 = TCP.server().port(9090).protocol(UPPERCASE_SERVER).build().start();

// let the servers serve the clients
UTILS.sleep(3000);

eq(client.info().messagesProcessed(), 5);
eq(server1.info().messagesProcessed(), 3);
eq(server2.info().messagesProcessed(), 2);

client.shutdown();
server1.shutdown();
server2.shutdown();
Expand Down

0 comments on commit 5d288ae

Please sign in to comment.