Skip to content
This repository has been archived by the owner on Sep 29, 2021. It is now read-only.

Commit

Permalink
DeadAgentReaper: Address some review comments + fixes
Browse files Browse the repository at this point in the history
 * Log the DOWN duration when reaping an agent
 * Get rid of the static method and @VisibleForTesting annotation
 * Allow reaping to be disabled
 * Bugfix: actually start the DeadAgentReaper on startup :)
  • Loading branch information
Staffan Gimåker committed Jan 22, 2016
1 parent 2010c3b commit a8d0867
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 49 deletions.
Expand Up @@ -17,15 +17,13 @@

package com.spotify.helios.master;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;

import com.spotify.helios.agent.InterruptingScheduledService;
import com.spotify.helios.common.Clock;
import com.spotify.helios.common.SystemClock;
import com.spotify.helios.common.descriptors.AgentInfo;
import com.spotify.helios.common.descriptors.HostStatus;

import org.apache.commons.lang.time.DurationFormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,6 +32,8 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;

/**
* De-registers dead agents, where an agent that has been DOWN for more than X time is considered
* dead.
Expand All @@ -48,45 +48,37 @@ public class DeadAgentReaper extends InterruptingScheduledService {

private final MasterModel masterModel;
private final long timeoutMillis;
private final Clock clock;

public DeadAgentReaper(final MasterModel masterModel,
final long timeout,
final TimeUnit timeUnit) {
this(masterModel, timeout, timeUnit, SYSTEM_CLOCK);
}

/**
* We check for dead agents every 30 minutes, which means the actual timeout the specified timeout
* plus up to 30 minutes. I.e. very low timeouts will not work in practice.
*/
public DeadAgentReaper(final MasterModel masterModel,
final long timeout,
final TimeUnit timeUnit) {
final TimeUnit timeUnit,
final Clock clock) {
this.masterModel = masterModel;
checkArgument(timeout > 0);
this.timeoutMillis = timeUnit.toMillis(timeout);
this.clock = clock;
}

@Override
protected void runOneIteration() {
log.debug("Reaping agents");
final List<String> deadAgents = getDeadAgents(masterModel, timeoutMillis, SYSTEM_CLOCK);
log.debug("Reaping {} dead agents: {}", deadAgents.size(), deadAgents);

for (final String agent : deadAgents) {
log.info("Reaping dead agent {}", agent);
try {
masterModel.deregisterHost(agent);
} catch (Exception e) {
log.error("Failed to reap dead agent {}", agent, e);
}
}
}

@VisibleForTesting
static List<String> getDeadAgents(final MasterModel masterModel, final long timeoutMillis,
final Clock clock) {
final List<String> deadAgents = Lists.newArrayList();
final List<String> agents = masterModel.listHosts();

for (final String agent : agents) {
try {
final HostStatus hostStatus = masterModel.getHostStatus(agent);
if (hostStatus == null || hostStatus.getStatus() != HostStatus.Status.DOWN) {
// Hot not found or host not DOWN -- nothing to do, move on to the next host
// Host not found or host not DOWN -- nothing to do, move on to the next host
continue;
}

Expand All @@ -99,14 +91,18 @@ static List<String> getDeadAgents(final MasterModel masterModel, final long time
final long downDurationMillis = clock.now().getMillis() - downSince;

if (downDurationMillis >= timeoutMillis) {
deadAgents.add(agent);
try {
log.info("Reaping dead agent '{}' (DOWN for {} hours)",
DurationFormatUtils.formatDurationHMS(downDurationMillis));
masterModel.deregisterHost(agent);
} catch (Exception e) {
log.warn("Failed to reap agent '{}'", agent, e);
}
}
} catch (Exception e) {
log.warn("Failed to determine if agent '{}' should be reaped", agent, e);
}
}

return deadAgents;
}

@Override
Expand Down
Expand Up @@ -105,7 +105,7 @@ protected void addArgs(final ArgumentParser parser) {
.type(Integer.class)
.setDefault(TimeUnit.DAYS.toHours(14))
.help("In hours. Agents will be automatically de-registered if they are DOWN for more " +
"than the specified timeout.");
"than the specified timeout. To disable reaping, set to 0.");
}

public MasterConfig getMasterConfig() {
Expand Down
Expand Up @@ -203,7 +203,13 @@ public MasterService(final MasterConfig config,
this.rollingUpdateService = new RollingUpdateService(model, reactorFactory);

// Set up agent reaper (de-registering hosts that have been DOWN for more than X time)
this.agentReaper = new DeadAgentReaper(model, config.getAgentReapingTimeout(), TimeUnit.HOURS);
if (config.getAgentReapingTimeout() > 0) {
this.agentReaper = new DeadAgentReaper(
model, config.getAgentReapingTimeout(), TimeUnit.HOURS);
} else {
log.info("Reaping of dead agents disabled");
this.agentReaper = null;
}

// Set up http server
environment.servlets()
Expand Down Expand Up @@ -282,6 +288,11 @@ protected void startUp() throws Exception {
}
expiredJobReaper.startAsync().awaitRunning();
rollingUpdateService.startAsync().awaitRunning();

if (agentReaper != null) {
agentReaper.startAsync().awaitRunning();
}

try {
server.start();
} catch (Exception e) {
Expand All @@ -301,6 +312,11 @@ protected void shutDown() throws Exception {
server.stop();
server.join();
registrar.close();

if (agentReaper != null) {
agentReaper.stopAsync().awaitTerminated();
}

rollingUpdateService.stopAsync().awaitTerminated();
expiredJobReaper.stopAsync().awaitTerminated();
zkRegistrar.stopAsync().awaitTerminated();
Expand Down
Expand Up @@ -18,8 +18,6 @@
package com.spotify.helios.master;

import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;

Expand All @@ -35,10 +33,11 @@

import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class DeadAgentReaperTest {
Expand Down Expand Up @@ -75,7 +74,10 @@ public void testDeadAgentReaper() throws Exception {
new Datapoint("host2", 0, TIMEOUT_MILLIS + 1, HostStatus.Status.DOWN, false),
new Datapoint("host3", 1000, 1000, HostStatus.Status.UP, false),
new Datapoint("host4", 500, 300, HostStatus.Status.DOWN, true),
// Agents started in the future should not be reaped, even if they are reported as down
new Datapoint("host5", 5000, 0, HostStatus.Status.DOWN, false),
// Agents that are UP should not be reaped even if the start and uptime indicate that
// they should
new Datapoint("host6", 0, 0, HostStatus.Status.UP, false)
);

Expand All @@ -100,23 +102,16 @@ public String apply(final Datapoint input) {
.build());
}

final List<String> expected = FluentIterable.from(datapoints)
.filter(new Predicate<Datapoint>() {
@Override
public boolean apply(final Datapoint input) {
return input.expectReap;
}
})
.transform(new Function<Datapoint, String>() {
@Override
public String apply(final Datapoint input) {
return input.host;
}
})
.toList();
final DeadAgentReaper reaper = new DeadAgentReaper(
masterModel, TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, clock);
reaper.startAsync().awaitRunning();

verify(masterModel, timeout(5000)).listHosts();

assertThat(
DeadAgentReaper.getDeadAgents(masterModel, TIMEOUT_MILLIS, clock),
containsInAnyOrder(expected.toArray()));
for (final Datapoint datapoint : datapoints) {
if (datapoint.expectReap) {
verify(masterModel).deregisterHost(datapoint.host);
}
}
}
}

0 comments on commit a8d0867

Please sign in to comment.