Permalink
Browse files

backport some THsHaServer changes from revision r995939

  • Loading branch information...
1 parent dc3b073 commit 64a6486f0cdf0d6f8cd35050155c067ac3886179 Ed Ceaser committed Sep 14, 2010
@@ -42,11 +42,6 @@
// for the passing of Invocations from the Selector to workers.
private ExecutorService invoker;
- protected final int MIN_WORKER_THREADS;
- protected final int MAX_WORKER_THREADS;
- protected final int STOP_TIMEOUT_VAL;
- protected final TimeUnit STOP_TIMEOUT_UNIT;
-
/**
* Create server with given processor, and server transport. Default server
* options, TBinaryProtocol for the protocol, and TFramedTransport.Factory on
@@ -112,8 +107,10 @@ public THsHaServer( TProcessor processor,
TNonblockingServerTransport serverTransport,
TProtocolFactory protocolFactory,
Options options) {
- this(processor, serverTransport, new TFramedTransport.Factory(),
- protocolFactory);
+ this(new TProcessorFactory(processor), serverTransport,
+ new TFramedTransport.Factory(),
+ protocolFactory, protocolFactory,
+ options);
}
/**
@@ -139,7 +136,7 @@ public THsHaServer( TProcessorFactory processorFactory,
TFramedTransport.Factory transportFactory,
TProtocolFactory protocolFactory) {
this(processorFactory, serverTransport,
- transportFactory, transportFactory,
+ transportFactory,
protocolFactory, protocolFactory, new Options());
}
@@ -153,7 +150,7 @@ public THsHaServer( TProcessorFactory processorFactory,
TProtocolFactory protocolFactory,
Options options) {
this(processorFactory, serverTransport,
- transportFactory, transportFactory,
+ transportFactory,
protocolFactory, protocolFactory,
options);
}
@@ -163,12 +160,11 @@ public THsHaServer( TProcessorFactory processorFactory,
*/
public THsHaServer( TProcessor processor,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory) {
this(new TProcessorFactory(processor), serverTransport,
- inputTransportFactory, outputTransportFactory,
+ outputTransportFactory,
inputProtocolFactory, outputProtocolFactory);
}
@@ -177,45 +173,55 @@ public THsHaServer( TProcessor processor,
*/
public THsHaServer( TProcessorFactory processorFactory,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory)
{
this(processorFactory, serverTransport,
- inputTransportFactory, outputTransportFactory,
+ outputTransportFactory,
inputProtocolFactory, outputProtocolFactory, new Options());
}
/**
- * Create server with every option fully specified.
+ * Create server with every option fully specified, with an internally managed
+ * ExecutorService
*/
public THsHaServer( TProcessorFactory processorFactory,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory,
Options options)
{
+ this(processorFactory, serverTransport,
+ outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory,
+ createInvokerPool(options),
+ options);
+ }
+
+ /**
+ * Create server with every option fully specified, and with an injected
+ * ExecutorService
+ */
+ public THsHaServer( TProcessorFactory processorFactory,
+ TNonblockingServerTransport serverTransport,
+ TFramedTransport.Factory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory,
+ ExecutorService executor,
+ TNonblockingServer.Options options) {
super(processorFactory, serverTransport,
- inputTransportFactory, outputTransportFactory,
+ outputTransportFactory,
inputProtocolFactory, outputProtocolFactory,
options);
- MIN_WORKER_THREADS = options.minWorkerThreads;
- MAX_WORKER_THREADS = options.maxWorkerThreads;
- STOP_TIMEOUT_VAL = options.stopTimeoutVal;
- STOP_TIMEOUT_UNIT = options.stopTimeoutUnit;
+ invoker = executor;
}
/** @inheritDoc */
@Override
public void serve() {
- if (!startInvokerPool()) {
- return;
- }
-
// start listening, or exit
if (!startListening()) {
return;
@@ -237,13 +243,19 @@ public void serve() {
// ungracefully shut down the invoker pool?
}
- protected boolean startInvokerPool() {
- // start the invoker pool
+ /**
+ * Helper to create an invoker pool
+ */
+ protected static ExecutorService createInvokerPool(Options options) {
+ int workerThreads = options.workerThreads;
+ int stopTimeoutVal = options.stopTimeoutVal;
+ TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;
+
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
- invoker = new ThreadPoolExecutor(MIN_WORKER_THREADS, MAX_WORKER_THREADS,
- STOP_TIMEOUT_VAL, STOP_TIMEOUT_UNIT, queue);
+ ExecutorService invoker = new ThreadPoolExecutor(workerThreads, workerThreads,
+ stopTimeoutVal, stopTimeoutUnit, queue);
- return true;
+ return invoker;
}
protected void gracefullyShutdownInvokerPool() {
@@ -296,8 +308,7 @@ public void run() {
}
public static class Options extends TNonblockingServer.Options {
- public int minWorkerThreads = 5;
- public int maxWorkerThreads = Integer.MAX_VALUE;
+ public int workerThreads = 5;
public int stopTimeoutVal = 60;
public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
}
@@ -20,7 +20,6 @@
package org.apache.thrift.server;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
@@ -30,9 +29,6 @@
import java.util.Iterator;
import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.thrift.TByteArrayOutputStream;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
@@ -42,10 +38,13 @@
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TMemoryInputTransport;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A nonblocking TServer implementation. This allows for fairness amongst all
@@ -100,15 +99,15 @@ public TNonblockingServer(TProcessor processor,
public TNonblockingServer(TProcessorFactory processorFactory,
TNonblockingServerTransport serverTransport) {
this(processorFactory, serverTransport,
- new TFramedTransport.Factory(), new TFramedTransport.Factory(),
+ new TFramedTransport.Factory(),
new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory());
}
public TNonblockingServer(TProcessor processor,
TNonblockingServerTransport serverTransport,
TProtocolFactory protocolFactory) {
this(processor, serverTransport,
- new TFramedTransport.Factory(), new TFramedTransport.Factory(),
+ new TFramedTransport.Factory(),
protocolFactory, protocolFactory);
}
@@ -117,7 +116,7 @@ public TNonblockingServer(TProcessor processor,
TFramedTransport.Factory transportFactory,
TProtocolFactory protocolFactory) {
this(processor, serverTransport,
- transportFactory, transportFactory,
+ transportFactory,
protocolFactory, protocolFactory);
}
@@ -126,42 +125,39 @@ public TNonblockingServer(TProcessorFactory processorFactory,
TFramedTransport.Factory transportFactory,
TProtocolFactory protocolFactory) {
this(processorFactory, serverTransport,
- transportFactory, transportFactory,
+ transportFactory,
protocolFactory, protocolFactory);
}
public TNonblockingServer(TProcessor processor,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory) {
this(new TProcessorFactory(processor), serverTransport,
- inputTransportFactory, outputTransportFactory,
+ outputTransportFactory,
inputProtocolFactory, outputProtocolFactory);
}
public TNonblockingServer(TProcessorFactory processorFactory,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory) {
this(processorFactory, serverTransport,
- inputTransportFactory, outputTransportFactory,
+ outputTransportFactory,
inputProtocolFactory, outputProtocolFactory,
new Options());
}
public TNonblockingServer(TProcessorFactory processorFactory,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory,
Options options) {
super(processorFactory, serverTransport,
- inputTransportFactory, outputTransportFactory,
+ null, outputTransportFactory,
inputProtocolFactory, outputProtocolFactory);
options_ = options;
options_.validate();
@@ -522,25 +518,23 @@ public boolean read() {
// if this frame will always be too large for this server, log the
// error and close the connection.
- if (frameSize + 4 > MAX_READ_BUFFER_BYTES) {
+ if (frameSize > MAX_READ_BUFFER_BYTES) {
LOGGER.error("Read a frame size of " + frameSize
+ ", which is bigger than the maximum allowable buffer size for ALL connections.");
return false;
}
// if this frame will push us over the memory limit, then return.
// with luck, more memory will free up the next time around.
- if (readBufferBytesAllocated + frameSize + 4 > MAX_READ_BUFFER_BYTES) {
+ if (readBufferBytesAllocated + frameSize > MAX_READ_BUFFER_BYTES) {
return true;
}
// incremement the amount of memory allocated to read buffers
- readBufferBytesAllocated += frameSize + 4;
+ readBufferBytesAllocated += frameSize;
// reallocate the readbuffer as a frame-sized buffer
- buffer_ = ByteBuffer.allocate(frameSize + 4);
- // put the frame size at the head of the buffer
- buffer_.putInt(frameSize);
+ buffer_ = ByteBuffer.allocate(frameSize);
state_ = READING_FRAME;
} else {
@@ -699,8 +693,7 @@ public void invoke() {
* the data it needs to handle an invocation.
*/
private TTransport getInputTransport() {
- return inputTransportFactory_.getTransport(new TIOStreamTransport(
- new ByteArrayInputStream(buffer_.array())));
+ return new TMemoryInputTransport(buffer_.array());
}
/**
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+public final class TMemoryInputTransport extends TTransport {
+
+ private byte[] buf_;
+ private int pos_;
+ private int endPos_;
+
+ public TMemoryInputTransport() {
+ }
+
+ public TMemoryInputTransport(byte[] buf) {
+ reset(buf);
+ }
+
+ public TMemoryInputTransport(byte[] buf, int offset, int length) {
+ reset(buf, offset, length);
+ }
+
+ public void reset(byte[] buf) {
+ reset(buf, 0, buf.length);
+ }
+
+ public void reset(byte[] buf, int offset, int length) {
+ buf_ = buf;
+ pos_ = offset;
+ endPos_ = offset + length;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isOpen() {
+ return true;
+ }
+
+ @Override
+ public void open() throws TTransportException {}
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ int bytesRemaining = getBytesRemainingInBuffer();
+ int amtToRead = (len > bytesRemaining ? bytesRemaining : len);
+ if (amtToRead > 0) {
+ System.arraycopy(buf_, pos_, buf, off, amtToRead);
+ consumeBuffer(amtToRead);
+ }
+ return amtToRead;
+ }
+
+ @Override
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ throw new UnsupportedOperationException("No writing allowed!");
+ }
+
+ public byte[] getBuffer() {
+ return buf_;
+ }
+
+ public int getBufferPosition() {
+ return pos_;
+ }
+
+ public int getBytesRemainingInBuffer() {
+ return endPos_ - pos_;
+ }
+
+ public void consumeBuffer(int len) {
+ pos_ += len;
+ }
+
+}

0 comments on commit 64a6486

Please sign in to comment.