Skip to content

Commit

Permalink
Use a ceiling on ExponentialBackoffRetry
Browse files Browse the repository at this point in the history
Trying to avoid pulling in a newer netflix curator

Adds lein-junit and junit dependencies with test.

Adds new configurable maximum ceiling for the retry interval, defaults
to 30s.
  • Loading branch information
Derek Dagit committed Feb 17, 2013
1 parent e34feda commit 330b8bd
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 4 deletions.
1 change: 1 addition & 0 deletions conf/defaults.yaml
Expand Up @@ -14,6 +14,7 @@ storm.zookeeper.session.timeout: 20000
storm.zookeeper.connection.timeout: 15000
storm.zookeeper.retry.times: 5
storm.zookeeper.retry.interval: 1000
storm.zookeeper.retry.intervalceiling: 30000
storm.cluster.mode: "distributed" # can be distributed or local
storm.local.mode.zmq: false

Expand Down
12 changes: 9 additions & 3 deletions project.clj
Expand Up @@ -29,16 +29,22 @@
]

:source-paths ["src/clj"]
:java-source-paths ["src/jvm"]
:java-source-paths ["src/jvm" "test/jvm"]
:test-paths ["test/clj"]
:resource-paths ["conf"]

:profiles {:dev {:resource-paths ["src/dev"]}
:profiles {:dev {:resource-paths ["src/dev"]
:dependencies [[junit/junit "4.11"]]
}
:release {}
:lib {}
}

:plugins [[lein-swank "1.4.4"]]
:plugins [[lein-swank "1.4.4"]
[lein-junit "1.1.2"]
]

:junit ["test/jvm"]

:repositories {"sonatype"
"http://oss.sonatype.org/content/groups/public/"}
Expand Down
5 changes: 5 additions & 0 deletions src/jvm/backtype/storm/Config.java
Expand Up @@ -106,6 +106,11 @@ public class Config extends HashMap<String, Object> {
*/
public static String STORM_ZOOKEEPER_RETRY_INTERVAL="storm.zookeeper.retry.interval";

/**
* The ceiling of the interval between retries of a Zookeeper operation.
*/
public static String STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING="storm.zookeeper.retry.intervalceiling";

/**
* The Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication.
*/
Expand Down
15 changes: 14 additions & 1 deletion src/jvm/backtype/storm/utils/Utils.java
Expand Up @@ -295,11 +295,24 @@ public static CuratorFramework newCurator(Map conf, List<String> servers, Object
String zkStr = StringUtils.join(serverPorts, ",") + root;
try {

final int maxRetryInterval = Utils.getInt(
conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING));

CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(zkStr)
.connectionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)))
.sessionTimeoutMs(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)))
.retryPolicy(new ExponentialBackoffRetry(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)), Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
.retryPolicy(new ExponentialBackoffRetry(
Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)),
Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)))
{
@Override
protected int getSleepTimeMs(int count, long elapsedMs)
{
return Math.min(maxRetryInterval,
super.getSleepTimeMs(count, elapsedMs));
}
});
if(auth!=null && auth.scheme!=null) {
builder = builder.authorization(auth.scheme, auth.payload);
}
Expand Down
45 changes: 45 additions & 0 deletions test/jvm/backtype/storm/utils/UtilsTest.java
@@ -0,0 +1,45 @@
package backtype.storm.utils;

import org.junit.Test;

import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.netflix.curator.CuratorZookeeperClient;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.retry.ExponentialBackoffRetry;

import backtype.storm.Config;

public class UtilsTest {

@Test
public void testNewCuratorUsesBoundedExponentialBackoff() {
@SuppressWarnings("unchecked")
Map<String,Object> conf = (Map<String,Object>)Utils.readDefaultConfig();

// Ensure these two values are different.
final int ArbitraryInterval = 24;
final int ArbitraryRetries = 4;
conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, ArbitraryInterval);
conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, ArbitraryRetries);

List<String> servers = new ArrayList<String>();
servers.add("bogus_server");
Object port = new Integer(42);
CuratorFramework cf = Utils.newCurator(conf, servers, port);

assertTrue(cf.getZookeeperClient().getRetryPolicy()
instanceof ExponentialBackoffRetry);

ExponentialBackoffRetry retry =
(ExponentialBackoffRetry)cf.getZookeeperClient().getRetryPolicy();
assertEquals(retry.getBaseSleepTimeMs(), ArbitraryInterval);
assertEquals(retry.getN(), ArbitraryRetries);
}
}

0 comments on commit 330b8bd

Please sign in to comment.