Skip to content
This repository has been archived by the owner on Sep 12, 2024. It is now read-only.

Commit

Permalink
Removed tombstoning logic from Agent codebase.
Browse files Browse the repository at this point in the history
Removal of job node in ZK is now used to determine if the actual job should be stopped
and removed, instead of using tombstone logic.
  • Loading branch information
Andrey Sibiryov committed Sep 26, 2014
1 parent 6bbb32b commit 6e68b5b
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 277 deletions.
1 change: 0 additions & 1 deletion findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
<Bug pattern="SIC_INNER_SHOULD_BE_STATIC_ANON"/>
<Or>
<Class name="com.spotify.helios.servicescommon.coordination.RetryingZooKeeperNodeWriter$1"/>
<Class name="com.spotify.helios.servicescommon.coordination.ZooKeeperPersistentNodeRemover$2"/>
<Class name="com.spotify.helios.servicescommon.coordination.ZooKeeperUpdatingPersistentDirectory$3"/>
</Or>
</Match>
Expand Down
19 changes: 13 additions & 6 deletions helios-services/src/main/java/com/spotify/helios/agent/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractIdleService;

import com.spotify.helios.common.descriptors.Job;
import com.spotify.helios.common.descriptors.JobId;
import com.spotify.helios.common.descriptors.Task;
import com.spotify.helios.common.descriptors.TaskStatus;
import com.spotify.helios.common.descriptors.*;
import com.spotify.helios.servicescommon.PersistentAtomicReference;
import com.spotify.helios.servicescommon.Reactor;
import com.spotify.helios.servicescommon.ReactorFactory;
Expand Down Expand Up @@ -225,6 +222,17 @@ public Set<String> get() {
}
}

// Create undeploy goals for removed tasks
for (Entry<JobId, Execution> entry : newExecutions.entrySet()) {
final JobId jobId = entry.getKey();
final Execution execution = entry.getValue();

if (!tasks.containsKey(jobId)) {
log.debug("Setting UNDEPLOY goal for removed job: {}", execution.getJob());
entry.setValue(execution.withGoal(Goal.UNDEPLOY));
}
}

// Allocate ports
final Map<JobId, Execution> pending = ImmutableMap.copyOf(
Maps.filterValues(newExecutions, PORT_ALLOCATION_PENDING));
Expand Down Expand Up @@ -297,8 +305,7 @@ public Set<String> get() {
final Supervisor supervisor = supervisors.get(jobId);
if (supervisor == null) {
reapedTasks.add(jobId);
log.debug("Removing tombstoned task: {}", jobId);
model.removeUndeployTombstone(jobId);
log.debug("Removing task: {}", jobId);
model.removeTaskStatus(jobId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,4 @@ public interface Listener {
*/
void tasksChanged(AgentModel model);
}

/**
* Safely (transactionally speaking) removes an UNDEPLOY tombstone.
*
* @param jobId The {@link JobId} of the job that is tombstoned.
*/
void removeUndeployTombstone(JobId jobId) throws InterruptedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

package com.spotify.helios.agent;

import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
Expand All @@ -31,12 +30,10 @@
import com.spotify.helios.common.descriptors.JobId;
import com.spotify.helios.common.descriptors.Task;
import com.spotify.helios.common.descriptors.TaskStatus;
import com.spotify.helios.servicescommon.coordination.Node;
import com.spotify.helios.servicescommon.coordination.Paths;
import com.spotify.helios.servicescommon.coordination.PersistentPathChildrenCache;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClient;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClientProvider;
import com.spotify.helios.servicescommon.coordination.ZooKeeperPersistentNodeRemover;
import com.spotify.helios.servicescommon.coordination.ZooKeeperUpdatingPersistentDirectory;

import org.apache.curator.framework.state.ConnectionState;
Expand All @@ -50,7 +47,6 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static com.spotify.helios.common.descriptors.Descriptor.parse;
import static com.spotify.helios.common.descriptors.Goal.UNDEPLOY;

/**
* The Helios Agent's view into ZooKeeper.
Expand All @@ -65,12 +61,9 @@ public class ZooKeeperAgentModel extends AbstractIdleService implements AgentMod
private static final String TASK_CONFIG_FILENAME = "task-config.json";
private static final String TASK_HISTORY_FILENAME = "task-history.json";
private static final String TASK_STATUS_FILENAME = "task-status.json";
private static final String TASK_REMOVER_FILENAME = "remove.json";
private static final Predicate<Node> TASK_GOAL_IS_UNDEPLOY = new TaskGoalIsUndeployPredicate();

private final PersistentPathChildrenCache<Task> tasks;
private final ZooKeeperUpdatingPersistentDirectory taskStatuses;
private final ZooKeeperPersistentNodeRemover taskRemover;
private final QueueingHistoryWriter historyWriter;

private final String agent;
Expand All @@ -93,10 +86,6 @@ public ZooKeeperAgentModel(final ZooKeeperClientProvider provider, final String
provider,
taskStatusFile,
Paths.statusHostJobs(host));
final Path removerFile = stateDirectory.resolve(TASK_REMOVER_FILENAME);
this.taskRemover = ZooKeeperPersistentNodeRemover.create("agent-model-task-remover", provider,
removerFile, TASK_GOAL_IS_UNDEPLOY,
true);
this.historyWriter = new QueueingHistoryWriter(host, client,
stateDirectory.resolve(TASK_HISTORY_FILENAME));
}
Expand All @@ -105,15 +94,13 @@ public ZooKeeperAgentModel(final ZooKeeperClientProvider provider, final String
protected void startUp() throws Exception {
tasks.startAsync().awaitRunning();
taskStatuses.startAsync().awaitRunning();
taskRemover.startAsync().awaitRunning();
historyWriter.startAsync().awaitRunning();
}

@Override
protected void shutDown() throws Exception {
tasks.stopAsync().awaitTerminated();
taskStatuses.stopAsync().awaitTerminated();
taskRemover.stopAsync().awaitTerminated();
historyWriter.stopAsync().awaitTerminated();
}

Expand Down Expand Up @@ -188,17 +175,6 @@ public void removeTaskStatus(final JobId jobId) throws InterruptedException {
taskStatuses.remove(jobId.toString());
}

/**
* Remove the tombstone for the job identified by {@code jobId}. Tombstones are written by the
* master to tell the agent to undeploy, and these tombstones are removed by the agent after it
* has undeployed.
*/
@Override
public void removeUndeployTombstone(final JobId jobId) throws InterruptedException {
String path = Paths.configHostJob(agent, jobId);
taskRemover.remove(path);
}

/**
* Add a listener that will be notified when tasks are changed.
*/
Expand Down Expand Up @@ -238,18 +214,4 @@ public void connectionStateChanged(final ConnectionState state) {
// ignore
}
}

private static class TaskGoalIsUndeployPredicate implements Predicate<Node> {

@Override
public boolean apply(final Node node) {
assert node != null;
try {
final Task task = parse(node.getBytes(), Task.class);
return task.getGoal() == UNDEPLOY;
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
}

This file was deleted.

0 comments on commit 6e68b5b

Please sign in to comment.