Permalink
Browse files

* added ServerFactory

* allow creation of server with netty or mina as network framework
* implemented server thread with netty
  • Loading branch information...
1 parent b6f8025 commit 4f99f88a535dbd33146150981a648c9cec49a5ab r1j0 committed May 6, 2012
View
@@ -89,5 +89,10 @@
<version>1.2</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.4.3.Final</version>
+ </dependency>
</dependencies>
</project>
@@ -1,9 +1,12 @@
server.debug = false
server.listen_port=39390
-# flush interval in milliseconds
+# Flush interval in milliseconds
server.flush_interval=5000
+# Network framework to use (netty or mina)
+server.framework=netty
+
backend.use = mysql, graphite
backend.mysql.host = 192.168.1.10
@@ -24,9 +24,9 @@ public static void main(String[] args) throws IOException, InterruptedException
while (true) {
String message = "system.loadavg_1min 0.60 " + System.currentTimeMillis() / 1000L + "\nsystem.loadavg_5min 0.80 " + System.currentTimeMillis() / 1000L + "\nsystem.loadavg_15min 0.50 " + System.currentTimeMillis() / 1000L + "\n";
- for (int i = 0; i < 1; i++) {
+ for (int i = 0; i < 10; i++) {
statsdClient.sendUdp(message);
-// new Automatic().start();
+ new Automatic().start();
}
Thread.sleep(2000);
@@ -27,6 +27,7 @@
private static final String SERVER_LISTEN_PORT = "server.listen_port";
private static final String SERVER_FLUSH_INTERVAL = "server.flush_interval";
private static final String BACKEND_USE = "backend.use";
+ private static final String NETWORK_FRAMEWORK = "server.framework";
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -76,6 +77,11 @@ public int getFlushInterval() {
}
+ public String getNetworkFramework() {
+ return getValue(NETWORK_FRAMEWORK);
+ }
+
+
private void parseCommandLineOptions(final CommandLine line) {
if (line.hasOption("h")) {
printHelp(options);
@@ -0,0 +1,20 @@
+package com.github.r1j0.statsd.server;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.github.r1j0.statsd.configuration.StatsdConfiguration;
+
+public class ServerFactory {
+
+ public static ServerThread getInstance(String networkFramework, StatsdConfiguration configuration, LinkedBlockingQueue<String> linkedBlockingQueue) {
+ final String framework = networkFramework.toLowerCase();
+
+ if (framework.equals("netty")) {
+ return new ServerThreadNetty(configuration, linkedBlockingQueue);
+ } else if (framework.equals("mina")) {
+ return new ServerThreadMina(configuration, linkedBlockingQueue);
+ }
+
+ throw new IllegalArgumentException("Network framework: " + networkFramework + " could not be initialized.");
+ }
+}
@@ -9,13 +9,13 @@
import org.slf4j.LoggerFactory;
-public class StatsdServerHandler extends IoHandlerAdapter {
+public class ServerHandlerMina extends IoHandlerAdapter {
private final Logger log = LoggerFactory.getLogger(getClass());
private final LinkedBlockingQueue<String> linkedBlockingQueue;
- public StatsdServerHandler(LinkedBlockingQueue<String> linkedBlockingQueue) {
+ public ServerHandlerMina(LinkedBlockingQueue<String> linkedBlockingQueue) {
this.linkedBlockingQueue = linkedBlockingQueue;
}
@@ -0,0 +1,37 @@
+package com.github.r1j0.statsd.server;
+
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ServerHandlerNetty extends SimpleChannelUpstreamHandler {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final LinkedBlockingQueue<String> linkedBlockingQueue;
+
+
+ public ServerHandlerNetty(LinkedBlockingQueue<String> linkedBlockingQueue) {
+ this.linkedBlockingQueue = linkedBlockingQueue;
+ }
+
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ final String message = (String) e.getMessage();
+
+ log.info("Message received in the server..");
+ linkedBlockingQueue.put(message);
+ }
+
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ e.getCause().printStackTrace();
+ // We don't close the channel because we can keep serving requests.
+ }
+}
@@ -1,58 +1,7 @@
package com.github.r1j0.statsd.server;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.charset.Charset;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.mina.core.session.IdleStatus;
-import org.apache.mina.filter.codec.ProtocolCodecFilter;
-import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
-import org.apache.mina.filter.logging.LoggingFilter;
-import org.apache.mina.transport.socket.DatagramSessionConfig;
-import org.apache.mina.transport.socket.nio.NioDatagramAcceptor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+public interface ServerThread {
-import com.github.r1j0.statsd.configuration.StatsdConfiguration;
-
-public class ServerThread extends Thread {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final int port;
- private final LinkedBlockingQueue<String> linkedBlockingQueue;
-
-
- public ServerThread(StatsdConfiguration configuration, LinkedBlockingQueue<String> linkedBlockingQueue) {
- this.linkedBlockingQueue = linkedBlockingQueue;
-
- port = configuration.getListeningPort();
- }
-
-
- @Override
- public void run() {
- log.info("ServerThread started.");
-
- NioDatagramAcceptor acceptor = new NioDatagramAcceptor();
- acceptor.setHandler(new StatsdServerHandler(linkedBlockingQueue));
-
- acceptor.getFilterChain().addLast("logger", new LoggingFilter());
- acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
-
- DatagramSessionConfig sessionConfig = acceptor.getSessionConfig();
- sessionConfig.setReuseAddress(true);
- acceptor.getSessionConfig().setReadBufferSize(2048);
- acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 5);
-
- log.info("Starting server...");
-
- try {
- acceptor.bind(new InetSocketAddress(port));
- log.info("Listening on port: " + port);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
+ void start();
}
@@ -0,0 +1,57 @@
+package com.github.r1j0.statsd.server;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.mina.core.session.IdleStatus;
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
+import org.apache.mina.filter.logging.LoggingFilter;
+import org.apache.mina.transport.socket.DatagramSessionConfig;
+import org.apache.mina.transport.socket.nio.NioDatagramAcceptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.r1j0.statsd.configuration.StatsdConfiguration;
+
+public class ServerThreadMina extends Thread implements ServerThread {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final LinkedBlockingQueue<String> linkedBlockingQueue;
+ private final int port;
+
+
+ public ServerThreadMina(StatsdConfiguration configuration, LinkedBlockingQueue<String> linkedBlockingQueue) {
+ this.linkedBlockingQueue = linkedBlockingQueue;
+ this.port = configuration.getListeningPort();
+ }
+
+
+ @Override
+ public void run() {
+ log.info("ServerThreadMina started.");
+
+ NioDatagramAcceptor acceptor = new NioDatagramAcceptor();
+ acceptor.setHandler(new ServerHandlerMina(linkedBlockingQueue));
+
+ acceptor.getFilterChain().addLast("logger", new LoggingFilter());
+ acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
+
+ DatagramSessionConfig sessionConfig = acceptor.getSessionConfig();
+ sessionConfig.setReuseAddress(true);
+ acceptor.getSessionConfig().setReadBufferSize(2048);
+ acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 5);
+
+ log.info("Starting server...");
+
+ try {
+ acceptor.bind(new InetSocketAddress(port));
+ log.info("Listening on port: " + port);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
@@ -0,0 +1,57 @@
+package com.github.r1j0.statsd.server;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
+import org.jboss.netty.channel.socket.DatagramChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.handler.codec.string.StringDecoder;
+import org.jboss.netty.handler.codec.string.StringEncoder;
+import org.jboss.netty.util.CharsetUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.r1j0.statsd.configuration.StatsdConfiguration;
+
+public class ServerThreadNetty extends Thread implements ServerThread {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final LinkedBlockingQueue<String> linkedBlockingQueue;
+ private final int port;
+
+
+ public ServerThreadNetty(StatsdConfiguration configuration, LinkedBlockingQueue<String> linkedBlockingQueue) {
+ this.linkedBlockingQueue = linkedBlockingQueue;
+ this.port = configuration.getListeningPort();
+ }
+
+
+ @Override
+ public void run() {
+ log.info("ServerThreadNetty started.");
+
+ DatagramChannelFactory f = new NioDatagramChannelFactory(Executors.newCachedThreadPool());
+ ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);
+
+ b.setPipelineFactory(new ChannelPipelineFactory() {
+
+ public ChannelPipeline getPipeline() throws Exception {
+ return Channels.pipeline(new StringEncoder(CharsetUtil.UTF_8), new StringDecoder(CharsetUtil.UTF_8), new ServerHandlerNetty(linkedBlockingQueue));
+ }
+ });
+
+ b.setOption("broadcast", "false");
+ b.setOption("receiveBufferSizePredictorFactory", new FixedReceiveBufferSizePredictorFactory(1024));
+
+ log.info("Starting server...");
+ b.bind(new InetSocketAddress(port));
+ log.info("Listening on port: " + port);
+ }
+}
@@ -9,10 +9,12 @@
private static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<String>();;
+
public static void main(String[] args) throws IOException {
final StatsdConfiguration configuration = new StatsdConfiguration(args);
- new ServerThread(configuration, linkedBlockingQueue).start();
+ ServerThread serverThread = ServerFactory.getInstance(configuration.getNetworkFramework(), configuration, linkedBlockingQueue);
+ serverThread.start();
new FlushThread(configuration, linkedBlockingQueue).start();
}
}

0 comments on commit 4f99f88

Please sign in to comment.