Skip to content

Commit

Permalink
#483 更新发送初始化失败告警逻辑:只有JobInitException的异常才会发送告警
Browse files Browse the repository at this point in the history
  • Loading branch information
kfchu committed Sep 10, 2018
1 parent 21f00f0 commit 44fdb4e
Show file tree
Hide file tree
Showing 10 changed files with 84 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.vip.saturn.job.trigger.SaturnTrigger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -100,7 +99,7 @@ public ExecutorService getExecutorService() {
return jobScheduler.getExecutorService();
}

protected void init() throws SchedulerException {
protected void init() {
scheduler = getTrigger().build(this);
getExecutorService();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

package com.vip.saturn.job.basic;

import com.vip.saturn.job.exception.JobException;
import com.vip.saturn.job.exception.JobInitException;
import com.vip.saturn.job.executor.LimitMaxJobsService;
import com.vip.saturn.job.executor.SaturnExecutorService;
import com.vip.saturn.job.internal.analyse.AnalyseService;
Expand All @@ -37,7 +37,6 @@
import com.vip.saturn.job.threads.TaskQueue;
import com.vip.saturn.job.trigger.SaturnScheduler;
import org.apache.curator.framework.CuratorFramework;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.spi.OperableTrigger;
import org.slf4j.Logger;
Expand Down Expand Up @@ -139,9 +138,8 @@ public JobScheduler(final CoordinatorRegistryCenter coordinatorRegistryCenter,

/**
* 初始化作业.
* @return true初始化成功,false初始化失败
*/
public boolean init() {
public void init() {
try {
String currentConfJobName = currentConf.getJobName();
log.info("[{}] msg=Elastic job: job controller init, job name is: {}.", jobName, currentConfJobName);
Expand All @@ -151,11 +149,9 @@ public boolean init() {
serverService.persistServerOnline(job);
// Notify job enabled or disabled after that all are ready, include job was initialized.
configService.notifyJobEnabledOrNot();
return true;
} catch (Throwable t) {
log.error(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, jobName, t.getMessage()), t);
} catch (Throwable e) {
shutdown(false);
return false;
throw e;
}
}

Expand All @@ -181,13 +177,15 @@ private void startAll() {
statisticsService.startProcessCountJob();
}

private void createJob() throws SchedulerException {
private void createJob() {
Class<?> jobClass = currentConf.getSaturnJobClass();
try {
job = (AbstractElasticJob) jobClass.newInstance();
} catch (Exception e) {
log.error(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, jobName, "createJobException:"), e);
throw new RuntimeException("can not create job with job type " + currentConf.getJobType());
String errMsg = String
.format(SaturnConstant.LOG_FORMAT_FOR_STRING, jobName, "createJobException:" + e.getMessage());
log.error(errMsg, e);
throw new JobInitException(errMsg, e);
}
job.setJobScheduler(this);
job.setConfigService(configService);
Expand Down Expand Up @@ -333,14 +331,10 @@ public void shutdown(boolean removejob) {
* @param cronExpression crom表达式
*/
public void rescheduleJob(final String cronExpression) {
try {
if (job.getScheduler().isShutdown()) {
return;
}
job.getTrigger().retrigger(job.getScheduler(), job);
} catch (final SchedulerException ex) {
throw new JobException(ex);
if (job.getScheduler().isShutdown()) {
return;
}
job.getTrigger().retrigger(job.getScheduler(), job);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,7 @@ public class SaturnConstant {
// max datalength 1MB
public static final int MAX_ZNODE_DATA_LENGTH = 1048576;

public static final String ERR_MSG_TEMPLATE_INIT_FAIL = "[%s] msg=init job business instance fail for reason %s. The job class is [%s]";

public static final String ERR_MSG_INVOKE_METHOD_FAIL = "[%s] msg=invoke method %s of class %s fail for reason %s";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.vip.saturn.job.exception;

/**
* Exception for handling job init fail.
*/
public class JobInitException extends JobException {

public JobInitException(String errorMessage, Object... args) {
super(errorMessage, args);
}

public JobInitException(Exception cause) {
super(cause);
}

public JobInitException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import com.google.common.collect.Maps;
import com.vip.saturn.job.basic.JobScheduler;
import com.vip.saturn.job.exception.JobException;
import com.vip.saturn.job.exception.JobInitException;
import com.vip.saturn.job.exception.SaturnJobException;
import com.vip.saturn.job.internal.config.ConfigurationNode;
import com.vip.saturn.job.internal.config.JobConfiguration;
import com.vip.saturn.job.internal.storage.JobNodePath;
Expand Down Expand Up @@ -127,16 +129,9 @@ public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exc
log.info("new job: {} 's jobClass created event received", jobName);

if (!jobNames.contains(jobName)) {
try {
initJobScheduler(jobName);
if (initJobScheduler(jobName)) {
jobNames.add(jobName);
log.info("the job {} initialize successfully", jobName);
} catch (JobException e) {
String alarmMessage = String.format("job [%s] initialize fail: %s", jobName, e.getMessage());
log.warn(alarmMessage);
String namespace = regCenter.getNamespace();
AlarmUtils.raiseAlarm(constructAlarmInfo(namespace, jobName, executorName, alarmMessage),
namespace);
}
} else {
log.warn("the job {} is unnecessary to initialize, because it's already existing", jobName);
Expand Down Expand Up @@ -164,29 +159,34 @@ public Map<String, Object> constructAlarmInfo(String namespace, String jobName,
return alarmInfo;
}

private void initJobScheduler(String jobName) {
private boolean initJobScheduler(String jobName) throws SaturnJobException {
try {
log.info("[{}] msg=add new job {} - {}", jobName, executorName, jobName);
JobConfiguration jobConfig = new JobConfiguration(regCenter, jobName);
if (jobConfig.getSaturnJobClass() == null) {
String errorMsg = String
.format("the saturnJobClass is null, jobType is [%s]", jobConfig.getJobType());
log.warn("[{}] msg={} - {}", jobName, executorName, errorMsg);
throw new JobException(errorMsg);
throw new JobInitException("the saturnJobClass is null, jobType is {}", jobConfig.getJobType());
}
if (jobConfig.isDeleting()) {
log.warn("[{}] msg={} - {}", jobName, executorName, ERR_MSG_JOB_IS_ON_DELETING);
String serverNodePath = JobNodePath.getServerNodePath(jobName, executorName);
regCenter.remove(serverNodePath);
throw new JobException(ERR_MSG_JOB_IS_ON_DELETING);
log.warn(ERR_MSG_JOB_IS_ON_DELETING);
return false;
}
JobScheduler scheduler = new JobScheduler(regCenter, jobConfig);
scheduler.setSaturnExecutorService(saturnExecutorService);
scheduler.init();
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new JobException(e);
return true;
} catch (JobInitException e) {
String alarmMessage = String.format("job [%s] initialize fail: %s", jobName, e.getMessage());
// no need to log exception stack as it should be logged in the original happen place
log.error(alarmMessage);
String namespace = regCenter.getNamespace();
AlarmUtils.raiseAlarm(constructAlarmInfo(namespace, jobName, executorName, alarmMessage), namespace);
} catch (Throwable e) {
log.warn(String.format("job {} initialize fail, but will not stop the init process", jobName), e);
}

return false;
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import com.vip.saturn.job.SaturnSystemErrorGroup;
import com.vip.saturn.job.SaturnSystemReturnCode;
import com.vip.saturn.job.basic.*;
import com.vip.saturn.job.exception.JobInitException;
import com.vip.saturn.job.internal.config.JobConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -18,6 +18,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import static com.vip.saturn.job.basic.SaturnConstant.ERR_MSG_TEMPLATE_INIT_FAIL;

public class SaturnJavaJob extends CrondJob {
private static Logger log = LoggerFactory.getLogger(SaturnJavaJob.class);

Expand All @@ -31,13 +33,13 @@ public JavaShardingItemCallable createCallable(String jobName, Integer item, Str
}

@Override
public void init() throws SchedulerException {
public void init() {
super.init();
createJobBusinessInstanceIfNecessary();
getJobVersionIfNecessary();
}

private void getJobVersionIfNecessary() throws SchedulerException {
private void getJobVersionIfNecessary() {
if (jobBusinessInstance != null) {
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(saturnExecutorService.getJobClassLoader());
Expand All @@ -46,20 +48,21 @@ private void getJobVersionIfNecessary() throws SchedulerException {
.invoke(jobBusinessInstance);
setJobVersion(version);
} catch (Throwable t) {
log.error(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, jobName,
"error throws during get job version"), t);
throw new SchedulerException(t);
// only log the error message as getJobVersion should not block the init process
String errMsg = String
.format(SaturnConstant.LOG_FORMAT_FOR_STRING, jobName, "error throws during get job version");
log.error(errMsg, t);
} finally {
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
}
}

private void createJobBusinessInstanceIfNecessary() throws SchedulerException {
private void createJobBusinessInstanceIfNecessary() {
JobConfiguration currentConf = configService.getJobConfiguration();
String jobClassStr = currentConf.getJobClass();
if (StringUtils.isBlank(jobClassStr)) {
throw new SchedulerException("init job business instance failed, the job class is " + jobClassStr);
throw new JobInitException(String.format(ERR_MSG_TEMPLATE_INIT_FAIL, jobName, "job class is not set", ""));
}

if (jobBusinessInstance == null) {
Expand All @@ -68,16 +71,20 @@ private void createJobBusinessInstanceIfNecessary() throws SchedulerException {
Thread.currentThread().setContextClassLoader(jobClassLoader);
try {
reflectionInvokeInitMethodsOfJobBusinessInstance(currentConf, jobClassStr, jobClassLoader);
} catch (JobInitException e) {
throw e;
} catch (Throwable t) {
log.error(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, jobName,
"create job business instance error"), t);
throw new SchedulerException(t);
String errMsg = String
.format(SaturnConstant.ERR_MSG_TEMPLATE_INIT_FAIL, jobName, t.getMessage(), jobClassStr);
log.error(errMsg, t);
throw new JobInitException(errMsg, t);
} finally {
Thread.currentThread().setContextClassLoader(oldClassLoader);
}
}
if (jobBusinessInstance == null) {
throw new SchedulerException("init job business instance failed, the job class is " + jobClassStr);
throw new JobInitException(
String.format(ERR_MSG_TEMPLATE_INIT_FAIL, jobName, "job instance is null", jobClassStr));
}
}

Expand All @@ -91,7 +98,7 @@ private void reflectionInvokeInitMethodsOfJobBusinessInstance(JobConfiguration c
if (getObject != null) {
reflectionInvokeGetObjectMethod(jobClassStr, getObject);
}
} catch (Exception ex) {// NOSONAR
} catch (NoSuchMethodException ex) {// NOSONAR
}

if (jobBusinessInstance == null) {
Expand All @@ -105,8 +112,10 @@ private void reflectionInvokeGetObjectMethod(String jobClassStr, Method getObjec
try {
jobBusinessInstance = getObject.invoke(null);
} catch (Throwable t) {
log.error(String.format(SaturnConstant.LOG_FORMAT_FOR_STRING, jobName, jobClassStr + " getObject error"),
t);
String errMsg = String.format(SaturnConstant.ERR_MSG_INVOKE_METHOD_FAIL, jobName, "getObject", jobClassStr,
t.getMessage());
log.error(errMsg, t);
throw new JobInitException(errMsg, t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.quartz.CronExpression;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.triggers.AbstractTrigger;
Expand Down Expand Up @@ -47,15 +46,14 @@ public Trigger createTrigger(AbstractElasticJob job) {
}

@Override
@SuppressWarnings("unchecked")
public SaturnScheduler build(AbstractElasticJob job) throws SchedulerException {
public SaturnScheduler build(AbstractElasticJob job) {
SaturnScheduler scheduler = new SaturnScheduler(job, createTrigger(job));
scheduler.start();
return scheduler;
}

@Override
public void retrigger(SaturnScheduler scheduler, AbstractElasticJob job) throws SchedulerException {
public void retrigger(SaturnScheduler scheduler, AbstractElasticJob job) {
scheduler.rescheduleJob(createTrigger(job));
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.vip.saturn.job.trigger;

import org.quartz.SchedulerException;

import com.vip.saturn.job.basic.AbstractElasticJob;

public class OnetimeTrigger implements SaturnTrigger {
Expand All @@ -11,7 +9,7 @@ public void retrigger(SaturnScheduler scheduler, AbstractElasticJob job) {
}

@Override
public SaturnScheduler build(AbstractElasticJob job) throws SchedulerException {
public SaturnScheduler build(AbstractElasticJob job) {
SaturnScheduler scheduler = new SaturnScheduler(job, null);
scheduler.start();
scheduler.triggerJob();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public Thread newThread(Runnable r) {
});
}

public void start() throws SchedulerException {
public void start() {
saturnQuartzWorker = new SaturnWorker(job, trigger);
executor.submit(saturnQuartzWorker);
}
Expand All @@ -72,7 +72,7 @@ public boolean isShutdown() {
return saturnQuartzWorker.isShutDown();
}

public void rescheduleJob(Trigger createTrigger) throws SchedulerException {
public void rescheduleJob(Trigger createTrigger) {
this.trigger = createTrigger;
saturnQuartzWorker.reInitTrigger(createTrigger);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package com.vip.saturn.job.trigger;

import org.quartz.SchedulerException;

import com.vip.saturn.job.basic.AbstractElasticJob;

public interface SaturnTrigger {
SaturnScheduler build(AbstractElasticJob job) throws SchedulerException;
SaturnScheduler build(AbstractElasticJob job);

void retrigger(SaturnScheduler scheduler, AbstractElasticJob job) throws SchedulerException;
void retrigger(SaturnScheduler scheduler, AbstractElasticJob job);
}

0 comments on commit 44fdb4e

Please sign in to comment.