Skip to content
This repository has been archived by the owner on Sep 29, 2021. It is now read-only.

Commit

Permalink
Fix flaky DeregisterTest
Browse files Browse the repository at this point in the history
* Fix NPE in AgentZooKeeperRegistrar.tryToRegister() that caused this
thread to die silently. This caused a race which sometimes failed the
tests.

* Wait for first host in
DeregisterTest.testRegistrationResolutionTtlNotExpired() and
DeregisterTest.testRegistrationResolution() to come up AND report
HostInfo to ZK in order for second host to detect when the
first host reported its HostInfo back to ZK so the second can properly
deregister the first.
  • Loading branch information
davidxia committed Oct 9, 2015
1 parent 53d8396 commit d495179
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 16 deletions.
Expand Up @@ -102,19 +102,26 @@ public void tryToRegister(ZooKeeperClient client)
final byte[] bytes = client.getData(idPath);
final String existingId = bytes == null ? "" : new String(bytes, UTF_8);
if (!id.equals(existingId)) {
final long mtime = client.stat(hostInfoPath).getMtime();
if ((clock.now().getMillis() - mtime) < zooKeeperRegistrationTtlMillis) {
final String message = format("Another agent already registered as '%s' " +
"(local=%s remote=%s).", name, id, existingId);
log.error(message);
agentService.stopAsync();
return;
final Stat hostInfoStat = client.stat(hostInfoPath);
if (hostInfoStat != null) {
final long mtime = hostInfoStat.getMtime();
if ((clock.now().getMillis() - mtime) < zooKeeperRegistrationTtlMillis) {
final String message = format("Another agent already registered as '%s' " +
"(local=%s remote=%s).", name, id, existingId);
log.error(message);
agentService.stopAsync();
return;
}

log.info("Another agent has already registered as '{}', but its ID node was last " +
"updated more than {} milliseconds ago. I\'m deregistering the agent with the "
+ "old ID of {} and replacing it with this new agent with ID '{}'.",
name, zooKeeperRegistrationTtlMillis, existingId, id);
} else {
log.info("Another agent has registered as '{}', but it never updated '{}' in ZooKeeper. "
+ "I'll assume it's dead and deregister it.", name, hostInfoPath);
}

log.info("Another agent has already registered as '{}', but its ID node was last " +
"updated more than {} milliseconds ago. I\'m deregistering the agent with the "
+ "old ID of {} and replacing it with this new agent with ID '{}'.",
name, zooKeeperRegistrationTtlMillis, existingId, id);
ZooKeeperRegistrarUtil.deregisterHost(client, name);
ZooKeeperRegistrarUtil.registerHost(client, idPath, name, id);
} else {
Expand Down
Expand Up @@ -40,6 +40,8 @@
import org.junit.Rule;
import org.junit.Test;

import java.util.concurrent.TimeoutException;

import static com.spotify.helios.common.descriptors.Goal.START;
import static com.spotify.helios.common.descriptors.HostStatus.Status.DOWN;
import static com.spotify.helios.common.descriptors.HostStatus.Status.UP;
Expand Down Expand Up @@ -120,6 +122,8 @@ public void testRegistrationResolution() throws Exception {
final HostStatus hostStatus1 =
awaitHostStatusWithLabels(client, host, UP, LONG_WAIT_SECONDS, SECONDS);
assertThat(hostStatus1.getLabels(), Matchers.hasEntry("num", "1"));
// Wait for agent to be UP and report HostInfo
awaitHostStatusWithHostInfo(client, host, UP, LONG_WAIT_SECONDS, SECONDS);

// Kill off agent
agent.stopAsync().awaitTerminated();
Expand All @@ -137,17 +141,18 @@ public void testRegistrationResolution() throws Exception {
assertThat(hostStatus2.getLabels(), Matchers.hasEntry("num", "2"));
}

@Test(expected = IllegalStateException.class)
@Test(expected = TimeoutException.class)
public void testRegistrationResolutionTtlNotExpired() throws Exception {
startDefaultMaster();
final String host = testHost() + "2";
final String host = testHost();
AgentMain agent = startDefaultAgent(host);

final HeliosClient client = defaultClient();

// Wait for agent to come up
awaitHostRegistered(client, host, LONG_WAIT_SECONDS, SECONDS);
awaitHostStatus(client, host, UP, LONG_WAIT_SECONDS, SECONDS);
// Wait for agent to be UP and report HostInfo
awaitHostStatusWithHostInfo(client, host, UP, LONG_WAIT_SECONDS, SECONDS);

// Kill off agent
agent.stopAsync().awaitTerminated();
Expand All @@ -157,7 +162,13 @@ public void testRegistrationResolutionTtlNotExpired() throws Exception {
resetAgentStateDir();

// Set TTL to a large number so new agent will not deregister previous one.
// This should throw IllegalStateException as this agent will fail to start as it can't register
startDefaultAgent(host, "--zk-registration-ttl", "9999");
// This might throw IllegalStateException as this agent will fail to start since it can't
// register. This exception sometimes occurs and sometimes doesn't. We ignore that and
// instead check for the TimeoutException while polling for it being UP.
try {
startDefaultAgent(host, "--zk-registration-ttl", "9999");
} catch (IllegalStateException ignored) {
}
awaitHostStatus(client, host, UP, 10, SECONDS);
}
}
Expand Up @@ -959,6 +959,22 @@ public HostStatus call() throws Exception {
});
}

protected HostStatus awaitHostStatusWithHostInfo(final HeliosClient client, final String host,
final HostStatus.Status status,
final int timeout,
final TimeUnit timeUnit) throws Exception {
return Polling.await(timeout, timeUnit, new Callable<HostStatus>() {
@Override
public HostStatus call() throws Exception {
final HostStatus hostStatus = getOrNull(client.hostStatus(host));
if (hostStatus == null || hostStatus.getHostInfo() == null) {
return null;
}
return (hostStatus.getStatus() == status) ? hostStatus : null;
}
});
}

protected TaskStatus awaitTaskState(final JobId jobId, final String host,
final TaskStatus.State state) throws Exception {
return Polling.await(LONG_WAIT_SECONDS, SECONDS, new Callable<TaskStatus>() {
Expand Down

0 comments on commit d495179

Please sign in to comment.