Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Merge branch 'master' into 0.8.2
  • Loading branch information
Nathan Marz committed Jan 5, 2013
2 parents 5ab86ad + b965f2f commit 6f615b4
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -31,6 +31,8 @@
* Added MockTridentTuple for testing (thanks emblem)
* Add whitelist methods to Cluster to allow only a subset of hosts to be revealed as available slots
* Updated Trident Debug filter to take in an identifier to use when logging (thanks emblem)
* Number of DRPC server worker threads now customizable (thanks xiaokang)
* DRPC server now uses a bounded queue for requests to prevent being overloaded with requests (thanks xiaokang)
* Bug fix: Fix for bug that could cause topology to hang when ZMQ blocks sending to a worker that got reassigned
* Bug fix: Fix deadlock bug due to variant of dining philosophers problem. Spouts now use an overflow buffer to prevent blocking and guarantee that it can consume the incoming queue of acks/fails.
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
Expand Down
2 changes: 2 additions & 0 deletions conf/defaults.yaml
Expand Up @@ -36,6 +36,8 @@ ui.port: 8080
ui.childopts: "-Xmx768m"

drpc.port: 3772
drpc.worker.threads: 64
drpc.queue.size: 128
drpc.invocations.port: 3773
drpc.request.timeout.secs: 600

Expand Down
6 changes: 5 additions & 1 deletion src/clj/backtype/storm/daemon/drpc.clj
Expand Up @@ -6,7 +6,7 @@
(:import [backtype.storm.generated DistributedRPC DistributedRPC$Iface DistributedRPC$Processor
DRPCRequest DRPCExecutionException DistributedRPCInvocations DistributedRPCInvocations$Iface
DistributedRPCInvocations$Processor])
(:import [java.util.concurrent Semaphore ConcurrentLinkedQueue])
(:import [java.util.concurrent Semaphore ConcurrentLinkedQueue ThreadPoolExecutor ArrayBlockingQueue TimeUnit])
(:import [backtype.storm.daemon Shutdownable])
(:import [java.net InetAddress])
(:use [backtype.storm bootstrap config log])
Expand Down Expand Up @@ -100,6 +100,8 @@
(defn launch-server!
([]
(let [conf (read-storm-config)
worker-threads (int (conf DRPC-WORKER-THREADS))
queue-size (int (conf DRPC-QUEUE-SIZE))
service-handler (service-handler)
;; requests and returns need to be on separate thread pools, since calls to
;; "execute" don't unblock until other thrift methods are called. So if
Expand All @@ -108,6 +110,8 @@
handler-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-PORT)))
(THsHaServer$Args.)
(.workerThreads 64)
(.executorService (ThreadPoolExecutor. worker-threads worker-threads
60 TimeUnit/SECONDS (ArrayBlockingQueue. queue-size)))
(.protocolFactory (TBinaryProtocol$Factory.))
(.processor (DistributedRPC$Processor. service-handler))
))
Expand Down
10 changes: 10 additions & 0 deletions src/jvm/backtype/storm/Config.java
Expand Up @@ -222,6 +222,16 @@ public class Config extends HashMap<String, Object> {
*/
public static String DRPC_PORT = "drpc.port";

/**
* DRPC thrift server worker threads
*/
public static String DRPC_WORKER_THREADS = "drpc.worker.threads";

/**
* DRPC thrift server queue size
*/
public static String DRPC_QUEUE_SIZE = "drpc.queue.size";

/**
* This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
*/
Expand Down
3 changes: 2 additions & 1 deletion src/jvm/backtype/storm/spout/MultiScheme.java
@@ -1,10 +1,11 @@
package backtype.storm.spout;

import java.util.List;
import java.io.Serializable;

import backtype.storm.tuple.Fields;

public interface MultiScheme {
public interface MultiScheme extends Serializable {
public Iterable<List<Object>> deserialize(byte[] ser);
public Fields getOutputFields();
}

0 comments on commit 6f615b4

Please sign in to comment.