From 7d0dadbce9e08af3b6524be8cc9b7824fb890fd1 Mon Sep 17 00:00:00 2001 From: hugui <254963746@qq.com> Date: Thu, 8 Oct 2015 18:47:23 +0800 Subject: [PATCH] =?UTF-8?q?LTS-Admin=20=E4=BD=BF=E7=94=A8JobClient?= =?UTF-8?q?=E6=8F=90=E4=BA=A4=E4=BB=BB=E5=8A=A1;=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E5=BB=B6=E8=BF=9F=E6=89=B9=E9=87=8F=E8=BE=93=E7=9B=98=E4=B8=9A?= =?UTF-8?q?=E5=8A=A1=E6=97=A5=E5=BF=97=E6=B5=8B=E8=AF=95;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lts-admin/pom.xml | 5 + .../controller/api/JobQueueApiController.java | 79 ++++---- .../lts/web/support/LtsAdminJobClient.java | 36 ++++ .../support/spring/AdminAppFactoryBean.java | 7 +- .../src/main/webapp/views/templates/jobAdd.vm | 10 + .../main/webapp/views/templates/jobLogger.vm | 1 + .../views/templates/nodeGroupManager.vm | 4 +- .../webapp/views/templates/nodeJvmInfo.vm | 3 +- .../webapp/views/templates/nodeManager.vm | 6 +- .../com/lts/core/cluster/AbstractJobNode.java | 4 +- .../com/lts/core/commons/utils/NetUtils.java | 2 +- .../java/com/lts/core/constant/Constants.java | 7 + .../com/lts/example/api/JobTrackerTest.java | 9 +- .../java/com/lts/jobtracker/JobTracker.java | 5 +- .../lts/jobtracker/support/JobReceiver.java | 29 ++- .../com/lts/biz/logger/JobLoggerDelegate.java | 179 ++++++++++++++++++ .../com/lts/biz/logger/domain/LogType.java | 1 + .../lts/biz/logger/mysql/MysqlJobLogger.java | 15 +- 18 files changed, 330 insertions(+), 72 deletions(-) create mode 100644 lts-admin/src/main/java/com/lts/web/support/LtsAdminJobClient.java create mode 100644 lts-logger/lts-logger-api/src/main/java/com/lts/biz/logger/JobLoggerDelegate.java diff --git a/lts-admin/pom.xml b/lts-admin/pom.xml index 32bcb6ac2..093d1c0fe 100644 --- a/lts-admin/pom.xml +++ b/lts-admin/pom.xml @@ -40,6 +40,11 @@ lts-core ${project.version} + + com.lts + lts-jobclient + ${project.version} + com.lts lts-queue-api diff --git a/lts-admin/src/main/java/com/lts/web/controller/api/JobQueueApiController.java b/lts-admin/src/main/java/com/lts/web/controller/api/JobQueueApiController.java index cfd72ad76..e8e2b8808 100644 --- a/lts-admin/src/main/java/com/lts/web/controller/api/JobQueueApiController.java +++ b/lts-admin/src/main/java/com/lts/web/controller/api/JobQueueApiController.java @@ -3,14 +3,17 @@ import com.lts.biz.logger.domain.JobLogPo; import com.lts.biz.logger.domain.JobLoggerRequest; import com.lts.core.commons.utils.Assert; +import com.lts.core.commons.utils.CollectionUtils; import com.lts.core.commons.utils.StringUtils; +import com.lts.core.domain.Job; import com.lts.core.support.CronExpression; -import com.lts.core.support.SystemClock; +import com.lts.jobclient.domain.Response; import com.lts.queue.domain.JobPo; import com.lts.web.cluster.AdminApplication; import com.lts.web.controller.AbstractController; import com.lts.web.request.JobQueueRequest; import com.lts.web.response.PageResponse; +import com.lts.web.support.LtsAdminJobClient; import com.lts.web.vo.RestfulResponse; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; @@ -18,6 +21,7 @@ import java.text.ParseException; import java.util.Date; +import java.util.Map; /** * @author Robert HG (254963746@qq.com) on 6/6/15. @@ -27,6 +31,8 @@ public class JobQueueApiController extends AbstractController { @Autowired AdminApplication application; + @Autowired + LtsAdminJobClient ltsAdminJobClient; @RequestMapping("/job-queue/cron-job-get") public RestfulResponse cronJobGet(JobQueueRequest request) { @@ -149,7 +155,7 @@ public RestfulResponse cronJobDelete(JobQueueRequest request) { return response; } boolean success = application.getCronJobQueue().remove(request.getJobId()); - if(success){ + if (success) { try { application.getExecutableJobQueue().remove(request.getTaskTrackerNodeGroup(), request.getJobId()); } catch (Exception e) { @@ -187,7 +193,7 @@ public RestfulResponse executableJobDelete(JobQueueRequest request) { } } response.setSuccess(true); - } else{ + } else { response.setSuccess(false); response.setMsg("更新失败,该条任务可能已经删除."); } @@ -199,14 +205,14 @@ public RestfulResponse executableJobDelete(JobQueueRequest request) { public RestfulResponse jobLoggerGet(JobLoggerRequest request) { RestfulResponse response = new RestfulResponse(); - try { +// try { // Assert.hasLength(request.getTaskId(), "taskId不能为空!"); // Assert.hasLength(request.getTaskTrackerNodeGroup(), "taskTrackerNodeGroup不能为空!"); - } catch (IllegalArgumentException e) { - response.setSuccess(false); - response.setMsg(e.getMessage()); - return response; - } +// } catch (IllegalArgumentException e) { +// response.setSuccess(false); +// response.setMsg(e.getMessage()); +// return response; +// } PageResponse pageResponse = application.getJobLogger().search(request); response.setResults(pageResponse.getResults()); @@ -221,7 +227,6 @@ public RestfulResponse jobAdd(JobQueueRequest request) { RestfulResponse response = new RestfulResponse(); // 表单check - Long triggerTime = null; try { Assert.hasLength(request.getTaskId(), "taskId不能为空!"); Assert.hasLength(request.getTaskTrackerNodeGroup(), "taskTrackerNodeGroup不能为空!"); @@ -236,7 +241,7 @@ public RestfulResponse jobAdd(JobQueueRequest request) { response.setMsg(StringUtils.format("该CronExpression={} 已经没有执行时间点!", request.getCronExpression())); return response; } else { - triggerTime = nextTime.getTime(); + request.setTriggerTime(nextTime); } } catch (ParseException e) { response.setSuccess(false); @@ -251,38 +256,36 @@ public RestfulResponse jobAdd(JobQueueRequest request) { return response; } - addJob(request, triggerTime); - - response.setSuccess(true); + Response ltsResponse = addJob(request); + if (ltsResponse.isSuccess()) { + response.setSuccess(true); + } else { + response.setSuccess(false); + response.setMsg("提交失败: " + ltsResponse.getMsg()); + response.setCode(ltsResponse.getCode()); + } return response; } - private void addJob(JobQueueRequest request, Long triggerTime) { - JobPo jobPo = new JobPo(); - // 这里暂时用UUID来代替 - jobPo.setJobId(StringUtils.generateUUID()); - jobPo.setCronExpression(request.getCronExpression()); - jobPo.setExtParams(request.getExtParams()); - jobPo.setGmtCreated(SystemClock.now()); - jobPo.setGmtModified(jobPo.getGmtCreated()); - jobPo.setNeedFeedback(request.getNeedFeedback()); - jobPo.setPriority(request.getPriority()); - jobPo.setTaskId(request.getTaskId()); - jobPo.setSubmitNodeGroup(request.getSubmitNodeGroup()); - jobPo.setTaskTrackerNodeGroup(request.getTaskTrackerNodeGroup()); - if (request.getTriggerTime() != null) { - jobPo.setTriggerTime(request.getTriggerTime().getTime()); - } + private Response addJob(JobQueueRequest request) { - if (jobPo.isSchedule()) { - application.getCronJobQueue().add(jobPo); - if (triggerTime != null) { - jobPo.setTriggerTime(triggerTime); + Job job = new Job(); + job.setTaskId(request.getTaskId()); + if (CollectionUtils.isNotEmpty(request.getExtParams())) { + for (Map.Entry entry : request.getExtParams().entrySet()) { + job.setParam(entry.getKey(), entry.getValue()); } } - if (jobPo.getTriggerTime() == null) { - jobPo.setTriggerTime(SystemClock.now()); - } - application.getExecutableJobQueue().add(jobPo); + // 执行节点的group名称 + job.setTaskTrackerNodeGroup(request.getTaskTrackerNodeGroup()); + job.setSubmitNodeGroup(request.getSubmitNodeGroup()); + + job.setNeedFeedback(request.getNeedFeedback()); + // 这个是 cron expression 和 quartz 一样,可选 + job.setCronExpression(request.getCronExpression()); + job.setTriggerTime(request.getTriggerTime()); + job.setPriority(request.getPriority()); + + return ltsAdminJobClient.submitJob(job); } } diff --git a/lts-admin/src/main/java/com/lts/web/support/LtsAdminJobClient.java b/lts-admin/src/main/java/com/lts/web/support/LtsAdminJobClient.java new file mode 100644 index 000000000..6c020d3ca --- /dev/null +++ b/lts-admin/src/main/java/com/lts/web/support/LtsAdminJobClient.java @@ -0,0 +1,36 @@ +package com.lts.web.support; + +import com.lts.core.commons.utils.StringUtils; +import com.lts.core.domain.Job; +import com.lts.jobclient.JobClient; +import com.lts.jobclient.domain.Response; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.stereotype.Component; + +/** + * @author Robert HG (254963746@qq.com) on 10/3/15. + */ +@Component +public class LtsAdminJobClient implements InitializingBean { + + private JobClient jobClient; + + public Response submitJob(Job job) { + return jobClient.submitJob(job); + } + + @Override + public void afterPropertiesSet() throws Exception { + jobClient = new JobClient(); + jobClient.setNodeGroup("LTS-Admin"); + String clusterName = AppConfigurer.getProperties("clusterName"); + if (StringUtils.isEmpty(clusterName)) { + throw new IllegalArgumentException("clusterName in lts-admin.cfg can not be null."); + } + jobClient.setClusterName(clusterName); + jobClient.setRegistryAddress(AppConfigurer.getProperties("registryAddress")); + + jobClient.start(); + } + +} diff --git a/lts-admin/src/main/java/com/lts/web/support/spring/AdminAppFactoryBean.java b/lts-admin/src/main/java/com/lts/web/support/spring/AdminAppFactoryBean.java index 228a58757..b1dabed84 100644 --- a/lts-admin/src/main/java/com/lts/web/support/spring/AdminAppFactoryBean.java +++ b/lts-admin/src/main/java/com/lts/web/support/spring/AdminAppFactoryBean.java @@ -1,6 +1,6 @@ package com.lts.web.support.spring; -import com.lts.biz.logger.JobLoggerFactory; +import com.lts.biz.logger.JobLoggerDelegate; import com.lts.core.cluster.Config; import com.lts.core.cluster.Node; import com.lts.core.cluster.NodeType; @@ -9,7 +9,6 @@ import com.lts.core.constant.Constants; import com.lts.core.extension.ExtensionLoader; import com.lts.core.registry.RegistryStatMonitor; -import com.lts.ec.EventCenter; import com.lts.ec.EventCenterFactory; import com.lts.queue.*; import com.lts.web.cluster.AdminApplication; @@ -32,8 +31,6 @@ public class AdminAppFactoryBean implements FactoryBean, Initi ExecutingJobQueueFactory.class).getAdaptiveExtension(); NodeGroupStoreFactory nodeGroupStoreFactory = ExtensionLoader.getExtensionLoader( NodeGroupStoreFactory.class).getAdaptiveExtension(); - JobLoggerFactory jobLoggerFactory = ExtensionLoader.getExtensionLoader( - JobLoggerFactory.class).getAdaptiveExtension(); JobFeedbackQueueFactory jobFeedbackQueueFactory = ExtensionLoader.getExtensionLoader( JobFeedbackQueueFactory.class).getAdaptiveExtension(); private EventCenterFactory eventCenterFactory = ExtensionLoader.getExtensionLoader(EventCenterFactory.class).getAdaptiveExtension(); @@ -90,7 +87,7 @@ public void afterPropertiesSet() throws Exception { application.setExecutableJobQueue(executableJobQueueFactory.getQueue(config)); application.setExecutingJobQueue(executingJobQueueFactory.getQueue(config)); application.setNodeGroupStore(nodeGroupStoreFactory.getStore(config)); - application.setJobLogger(jobLoggerFactory.getJobLogger(config)); + application.setJobLogger(new JobLoggerDelegate(config)); application.setEventCenter(eventCenterFactory.getEventCenter(config)); application.setRegistryStatMonitor(new RegistryStatMonitor(application)); } diff --git a/lts-admin/src/main/webapp/views/templates/jobAdd.vm b/lts-admin/src/main/webapp/views/templates/jobAdd.vm index a42beef63..e2e1d1427 100644 --- a/lts-admin/src/main/webapp/views/templates/jobAdd.vm +++ b/lts-admin/src/main/webapp/views/templates/jobAdd.vm @@ -135,6 +135,16 @@ } }); + $(document).on("change", "select[name='submitNodeGroup']", function () { + var submitNodeGroup = $(this).val(); + if (submitNodeGroup == 'LTS-Admin') { + $("select[name='needFeedback']").selectpicker("val", "false"); + $("select[name='needFeedback']").attr("disabled", "disabled"); + } else { + $("select[name='needFeedback']").attr("disabled", null); + } + }); + $(document).on("click", "#addBtn", function () { var params = {}; $.each($('#form').parent().find(".form-control"), function () { diff --git a/lts-admin/src/main/webapp/views/templates/jobLogger.vm b/lts-admin/src/main/webapp/views/templates/jobLogger.vm index 8feb902f7..9482987e3 100644 --- a/lts-admin/src/main/webapp/views/templates/jobLogger.vm +++ b/lts-admin/src/main/webapp/views/templates/jobLogger.vm @@ -158,6 +158,7 @@ $(document).ready(function () { var LOG_TYPE = { + RECEIVE: '接受任务', SENT: '派发任务', FINISHED: '完成任务', FIXED_DEAD: '修复死任务', diff --git a/lts-admin/src/main/webapp/views/templates/nodeGroupManager.vm b/lts-admin/src/main/webapp/views/templates/nodeGroupManager.vm index 86544e286..07e80f595 100644 --- a/lts-admin/src/main/webapp/views/templates/nodeGroupManager.vm +++ b/lts-admin/src/main/webapp/views/templates/nodeGroupManager.vm @@ -82,13 +82,13 @@ {{/each}} {{if results == 0}} - 暂无数据 + 暂无数据 {{/if}} - + 共{{results}}条记录,每页展示{{pageSize}}条
    diff --git a/lts-admin/src/main/webapp/views/templates/nodeJvmInfo.vm b/lts-admin/src/main/webapp/views/templates/nodeJvmInfo.vm index 8b309465a..53aa36991 100644 --- a/lts-admin/src/main/webapp/views/templates/nodeJvmInfo.vm +++ b/lts-admin/src/main/webapp/views/templates/nodeJvmInfo.vm @@ -106,9 +106,10 @@ showJVMInfo(identity, data); }); } else { - if (json) { + if (json && json['msg']) { swal(json['msg']); } + showJVMInfo('', {}); } } }); diff --git a/lts-admin/src/main/webapp/views/templates/nodeManager.vm b/lts-admin/src/main/webapp/views/templates/nodeManager.vm index b2fe36b77..767b041ac 100644 --- a/lts-admin/src/main/webapp/views/templates/nodeManager.vm +++ b/lts-admin/src/main/webapp/views/templates/nodeManager.vm @@ -41,9 +41,9 @@
    diff --git a/lts-core/src/main/java/com/lts/core/cluster/AbstractJobNode.java b/lts-core/src/main/java/com/lts/core/cluster/AbstractJobNode.java index a4a29ea47..6a5946238 100644 --- a/lts-core/src/main/java/com/lts/core/cluster/AbstractJobNode.java +++ b/lts-core/src/main/java/com/lts/core/cluster/AbstractJobNode.java @@ -72,7 +72,9 @@ final public void stop() { try { if (started.compareAndSet(true, false)) { - registry.unregister(node); + if (registry != null) { + registry.unregister(node); + } preRemotingStop(); diff --git a/lts-core/src/main/java/com/lts/core/commons/utils/NetUtils.java b/lts-core/src/main/java/com/lts/core/commons/utils/NetUtils.java index 7ba595103..95cf9cee9 100644 --- a/lts-core/src/main/java/com/lts/core/commons/utils/NetUtils.java +++ b/lts-core/src/main/java/com/lts/core/commons/utils/NetUtils.java @@ -183,7 +183,7 @@ private static InetAddress getLocalAddress0() { } catch (Exception e) { logger.warn("Failed to retriving ip address, " + e.getMessage(), e); } - logger.error("Could not get local host ip address, will use 127.0.0.1 instead."); +// logger.error("Could not get local host ip address, will use 127.0.0.1 instead."); return localAddress; } diff --git a/lts-core/src/main/java/com/lts/core/constant/Constants.java b/lts-core/src/main/java/com/lts/core/constant/Constants.java index f1030f749..18628b73f 100644 --- a/lts-core/src/main/java/com/lts/core/constant/Constants.java +++ b/lts-core/src/main/java/com/lts/core/constant/Constants.java @@ -91,4 +91,11 @@ public interface Constants { String ADMIN_ID_PREFIX = "LTS_admin_"; + // 是否延迟批量刷盘日志, 如果启用,采用队列的方式批量将日志刷盘(在应用关闭的时候,可能会造成日志丢失) + String LAZY_JOB_LOGGER = "lazy.job.logger"; + // 延迟批量刷盘日志 内存中的最大日志量阀值 + String LAZY_JOB_LOGGER_MEM_SIZE = "lazy.job.logger.mem.size"; + // 延迟批量刷盘日志 检查频率 + String LAZY_JOB_LOGGER_CHECK_PERIOD = "lazy.job.logger.check.period"; + } diff --git a/lts-example/src/main/java/com/lts/example/api/JobTrackerTest.java b/lts-example/src/main/java/com/lts/example/api/JobTrackerTest.java index d47694b4b..f6bf83c88 100644 --- a/lts-example/src/main/java/com/lts/example/api/JobTrackerTest.java +++ b/lts-example/src/main/java/com/lts/example/api/JobTrackerTest.java @@ -72,9 +72,12 @@ public static void testMysqlQueue() { // 任务队列用mysql jobTracker.addConfig("job.queue", "mysql"); // mysql 配置 - jobTracker.addConfig("jdbc.url", "jdbc:mysql://127.0.0.1:3306/lts"); - jobTracker.addConfig("jdbc.username", "root"); - jobTracker.addConfig("jdbc.password", "root"); + jobTracker.addConfig("jdbc.url", "jdbc:mysql://127.0.0.1:3306/lts"); + jobTracker.addConfig("jdbc.username", "root"); + jobTracker.addConfig("jdbc.password", "root"); + + // 延迟批量刷盘业务日志开关 +// jobTracker.addConfig("lazy.job.logger", "true"); jobTracker.setOldDataHandler(new OldDataDeletePolicy()); // 设置 zk 客户端用哪个, 可选 zkclient, curator 默认是 zkclient diff --git a/lts-jobtracker/src/main/java/com/lts/jobtracker/JobTracker.java b/lts-jobtracker/src/main/java/com/lts/jobtracker/JobTracker.java index 449e9fe5d..dd34a51ca 100644 --- a/lts-jobtracker/src/main/java/com/lts/jobtracker/JobTracker.java +++ b/lts-jobtracker/src/main/java/com/lts/jobtracker/JobTracker.java @@ -1,5 +1,6 @@ package com.lts.jobtracker; +import com.lts.biz.logger.JobLoggerDelegate; import com.lts.biz.logger.JobLoggerFactory; import com.lts.core.cluster.AbstractServerNode; import com.lts.core.extension.ExtensionLoader; @@ -21,8 +22,6 @@ */ public class JobTracker extends AbstractServerNode { - private JobLoggerFactory jobLoggerFactory = ExtensionLoader.getExtensionLoader(JobLoggerFactory.class).getAdaptiveExtension(); - private CronJobQueueFactory cronJobQueueFactory = ExtensionLoader.getExtensionLoader(CronJobQueueFactory.class).getAdaptiveExtension(); private ExecutableJobQueueFactory executableJobQueueFactory = @@ -56,7 +55,7 @@ protected void preRemotingStart() { super.preRemotingStart(); // injectRemotingServer application.setRemotingServer(remotingServer); - application.setJobLogger(jobLoggerFactory.getJobLogger(config)); + application.setJobLogger(new JobLoggerDelegate(config)); application.setExecutableJobQueue(executableJobQueueFactory.getQueue(config)); application.setExecutingJobQueue(executingJobQueueFactory.getQueue(config)); application.setCronJobQueue(cronJobQueueFactory.getQueue(config)); diff --git a/lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobReceiver.java b/lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobReceiver.java index d79962c2c..4a21e4f22 100644 --- a/lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobReceiver.java +++ b/lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobReceiver.java @@ -1,6 +1,9 @@ package com.lts.jobtracker.support; +import com.lts.biz.logger.domain.JobLogPo; +import com.lts.biz.logger.domain.LogType; import com.lts.core.commons.utils.StringUtils; +import com.lts.core.constant.Level; import com.lts.core.domain.Job; import com.lts.core.exception.JobReceiveException; import com.lts.core.extension.ExtensionLoader; @@ -8,6 +11,7 @@ import com.lts.core.logger.LoggerFactory; import com.lts.core.protocol.command.JobSubmitRequest; import com.lts.core.support.LoggerName; +import com.lts.core.support.SystemClock; import com.lts.jobtracker.domain.JobTrackerApplication; import com.lts.jobtracker.id.IdGenerator; import com.lts.jobtracker.monitor.JobTrackerMonitor; @@ -68,6 +72,7 @@ private JobPo addToQueue(Job job, JobSubmitRequest request) { JobPo jobPo = null; + boolean duplicate = false; try { jobPo = JobDomainConverter.convert(job); if (jobPo == null) { @@ -82,19 +87,39 @@ private JobPo addToQueue(Job job, JobSubmitRequest request) { if (job.isSchedule()) { addCronJob(jobPo); - LOGGER.info("Receive cron job success. nodeGroup={}, CronExpression={}, {}", + LOGGER.info("Receive Cron Job success. nodeGroup={}, CronExpression={}, {}", request.getNodeGroup(), job.getCronExpression(), job); } else { application.getExecutableJobQueue().add(jobPo); - LOGGER.info("Receive job success. nodeGroup={}, {}", request.getNodeGroup(), job); + LOGGER.info("Receive Job success. nodeGroup={}, {}", request.getNodeGroup(), job); } + } catch (DuplicateJobException e) { // already exist, ignore LOGGER.info("Job already exist. nodeGroup={}, {}", request.getNodeGroup(), job); + duplicate = true; } finally { monitor.incReceiveJobNum(); } + if (jobPo != null) { + try { + // 记录日志 + JobLogPo jobLogPo = JobDomainConverter.convertJobLog(jobPo); + jobLogPo.setSuccess(true); + jobLogPo.setLogType(LogType.RECEIVE); + jobLogPo.setLogTime(SystemClock.now()); + jobLogPo.setLevel(Level.INFO); + if (duplicate) { + jobLogPo.setLevel(Level.WARN); + jobLogPo.setMsg("在任务队列中已经存在,忽略本次提交."); + } + application.getJobLogger().log(jobLogPo); + } catch (Throwable t) { // 日志记录失败不影响正常运行 + LOGGER.error("Receive Job Log error ", t); + } + } + return jobPo; } diff --git a/lts-logger/lts-logger-api/src/main/java/com/lts/biz/logger/JobLoggerDelegate.java b/lts-logger/lts-logger-api/src/main/java/com/lts/biz/logger/JobLoggerDelegate.java new file mode 100644 index 000000000..838add923 --- /dev/null +++ b/lts-logger/lts-logger-api/src/main/java/com/lts/biz/logger/JobLoggerDelegate.java @@ -0,0 +1,179 @@ +package com.lts.biz.logger; + +import com.lts.biz.logger.domain.JobLogPo; +import com.lts.biz.logger.domain.JobLoggerRequest; +import com.lts.core.cluster.Config; +import com.lts.core.commons.utils.CollectionUtils; +import com.lts.core.constant.Constants; +import com.lts.core.extension.ExtensionLoader; +import com.lts.core.factory.NamedThreadFactory; +import com.lts.core.logger.Logger; +import com.lts.core.logger.LoggerFactory; +import com.lts.web.response.PageResponse; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * 内部根据用户参数决定是否采用延迟批量刷盘的策略,来提高吞吐量 + * 批量刷盘有两种情况: + * 1. 内存的日志量超过了设置的阀值 + * 2. 每3S检查一次内存中是否有日志,如果有就那么刷盘 + * + * @author Robert HG (254963746@qq.com) on 10/2/15. + */ +public class JobLoggerDelegate implements JobLogger { + + private static final Logger LOGGER = LoggerFactory.getLogger(JobLoggerDelegate.class); + + // 3S 检查输盘一次日志 + private int flushPeriod; + + private JobLogger jobLogger; + private boolean lazyLog = false; + private ScheduledExecutorService executor; + private ScheduledFuture scheduledFuture; + private BlockingQueue memoryQueue; + // 日志批量刷盘数量 + private int batchFlushSize = 100; + private int overflowSize = 10000; + // 内存中最大的日志量阀值 + private int maxMemoryLogSize; + private AtomicBoolean flushing = new AtomicBoolean(false); + + public JobLoggerDelegate(Config config) { + JobLoggerFactory jobLoggerFactory = ExtensionLoader + .getExtensionLoader(JobLoggerFactory.class).getAdaptiveExtension(); + jobLogger = jobLoggerFactory.getJobLogger(config); + lazyLog = config.getParameter(Constants.LAZY_JOB_LOGGER, false); + if (lazyLog) { + + // 无界Queue + memoryQueue = new LinkedBlockingQueue(); + maxMemoryLogSize = config.getParameter(Constants.LAZY_JOB_LOGGER_MEM_SIZE, 1000); + flushPeriod = config.getParameter(Constants.LAZY_JOB_LOGGER_CHECK_PERIOD, 3); + + executor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("LazyJobLogger")); + scheduledFuture = executor.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + try { + if (flushing.compareAndSet(false, true)) { + checkAndFlush(); + } + } catch (Throwable t) { + LOGGER.error("CheckAndFlush log error", t); + } + } + }, flushPeriod, flushPeriod, TimeUnit.SECONDS); + + } + } + + /** + * 检查内存中是否有日志,如果有就批量刷盘 + */ + private void checkAndFlush() { + try { + int nowSize = memoryQueue.size(); + if (nowSize == 0) { + return; + } + List batch = new ArrayList(); + for (int i = 0; i < nowSize; i++) { + JobLogPo jobLogPo = memoryQueue.poll(); + batch.add(jobLogPo); + + if (batch.size() >= batchFlushSize) { + flush(batch); + } + } + if (batch.size() > 0) { + flush(batch); + } + + } finally { + flushing.compareAndSet(true, false); + } + } + + private void checkOverflowSize() { + if (memoryQueue.size() > overflowSize) { + throw new JobLogException("Memory Log size is " + memoryQueue.size() + " , please check the JobLogger is available"); + } + } + + private void flush(List batch) { + boolean flushSuccess = false; + try { + jobLogger.log(batch); + flushSuccess = true; + } finally { + if (!flushSuccess) { + memoryQueue.addAll(batch); + } + batch.clear(); + } + } + + /** + * 检查内存中的日志量是否超过阀值,如果超过需要批量刷盘日志 + */ + private void checkCapacity() { + if (memoryQueue.size() > maxMemoryLogSize) { + // 超过阀值,需要批量刷盘 + if (flushing.compareAndSet(false, true)) { + // 这里可以采用new Thread, 因为这里只会同时new一个 + new Thread(new Runnable() { + @Override + public void run() { + try { + checkAndFlush(); + } catch (Throwable t) { + LOGGER.error("Capacity full flush error", t); + } + } + }).start(); + } + } + } + + @Override + public void log(JobLogPo jobLogPo) { + if (jobLogPo == null) { + return; + } + if (lazyLog) { + checkOverflowSize(); + memoryQueue.offer(jobLogPo); + checkCapacity(); + } else { + jobLogger.log(jobLogPo); + } + } + + @Override + public void log(List jobLogPos) { + if (CollectionUtils.isEmpty(jobLogPos)) { + return; + } + if (lazyLog) { + checkOverflowSize(); + for (JobLogPo jobLogPo : jobLogPos) { + memoryQueue.offer(jobLogPo); + } + // checkCapacity + checkCapacity(); + } else { + jobLogger.log(jobLogPos); + } + } + + @Override + public PageResponse search(JobLoggerRequest request) { + return jobLogger.search(request); + } + +} diff --git a/lts-logger/lts-logger-api/src/main/java/com/lts/biz/logger/domain/LogType.java b/lts-logger/lts-logger-api/src/main/java/com/lts/biz/logger/domain/LogType.java index 6af6bbab4..46aff7763 100644 --- a/lts-logger/lts-logger-api/src/main/java/com/lts/biz/logger/domain/LogType.java +++ b/lts-logger/lts-logger-api/src/main/java/com/lts/biz/logger/domain/LogType.java @@ -5,6 +5,7 @@ */ public enum LogType { + RECEIVE, // 接受任务 SENT, // 任务发送 开始执行 FINISHED, // 任务执行完成 RESEND, // TaskTracker 重新发送的任务执行结果 diff --git a/lts-logger/lts-logger-mysql/src/main/java/com/lts/biz/logger/mysql/MysqlJobLogger.java b/lts-logger/lts-logger-mysql/src/main/java/com/lts/biz/logger/mysql/MysqlJobLogger.java index af28a5aa5..97e922837 100644 --- a/lts-logger/lts-logger-mysql/src/main/java/com/lts/biz/logger/mysql/MysqlJobLogger.java +++ b/lts-logger/lts-logger-mysql/src/main/java/com/lts/biz/logger/mysql/MysqlJobLogger.java @@ -11,9 +11,9 @@ import com.lts.core.commons.utils.CollectionUtils; import com.lts.core.commons.utils.JSONUtils; import com.lts.core.constant.Level; -import com.lts.web.response.PageResponse; import com.lts.store.jdbc.JdbcRepository; import com.lts.store.jdbc.SqlBuilder; +import com.lts.web.response.PageResponse; import org.apache.commons.dbutils.ResultSetHandler; import java.io.IOException; @@ -78,18 +78,7 @@ public void log(List jobLogPos) { if (CollectionUtils.isEmpty(jobLogPos)) { return; } - String prefixSQL = "INSERT INTO `lts_job_log_po` ( `log_time`, `gmt_created`, `log_type`, `success`, `msg`" + - ",`task_tracker_identity`, `level`, `task_id`, `job_id`" + - ", `priority`, `submit_node_group`, `task_tracker_node_group`, `ext_params`, `need_feedback`" + - ", `cron_expression`, `trigger_time`, `retry_times`) VALUES "; int size = jobLogPos.size(); - for (int i = 0; i < size; i++) { - if (i == size - 1) { - prefixSQL += "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; - } else { - prefixSQL += "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?),"; - } - } Object[][] params = new Object[size][17]; int index = 0; @@ -115,7 +104,7 @@ public void log(List jobLogPos) { } try { - getSqlTemplate().batchUpdate(prefixSQL, params); + getSqlTemplate().batchUpdate(insertSQL, params); } catch (SQLException e) { throw new JobLogException(e); }