Skip to content

Commit

Permalink
Merge branch 'master' into 0.9.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Jan 5, 2013
2 parents 5ddc087 + b965f2f commit cc5d7d2
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -39,6 +39,8 @@
* Added MockTridentTuple for testing (thanks emblem) * Added MockTridentTuple for testing (thanks emblem)
* Add whitelist methods to Cluster to allow only a subset of hosts to be revealed as available slots * Add whitelist methods to Cluster to allow only a subset of hosts to be revealed as available slots
* Updated Trident Debug filter to take in an identifier to use when logging (thanks emblem) * Updated Trident Debug filter to take in an identifier to use when logging (thanks emblem)
* Number of DRPC server worker threads now customizable (thanks xiaokang)
* DRPC server now uses a bounded queue for requests to prevent being overloaded with requests (thanks xiaokang)
* Bug fix: Fix for bug that could cause topology to hang when ZMQ blocks sending to a worker that got reassigned * Bug fix: Fix for bug that could cause topology to hang when ZMQ blocks sending to a worker that got reassigned
* Bug fix: Fix deadlock bug due to variant of dining philosophers problem. Spouts now use an overflow buffer to prevent blocking and guarantee that it can consume the incoming queue of acks/fails. * Bug fix: Fix deadlock bug due to variant of dining philosophers problem. Spouts now use an overflow buffer to prevent blocking and guarantee that it can consume the incoming queue of acks/fails.
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology * Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
Expand Down
2 changes: 2 additions & 0 deletions conf/defaults.yaml
Expand Up @@ -36,6 +36,8 @@ ui.port: 8080
ui.childopts: "-Xmx768m" ui.childopts: "-Xmx768m"


drpc.port: 3772 drpc.port: 3772
drpc.worker.threads: 64
drpc.queue.size: 128
drpc.invocations.port: 3773 drpc.invocations.port: 3773
drpc.request.timeout.secs: 600 drpc.request.timeout.secs: 600


Expand Down
6 changes: 5 additions & 1 deletion src/clj/backtype/storm/daemon/drpc.clj
Expand Up @@ -6,7 +6,7 @@
(:import [backtype.storm.generated DistributedRPC DistributedRPC$Iface DistributedRPC$Processor (:import [backtype.storm.generated DistributedRPC DistributedRPC$Iface DistributedRPC$Processor
DRPCRequest DRPCExecutionException DistributedRPCInvocations DistributedRPCInvocations$Iface DRPCRequest DRPCExecutionException DistributedRPCInvocations DistributedRPCInvocations$Iface
DistributedRPCInvocations$Processor]) DistributedRPCInvocations$Processor])
(:import [java.util.concurrent Semaphore ConcurrentLinkedQueue]) (:import [java.util.concurrent Semaphore ConcurrentLinkedQueue ThreadPoolExecutor ArrayBlockingQueue TimeUnit])
(:import [backtype.storm.daemon Shutdownable]) (:import [backtype.storm.daemon Shutdownable])
(:import [java.net InetAddress]) (:import [java.net InetAddress])
(:use [backtype.storm bootstrap config log]) (:use [backtype.storm bootstrap config log])
Expand Down Expand Up @@ -100,6 +100,8 @@
(defn launch-server! (defn launch-server!
([] ([]
(let [conf (read-storm-config) (let [conf (read-storm-config)
worker-threads (int (conf DRPC-WORKER-THREADS))
queue-size (int (conf DRPC-QUEUE-SIZE))
service-handler (service-handler) service-handler (service-handler)
;; requests and returns need to be on separate thread pools, since calls to ;; requests and returns need to be on separate thread pools, since calls to
;; "execute" don't unblock until other thrift methods are called. So if ;; "execute" don't unblock until other thrift methods are called. So if
Expand All @@ -108,6 +110,8 @@
handler-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-PORT))) handler-server (THsHaServer. (-> (TNonblockingServerSocket. (int (conf DRPC-PORT)))
(THsHaServer$Args.) (THsHaServer$Args.)
(.workerThreads 64) (.workerThreads 64)
(.executorService (ThreadPoolExecutor. worker-threads worker-threads
60 TimeUnit/SECONDS (ArrayBlockingQueue. queue-size)))
(.protocolFactory (TBinaryProtocol$Factory.)) (.protocolFactory (TBinaryProtocol$Factory.))
(.processor (DistributedRPC$Processor. service-handler)) (.processor (DistributedRPC$Processor. service-handler))
)) ))
Expand Down
10 changes: 10 additions & 0 deletions src/jvm/backtype/storm/Config.java
Expand Up @@ -222,6 +222,16 @@ public class Config extends HashMap<String, Object> {
*/ */
public static String DRPC_PORT = "drpc.port"; public static String DRPC_PORT = "drpc.port";


/**
* DRPC thrift server worker threads
*/
public static String DRPC_WORKER_THREADS = "drpc.worker.threads";

/**
* DRPC thrift server queue size
*/
public static String DRPC_QUEUE_SIZE = "drpc.queue.size";

/** /**
* This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back. * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
*/ */
Expand Down
3 changes: 2 additions & 1 deletion src/jvm/backtype/storm/spout/MultiScheme.java
@@ -1,10 +1,11 @@
package backtype.storm.spout; package backtype.storm.spout;


import java.util.List; import java.util.List;
import java.io.Serializable;


import backtype.storm.tuple.Fields; import backtype.storm.tuple.Fields;


public interface MultiScheme { public interface MultiScheme extends Serializable {
public Iterable<List<Object>> deserialize(byte[] ser); public Iterable<List<Object>> deserialize(byte[] ser);
public Fields getOutputFields(); public Fields getOutputFields();
} }

0 comments on commit cc5d7d2

Please sign in to comment.