Permalink
Browse files

Merge remote-tracking branch 'd2r/derekd-exponential-backoff'

  • Loading branch information...
2 parents 9fbf64a + c8905d6 commit 5185746994ac8d8ae0fc78574564bdd212c89e88 @nathanmarz nathanmarz committed Mar 3, 2013
View
@@ -14,6 +14,7 @@ storm.zookeeper.session.timeout: 20000
storm.zookeeper.connection.timeout: 15000
storm.zookeeper.retry.times: 5
storm.zookeeper.retry.interval: 1000
+storm.zookeeper.retry.intervalceiling: 30000
storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false
@@ -107,6 +107,11 @@
public static String STORM_ZOOKEEPER_RETRY_INTERVAL="storm.zookeeper.retry.interval";
/**
+ * The ceiling of the interval between retries of a Zookeeper operation.
+ */
+ public static String STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING="storm.zookeeper.retry.intervalceiling";
+
+ /**
* The Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication.
*/
public static String STORM_ZOOKEEPER_AUTH_SCHEME="storm.zookeeper.auth.scheme";
@@ -8,7 +8,7 @@
import clojure.lang.RT;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
-import com.netflix.curator.retry.RetryNTimes;
+import com.netflix.curator.retry.ExponentialBackoffRetry;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream;
@@ -287,23 +287,48 @@ public static long secureRandomLong() {
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root) {
return newCurator(conf, servers, port, root, null);
}
-
+
+ public static class BoundedExponentialBackoffRetry extends ExponentialBackoffRetry {
+
+ protected final int maxRetryInterval;
+
+ public BoundedExponentialBackoffRetry(int baseSleepTimeMs,
+ int maxRetries, int maxSleepTimeMs) {
+ super(baseSleepTimeMs, maxRetries);
+ this.maxRetryInterval = maxSleepTimeMs;
+ }
+
+ public int getMaxRetryInterval() {
+ return this.maxRetryInterval;
+ }
+
+ @Override
+ public int getSleepTimeMs(int count, long elapsedMs)
+ {
+ return Math.min(maxRetryInterval,
+ super.getSleepTimeMs(count, elapsedMs));
+ }
+
+ }
+
public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
List<String> serverPorts = new ArrayList<String>();
for(String zkServer: (List<String>) servers) {
serverPorts.add(zkServer + ":" + Utils.getInt(port));
}
- String zkStr = StringUtils.join(serverPorts, ",") + root;
+ String zkStr = StringUtils.join(serverPorts, ",") + root;
try {
-
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(zkStr)
.connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
.sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
- .retryPolicy(new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
+ .retryPolicy(new BoundedExponentialBackoffRetry(
+ Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)),
+ Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
+ Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING))));
if(auth!=null && auth.scheme!=null) {
builder = builder.authorization(auth.scheme, auth.payload);
- }
+ }
return builder.build();
} catch (IOException e) {
throw new RuntimeException(e);
@@ -0,0 +1,28 @@
+(ns backtype.storm.utils-test
+ (:import [backtype.storm Config])
+ (:import [backtype.storm.utils Utils])
+ (:import [com.netflix.curator.retry ExponentialBackoffRetry])
+ (:use [backtype.storm util])
+ (:use [clojure test])
+)
+
+(deftest test-new-curator-uses-exponential-backoff
+ (let [expected_interval 2400
+ expected_retries 10
+ expected_ceiling (/ expected_interval 2)
+ conf (merge (clojurify-structure (Utils/readDefaultConfig))
+ {Config/STORM_ZOOKEEPER_RETRY_INTERVAL expected_interval
+ Config/STORM_ZOOKEEPER_RETRY_TIMES expected_retries
+ Config/STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING expected_ceiling})
+ servers ["bogus_server"]
+ arbitrary_port 42
+ curator (Utils/newCurator conf servers arbitrary_port)
+ retry (-> curator .getZookeeperClient .getRetryPolicy)
+ ]
+ (is (.isAssignableFrom ExponentialBackoffRetry (.getClass retry)))
+ (is (= (.getBaseSleepTimeMs retry) expected_interval))
+ (is (= (.getN retry) expected_retries))
+ (is (= (.getMaxRetryInterval retry) expected_ceiling))
+ (is (= (.getSleepTimeMs retry 10 0) expected_ceiling))
+ )
+)

0 comments on commit 5185746

Please sign in to comment.