Skip to content
This repository has been archived by the owner on Feb 26, 2020. It is now read-only.

Commit

Permalink
Add infinite retry policy to zookeeper client used by tests
Browse files Browse the repository at this point in the history
This change is to improve the tests running on travis ci

Author: jiazhai <zhaijia03@gmail.com>

Reviewers: Sijie Guo <sijie@apache.org>

Closes #25 from jiazhai/zhaijia/fix_federated_namespace
  • Loading branch information
zhaijack authored and Sijie Guo committed Sep 13, 2016
1 parent c22cad2 commit f826a3a
Show file tree
Hide file tree
Showing 25 changed files with 185 additions and 102 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog.util;

import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.RetryPolicy;

/**
* Utils for {@link org.apache.bookkeeper.zookeeper.RetryPolicy}
*/
public class RetryPolicyUtils {

/**
* Infinite retry policy
*/
public static final RetryPolicy DEFAULT_INFINITE_RETRY_POLICY = infiniteRetry(200, 2000);

/**
* Create an infinite retry policy with backoff time between <i>baseBackOffTimeMs</i> and
* <i>maxBackoffTimeMs</i>.
*
* @param baseBackoffTimeMs base backoff time in milliseconds
* @param maxBackoffTimeMs maximum backoff time in milliseconds
* @return an infinite retry policy
*/
public static RetryPolicy infiniteRetry(long baseBackoffTimeMs, long maxBackoffTimeMs) {
return new BoundExponentialBackoffRetryPolicy(baseBackoffTimeMs, maxBackoffTimeMs, Integer.MAX_VALUE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.twitter.distributedlog.util.ConfUtils;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.PermitLimiter;
import com.twitter.distributedlog.util.RetryPolicyUtils;
import com.twitter.distributedlog.util.Utils;
import com.twitter.util.Await;
import com.twitter.util.Duration;
Expand Down Expand Up @@ -140,14 +141,9 @@ static BKLogPartitionWriteHandlerAndClients createNewBKDLM(DistributedLogConfigu
int zkPort) throws Exception {
URI uri = createDLMURI(zkPort, "/" + logName);

ZooKeeperClientBuilder zkcBuilder = ZooKeeperClientBuilder.newBuilder()
ZooKeeperClientBuilder zkcBuilder = TestZooKeeperClientBuilder.newBuilder(conf)
.name(String.format("dlzk:%s:handler_dedicated", logName))
.sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
.uri(uri)
.statsLogger(NullStatsLogger.INSTANCE.scope("dlzk_handler_dedicated"))
.retryThreadCount(conf.getZKClientNumberRetryThreads())
.requestRateLimit(conf.getZKRequestRateLimit())
.zkAclId(conf.getZkAclId());
.uri(uri);

ZooKeeperClient zkClient = zkcBuilder.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -959,10 +959,8 @@ public void testLogSegmentListener() throws Exception {
new AtomicReference<Collection<LogSegmentMetadata>>();

DistributedLogManager dlm = createNewDLM(conf, name);
ZooKeeperClient zkClient = ZooKeeperClientBuilder.newBuilder()
ZooKeeperClient zkClient = TestZooKeeperClientBuilder.newBuilder()
.uri(createDLMURI("/"))
.sessionTimeoutMs(10000)
.zkAclId(null)
.build();

BKDistributedLogManager.createLog(conf, zkClient, ((BKDistributedLogManager) dlm).uri, name);
Expand Down Expand Up @@ -1113,10 +1111,9 @@ public void testInvalidStreamFromInvalidZkPath() throws Exception {
public void testTruncationValidation() throws Exception {
String name = "distrlog-truncation-validation";
URI uri = createDLMURI("/" + name);
ZooKeeperClient zookeeperClient = ZooKeeperClientBuilder.newBuilder()
ZooKeeperClient zookeeperClient = TestZooKeeperClientBuilder.newBuilder()
.uri(uri)
.zkAclId(null)
.sessionTimeoutMs(10000).build();
.build();
OrderedScheduler scheduler = OrderedScheduler.newBuilder()
.name("test-truncation-validation")
.corePoolSize(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,9 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
@Before
public void setup() throws Exception {
zooKeeperClient =
ZooKeeperClientBuilder.newBuilder()
TestZooKeeperClientBuilder.newBuilder()
.uri(createDLMURI("/"))
.zkAclId(null)
.sessionTimeoutMs(10000).build();
.build();
}

@After
Expand Down Expand Up @@ -310,11 +309,9 @@ public void testAclPermsZkAccessConflict() throws Exception {
initDlogMeta(namespace, "test-un", "test-stream");
URI uri = createDLMURI(namespace);

ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
ZooKeeperClient zkc = TestZooKeeperClientBuilder.newBuilder()
.name("unpriv")
.uri(uri)
.sessionTimeoutMs(2000)
.zkAclId(null)
.build();

try {
Expand All @@ -340,11 +337,9 @@ public void testAclPermsZkAccessNoConflict() throws Exception {
initDlogMeta(namespace, "test-un", "test-stream");
URI uri = createDLMURI(namespace);

ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
ZooKeeperClient zkc = TestZooKeeperClientBuilder.newBuilder()
.name("unpriv")
.uri(uri)
.sessionTimeoutMs(2000)
.zkAclId(null)
.build();

zkc.get().getChildren(uri.getPath() + "/test-stream", false, new Stat());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,12 @@ public void setup() throws Exception {
lockStateExecutor = OrderedScheduler.newBuilder().corePoolSize(1).build();
// build zookeeper client
URI uri = createDLMURI("");
zkc = ZooKeeperClientBuilder.newBuilder()
zkc = TestZooKeeperClientBuilder.newBuilder(conf)
.name("test-zkc")
.sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
.zkAclId(conf.getZkAclId())
.uri(uri)
.build();
zkc0 = ZooKeeperClientBuilder.newBuilder()
zkc0 = TestZooKeeperClientBuilder.newBuilder(conf)
.name("test-zkc0")
.sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
.zkAclId(conf.getZkAclId())
.uri(uri)
.build();
// build bookkeeper client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,15 @@ public class TestLedgerHandleCache extends TestDistributedLogBase {

@Before
public void setup() throws Exception {
zkc = ZooKeeperClientBuilder.newBuilder()
.zkServers(zkServers).sessionTimeoutMs(10000).zkAclId(null).build();
bkc = BookKeeperClientBuilder.newBuilder().name("bkc")
.zkc(zkc).ledgersPath(ledgersPath).dlConfig(conf).build();
zkc = TestZooKeeperClientBuilder.newBuilder()
.zkServers(zkServers)
.build();
bkc = BookKeeperClientBuilder.newBuilder()
.name("bkc")
.zkc(zkc)
.ledgersPath(ledgersPath)
.dlConfig(conf)
.build();
}

@After
Expand Down Expand Up @@ -79,10 +84,16 @@ public void testOpenLedgerWhenBkcClosed() throws Exception {

@Test(timeout = 60000, expected = BKException.ZKException.class)
public void testOpenLedgerWhenZkClosed() throws Exception {
ZooKeeperClient newZkc = ZooKeeperClientBuilder.newBuilder().zkAclId(null).name("zkc-openledger-when-zk-closed")
.zkServers(zkServers).sessionTimeoutMs(10000).build();
BookKeeperClient newBkc = BookKeeperClientBuilder.newBuilder().name("bkc-openledger-when-zk-closed")
.zkc(newZkc).ledgersPath(ledgersPath).dlConfig(conf).build();
ZooKeeperClient newZkc = TestZooKeeperClientBuilder.newBuilder()
.name("zkc-openledger-when-zk-closed")
.zkServers(zkServers)
.build();
BookKeeperClient newBkc = BookKeeperClientBuilder.newBuilder()
.name("bkc-openledger-when-zk-closed")
.zkc(newZkc)
.ledgersPath(ledgersPath)
.dlConfig(conf)
.build();
try {
LedgerHandle lh = newBkc.get().createLedger(BookKeeper.DigestType.CRC32, "zkcClosed".getBytes(UTF_8));
lh.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public class TestLogSegmentMetadata extends ZooKeeperClusterTestCase {

@Before
public void setup() throws Exception {
zkc = ZooKeeperClientBuilder.newBuilder().zkAclId(null).zkServers(zkServers).sessionTimeoutMs(10000).build();
zkc = TestZooKeeperClientBuilder.newBuilder()
.zkServers(zkServers)
.build();
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,8 @@ public void run() {

private long createStreamWithInconsistentMetadata(String name) throws Exception {
DistributedLogManager dlm = createNewDLM(conf, name);
ZooKeeperClient zkClient = ZooKeeperClientBuilder.newBuilder()
.zkAclId(null)
ZooKeeperClient zkClient = TestZooKeeperClientBuilder.newBuilder()
.uri(createDLMURI("/"))
.sessionTimeoutMs(10000)
.build();
long txid = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@ public void testPurgeLogs() throws Exception {
List<LogSegmentMetadata> segments = distributedLogManager.getLogSegments();
LOG.info("Segments before modifying completion time : {}", segments);

ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder().zkAclId(null).uri(uri)
.sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
.connectionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
ZooKeeperClient zkc = TestZooKeeperClientBuilder.newBuilder(conf)
.uri(uri)
.build();

// Update completion time of first 5 segments
Expand Down Expand Up @@ -198,11 +197,8 @@ public void testOnlyPurgeSegmentsBeforeNoneFullyTruncatedSegment() throws Except
List<LogSegmentMetadata> segments = dlm.getLogSegments();
LOG.info("Segments before modifying segment status : {}", segments);

ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
.zkAclId(null)
ZooKeeperClient zkc = TestZooKeeperClientBuilder.newBuilder(conf)
.uri(uri)
.sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
.connectionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
.build();
setTruncationStatus(zkc, segments.get(0), TruncationStatus.PARTIALLY_TRUNCATED);
for (int i = 1; i < 4; i++) {
Expand Down Expand Up @@ -266,11 +262,8 @@ public void testPartiallyTruncateTruncatedSegments() throws Exception {
List<LogSegmentMetadata> segments = dlm.getLogSegments();
LOG.info("Segments before modifying segment status : {}", segments);

ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder()
.zkAclId(null)
ZooKeeperClient zkc = TestZooKeeperClientBuilder.newBuilder(conf)
.uri(uri)
.sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
.connectionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
.build();
for (int i = 0; i < 4; i++) {
LogSegmentMetadata segment = segments.get(i);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog;

import com.twitter.distributedlog.util.RetryPolicyUtils;
import org.apache.bookkeeper.stats.NullStatsLogger;

/**
* The zookeeper client builder used for testing.
*/
public class TestZooKeeperClientBuilder {

/**
* Return a zookeeper client builder for testing.
*
* @return a zookeeper client builder
*/
public static ZooKeeperClientBuilder newBuilder() {
return ZooKeeperClientBuilder.newBuilder()
.retryPolicy(RetryPolicyUtils.DEFAULT_INFINITE_RETRY_POLICY)
.connectionTimeoutMs(10000)
.sessionTimeoutMs(60000)
.zkAclId(null)
.statsLogger(NullStatsLogger.INSTANCE);
}

/**
* Create a zookeeper client builder with provided <i>conf</i> for testing.
*
* @param conf distributedlog configuration
* @return zookeeper client builder
*/
public static ZooKeeperClientBuilder newBuilder(DistributedLogConfiguration conf) {
return ZooKeeperClientBuilder.newBuilder()
.retryPolicy(RetryPolicyUtils.DEFAULT_INFINITE_RETRY_POLICY)
.sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds())
.zkAclId(conf.getZkAclId())
.retryThreadCount(conf.getZKClientNumberRetryThreads())
.requestRateLimit(conf.getZKRequestRateLimit())
.statsLogger(NullStatsLogger.INSTANCE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
*/
package com.twitter.distributedlog.acl;

import com.twitter.distributedlog.TestZooKeeperClientBuilder;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.ZooKeeperClientBuilder;
import com.twitter.distributedlog.ZooKeeperClusterTestCase;
import com.twitter.distributedlog.thrift.AccessControlEntry;
import com.twitter.util.Await;
Expand All @@ -39,10 +39,9 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase {

@Before
public void setup() throws Exception {
zkc = ZooKeeperClientBuilder.newBuilder()
zkc = TestZooKeeperClientBuilder.newBuilder()
.uri(createURI("/"))
.zkAclId(null)
.sessionTimeoutMs(10000).build();
.build();
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package com.twitter.distributedlog.acl;

import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.TestZooKeeperClientBuilder;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.ZooKeeperClientBuilder;
import com.twitter.distributedlog.ZooKeeperClientUtils;
import com.twitter.distributedlog.ZooKeeperClusterTestCase;
import com.twitter.distributedlog.thrift.AccessControlEntry;
Expand Down Expand Up @@ -51,10 +51,9 @@ private URI createURI(String path) {
@Before
public void setup() throws Exception {
executorService = Executors.newSingleThreadScheduledExecutor();
zkc = ZooKeeperClientBuilder.newBuilder()
zkc = TestZooKeeperClientBuilder.newBuilder()
.uri(createURI("/"))
.zkAclId(null)
.sessionTimeoutMs(10000).build();
.build();
conf = new DistributedLogConfiguration();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import com.twitter.distributedlog.DistributedLogManager;
import com.twitter.distributedlog.LogSegmentMetadata;
import com.twitter.distributedlog.TestDistributedLogBase;
import com.twitter.distributedlog.TestZooKeeperClientBuilder;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.ZooKeeperClientBuilder;
import com.twitter.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
import com.twitter.distributedlog.util.SchedulerUtils;
Expand Down Expand Up @@ -60,11 +60,10 @@ public class TestDLCK extends TestDistributedLogBase {

@Before
public void setup() throws Exception {
zkc = ZooKeeperClientBuilder
zkc = TestZooKeeperClientBuilder
.newBuilder()
.uri(createDLMURI("/"))
.zkAclId(null)
.sessionTimeoutMs(10000).build();
.build();
}

@After
Expand Down

0 comments on commit f826a3a

Please sign in to comment.