Skip to content

Commit

Permalink
* netty 4.0.7
Browse files Browse the repository at this point in the history
  • Loading branch information
whym committed Aug 18, 2013
1 parent eef1319 commit 2a91c42
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 64 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ libraryDependencies ++= Seq(
% "4.4.0",
"org.apache.lucene" % "lucene-queryparser"
% "4.4.0",
"org.jboss.netty" % "netty" % "3.2.7.Final",
"io.netty" % "netty-all" % "4.0.7.Final",
"org.json" % "json" % "20090211",
"org.apache.commons" % "commons-lang3" % "3.1",
"ch.qos.logback" % "logback-classic" % "1.0.7",
Expand Down
119 changes: 56 additions & 63 deletions src/main/java/org/wikimedia/revdiffsearch/SearcherDaemon.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,25 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.DirectoryReader;

import org.jboss.netty.util.CharsetUtil;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
import org.jboss.netty.handler.codec.frame.Delimiters;
import io.netty.util.CharsetUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import org.json.JSONObject;
import org.json.JSONArray;
Expand Down Expand Up @@ -86,28 +85,37 @@ public SearcherDaemon(InetSocketAddress address, IndexSearcher searcher, final Q
}

@Override public void run() {
ServerBootstrap bootstrap = new ServerBootstrap
(new NioServerSocketChannelFactory
(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));

bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(RevDiffSearchUtils.getProperty("maxQueryLength", 100000), Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("encoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("handler", new SearcherHandler(searcher, parser, threads));
return pipeline;
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(RevDiffSearchUtils.getProperty("maxQueryLength", 100000), Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringEncoder(CharsetUtil.UTF_8));
pipeline.addLast("encoder", new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast("handler", new SearcherHandler(searcher, parser, threads));
}
});

bootstrap.bind(this.address);
logger.info("SearcherDaemon is launched in " + DurationFormatUtils.formatDurationHMS(System.currentTimeMillis() - this.startTimeMillis) + " at " + this.address);
ChannelFuture f = bootstrap.bind(this.address).sync();
logger.info("SearcherDaemon is launched in " + DurationFormatUtils.formatDurationHMS(System.currentTimeMillis() - this.startTimeMillis) + " at " + this.address);
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.warning("interrupted");
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static class SearcherHandler extends SimpleChannelUpstreamHandler {
public static class SearcherHandler extends SimpleChannelInboundHandler<String> {
private final IndexSearcher searcher;
private final QueryParser parser;
private final int threads;
Expand All @@ -118,14 +126,6 @@ public SearcherHandler(IndexSearcher searcher, QueryParser parser, int threads)
this.threads = threads;
}


@Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
}

private JSONArray writeCollapsedHitsByTimestamp(BitSet hits, List<String> fields, Pattern pattern) throws IOException, JSONException {
Map<String, JSONArray> map = new TreeMap<String, JSONArray>();
for(int i = hits.nextSetBit(0); i >= 0; i = hits.nextSetBit(i+1) ) {
Expand Down Expand Up @@ -174,11 +174,10 @@ private JSONArray writeHits(BitSet hits, List<String> fields) throws IOException
return ret;
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
@Override
public void channelRead0(ChannelHandlerContext ctx, String qstr) {
long processingStartMillis = System.currentTimeMillis();
try {
String qstr = (String)e.getMessage();
logger.info("received query: " + qstr + " at " + ctx);
JSONObject qobj = new JSONObject(qstr);
logger.info("received query (JSON): " + qobj.toString(2) + " at " + ctx);
Expand Down Expand Up @@ -244,35 +243,29 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {

ret.put("elapsed", System.currentTimeMillis() - processingStartMillis);
String str = ret.toString();
ChannelFuture f = e.getChannel().write(str);
ChannelFuture f = ctx.write(str);
logger.info("responded in " + DurationFormatUtils.formatDurationHMS(System.currentTimeMillis() - processingStartMillis) + " (" + str.length() + " characters)");
f.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
Channel ch = future.getChannel();
Channel ch = future.channel();
ch.close();
logger.info("connection closed");
}
});
} catch (IOException ex) {
e.getChannel().write("{\"exception\": \"" + StringEscapeUtils.escapeJava(ex.toString()) + "\"}\n");
ctx.write("{\"exception\": \"" + StringEscapeUtils.escapeJava(ex.toString()) + "\"}\n");
ex.printStackTrace();
} catch (JSONException ex) {
e.getChannel().write("{\"exception\": \"" + StringEscapeUtils.escapeJava(ex.toString()) + "\"}\n");
ctx.write("{\"exception\": \"" + StringEscapeUtils.escapeJava(ex.toString()) + "\"}\n");
ex.printStackTrace();
} catch (ParseException ex) {
e.getChannel().write("{\"exception\": \"" + StringEscapeUtils.escapeJava(ex.toString()) + "\"}\n");
ctx.write("{\"exception\": \"" + StringEscapeUtils.escapeJava(ex.toString()) + "\"}\n");
ex.printStackTrace();
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
logger.log(Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
} finally {
ctx.flush();
}
}
}
}

public static void main(String[] args) throws IOException {
if ( args.length < 1 ) {
Expand Down

0 comments on commit 2a91c42

Please sign in to comment.