Skip to content

Commit

Permalink
Merge pull request #509 from mesos/bug/462-ShutdownExecutors
Browse files Browse the repository at this point in the history
SIGTERM will gracefully shutdown the framework.
  • Loading branch information
Phil Winder committed Feb 23, 2016
2 parents 91b6608 + 5bb45ae commit 6078ddb
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,10 @@ public void executorLost(SchedulerDriver driver, Protos.ExecutorID executorId, P
public void error(SchedulerDriver driver, String message) {
LOGGER.error("Error: " + message);
}

public void shutdown(SchedulerDriver driver) {
clusterState.getTaskList().stream().forEach(taskInfo -> driver.killTask(taskInfo.getTaskId())); // Kill tasks.
clusterState.destroy(); // Remove tasks from zk
frameworkState.destroy(); // Remove framework state from zk.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ public void run(String[] args) {
schedulerDriver = new MesosSchedulerDriver(scheduler, frameworkBuilder.build(), configuration.getMesosZKURL());
}

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
LOGGER.info("Performing graceful shutdown");
scheduler.shutdown(schedulerDriver);
}
});

HashMap<String, Object> properties = new HashMap<>();
properties.put("server.port", String.valueOf(configuration.getWebUiPort()));
new SpringApplicationBuilder(WebApplication.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,21 @@ public boolean taskInError(Protos.TaskStatus status) {
return getStatus(status.getTaskId()).taskInError();
}

/**
* Deletes all tasks and state.
*/
public void destroy() {
try {
getTaskList().stream().forEach(taskInfo -> getStatus(taskInfo).destroy());
// Todo (pnw): Refactor. This shouldn't be mopping up the ESTaskStatus stuff
zooKeeperStateDriver.delete(getKey());
zooKeeperStateDriver.delete(frameworkState.getFrameworkID().getValue() + "/" + ESTaskStatus.STATE_KEY);
zooKeeperStateDriver.delete(frameworkState.getFrameworkID().getValue());
} catch (IOException e) {
LOGGER.error("Unable to delete state from ZooKeeper", e);
}
}

/**
* Updates a task with the given status. Status is written to zookeeper.
* If the task is in error, then the healthchecks are stopped and state is removed from ZK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* the respective TaskStatus packet.
*/
public class ESTaskStatus {
// Todo (pnw): Refactor: This is part of the cluster state, but is often accessed without cluster state.
private static final Logger LOGGER = Logger.getLogger(TaskStatus.class);
public static final String STATE_KEY = "state";
public static final String DEFAULT_STATUS_NO_MESSAGE_SET = "Default status. No message set.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ public void markRegistered(Protos.FrameworkID frameworkId, SchedulerDriver drive
registeredListeners.forEach(listener -> listener.accept(clusterState));
}

public void destroy() {
try {
statePath.rm(FRAMEWORKID_KEY);
} catch (IOException e) {
LOGGER.error("Unable to delete " + FRAMEWORKID_KEY + " from zookeeper", e);
}
}

public SchedulerDriver getDriver() {
return driver;
}
Expand Down Expand Up @@ -91,4 +99,4 @@ public void announceStatusUpdate(Protos.TaskStatus taskStatus) {
public void onStatusUpdate(Consumer<Protos.TaskStatus> listener) {
statusUpdateListeners.add(listener);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
package org.apache.mesos.elasticsearch.scheduler.state;

import org.apache.log4j.Logger;

import java.io.IOException;
import java.security.InvalidParameterException;

/**
* Path utilities
*/
public class StatePath {
private static final Logger LOGGER = Logger.getLogger(StatePath.class);
private SerializableState zkState;
public StatePath(SerializableState zkState) {
this.zkState = zkState;
Expand Down Expand Up @@ -43,4 +40,14 @@ public Boolean exists(String key) throws IOException {
}
return exists;
}

/**
* Remove a zNode or zFolder.
*
* @param key the path to remove.
* @throws IOException If unable to remove
*/
public void rm(String key) throws IOException {
zkState.delete(key);
}
}
2 changes: 1 addition & 1 deletion scheduler/start-scheduler.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#!/bin/bash
java $JAVA_OPTS -Djava.library.path=/usr/lib -jar /tmp/elasticsearch-mesos-scheduler.jar "$@"
exec java $JAVA_OPTS -Djava.library.path=/usr/lib -jar /tmp/elasticsearch-mesos-scheduler.jar "$@"

0 comments on commit 6078ddb

Please sign in to comment.