Skip to content

Commit

Permalink
#298 Saturn-Console support dump executor threads
Browse files Browse the repository at this point in the history
watch the dump node;  refactor code with #295
  • Loading branch information
xiaopeng-he committed Dec 11, 2017
1 parent d5f6085 commit 1c008b4
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 156 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package com.vip.saturn.job.executor;

import com.vip.saturn.job.exception.SaturnJobException;
import com.vip.saturn.job.reg.base.CoordinatorRegistryCenter;
import com.vip.saturn.job.threads.SaturnThreadFactory;
import com.vip.saturn.job.utils.SystemEnvProperties;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @author hebelala
*/
public class RestartAndDumpService {

private static final Logger LOGGER = LoggerFactory.getLogger(RestartAndDumpService.class);

private String executorName;
private CoordinatorRegistryCenter coordinatorRegistryCenter;
private CuratorFramework curatorFramework;
private File prgDir;
private NodeCache restartNC;
private ExecutorService restartES;
private NodeCache dumpNC;
private ExecutorService dumpES;

public RestartAndDumpService(String executorName, CoordinatorRegistryCenter coordinatorRegistryCenter) {
this.executorName = executorName;
this.coordinatorRegistryCenter = coordinatorRegistryCenter;
this.curatorFramework = (CuratorFramework) coordinatorRegistryCenter.getRawClient();
}

public void start() throws Exception {
if (!SystemEnvProperties.VIP_SATURN_ENABLE_EXEC_SCRIPT) {
LOGGER.info("The RestartAndDumpService is disabled");
return;
}
validateFile(SystemEnvProperties.NAME_VIP_SATURN_PRG, SystemEnvProperties.VIP_SATURN_PRG);
validateConfigured(SystemEnvProperties.NAME_VIP_SATURN_LOG_OUTFILE, SystemEnvProperties.VIP_SATURN_LOG_OUTFILE);
prgDir = new File(SystemEnvProperties.NAME_VIP_SATURN_PRG).getParentFile();

initRestart();
initDump();
}

private void validateFile(String name, String value) throws SaturnJobException {
validateConfigured(name, value);
File file = new File(value);
if (!file.exists()) {
throw new SaturnJobException(value + " is not existing");
}
if (!file.isFile()) {
throw new SaturnJobException(value + " is not file");
}
}

private void validateConfigured(String name, String value) throws SaturnJobException {
if (StringUtils.isBlank(value)) {
throw new SaturnJobException(name + " is not configured");
}
LOGGER.info("The {} is configured as {}", name, value);
}

private void initRestart() throws Exception {
restartES = Executors.newSingleThreadExecutor(new SaturnThreadFactory(executorName + "-restart-watcher-thread", false));
// Remove the restart node, before add watcher that watches the create and update event
String nodePath = SaturnExecutorsNode.EXECUTORS_ROOT + "/" + executorName + "/restart";
coordinatorRegistryCenter.remove(nodePath);
restartNC = new NodeCache(curatorFramework, nodePath);
restartNC.getListenable().addListener(new NodeCacheListener() {

@Override
public void nodeChanged() throws Exception {
// Watch create, update event
if (restartNC.getCurrentData() != null) {
LOGGER.info("The executor {} restart event is received", executorName);
restartES.execute(new Runnable() {
@Override
public void run() {
try {
// The apache's Executor maybe destroy process on some conditions,
// and don't provide the api for redirect process's streams to file.
// It's not expected, so I use the original way.
LOGGER.info("Begin to execute restart script");
String command = "chmod +x " + SystemEnvProperties.VIP_SATURN_PRG + ";" + SystemEnvProperties.VIP_SATURN_PRG + " restart";
Process process = new ProcessBuilder()
.command("/bin/bash", "-c", command)
.directory(prgDir)
.redirectOutput(ProcessBuilder.Redirect.appendTo(new File(SystemEnvProperties.VIP_SATURN_LOG_OUTFILE)))
.redirectError(ProcessBuilder.Redirect.appendTo(new File(SystemEnvProperties.VIP_SATURN_LOG_OUTFILE)))
.start();
int exit = process.waitFor();
LOGGER.info("Executed restart script, the exit value {} is returned", exit);
} catch (InterruptedException e) {
LOGGER.info("Restart thread is interrupted");
} catch (Exception e) {
LOGGER.error("Execute restart script error", e);
}
}
});
}
}

});
// Start, with not buildInitial.
// The initial data is null, so the create event will be triggered firstly.
restartNC.start(false);
}

private void initDump() throws Exception {
dumpES = Executors.newSingleThreadExecutor(new SaturnThreadFactory(executorName + "-dump-watcher-thread", false));
final String nodePath = SaturnExecutorsNode.EXECUTORS_ROOT + "/" + executorName + "/dump";
coordinatorRegistryCenter.remove(nodePath);
dumpNC = new NodeCache(curatorFramework, nodePath);
dumpNC.getListenable().addListener(new NodeCacheListener() {

@Override
public void nodeChanged() throws Exception {
// Watch create, update event
if (dumpNC.getCurrentData() != null) {
LOGGER.info("The executor {} dump event is received", executorName);
dumpES.execute(new Runnable() {
@Override
public void run() {
try {
// dump threads and gc
LOGGER.info("Begin to execute script dump");
String command = "chmod +x " + SystemEnvProperties.VIP_SATURN_PRG + ";" + SystemEnvProperties.VIP_SATURN_PRG + " dump";
Process process = new ProcessBuilder()
.command("/bin/bash", "-c", command)
.directory(prgDir)
.redirectOutput(ProcessBuilder.Redirect.appendTo(new File(SystemEnvProperties.VIP_SATURN_LOG_OUTFILE)))
.redirectError(ProcessBuilder.Redirect.appendTo(new File(SystemEnvProperties.VIP_SATURN_LOG_OUTFILE)))
.start();
int exit = process.waitFor();
LOGGER.info("Execute script dump done, the exit value {} is returned", exit);
} catch (InterruptedException e) {
LOGGER.info("Dump thread is interrupted");
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
coordinatorRegistryCenter.remove(nodePath);
}
}
});
}
}

});
dumpNC.start(false);
}


public void stop() {
try {
if (restartNC != null) {
restartNC.close();
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
if (restartES != null) {
restartES.shutdownNow();
}
try {
if (dumpNC != null) {
dumpNC.close();
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
if (dumpES != null) {
dumpES.shutdownNow();
}
}


}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
public class SaturnExecutorExtensionDefault extends SaturnExecutorExtension {

private static Logger LOGGER;
private static final String NAME_SATURN_LOG_DIR = "SATURN_LOG_DIR";
private static final String NAME_VIP_SATURN_LOG_DIR = "VIP_SATURN_LOG_DIR";

public SaturnExecutorExtensionDefault(String executorName, String namespace, ClassLoader executorClassLoader,
ClassLoader jobClassLoader) {
Expand All @@ -30,8 +30,8 @@ public void initBefore() {

@Override
public void initLogDirEnv() {
String SATURN_LOG_DIR = System.getProperty(NAME_SATURN_LOG_DIR,
getEnv(NAME_SATURN_LOG_DIR, getDefaultLogDir(executorName)));
String SATURN_LOG_DIR = System.getProperty(NAME_VIP_SATURN_LOG_DIR,
getEnv(NAME_VIP_SATURN_LOG_DIR, getDefaultLogDir(executorName)));
System.setProperty("saturn.log.dir", SATURN_LOG_DIR); // for logback.xml
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class SaturnExecutorService {
private String ipNode;
private ClassLoader jobClassLoader;
private ClassLoader executorClassLoader;
private RestartExecutorService restartExecutorService;
private RestartAndDumpService restartExecutorService;

public SaturnExecutorService(CoordinatorRegistryCenter coordinatorRegistryCenter, String executorName) {
this.coordinatorRegistryCenter = coordinatorRegistryCenter;
Expand Down Expand Up @@ -92,7 +92,7 @@ private void registerExecutor0() throws Exception {
if (restartExecutorService != null) {
restartExecutorService.stop();
}
restartExecutorService = new RestartExecutorService(executorName, coordinatorRegistryCenter);
restartExecutorService = new RestartAndDumpService(executorName, coordinatorRegistryCenter);
restartExecutorService.start();

// 持久化ip
Expand Down Expand Up @@ -213,7 +213,7 @@ private void stopRestartExecutorService() {
private void removeIpNode() {
try {
if (coordinatorRegistryCenter != null && ipNode != null && coordinatorRegistryCenter.isConnected()) {
log.info(" {} is going to delete its ip node {}", executorName, ipNode);
log.info("{} is going to delete its ip node {}", executorName, ipNode);
coordinatorRegistryCenter.remove(ipNode);
}
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public class SystemEnvProperties {
public static String NAME_VIP_SATURN_ZK_CLIENT_CONNECTION_TIMEOUT_IN_SECONDS = "VIP_SATURN_ZK_CLIENT_CONNECTION_TIMEOUT";
public static int VIP_SATURN_ZK_CLIENT_CONNECTION_TIMEOUT_IN_SECONDS = -1;

// For restart executor
public static boolean VIP_SATURN_ENABLE_RESTART_EXECUTOR = Boolean.getBoolean("VIP_SATURN_ENABLE_RESTART_EXECUTOR");
// For restart and dump
public static boolean VIP_SATURN_ENABLE_EXEC_SCRIPT = Boolean.getBoolean("VIP_SATURN_ENABLE_EXEC_SCRIPT");
public static String NAME_VIP_SATURN_PRG = "VIP_SATURN_PRG";
public static String VIP_SATURN_PRG = System.getProperty(NAME_VIP_SATURN_PRG);
public static String NAME_VIP_SATURN_LOG_OUTFILE = "VIP_SATURN_LOG_OUTFILE";
Expand Down
Loading

0 comments on commit 1c008b4

Please sign in to comment.