Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

More refactoring, replaced netty http parser with modified one, not u…

…sing HttpRequest objects for input anymore. Lots of benchmarking. Latency: 88us, Throughput: 204K single threaded, 279K multithreaded.
  • Loading branch information...
commit ebf9086cfd4ce034f879227f83a447cfaf60ba0e 1 parent b47b394
@jakewins jakewins authored
Showing with 1,444 additions and 146 deletions.
  1. +9 −0 src/main/java/org/neo4j/smack/DaemonThreadFactory.java
  2. +12 −1 src/main/java/org/neo4j/smack/DatabaseWorkerThread.java
  3. +32 −0 src/main/java/org/neo4j/smack/InputPipeline.java
  4. +11 −5 src/main/java/org/neo4j/smack/PipelineBootstrap.java
  5. +10 −10 src/main/java/org/neo4j/smack/SmackServer.java
  6. +12 −0 src/main/java/org/neo4j/smack/WorkInputGate.java
  7. +0 −8 src/main/java/org/neo4j/smack/annotation/EndpointParameter.java
  8. +19 −17 src/main/java/org/neo4j/smack/event/RequestEvent.java
  9. +2 −1  src/main/java/org/neo4j/smack/handler/DatabaseWorkDivider.java
  10. +8 −0 src/main/java/org/neo4j/smack/handler/RoutingHandler.java
  11. +731 −0 src/main/java/org/neo4j/smack/http/HttpDecoder.java
  12. +53 −0 src/main/java/org/neo4j/smack/http/HttpTokens.java
  13. +1 −1  src/main/java/org/neo4j/smack/{ → http}/NettyChannelTrackingHandler.java
  14. +23 −34 src/main/java/org/neo4j/smack/{ → http}/NettyHttpHandler.java
  15. +10 −12 src/main/java/org/neo4j/smack/{ → http}/NettyHttpPipelineFactory.java
  16. +12 −0 src/main/java/org/neo4j/smack/routing/Routable.java
  17. +4 −5 src/main/java/org/neo4j/smack/routing/Router.java
  18. +137 −0 src/test/java/org/neo4j/smack/http/TestHttpDecoder.java
  19. +33 −21 src/test/java/org/neo4j/smack/routing/TestRouter.java
  20. +32 −4 src/test/java/org/neo4j/smack/test/performance/NetworkLatency.java
  21. +81 −17 src/test/java/org/neo4j/smack/test/performance/NetworkThroughput.java
  22. +212 −10 src/test/java/org/neo4j/smack/test/util/PipelinedHttpClient.java
View
9 src/main/java/org/neo4j/smack/DaemonThreadFactory.java
@@ -7,9 +7,18 @@
* @since 14.11.11
*/
public class DaemonThreadFactory implements ThreadFactory {
+
+ private String baseName;
+ private int threadNo = 0;
+
+ public DaemonThreadFactory(String threadBaseName) {
+ this.baseName = threadBaseName;
+ }
+
@Override
public Thread newThread(Runnable runnable) {
final Thread thread = new Thread(runnable);
+ thread.setName(baseName + "-" + threadNo++);
thread.setDaemon(true);
return thread;
}
View
13 src/main/java/org/neo4j/smack/DatabaseWorkerThread.java
@@ -1,5 +1,6 @@
package org.neo4j.smack;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.neo4j.smack.event.DatabaseWork;
@@ -7,7 +8,9 @@
import org.neo4j.smack.event.WorkTransactionMode;
import org.neo4j.smack.handler.DatabaseWorkPerformer;
+import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.MultiThreadedClaimStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.Sequencer;
@@ -15,6 +18,8 @@
public class DatabaseWorkerThread {
+ private static final AtomicInteger workerId = new AtomicInteger();
+
// Each worker thread keeps track of its own transactions
private TransactionRegistry txs;
private Database database;
@@ -32,7 +37,12 @@ public DatabaseWorkerThread(Database database, TransactionRegistry txs,
this.database = database;
this.workBuffer = new RingBuffer<DatabaseWork>(DatabaseWork.FACTORY,
- BUFFER_SIZE);
+ new MultiThreadedClaimStrategy(BUFFER_SIZE),
+// new BusySpinWaitStrategy()
+// new YieldingWaitStrategy() //65189.048239895696 requests/second
+// new SleepingWaitStrategy() //104416.83199331732 requests/second
+ new BlockingWaitStrategy()
+ );
SequenceBarrier serializationBarrier = workBuffer.newBarrier();
processor = new WorkProcessor<DatabaseWork>(workBuffer,
@@ -47,6 +57,7 @@ public void start()
if (thread == null)
{
thread = new Thread(processor);
+ thread.setName("DabaseWorker-" + workerId.incrementAndGet());
thread.setDaemon(true);
thread.start();
}
View
32 src/main/java/org/neo4j/smack/InputPipeline.java
@@ -0,0 +1,32 @@
+package org.neo4j.smack;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.neo4j.smack.event.RequestEvent;
+import org.neo4j.smack.handler.DatabaseWorkDivider;
+import org.neo4j.smack.handler.RoutingHandler;
+import org.neo4j.smack.routing.InvocationVerb;
+
+import com.lmax.disruptor.ExceptionHandler;
+
+public class InputPipeline extends PipelineBootstrap<RequestEvent> implements WorkInputGate {
+
+ @SuppressWarnings("unchecked")
+ public InputPipeline(ExceptionHandler exceptionHandler, RoutingHandler routingHandler, DatabaseWorkDivider workDivider)
+ {
+ super("RequestEventHandler", RequestEvent.FACTORY, exceptionHandler, routingHandler, workDivider);
+ }
+
+ @Override
+ public void addWork(Long connectionId, InvocationVerb verb, String path,
+ ChannelBuffer content, Channel channel, boolean keepAlive)
+ {
+ long sequenceNo = ringBuffer.next();
+ RequestEvent event = ringBuffer.get(sequenceNo);
+
+ event.reset(connectionId, verb, path, content, channel, keepAlive);
+
+ ringBuffer.publish(sequenceNo);
+ }
+
+}
View
16 src/main/java/org/neo4j/smack/PipelineBootstrap.java
@@ -13,8 +13,10 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
+import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.ExceptionHandler;
+import com.lmax.disruptor.MultiThreadedClaimStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.Sequencer;
@@ -24,9 +26,9 @@
public class PipelineBootstrap<E> {
- private final ExceptionHandler exceptionHandler;
+ protected RingBuffer<E> ringBuffer;
- private RingBuffer<E> ringBuffer;
+ private final ExceptionHandler exceptionHandler;
private List<WorkProcessor<E>> processors = new ArrayList<WorkProcessor<E>>();
@@ -36,7 +38,10 @@
private final EventFactory<E> eventFactory;
- public PipelineBootstrap(final EventFactory<E> eventFactory, final ExceptionHandler exceptionHandler, WorkHandler<E>... handlers) {
+ private String nameForThreads;
+
+ public PipelineBootstrap(String nameForThreads, final EventFactory<E> eventFactory, final ExceptionHandler exceptionHandler, WorkHandler<E>... handlers) {
+ this.nameForThreads = nameForThreads;
this.handlers=asList(handlers);
this.eventFactory = eventFactory;
this.exceptionHandler = exceptionHandler;
@@ -45,12 +50,13 @@ public PipelineBootstrap(final EventFactory<E> eventFactory, final ExceptionHand
public void start() {
if (handlers.isEmpty()) throw new IllegalStateException("No Handlers configured on Pipeline");
final int numEventProcessors = handlers.size();
- workers = Executors.newFixedThreadPool(numEventProcessors, new DaemonThreadFactory());
+ workers = Executors.newFixedThreadPool(numEventProcessors, new DaemonThreadFactory(nameForThreads));
final int bufferSize = 1024 * 4;
ringBuffer = new RingBuffer<E>(
eventFactory,
- bufferSize);
+ new MultiThreadedClaimStrategy(bufferSize),
+ new BusySpinWaitStrategy());
WorkProcessor<E> processor = null;
for (WorkHandler<E> handler : handlers) {
View
20 src/main/java/org/neo4j/smack/SmackServer.java
@@ -27,14 +27,13 @@
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
import org.neo4j.kernel.AbstractGraphDatabase;
import org.neo4j.kernel.EmbeddedGraphDatabase;
-import org.neo4j.smack.event.RequestEvent;
import org.neo4j.smack.handler.DatabaseWorkDivider;
import org.neo4j.smack.handler.DefaultExceptionHandler;
-import org.neo4j.smack.handler.DeserializationHandler;
import org.neo4j.smack.handler.RoutingHandler;
+import org.neo4j.smack.http.NettyHttpPipelineFactory;
import org.neo4j.smack.routing.Endpoint;
import org.neo4j.smack.routing.Router;
import org.neo4j.smack.routing.RoutingDefinition;
@@ -46,7 +45,9 @@
private String host;
private Router router = new Router();
private ServerBootstrap netty;
- private PipelineBootstrap<RequestEvent> inputPipeline;
+
+ private InputPipeline inputPipeline;
+
private ServerSocketChannelFactory channelFactory;
private ChannelGroup openChannels = new DefaultChannelGroup("SmackServer");
private Database database;
@@ -70,7 +71,6 @@ public SmackServer(String host, int port, Database db) {
this.database = db;
}
- @SuppressWarnings("unchecked")
public void start() {
router.compileRoutes();
@@ -80,19 +80,19 @@ public void start() {
exceptionHandler = new DefaultExceptionHandler();
executionHandler = new DatabaseWorkDivider(database, exceptionHandler);
- inputPipeline = new PipelineBootstrap<RequestEvent>(RequestEvent.FACTORY, exceptionHandler, new RoutingHandler(router), new DeserializationHandler(), executionHandler);
+ inputPipeline = new InputPipeline(exceptionHandler, new RoutingHandler(router), executionHandler);
inputPipeline.start();
// NETTY
channelFactory =
- new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(new DaemonThreadFactory()),
- Executors.newCachedThreadPool(new DaemonThreadFactory()));
+ new OioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(new DaemonThreadFactory("SocketMaster")),
+ Executors.newCachedThreadPool(new DaemonThreadFactory("SocketSlave")));
netty = new ServerBootstrap(channelFactory);
// Set up the event pipeline factory.
- netty.setPipelineFactory(new NettyHttpPipelineFactory(inputPipeline.getRingBuffer(), openChannels));
+ netty.setPipelineFactory(new NettyHttpPipelineFactory(inputPipeline, openChannels));
// Bind and start to accept incoming connections.
openChannels.add(netty.bind(new InetSocketAddress(host, port)));
View
12 src/main/java/org/neo4j/smack/WorkInputGate.java
@@ -0,0 +1,12 @@
+package org.neo4j.smack;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.neo4j.smack.routing.InvocationVerb;
+
+public interface WorkInputGate {
+
+ void addWork(Long connectionId, InvocationVerb verb, String path,
+ ChannelBuffer content, Channel channel, boolean keepAlive);
+
+}
View
8 src/main/java/org/neo4j/smack/annotation/EndpointParameter.java
@@ -1,8 +0,0 @@
-package org.neo4j.smack.annotation;
-
-public @interface EndpointParameter {
-
- String key();
- String value();
-
-}
View
36 src/main/java/org/neo4j/smack/event/RequestEvent.java
@@ -24,10 +24,11 @@
import org.neo4j.smack.routing.Endpoint;
import org.neo4j.smack.routing.InvocationVerb;
import org.neo4j.smack.routing.PathVariables;
+import org.neo4j.smack.routing.Routable;
import com.lmax.disruptor.EventFactory;
-public class RequestEvent implements Fallible {
+public class RequestEvent implements Fallible, Routable {
public static EventFactory<RequestEvent> FACTORY = new EventFactory<RequestEvent>() {
public RequestEvent newInstance() {
@@ -63,20 +64,8 @@ public InvocationVerb getVerb() {
return verb;
}
- public void setPath(String path) {
- this.path = path;
- }
-
public String getPath() {
return path;
- }
-
- public void setContent(ChannelBuffer content) {
- this.content = content;
- }
-
- public void setIsPersistentConnection(boolean isPersistentConnection) {
- this.isPersistentConnection = isPersistentConnection;
}
public void setPathVariables(PathVariables pathVariables) {
@@ -138,13 +127,26 @@ public void setChannel(Channel channel)
this.channel = channel;
}
- public void setConnectionId(Long connectionId)
+ public long getConnectionId()
{
- this.connectionId = connectionId;
+ return connectionId;
}
- public long getConnectionId()
+ public void reset(Long connectionId, InvocationVerb verb, String path,
+ ChannelBuffer content, Channel channel, boolean keepAlive)
{
- return connectionId;
+
+ this.connectionId = connectionId;
+ this.verb = verb;
+ this.path = path;
+ this.content = content;
+ this.channel = channel;
+ this.isPersistentConnection = keepAlive;
+
+ this.pathVariables = null;
+ this.endpoint = null;
+ this.deserializedContent = null;
+
+ this.failure = null;
}
}
View
3  src/main/java/org/neo4j/smack/handler/DatabaseWorkDivider.java
@@ -75,7 +75,8 @@ public void onEvent(RequestEvent event) {
}
// Pick worker
- int workerId = (int) event.getConnectionId() % NUM_DATABASE_WORK_EXECUTORS;
+ //System.out.println(event.getConnectionId());
+ int workerId = (int) (event.getConnectionId() % NUM_DATABASE_WORK_EXECUTORS);
workers[workerId].addWork(event, txId, txMode);
}
View
8 src/main/java/org/neo4j/smack/handler/RoutingHandler.java
@@ -21,12 +21,15 @@
import org.neo4j.smack.event.RequestEvent;
import org.neo4j.smack.routing.Router;
+import org.neo4j.smack.serialization.Deserializer;
+import org.neo4j.smack.serialization.SerializationFactory;
import com.lmax.disruptor.WorkHandler;
public class RoutingHandler implements WorkHandler<RequestEvent> {
private Router router;
+ SerializationFactory serializationFactory = new SerializationFactory();
public RoutingHandler(Router router) {
this.router = router;
@@ -35,6 +38,11 @@ public RoutingHandler(Router router) {
public void onEvent(final RequestEvent event)
throws Exception {
event.setEndpoint(router.route(event));
+
+ if(!event.hasFailed()) {
+ Deserializer d = serializationFactory.getDeserializer(event.getContent());
+ event.setDeserializedContent(event.getEndpoint().getDeserializationStrategy().deserialize(d));
+ }
}
}
View
731 src/main/java/org/neo4j/smack/http/HttpDecoder.java
@@ -0,0 +1,731 @@
+package org.neo4j.smack.http;
+
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
+import org.jboss.netty.handler.codec.http.DefaultHttpChunkTrailer;
+import org.jboss.netty.handler.codec.http.HttpChunk;
+import org.jboss.netty.handler.codec.http.HttpChunkTrailer;
+import org.jboss.netty.handler.codec.http.HttpHeaders;
+import org.jboss.netty.handler.codec.http.HttpHeaders.Names;
+import org.jboss.netty.handler.codec.http.HttpHeaders.Values;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
+import org.neo4j.smack.WorkInputGate;
+import org.neo4j.smack.routing.InvocationVerb;
+
+/**
+ * Modified version of Nettys HttpDecoder. This decoder does not
+ * create new HttpRequest objects, instead it builds up state for one
+ * request, and then moves that state over to the Smack input pipeline via
+ * a method call. That means less objects garbage collected, and
+ * the potential for garbage freedom.
+ *
+ * This is *very* much in dev right now, for instance chunking is epically
+ * broken. Don't use for production.
+ */
+public class HttpDecoder extends ReplayingDecoder<HttpDecoder.State> {
+
+ static enum State {
+ SKIP_CONTROL_CHARS,
+ READ_INITIAL,
+ READ_HEADER,
+ READ_VARIABLE_LENGTH_CONTENT,
+ READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS,
+ READ_FIXED_LENGTH_CONTENT,
+ READ_FIXED_LENGTH_CONTENT_AS_CHUNKS,
+ READ_CHUNK_SIZE,
+ READ_CHUNKED_CONTENT,
+ READ_CHUNKED_CONTENT_AS_CHUNKS,
+ READ_CHUNK_DELIMITER,
+ READ_CHUNK_FOOTER;
+ }
+
+ private final int maxInitialLineLength;
+ private final int maxHeaderSize;
+ private final int maxChunkSize;
+
+ // Request state
+
+ private long chunkSize;
+ private int headerSize;
+
+ private class DecodedHttpMessage {
+
+ private boolean chunked;
+ private Long contentLength;
+ private HashMap<String, String> headers = new HashMap<String, String>();
+ private HttpVersion protocolVersion;
+ private InvocationVerb verb;
+ private String path;
+
+ String getHeader(String name)
+ {
+ return headers.get(name);
+ }
+
+ List<String> getHeaders(String name)
+ {
+ throw new NotImplementedException();
+ }
+
+ HttpVersion getProtocolVersion()
+ {
+ return protocolVersion;
+ }
+
+ void addHeader(String name, Object value)
+ {
+ headers.put(name, String.valueOf(value));
+ }
+
+ void removeHeader(String name)
+ {
+ headers.remove(name);
+ }
+
+ void clearHeaders()
+ {
+ headers.clear();
+ }
+
+ long getContentLength(long defaultValue)
+ {
+ return contentLength != null ? contentLength : defaultValue;
+ }
+
+ boolean isChunked()
+ {
+ return chunked;
+ }
+
+ void setChunked(boolean chunked)
+ {
+ this.chunked = chunked;
+ }
+
+ boolean isKeepAlive()
+ {
+ String connection = getHeader(Names.CONNECTION);
+ if (Values.CLOSE.equalsIgnoreCase(connection)) {
+ return false;
+ }
+
+ if (protocolVersion.isKeepAliveDefault()) {
+ return !Values.CLOSE.equalsIgnoreCase(connection);
+ } else {
+ return Values.KEEP_ALIVE.equalsIgnoreCase(connection);
+ }
+ }
+
+ public InvocationVerb getVerb()
+ {
+ return verb;
+ }
+
+ public String getPath()
+ {
+ return path;
+ }
+
+ void reset(HttpVersion protocolVersion, InvocationVerb verb,
+ String path)
+ {
+ this.protocolVersion = protocolVersion;
+ this.verb = verb;
+ this.path = path;
+ }
+ }
+
+ private WorkInputGate workBuffer;
+
+ private ChannelBuffer content;
+
+ private DecodedHttpMessage message = new DecodedHttpMessage();
+ private boolean isDecodingRequest = true;
+
+ HttpDecoder(WorkInputGate workBuffer)
+ {
+ this(workBuffer, 4096, 8192, 8192);
+ }
+
+ /**
+ * Creates a new instance with the specified parameters.
+ */
+ protected HttpDecoder(WorkInputGate workBuffer,
+ int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
+
+ super(State.SKIP_CONTROL_CHARS, true);
+
+ if (maxInitialLineLength <= 0) {
+ throw new IllegalArgumentException(
+ "maxInitialLineLength must be a positive integer: " +
+ maxInitialLineLength);
+ }
+ if (maxHeaderSize <= 0) {
+ throw new IllegalArgumentException(
+ "maxHeaderSize must be a positive integer: " +
+ maxHeaderSize);
+ }
+ if (maxChunkSize < 0) {
+ throw new IllegalArgumentException(
+ "maxChunkSize must be a positive integer: " +
+ maxChunkSize);
+ }
+ this.maxInitialLineLength = maxInitialLineLength;
+ this.maxHeaderSize = maxHeaderSize;
+ this.maxChunkSize = maxChunkSize;
+ this.workBuffer = workBuffer;
+ }
+
+ /*
+ * Work in progress
+ */
+ @Override
+ @SuppressWarnings("fallthrough")
+ protected Object decode(ChannelHandlerContext ctx, Channel channel,
+ ChannelBuffer buffer, State state) throws Exception
+ {
+ switch (state) {
+ case SKIP_CONTROL_CHARS: {
+ try {
+ skipControlCharacters(buffer);
+ checkpoint(State.READ_INITIAL);
+ } finally {
+ checkpoint();
+ }
+ }
+ case READ_INITIAL: {
+ String[] initialLine = splitInitialLine(readLine(buffer, maxInitialLineLength));
+ if (initialLine.length < 3) {
+ // Invalid initial line - ignore.
+ checkpoint(State.SKIP_CONTROL_CHARS);
+ return null;
+ }
+
+ intializeMessage(initialLine);
+ checkpoint(State.READ_HEADER);
+ }
+ case READ_HEADER: {
+ State nextState = readHeaders(buffer);
+ checkpoint(nextState);
+ if (nextState == State.READ_CHUNK_SIZE) {
+ // Chunked encoding
+ message.setChunked(true);
+ // Generate DecodedHttpMessage first. HttpChunks will follow.
+ return message;
+ } else if (nextState == State.SKIP_CONTROL_CHARS) {
+ // No content is expected.
+ // Remove the headers which are not supposed to be present not
+ // to confuse subsequent handlers.
+ message.removeHeader(HttpHeaders.Names.TRANSFER_ENCODING);
+ return message;
+ } else {
+ long contentLength = message.getContentLength(-1);
+ if (contentLength == 0 || contentLength == -1 && isDecodingRequest ) {
+ content = ChannelBuffers.EMPTY_BUFFER;
+ return reset(ctx, channel);
+ }
+
+ switch (nextState) {
+ case READ_FIXED_LENGTH_CONTENT:
+ if (contentLength > maxChunkSize || is100ContinueExpected(message)) {
+ // Generate DecodedHttpMessage first. HttpChunks will follow.
+ checkpoint(State.READ_FIXED_LENGTH_CONTENT_AS_CHUNKS);
+ message.setChunked(true);
+ // chunkSize will be decreased as the READ_FIXED_LENGTH_CONTENT_AS_CHUNKS
+ // state reads data chunk by chunk.
+ chunkSize = message.getContentLength(-1);
+ return message;
+ }
+ break;
+ case READ_VARIABLE_LENGTH_CONTENT:
+ if (buffer.readableBytes() > maxChunkSize || is100ContinueExpected(message)) {
+ // Generate DecodedHttpMessage first. HttpChunks will follow.
+ checkpoint(State.READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS);
+ message.setChunked(true);
+ return message;
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unexpected state: " + nextState);
+ }
+ }
+ // We return null here, this forces decode to be called again where we will decode the content
+ return null;
+ }
+ case READ_VARIABLE_LENGTH_CONTENT: {
+ if (content == null) {
+ content = ChannelBuffers.dynamicBuffer(channel.getConfig().getBufferFactory());
+ }
+ //this will cause a replay error until the channel is closed where this will read what's left in the buffer
+ content.writeBytes(buffer.readBytes(buffer.readableBytes()));
+ return reset(ctx, channel);
+ }
+ case READ_VARIABLE_LENGTH_CONTENT_AS_CHUNKS: {
+ // Keep reading data as a chunk until the end of connection is reached.
+ int chunkSize = Math.min(maxChunkSize, buffer.readableBytes());
+ HttpChunk chunk = new DefaultHttpChunk(buffer.readBytes(chunkSize));
+
+ if (!buffer.readable()) {
+ // Reached to the end of the connection.
+ reset(ctx, channel);
+ if (!chunk.isLast()) {
+ // Append the last chunk.
+ return new Object[] { chunk, HttpChunk.LAST_CHUNK };
+ }
+ }
+ return chunk;
+ }
+ case READ_FIXED_LENGTH_CONTENT: {
+ //we have a content-length so we just read the correct number of bytes
+ readFixedLengthContent(buffer);
+ return reset(ctx, channel);
+ }
+ case READ_FIXED_LENGTH_CONTENT_AS_CHUNKS: {
+ long chunkSize = this.chunkSize;
+ HttpChunk chunk;
+ if (chunkSize > maxChunkSize) {
+ chunk = new DefaultHttpChunk(buffer.readBytes(maxChunkSize));
+ chunkSize -= maxChunkSize;
+ } else {
+ assert chunkSize <= Integer.MAX_VALUE;
+ chunk = new DefaultHttpChunk(buffer.readBytes((int) chunkSize));
+ chunkSize = 0;
+ }
+ this.chunkSize = chunkSize;
+
+ if (chunkSize == 0) {
+ // Read all content.
+ reset(ctx, channel);
+ if (!chunk.isLast()) {
+ // Append the last chunk.
+ return new Object[] { chunk, HttpChunk.LAST_CHUNK };
+ }
+ }
+ return chunk;
+ }
+ /**
+ * everything else after this point takes care of reading chunked content. basically, read chunk size,
+ * read chunk, read and ignore the CRLF and repeat until 0
+ */
+ case READ_CHUNK_SIZE: {
+ String line = readLine(buffer, maxInitialLineLength);
+ int chunkSize = getChunkSize(line);
+ this.chunkSize = chunkSize;
+ if (chunkSize == 0) {
+ checkpoint(State.READ_CHUNK_FOOTER);
+ return null;
+ } else if (chunkSize > maxChunkSize) {
+ // A chunk is too large. Split them into multiple chunks again.
+ checkpoint(State.READ_CHUNKED_CONTENT_AS_CHUNKS);
+ } else {
+ checkpoint(State.READ_CHUNKED_CONTENT);
+ }
+ }
+ case READ_CHUNKED_CONTENT: {
+ assert chunkSize <= Integer.MAX_VALUE;
+ HttpChunk chunk = new DefaultHttpChunk(buffer.readBytes((int) chunkSize));
+ checkpoint(State.READ_CHUNK_DELIMITER);
+ return chunk;
+ }
+ case READ_CHUNKED_CONTENT_AS_CHUNKS: {
+ long chunkSize = this.chunkSize;
+ HttpChunk chunk;
+ if (chunkSize > maxChunkSize) {
+ chunk = new DefaultHttpChunk(buffer.readBytes(maxChunkSize));
+ chunkSize -= maxChunkSize;
+ } else {
+ assert chunkSize <= Integer.MAX_VALUE;
+ chunk = new DefaultHttpChunk(buffer.readBytes((int) chunkSize));
+ chunkSize = 0;
+ }
+ this.chunkSize = chunkSize;
+
+ if (chunkSize == 0) {
+ // Read all content.
+ checkpoint(State.READ_CHUNK_DELIMITER);
+ }
+
+ if (!chunk.isLast()) {
+ return chunk;
+ }
+ }
+ case READ_CHUNK_DELIMITER: {
+ for (;;) {
+ byte next = buffer.readByte();
+ if (next == HttpTokens.CR) {
+ if (buffer.readByte() == HttpTokens.LF) {
+ checkpoint(State.READ_CHUNK_SIZE);
+ return null;
+ }
+ } else if (next == HttpTokens.LF) {
+ checkpoint(State.READ_CHUNK_SIZE);
+ return null;
+ }
+ }
+ }
+ case READ_CHUNK_FOOTER: {
+ HttpChunkTrailer trailer = readTrailingHeaders(buffer);
+ if (maxChunkSize == 0) {
+ // Chunked encoding disabled.
+ return reset(ctx, channel);
+ } else {
+ reset(ctx, channel);
+ // The last chunk, which is empty
+ return trailer;
+ }
+ }
+ default: {
+ throw new Error("Shouldn't reach here.");
+ }
+
+ }
+ }
+
+ private boolean is100ContinueExpected(DecodedHttpMessage message)
+ {
+ // It works only on HTTP/1.1 or later.
+ if (message.getProtocolVersion().compareTo(HttpVersion.HTTP_1_1) < 0) {
+ return false;
+ }
+
+ // In most cases, there will be one or zero 'Expect' header.
+ String value = message.getHeader(Names.EXPECT);
+ if (value == null) {
+ return false;
+ }
+ if (Values.CONTINUE.equalsIgnoreCase(value)) {
+ return true;
+ }
+
+ // Multiple 'Expect' headers. Search through them.
+ for (String v: message.getHeaders(Names.EXPECT)) {
+ if (Values.CONTINUE.equalsIgnoreCase(v)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void intializeMessage(String[] initialLine)
+ {
+ message.reset(HttpVersion.valueOf(initialLine[2]), InvocationVerb.valueOf(initialLine[0]), initialLine[1]);
+ }
+
+ protected boolean isContentAlwaysEmpty(DecodedHttpMessage msg) {
+ if (msg instanceof HttpResponse) {
+ HttpResponse res = (HttpResponse) msg;
+ int code = res.getStatus().getCode();
+ if (code < 200) {
+ return true;
+ }
+ switch (code) {
+ case 204: case 205: case 304:
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private Object reset(ChannelHandlerContext ctx, Channel channel) {
+
+ Long connectionId = (Long)ctx.getAttachment();
+
+ workBuffer.addWork(connectionId, message.getVerb(), message.getPath(), content, channel, message.isKeepAlive());
+
+ checkpoint(State.SKIP_CONTROL_CHARS);
+ return null;
+ }
+
+ private void skipControlCharacters(ChannelBuffer buffer) {
+ for (;;) {
+ char c = (char) buffer.readUnsignedByte();
+ if (!Character.isISOControl(c) &&
+ !Character.isWhitespace(c)) {
+ buffer.readerIndex(buffer.readerIndex() - 1);
+ break;
+ }
+ }
+ }
+
+ private void readFixedLengthContent(ChannelBuffer buffer) {
+ long length = message.getContentLength(-1);
+ assert length <= Integer.MAX_VALUE;
+
+ if (content == null) {
+ content = buffer.readBytes((int) length);
+ } else {
+ content.writeBytes(buffer.readBytes((int) length));
+ }
+ }
+
+ private State readHeaders(ChannelBuffer buffer) throws TooLongFrameException {
+ headerSize = 0;
+ String line = readHeader(buffer);
+ String name = null;
+ String value = null;
+ if (line.length() != 0) {
+ message.clearHeaders();
+ do {
+ char firstChar = line.charAt(0);
+ if (name != null && (firstChar == ' ' || firstChar == '\t')) {
+ value = value + ' ' + line.trim();
+ } else {
+ if (name != null) {
+ message.addHeader(name, value);
+ }
+ String[] header = splitHeader(line);
+ name = header[0];
+ value = header[1];
+ }
+
+ line = readHeader(buffer);
+ } while (line.length() != 0);
+
+ // Add the last header.
+ if (name != null) {
+ message.addHeader(name, value);
+ }
+ }
+
+ State nextState;
+
+ if (isContentAlwaysEmpty(message)) {
+ nextState = State.SKIP_CONTROL_CHARS;
+ } else if (message.isChunked()) {
+ // DecodedHttpMessage.isChunked() returns true when either:
+ // 1) DecodedHttpMessage.setChunked(true) was called or
+ // 2) 'Transfer-Encoding' is 'chunked'.
+ // Because this decoder did not call DecodedHttpMessage.setChunked(true)
+ // yet, DecodedHttpMessage.isChunked() should return true only when
+ // 'Transfer-Encoding' is 'chunked'.
+ nextState = State.READ_CHUNK_SIZE;
+ } else if (message.getContentLength(-1) >= 0) {
+ nextState = State.READ_FIXED_LENGTH_CONTENT;
+ } else {
+ nextState = State.READ_VARIABLE_LENGTH_CONTENT;
+ }
+ return nextState;
+ }
+
+ private HttpChunkTrailer readTrailingHeaders(ChannelBuffer buffer) throws TooLongFrameException {
+ headerSize = 0;
+ String line = readHeader(buffer);
+ String lastHeader = null;
+ if (line.length() != 0) {
+ HttpChunkTrailer trailer = new DefaultHttpChunkTrailer();
+ do {
+ char firstChar = line.charAt(0);
+ if (lastHeader != null && (firstChar == ' ' || firstChar == '\t')) {
+ List<String> current = trailer.getHeaders(lastHeader);
+ if (current.size() != 0) {
+ int lastPos = current.size() - 1;
+ String newString = current.get(lastPos) + line.trim();
+ current.set(lastPos, newString);
+ } else {
+ // Content-Length, Transfer-Encoding, or Trailer
+ }
+ } else {
+ String[] header = splitHeader(line);
+ String name = header[0];
+ if (!name.equalsIgnoreCase(HttpHeaders.Names.CONTENT_LENGTH) &&
+ !name.equalsIgnoreCase(HttpHeaders.Names.TRANSFER_ENCODING) &&
+ !name.equalsIgnoreCase(HttpHeaders.Names.TRAILER)) {
+ trailer.addHeader(name, header[1]);
+ }
+ lastHeader = name;
+ }
+
+ line = readHeader(buffer);
+ } while (line.length() != 0);
+
+ return trailer;
+ }
+
+ return HttpChunk.LAST_CHUNK;
+ }
+
+ private String readHeader(ChannelBuffer buffer) throws TooLongFrameException {
+ StringBuilder sb = new StringBuilder(64);
+ int headerSize = this.headerSize;
+
+ loop:
+ for (;;) {
+ char nextByte = (char) buffer.readByte();
+ headerSize ++;
+
+ switch (nextByte) {
+ case HttpTokens.CR:
+ nextByte = (char) buffer.readByte();
+ headerSize ++;
+ if (nextByte == HttpTokens.LF) {
+ break loop;
+ }
+ break;
+ case HttpTokens.LF:
+ break loop;
+ }
+
+ // Abort decoding if the header part is too large.
+ if (headerSize >= maxHeaderSize) {
+ // TODO: Respond with Bad Request and discard the traffic
+ // or close the connection.
+ // No need to notify the upstream handlers - just log.
+ // If decoding a response, just throw an exception.
+ throw new TooLongFrameException(
+ "HTTP header is larger than " +
+ maxHeaderSize + " bytes.");
+
+ }
+
+ sb.append(nextByte);
+ }
+
+ this.headerSize = headerSize;
+ return sb.toString();
+ }
+
+ private int getChunkSize(String hex) {
+ hex = hex.trim();
+ for (int i = 0; i < hex.length(); i ++) {
+ char c = hex.charAt(i);
+ if (c == ';' || Character.isWhitespace(c) || Character.isISOControl(c)) {
+ hex = hex.substring(0, i);
+ break;
+ }
+ }
+
+ return Integer.parseInt(hex, 16);
+ }
+
+ private String readLine(ChannelBuffer buffer, int maxLineLength) throws TooLongFrameException {
+ StringBuilder sb = new StringBuilder(64);
+ int lineLength = 0;
+ while (true) {
+ byte nextByte = buffer.readByte();
+ if (nextByte == HttpTokens.CR) {
+ nextByte = buffer.readByte();
+ if (nextByte == HttpTokens.LF) {
+ return sb.toString();
+ }
+ }
+ else if (nextByte == HttpTokens.LF) {
+ return sb.toString();
+ }
+ else {
+ if (lineLength >= maxLineLength) {
+ // TODO: Respond with Bad Request and discard the traffic
+ // or close the connection.
+ // No need to notify the upstream handlers - just log.
+ // If decoding a response, just throw an exception.
+ throw new TooLongFrameException(
+ "An HTTP line is larger than " + maxLineLength +
+ " bytes.");
+ }
+ lineLength ++;
+ sb.append((char) nextByte);
+ }
+ }
+ }
+
+ private String[] splitInitialLine(String sb) {
+ int aStart;
+ int aEnd;
+ int bStart;
+ int bEnd;
+ int cStart;
+ int cEnd;
+
+ aStart = findNonWhitespace(sb, 0);
+ aEnd = findWhitespace(sb, aStart);
+
+ bStart = findNonWhitespace(sb, aEnd);
+ bEnd = findWhitespace(sb, bStart);
+
+ cStart = findNonWhitespace(sb, bEnd);
+ cEnd = findEndOfString(sb);
+
+ return new String[] {
+ sb.substring(aStart, aEnd),
+ sb.substring(bStart, bEnd),
+ cStart < cEnd? sb.substring(cStart, cEnd) : "" };
+ }
+
+ private String[] splitHeader(String sb) {
+ final int length = sb.length();
+ int nameStart;
+ int nameEnd;
+ int colonEnd;
+ int valueStart;
+ int valueEnd;
+
+ nameStart = findNonWhitespace(sb, 0);
+ for (nameEnd = nameStart; nameEnd < length; nameEnd ++) {
+ char ch = sb.charAt(nameEnd);
+ if (ch == ':' || Character.isWhitespace(ch)) {
+ break;
+ }
+ }
+
+ for (colonEnd = nameEnd; colonEnd < length; colonEnd ++) {
+ if (sb.charAt(colonEnd) == ':') {
+ colonEnd ++;
+ break;
+ }
+ }
+
+ valueStart = findNonWhitespace(sb, colonEnd);
+ if (valueStart == length) {
+ return new String[] {
+ sb.substring(nameStart, nameEnd),
+ ""
+ };
+ }
+
+ valueEnd = findEndOfString(sb);
+ return new String[] {
+ sb.substring(nameStart, nameEnd),
+ sb.substring(valueStart, valueEnd)
+ };
+ }
+
+ private int findNonWhitespace(String sb, int offset) {
+ int result;
+ for (result = offset; result < sb.length(); result ++) {
+ if (!Character.isWhitespace(sb.charAt(result))) {
+ break;
+ }
+ }
+ return result;
+ }
+
+ private int findWhitespace(String sb, int offset) {
+ int result;
+ for (result = offset; result < sb.length(); result ++) {
+ if (Character.isWhitespace(sb.charAt(result))) {
+ break;
+ }
+ }
+ return result;
+ }
+
+ private int findEndOfString(String sb) {
+ int result;
+ for (result = sb.length(); result > 0; result --) {
+ if (!Character.isWhitespace(sb.charAt(result - 1))) {
+ break;
+ }
+ }
+ return result;
+ }
+
+}
View
53 src/main/java/org/neo4j/smack/http/HttpTokens.java
@@ -0,0 +1,53 @@
+package org.neo4j.smack.http;
+
+import java.nio.charset.Charset;
+
+import org.jboss.netty.util.CharsetUtil;
+
+public class HttpTokens {
+
+ static final byte SP = 32;
+
+ //tab ' '
+ static final byte HT = 9;
+
+ /**
+ * Carriage return
+ */
+ static final byte CR = 13;
+
+ /**
+ * Equals '='
+ */
+ static final byte EQUALS = 61;
+
+ /**
+ * Line feed character
+ */
+ static final byte LF = 10;
+
+ /**
+ * carriage return line feed
+ */
+ static final byte[] CRLF = new byte[] { CR, LF };
+
+ /**
+ * Colon ':'
+ */
+ static final byte COLON = 58;
+
+ /**
+ * Semicolon ';'
+ */
+ static final byte SEMICOLON = 59;
+
+ /**
+ * comma ','
+ */
+ static final byte COMMA = 44;
+
+ static final byte DOUBLE_QUOTE = '"';
+
+ static final Charset DEFAULT_CHARSET = CharsetUtil.UTF_8;
+
+}
View
2  ...4j/smack/NettyChannelTrackingHandler.java → ...ack/http/NettyChannelTrackingHandler.java
@@ -17,7 +17,7 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-package org.neo4j.smack;
+package org.neo4j.smack.http;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
View
57 ...ava/org/neo4j/smack/NettyHttpHandler.java → ...rg/neo4j/smack/http/NettyHttpHandler.java
@@ -17,9 +17,8 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-package org.neo4j.smack;
+package org.neo4j.smack.http;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
@@ -42,58 +41,47 @@
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.jboss.netty.util.CharsetUtil;
+import org.neo4j.smack.WorkInputGate;
import org.neo4j.smack.event.RequestEvent;
-import org.neo4j.smack.routing.InvocationVerb;
-
-import com.lmax.disruptor.RingBuffer;
public class NettyHttpHandler extends SimpleChannelHandler {
- private RingBuffer<RequestEvent> workBuffer;
+ private HttpDecoder httpDecoder;
+
+ private AtomicLong connectionId;
- public NettyHttpHandler(RingBuffer<RequestEvent> workBuffer) {
- this.workBuffer = workBuffer;
+ public NettyHttpHandler(WorkInputGate workBuffer, AtomicLong connectionIdGenerator) {
+ this.httpDecoder = new HttpDecoder(workBuffer);
+ this.connectionId = connectionIdGenerator;
}
- static AtomicLong connectionId = new AtomicLong(0l);
-
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
- HttpRequest httpRequest = (HttpRequest) e.getMessage();
-
- long sequenceNo = workBuffer.next();
- RequestEvent event = workBuffer.get(sequenceNo);
-
- event.setConnectionId((Long)ctx.getAttachment());
-
- addMethod(httpRequest, event);
- addParamsAndPath(httpRequest, event);
-
- event.setIsPersistentConnection(isKeepAlive(httpRequest));
- event.setContent(httpRequest.getContent());
- event.setChannel(ctx.getChannel());
-
- workBuffer.publish(sequenceNo);
- }
-
- private void addMethod(HttpRequest httpRequest, RequestEvent event) {
- final String methodName = httpRequest.getMethod().getName();
- event.setVerb(InvocationVerb.valueOf(methodName.toUpperCase()));
+// HttpRequest httpRequest = (HttpRequest) e.getMessage();
+// Long connectionId = (Long)ctx.getAttachment();
+// InvocationVerb verb = InvocationVerb.valueOf(httpRequest.getMethod().getName().toUpperCase());
+//
+// workBuffer.addWork(connectionId, verb, httpRequest.getUri(), httpRequest.getContent(), ctx.getChannel(), isKeepAlive(httpRequest));
+ httpDecoder.messageReceived(ctx, e);
}
+ // TODO: This should go in router
private void addParamsAndPath(HttpRequest httpRequest, RequestEvent event) {
final String uri = httpRequest.getUri();
if (uri.contains("?")) {
final QueryStringDecoder decoder = new QueryStringDecoder(uri);
- event.getPathVariables().add(decoder.getParameters());
- event.setPath(decoder.getPath());
+ //event.getPathVariables().add(decoder.getParameters());
+ //event.setPath(decoder.getPath());
} else {
- event.setPath(uri);
+ //event.setPath(uri);
}
}
- // todo use output buffer for exception handling
+ // TODO: Create a failure RequestEvent from this, output
+ // should not be done from here, it needs to be done from the
+ // database worker thread assigned to this connection, because
+ // otherwise responses may be sent out of order to the client.
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
@@ -113,6 +101,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
ctx.setAttachment(connectionId.incrementAndGet());
+ System.out.println("Assigned connection: " + connectionId.get());
}
private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
View
22 ...neo4j/smack/NettyHttpPipelineFactory.java → .../smack/http/NettyHttpPipelineFactory.java
@@ -17,27 +17,27 @@
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-package org.neo4j.smack;
+package org.neo4j.smack.http;
+
+import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.neo4j.smack.event.RequestEvent;
-
-import com.lmax.disruptor.RingBuffer;
+import org.neo4j.smack.WorkInputGate;
public class NettyHttpPipelineFactory implements ChannelPipelineFactory {
- private RingBuffer<RequestEvent> workBuffer;
+ private WorkInputGate workConsumer;
private ChannelGroup openChannels;
+ private AtomicLong connectionIdGenerator;
- public NettyHttpPipelineFactory(RingBuffer<RequestEvent> workBuffer, ChannelGroup openChannels) {
- this.workBuffer = workBuffer;
+ public NettyHttpPipelineFactory(WorkInputGate workBuffer, ChannelGroup openChannels) {
+ this.workConsumer = workBuffer;
this.openChannels = openChannels;
+ this.connectionIdGenerator = new AtomicLong();
}
public ChannelPipeline getPipeline() throws Exception {
@@ -51,11 +51,9 @@ public ChannelPipeline getPipeline() throws Exception {
// pipeline.addLast("ssl", new SslHandler(engine));
pipeline.addLast("channeltracker",new NettyChannelTrackingHandler(openChannels));
- pipeline.addLast("decoder", new HttpRequestDecoder());
- pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
pipeline.addLast("encoder", new HttpResponseEncoder());
//pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
- pipeline.addLast("handler", new NettyHttpHandler(workBuffer));
+ pipeline.addLast("handler", new NettyHttpHandler(workConsumer, connectionIdGenerator));
return pipeline;
}
View
12 src/main/java/org/neo4j/smack/routing/Routable.java
@@ -0,0 +1,12 @@
+package org.neo4j.smack.routing;
+
+
+public interface Routable {
+
+ String getPath();
+
+ InvocationVerb getVerb();
+
+ PathVariables getPathVariables();
+
+}
View
9 src/main/java/org/neo4j/smack/routing/Router.java
@@ -24,7 +24,6 @@
import java.util.regex.MatchResult;
import org.apache.log4j.Logger;
-import org.neo4j.smack.event.RequestEvent;
import com.sun.jersey.server.impl.uri.PathPattern;
import com.sun.jersey.server.impl.uri.PathTemplate;
@@ -35,18 +34,18 @@
private static final Logger logger=Logger.getLogger(Router.class);
private static Endpoint notFoundEndpoint = new NotFoundEndpoint();
- public Endpoint route(RequestEvent event)
+ public Endpoint route(Routable routable)
{
- String path = event.getPath();
+ String path = routable.getPath();
if (path.endsWith("/")) path = path.substring(0,path.length()-1);
for(RouteEntry route : routes) // todo parallelize routing ?? (overhead ?)
{
MatchResult matchResult = route.pattern.match(path);
if(matchResult != null)
{
- Endpoint endpoint = route.getEndpoint(event.getVerb());
+ Endpoint endpoint = route.getEndpoint(routable.getVerb());
if(endpoint != null) {
- event.getPathVariables().add(matchResult, route.pattern); // todo is this the best way ?
+ routable.getPathVariables().add(matchResult, route.pattern); // todo is this the best way ?
return endpoint;
}
return notFoundEndpoint;
View
137 src/test/java/org/neo4j/smack/http/TestHttpDecoder.java
@@ -0,0 +1,137 @@
+package org.neo4j.smack.http;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.buffer.HeapChannelBufferFactory;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelConfig;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.junit.Test;
+import org.neo4j.smack.WorkInputGate;
+import org.neo4j.smack.routing.InvocationVerb;
+
+public class TestHttpDecoder {
+
+ class Request
+ {
+ public Request(Long connectionId, InvocationVerb verb, String path, ChannelBuffer content, Channel channel, boolean keepAlive)
+ {
+ this.connectionId = connectionId;
+ this.verb = verb;
+ this.path = path;
+ this.content = content;
+ this.channel = channel;
+ this.keepAlive = keepAlive;
+ }
+
+ public Long connectionId;
+ public InvocationVerb verb;
+ public String path;
+ public ChannelBuffer content;
+ public Channel channel;
+ public boolean keepAlive;
+ }
+
+ class RequestMatcher
+ {
+ public RequestMatcher(Long connectionId, InvocationVerb verb, String path)
+ {
+ this(connectionId, verb, path, null, null, true);
+ }
+
+ public RequestMatcher(Long connectionId, InvocationVerb verb, String path, ChannelBuffer content, Channel channel, boolean keepAlive)
+ {
+ this.connectionId = connectionId;
+ this.verb = verb;
+ this.path = path;
+ this.content = content;
+ this.channel = channel;
+ this.keepAlive = keepAlive;
+ }
+
+ public boolean matches(Request work)
+ {
+ if(work.connectionId == connectionId && work.verb == verb && work.path.equals(path) && work.keepAlive == keepAlive) {
+ return true;
+ }
+ return false;
+ }
+
+ public Long connectionId;
+ public InvocationVerb verb;
+ public String path;
+ public ChannelBuffer content;
+ public Channel channel;
+ public boolean keepAlive;
+ }
+
+ class DummyInputGate implements WorkInputGate
+ {
+
+ ArrayList<Request> requests = new ArrayList<Request>();
+
+ @Override
+ public void addWork(Long connectionId, InvocationVerb verb,
+ String path, ChannelBuffer content, Channel channel,
+ boolean keepAlive)
+ {
+ Request work = new Request(connectionId,verb,path,content,channel,keepAlive);
+ requests.add(work);
+ }
+
+ }
+
+ @Test
+ public void shouldDecodeSimpleMessage() throws Exception
+ {
+ ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
+
+ buf.writeBytes("GET / HTTP/1.1".getBytes("ASCII"));
+ buf.writeByte(HttpTokens.CR);
+ buf.writeByte(HttpTokens.LF);
+
+ buf.writeBytes("Content-Length: 0".getBytes("ASCII"));
+ buf.writeByte(HttpTokens.CR);
+ buf.writeByte(HttpTokens.LF);
+
+ buf.writeByte(HttpTokens.CR);
+ buf.writeByte(HttpTokens.LF);
+
+ testDecoding(buf, new RequestMatcher(0l, InvocationVerb.GET, "/"));
+ }
+
+ private void testDecoding(ChannelBuffer buf, RequestMatcher ... requestMatchers) throws Exception
+ {
+ DummyInputGate inputGate = new DummyInputGate();
+
+ HttpDecoder decoder = new HttpDecoder(inputGate);
+
+ MessageEvent msg = mock(MessageEvent.class);
+ when(msg.getMessage()).thenReturn(buf);
+
+ ChannelConfig channelConfig = mock(ChannelConfig.class);
+ when(channelConfig.getBufferFactory()).thenReturn(new HeapChannelBufferFactory());
+
+ Channel channel = mock(Channel.class);
+ when(channel.getConfig()).thenReturn(channelConfig);
+
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ when(ctx.getChannel()).thenReturn(channel);
+
+ // Decode
+
+ decoder.messageReceived(ctx, msg);
+
+ // Check result
+
+ assertThat("Should yield "+requestMatchers.length+" input requests", inputGate.requests.size(), is(requestMatchers.length));
+ }
+}
View
54 src/test/java/org/neo4j/smack/routing/TestRouter.java
@@ -7,12 +7,40 @@
import org.junit.Test;
import org.neo4j.smack.event.Invocation;
import org.neo4j.smack.event.Output;
-import org.neo4j.smack.event.RequestEvent;
import org.neo4j.smack.serialization.DeserializationStrategy;
import org.neo4j.smack.serialization.SerializationStrategy;
public class TestRouter {
+ class Routling implements Routable {
+ private String path;
+ private InvocationVerb verb;
+ private PathVariables pathVariables = new PathVariables();
+
+ Routling(InvocationVerb verb, String path) {
+ this.verb = verb;
+ this.path = path;
+ }
+
+ @Override
+ public String getPath()
+ {
+ return path;
+ }
+
+ @Override
+ public InvocationVerb getVerb()
+ {
+ return verb;
+ }
+
+ @Override
+ public PathVariables getPathVariables()
+ {
+ return pathVariables;
+ }
+ }
+
@Test
public void shouldRouteVerbsCorrectly() {
Endpoint e = new Endpoint() {
@@ -43,18 +71,10 @@ public boolean isTransactional() {
r.addRoute("/db/data", e);
r.compileRoutes();
- RequestEvent req = new RequestEvent();
- req.setVerb(InvocationVerb.GET);
- req.setPath("/db/data");
-
- Endpoint found = r.route(req);
+ Endpoint found = r.route(new Routling(InvocationVerb.GET, "/db/data"));
assertNotNull(found);
- req = new RequestEvent();
- req.setVerb(InvocationVerb.POST);
- req.setPath("/db/data");
-
- Endpoint endpoint = r.route(req);
+ Endpoint endpoint = r.route(new Routling(InvocationVerb.POST, "/db/data"));
assertThat(endpoint, instanceOf(NotFoundEndpoint.class));
}
@@ -89,18 +109,10 @@ public boolean isTransactional() {
r.addRoute("/db/data", e);
r.compileRoutes();
- RequestEvent req = new RequestEvent();
- req.setVerb(InvocationVerb.GET);
- req.setPath("/db/data");
-
- Endpoint found = r.route(req);
+ Endpoint found = r.route(new Routling(InvocationVerb.GET, "/db/data"));
assertNotNull(found);
-
- req = new RequestEvent();
- req.setVerb(InvocationVerb.GET);
- req.setPath("/db/da");
- Endpoint endpoint = r.route(req);
+ Endpoint endpoint = r.route(new Routling(InvocationVerb.GET, "/db/da"));
assertThat(endpoint, instanceOf(NotFoundEndpoint.class));
}
View
36 src/test/java/org/neo4j/smack/test/performance/NetworkLatency.java
@@ -1,5 +1,6 @@
package org.neo4j.smack.test.performance;
+import java.net.URI;
import java.util.Date;
import javax.ws.rs.core.MediaType;
@@ -7,6 +8,7 @@
import org.neo4j.smack.Database;
import org.neo4j.smack.SmackServer;
import org.neo4j.smack.test.util.PerformanceRoutes;
+import org.neo4j.smack.test.util.PipelinedHttpClient;
import org.neo4j.test.ImpermanentGraphDatabase;
import com.sun.jersey.api.client.Client;
@@ -17,14 +19,19 @@
* This is meant as a source of feedback for experimenting
* with improving network latency.
*
+ * High scores:
+ * smack : 88.229 µs / req
+ * jetty+jersey : 1 971.900 µs / req
+ *
* Suggested things to try:
- * - Make sure we re use TCP connections
- * - Drop the Jersey HTTP client, use a Netty based client instead
* - Look into adjusting TCP packet ACK rate from Java
+ * - Go through full call path, ensure it is garbage free
+ * - Look into optimizing the request router
*/
public class NetworkLatency {
private SmackServer server;
+ private PipelinedHttpClient pipelineClient;
public static void main(String [] args) {
NetworkLatency latency = new NetworkLatency();
@@ -36,17 +43,29 @@ public static void main(String [] args) {
private double test() {
try {
- int numRequests = 100000;
+ int numRequests = 1000000;
startServer();
Date start = new Date();
- sendXRequests("http://localhost:7473" + PerformanceRoutes.NO_SERIALIZATION_AND_NO_DESERIALIZATION, numRequests);
+ //sendXRequests("http://localhost:7473" + PerformanceRoutes.NO_SERIALIZATION_AND_NO_DESERIALIZATION, numRequests);
Date end = new Date();
+ // Pipelined calls
+ pipelineClient = new PipelinedHttpClient("localhost", 7473);
+
+ start = new Date();
+ sendXRequests("http://localhost:7473" + PerformanceRoutes.NO_SERIALIZATION_AND_NO_DESERIALIZATION, numRequests);
+ //sendXRequestsPipelined("http://localhost:7473" + PerformanceRoutes.NO_SERIALIZATION_AND_NO_DESERIALIZATION, numRequests);
+ end = new Date();
+
long total = end.getTime() - start.getTime();
return ((double)total)/numRequests;
+ } catch (Throwable e)
+ {
+ e.printStackTrace();
+ return 0d;
} finally {
stopServer();
}
@@ -59,6 +78,15 @@ private void sendXRequests(String uri, int numRequests) {
}
}
+ private void sendXRequestsPipelined(String uri, int numRequests) throws InterruptedException {
+ URI target = URI.create(uri);
+ for(int i=0;i<numRequests;i++) {
+ //pipelineClient.handle(HttpMethod.GET, target, "");
+ pipelineClient.sendRaw(1);
+ pipelineClient.waitForXResponses(i);
+ }
+ }
+
private void startServer() {
server = new SmackServer("localhost", 7473, new Database(new ImpermanentGraphDatabase()));
server.addRoute("",new PerformanceRoutes());
View
98 src/test/java/org/neo4j/smack/test/performance/NetworkThroughput.java
@@ -3,19 +3,25 @@
import java.net.URI;
import java.util.Date;
-import javax.ws.rs.core.MediaType;
-
-import org.jboss.netty.handler.codec.http.HttpMethod;
import org.neo4j.smack.Database;
import org.neo4j.smack.SmackServer;
import org.neo4j.smack.test.util.PerformanceRoutes;
import org.neo4j.smack.test.util.PipelinedHttpClient;
import org.neo4j.test.ImpermanentGraphDatabase;
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource.Builder;
-
+/**
+ * Meant to be used as a tool to maximize throughput in the
+ * entire stack, including the network layer.
+ *
+ * High scores
+ * smack, pipelined, 2 channels : 279 415.4628 req/second (2012-04-19, JH)
+ * smack, pipelined, 4 channels : 234 752.8052 req/second (2012-04-19, JH)
+ * smack, pipelined, single channel : 204 457.1662 req/second (2012-04-19, JH)
+ * jetty+jersey, pipelined, single channel : 17 470.6057 req/second (2012-04-18, JH)
+ * smack, non-pipelined, single channel : 8 443.8064 req/second (2012-04-18, JH)
+ * jetty+jersey, non-pipelined, single channel : 509.9959 req/second (2012-04-18, JH)
+ *
+ */
public class NetworkThroughput {
private SmackServer server;
@@ -52,10 +58,13 @@ public static void main(String [] args) {
private NetworkThroughputResult test() {
NetworkThroughputResult result = new NetworkThroughputResult();
try {
+ //Thread.sleep(1000 * 15);
- int numRequests = 1000000;
+ int numRequests = 10000000;
startServer();
+ pipelineClient = new PipelinedHttpClient("localhost", 7473);
+
// Simple HTTP calls
Date start = new Date();
@@ -66,14 +75,20 @@ private NetworkThroughputResult test() {
result.simpleHttpCalls = ((double)numRequests)/totalSeconds;
// Pipelined calls
+ pipelineClient.responseHandler.responseCount.set(0);
- pipelineClient = new PipelinedHttpClient("localhost", 7473);
+ System.out.println("Warming up..");
+ sendXRequestsPipelined("http://localhost:7473" + PerformanceRoutes.NO_SERIALIZATION_AND_NO_DESERIALIZATION, 10000);
+ pipelineClient.responseHandler.responseCount.set(0);
+ System.out.println("Running test..");
start = new Date();
- sendXRequestsPipelined("http://localhost:7473" + PerformanceRoutes.NO_SERIALIZATION_AND_NO_DESERIALIZATION, numRequests);
+ //sendXRequestsPipelined("http://localhost:7473" + PerformanceRoutes.NO_SERIALIZATION_AND_NO_DESERIALIZATION, numRequests);
+ totalSeconds = sendXRequestsPipelinedMultiThreaded("http://localhost:7474/dummy/justreturn/200", numRequests, 2);
end = new Date();
- totalSeconds = (end.getTime() - start.getTime()) / 1000.0d;
+ //totalSeconds = (end.getTime() - start.getTime()) / 1000.0d;
+ System.out.println("Did " + numRequests + " http calls in " + totalSeconds + " seconds.");
result.pipelinedCalls = ((double)numRequests)/totalSeconds;
return result;
@@ -85,17 +100,66 @@ private NetworkThroughputResult test() {
}
}
- private void sendXRequests(String uri, int numRequests) {
- Builder resource = Client.create().resource(uri).accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON);
- for(int i=0;i<numRequests;i++) {
- ClientResponse response = resource.get(ClientResponse.class);
+ private double sendXRequestsPipelinedMultiThreaded(String uri,
+ int numRequests, int numThreads) throws InterruptedException
+ {
+ Thread [] runnables = new Thread[numThreads];
+ final int numRequestsPerThread = (int) numRequests / numThreads;
+ for(int i=0;i<numThreads;i++) {
+ runnables[i] = new Thread(new Runnable(){
+
+ private PipelinedHttpClient client;
+
+ {
+ client = new PipelinedHttpClient("localhost", 7473);
+ }
+
+ @Override
+ public void run()
+ {
+ for(int i=0;i<numRequestsPerThread;i+=10) {
+ //pipelineClient.handle(HttpMethod.GET, target, "");
+ client.sendRaw(10);
+ }
+
+ try
+ {
+ client.waitForXResponses(numRequestsPerThread);
+ } catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ }
+
+ Date start = new Date();
+ for(int i=0;i<numThreads;i++) {
+ runnables[i].start();
+ }
+
+ for(int i=0;i<numThreads;i++) {
+ runnables[i].join();
+ }
+ Date end = new Date();
+
+ return (end.getTime() - start.getTime()) / 1000.0d;
+ }
+
+ private void sendXRequests(String uri, int numRequests) throws InterruptedException {
+ URI target = URI.create(uri);
+ for(int i=0;i<numRequests;i+=1) {
+ //pipelineClient.handle(HttpMethod.GET, target, "");
+ pipelineClient.sendRaw(1);
+ pipelineClient.waitForXResponses(i);
}
}
private void sendXRequestsPipelined(String uri, int numRequests) throws InterruptedException {
URI target = URI.create(uri);
- for(int i=0;i<numRequests;i++) {
- pipelineClient.handle(HttpMethod.GET, target, "");
+ for(int i=0;i<numRequests;i+=20) {
+ //pipelineClient.handle(HttpMethod.GET, target, "");
+ pipelineClient.sendRaw(20);
}
pipelineClient.waitForXResponses(numRequests);
View
222 src/test/java/org/neo4j/smack/test/util/PipelinedHttpClient.java
@@ -2,11 +2,15 @@
import static org.jboss.netty.channel.Channels.pipeline;
+import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.net.URI;
+import java.nio.charset.Charset;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
@@ -17,14 +21,14 @@
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpClientCodec;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpVersion;
+import org.jboss.netty.handler.codec.replay.ReplayingDecoder;
import org.jboss.netty.util.CharsetUtil;
/**
@@ -33,6 +37,50 @@
*/
public class PipelinedHttpClient {
+ static final byte SP = 32;
+
+ //tab ' '
+ static final byte HT = 9;
+
+ /**
+ * Carriage return
+ */
+ static final byte CR = 13;
+
+ /**
+ * Equals '='
+ */
+ static final byte EQUALS = 61;
+
+ /**
+ * Line feed character
+ */
+ static final byte LF = 10;
+
+ /**
+ * carriage return line feed
+ */
+ static final byte[] CRLF = new byte[] { CR, LF };
+
+ /**
+ * Colon ':'
+ */
+ static final byte COLON = 58;
+
+ /**
+ * Semicolon ';'
+ */
+ static final byte SEMICOLON = 59;
+
+ /**
+ * comma ','
+ */
+ static final byte COMMA = 44;
+
+ static final byte DOUBLE_QUOTE = '"';
+
+ static final Charset DEFAULT_CHARSET = CharsetUtil.UTF_8;
+
public static class HttpClientPipelineFactory implements ChannelPipelineFactory {
private HttpResponseHandler responseHandler;
@@ -45,10 +93,10 @@ public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();
- pipeline.addLast("codec", new HttpClientCodec());
+ //pipeline.addLast("codec", new HttpClientCodec());
// Uncomment the following line if you don't want to handle HttpChunks.
- pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
+ //pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
pipeline.addLast("handler", responseHandler);
return pipeline;
@@ -56,17 +104,122 @@ public ChannelPipeline getPipeline() throws Exception {
}
- public static class HttpResponseHandler extends SimpleChannelUpstreamHandler {
- public int responseCount = 0;
+ public static class HttpResponseHandler extends SimpleChannelUpstreamHandler {
+
+ static class HttpDecoder extends ReplayingDecoder<HttpDecoder.State>
+ {
+ enum State {
+ SKIP_CONTROL_CHARS,
+ READ_INITIAL,
+ READ_HEADERS;
+ }
+
+ private AtomicInteger responseCount;
+
+ HttpDecoder(AtomicInteger responseCount) {
+ super(State.SKIP_CONTROL_CHARS, true);
+ this.responseCount = responseCount;
+ }
+
+ /**
+ * Capable of parsing a single type of HTTP responses, namely:
+ *
+ * HTTP/1.1 200 OK
+ * Content-Length: 0
+ *
+ * Used to count responses for performance testing.
+ */
+ @Override
+ @SuppressWarnings ("fallthrough")
+ protected Object decode(ChannelHandlerContext ctx, Channel channel,
+ ChannelBuffer buffer, State state) throws Exception
+ {
+ switch (state) {
+ case SKIP_CONTROL_CHARS: {
+ try {
+ skipControlCharacters(buffer);
+ checkpoint(State.READ_INITIAL);
+ } finally {
+ checkpoint();
+ }
+ }
+ case READ_INITIAL: {
+ readLine(buffer, 1000);
+ checkpoint(State.READ_HEADERS);
+ }
+ case READ_HEADERS: {
+ readLine(buffer, 1000);
+ responseCount.incrementAndGet();
+ return reset();
+ }
+ default: {
+ throw new Error("Shouldn't reach here.");
+ }
+
+ }
+ }
+
+ private Object reset()
+ {
+ checkpoint(State.SKIP_CONTROL_CHARS);
+ return null;
+ }
+
+ private void skipControlCharacters(ChannelBuffer buffer) {
+ for (;;) {
+ char c = (char) buffer.readUnsignedByte();
+ if (!Character.isISOControl(c) &&
+ !Character.isWhitespace(c)) {
+ buffer.readerIndex(buffer.readerIndex() - 1);
+ break;
+ }
+ }
+ }
+
+ private String readLine(ChannelBuffer buffer, int maxLineLength) throws TooLongFrameException {
+ StringBuilder sb = new StringBuilder(64);
+ int lineLength = 0;
+ while (true) {
+ byte nextByte = buffer.readByte();
+ if (nextByte == CR) {
+ nextByte = buffer.readByte();
+ if (nextByte == LF) {
+ return sb.toString();
+ }
+ }
+ else if (nextByte == LF) {
+ return sb.toString();
+ }
+ else {
+ if (lineLength >= maxLineLength) {
+ // TODO: Respond with Bad Request and discard the traffic
+ // or close the connection.
+ // No need to notify the upstream handlers - just log.
+ // If decoding a response, just throw an exception.
+ throw new TooLongFrameException(
+ "An HTTP line is larger than " + maxLineLength +
+ " bytes.");
+ }
+ lineLength ++;
+ sb.append((char) nextByte);
+ }
+ }
+ }
+ }
+
+ public AtomicInteger responseCount = new AtomicInteger();
public HttpResponse lastResponse;
public Throwable lastException = null;
+ HttpDecoder decode = new HttpDecoder(responseCount);
+
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
- lastResponse = (HttpResponse) e.getMessage();
- responseCount++;
+ //lastResponse = (HttpResponse) e.getMessage();
+ decode.messageReceived(ctx, e);
+ //responseCount++;
}
@Override
@@ -125,17 +278,66 @@ public ChannelFuture handle(HttpMethod method, URI uri, String payload) {
// Send the HTTP request.
return channel.write(request);
+ }
+
+ public void sendRaw(int reqsInMessage) {
+ ChannelBuffer buf = ChannelBuffers.dynamicBuffer(channel.getConfig().getBufferFactory());
+ try
+ {
+ for(int i=0;i<reqsInMessage;i++)
+ addRequestTo(buf);
+ } catch (UnsupportedEncodingException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ channel.write(buf);
}
+ private void addRequestTo(ChannelBuffer buf) throws UnsupportedEncodingException
+ {
+ buf.writeBytes("GET".getBytes("ASCII"));
+ buf.writeByte(SP);
+ buf.writeBytes("/noserialnodeserial".getBytes("ASCII"));
+ buf.writeByte(SP);
+ buf.writeBytes(HttpVersion.HTTP_1_1.toString().getBytes("ASCII"));
+ buf.writeByte(CR);
+ buf.writeByte(LF);
+
+ buf.writeBytes(HttpHeaders.Names.HOST.getBytes("ASCII"));
+ buf.writeByte(COLON);
+ buf.writeByte(SP);
+ buf.writeBytes("localhost".getBytes("ASCII"));
+ buf.writeByte(CR);
+ buf.writeByte(LF);
+
+ buf.writeBytes(HttpHeaders.Names.CONNECTION.getBytes("ASCII"));
+ buf.writeByte(COLON);
+ buf.writeByte(SP);
+ buf.writeBytes("keep-alive".getBytes("ASCII"));
+ buf.writeByte(CR);
+ buf.writeByte(LF);
+
+ buf.writeBytes(HttpHeaders.Names.CONTENT_LENGTH.getBytes("ASCII"));
+ buf.writeByte(COLON);
+ buf.writeByte(SP);
+ buf.writeBytes("0".getBytes("ASCII"));
+ buf.writeByte(CR);
+ buf.writeByte(LF);
+
+ buf.writeByte(CR);
+ buf.writeByte(LF);
+ }
+
// Quick hack to wait for responses
public void waitForXResponses(int count) throws InterruptedException {
- while(responseHandler.responseCount < count) {
+ while(responseHandler.responseCount.get() < count) {
if(responseHandler.lastException != null) {
responseHandler.lastException.printStackTrace();
throw new RuntimeException(responseHandler.lastException);
}
- Thread.sleep(3);
+ Thread.sleep(0, 10);
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.