Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lts-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
<artifactId>lts-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.lts</groupId>
<artifactId>lts-jobclient</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.lts</groupId>
<artifactId>lts-queue-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,25 @@
import com.lts.biz.logger.domain.JobLogPo;
import com.lts.biz.logger.domain.JobLoggerRequest;
import com.lts.core.commons.utils.Assert;
import com.lts.core.commons.utils.CollectionUtils;
import com.lts.core.commons.utils.StringUtils;
import com.lts.core.domain.Job;
import com.lts.core.support.CronExpression;
import com.lts.core.support.SystemClock;
import com.lts.jobclient.domain.Response;
import com.lts.queue.domain.JobPo;
import com.lts.web.cluster.AdminApplication;
import com.lts.web.controller.AbstractController;
import com.lts.web.request.JobQueueRequest;
import com.lts.web.response.PageResponse;
import com.lts.web.support.LtsAdminJobClient;
import com.lts.web.vo.RestfulResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.text.ParseException;
import java.util.Date;
import java.util.Map;

/**
* @author Robert HG (254963746@qq.com) on 6/6/15.
Expand All @@ -27,6 +31,8 @@ public class JobQueueApiController extends AbstractController {

@Autowired
AdminApplication application;
@Autowired
LtsAdminJobClient ltsAdminJobClient;

@RequestMapping("/job-queue/cron-job-get")
public RestfulResponse cronJobGet(JobQueueRequest request) {
Expand Down Expand Up @@ -149,7 +155,7 @@ public RestfulResponse cronJobDelete(JobQueueRequest request) {
return response;
}
boolean success = application.getCronJobQueue().remove(request.getJobId());
if(success){
if (success) {
try {
application.getExecutableJobQueue().remove(request.getTaskTrackerNodeGroup(), request.getJobId());
} catch (Exception e) {
Expand Down Expand Up @@ -187,7 +193,7 @@ public RestfulResponse executableJobDelete(JobQueueRequest request) {
}
}
response.setSuccess(true);
} else{
} else {
response.setSuccess(false);
response.setMsg("更新失败,该条任务可能已经删除.");
}
Expand All @@ -199,14 +205,14 @@ public RestfulResponse executableJobDelete(JobQueueRequest request) {
public RestfulResponse jobLoggerGet(JobLoggerRequest request) {
RestfulResponse response = new RestfulResponse();

try {
// try {
// Assert.hasLength(request.getTaskId(), "taskId不能为空!");
// Assert.hasLength(request.getTaskTrackerNodeGroup(), "taskTrackerNodeGroup不能为空!");
} catch (IllegalArgumentException e) {
response.setSuccess(false);
response.setMsg(e.getMessage());
return response;
}
// } catch (IllegalArgumentException e) {
// response.setSuccess(false);
// response.setMsg(e.getMessage());
// return response;
// }

PageResponse<JobLogPo> pageResponse = application.getJobLogger().search(request);
response.setResults(pageResponse.getResults());
Expand All @@ -221,7 +227,6 @@ public RestfulResponse jobAdd(JobQueueRequest request) {
RestfulResponse response = new RestfulResponse();
// 表单check

Long triggerTime = null;
try {
Assert.hasLength(request.getTaskId(), "taskId不能为空!");
Assert.hasLength(request.getTaskTrackerNodeGroup(), "taskTrackerNodeGroup不能为空!");
Expand All @@ -236,7 +241,7 @@ public RestfulResponse jobAdd(JobQueueRequest request) {
response.setMsg(StringUtils.format("该CronExpression={} 已经没有执行时间点!", request.getCronExpression()));
return response;
} else {
triggerTime = nextTime.getTime();
request.setTriggerTime(nextTime);
}
} catch (ParseException e) {
response.setSuccess(false);
Expand All @@ -251,38 +256,36 @@ public RestfulResponse jobAdd(JobQueueRequest request) {
return response;
}

addJob(request, triggerTime);

response.setSuccess(true);
Response ltsResponse = addJob(request);
if (ltsResponse.isSuccess()) {
response.setSuccess(true);
} else {
response.setSuccess(false);
response.setMsg("提交失败: " + ltsResponse.getMsg());
response.setCode(ltsResponse.getCode());
}
return response;
}

private void addJob(JobQueueRequest request, Long triggerTime) {
JobPo jobPo = new JobPo();
// 这里暂时用UUID来代替
jobPo.setJobId(StringUtils.generateUUID());
jobPo.setCronExpression(request.getCronExpression());
jobPo.setExtParams(request.getExtParams());
jobPo.setGmtCreated(SystemClock.now());
jobPo.setGmtModified(jobPo.getGmtCreated());
jobPo.setNeedFeedback(request.getNeedFeedback());
jobPo.setPriority(request.getPriority());
jobPo.setTaskId(request.getTaskId());
jobPo.setSubmitNodeGroup(request.getSubmitNodeGroup());
jobPo.setTaskTrackerNodeGroup(request.getTaskTrackerNodeGroup());
if (request.getTriggerTime() != null) {
jobPo.setTriggerTime(request.getTriggerTime().getTime());
}
private Response addJob(JobQueueRequest request) {

if (jobPo.isSchedule()) {
application.getCronJobQueue().add(jobPo);
if (triggerTime != null) {
jobPo.setTriggerTime(triggerTime);
Job job = new Job();
job.setTaskId(request.getTaskId());
if (CollectionUtils.isNotEmpty(request.getExtParams())) {
for (Map.Entry<String, String> entry : request.getExtParams().entrySet()) {
job.setParam(entry.getKey(), entry.getValue());
}
}
if (jobPo.getTriggerTime() == null) {
jobPo.setTriggerTime(SystemClock.now());
}
application.getExecutableJobQueue().add(jobPo);
// 执行节点的group名称
job.setTaskTrackerNodeGroup(request.getTaskTrackerNodeGroup());
job.setSubmitNodeGroup(request.getSubmitNodeGroup());

job.setNeedFeedback(request.getNeedFeedback());
// 这个是 cron expression 和 quartz 一样,可选
job.setCronExpression(request.getCronExpression());
job.setTriggerTime(request.getTriggerTime());
job.setPriority(request.getPriority());

return ltsAdminJobClient.submitJob(job);
}
}
36 changes: 36 additions & 0 deletions lts-admin/src/main/java/com/lts/web/support/LtsAdminJobClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.lts.web.support;

import com.lts.core.commons.utils.StringUtils;
import com.lts.core.domain.Job;
import com.lts.jobclient.JobClient;
import com.lts.jobclient.domain.Response;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

/**
* @author Robert HG (254963746@qq.com) on 10/3/15.
*/
@Component
public class LtsAdminJobClient implements InitializingBean {

private JobClient jobClient;

public Response submitJob(Job job) {
return jobClient.submitJob(job);
}

@Override
public void afterPropertiesSet() throws Exception {
jobClient = new JobClient();
jobClient.setNodeGroup("LTS-Admin");
String clusterName = AppConfigurer.getProperties("clusterName");
if (StringUtils.isEmpty(clusterName)) {
throw new IllegalArgumentException("clusterName in lts-admin.cfg can not be null.");
}
jobClient.setClusterName(clusterName);
jobClient.setRegistryAddress(AppConfigurer.getProperties("registryAddress"));

jobClient.start();
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.lts.web.support.spring;

import com.lts.biz.logger.JobLoggerFactory;
import com.lts.biz.logger.JobLoggerDelegate;
import com.lts.core.cluster.Config;
import com.lts.core.cluster.Node;
import com.lts.core.cluster.NodeType;
Expand All @@ -9,7 +9,6 @@
import com.lts.core.constant.Constants;
import com.lts.core.extension.ExtensionLoader;
import com.lts.core.registry.RegistryStatMonitor;
import com.lts.ec.EventCenter;
import com.lts.ec.EventCenterFactory;
import com.lts.queue.*;
import com.lts.web.cluster.AdminApplication;
Expand All @@ -32,8 +31,6 @@ public class AdminAppFactoryBean implements FactoryBean<AdminApplication>, Initi
ExecutingJobQueueFactory.class).getAdaptiveExtension();
NodeGroupStoreFactory nodeGroupStoreFactory = ExtensionLoader.getExtensionLoader(
NodeGroupStoreFactory.class).getAdaptiveExtension();
JobLoggerFactory jobLoggerFactory = ExtensionLoader.getExtensionLoader(
JobLoggerFactory.class).getAdaptiveExtension();
JobFeedbackQueueFactory jobFeedbackQueueFactory = ExtensionLoader.getExtensionLoader(
JobFeedbackQueueFactory.class).getAdaptiveExtension();
private EventCenterFactory eventCenterFactory = ExtensionLoader.getExtensionLoader(EventCenterFactory.class).getAdaptiveExtension();
Expand Down Expand Up @@ -90,7 +87,7 @@ public void afterPropertiesSet() throws Exception {
application.setExecutableJobQueue(executableJobQueueFactory.getQueue(config));
application.setExecutingJobQueue(executingJobQueueFactory.getQueue(config));
application.setNodeGroupStore(nodeGroupStoreFactory.getStore(config));
application.setJobLogger(jobLoggerFactory.getJobLogger(config));
application.setJobLogger(new JobLoggerDelegate(config));
application.setEventCenter(eventCenterFactory.getEventCenter(config));
application.setRegistryStatMonitor(new RegistryStatMonitor(application));
}
Expand Down
10 changes: 10 additions & 0 deletions lts-admin/src/main/webapp/views/templates/jobAdd.vm
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@
}
});

$(document).on("change", "select[name='submitNodeGroup']", function () {
var submitNodeGroup = $(this).val();
if (submitNodeGroup == 'LTS-Admin') {
$("select[name='needFeedback']").selectpicker("val", "false");
$("select[name='needFeedback']").attr("disabled", "disabled");
} else {
$("select[name='needFeedback']").attr("disabled", null);
}
});

$(document).on("click", "#addBtn", function () {
var params = {};
$.each($('#form').parent().find(".form-control"), function () {
Expand Down
1 change: 1 addition & 0 deletions lts-admin/src/main/webapp/views/templates/jobLogger.vm
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@
$(document).ready(function () {

var LOG_TYPE = {
RECEIVE: '接受任务',
SENT: '派发任务',
FINISHED: '完成任务',
FIXED_DEAD: '修复死任务',
Expand Down
4 changes: 2 additions & 2 deletions lts-admin/src/main/webapp/views/templates/nodeGroupManager.vm
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@
{{/each}}
{{if results == 0}}
<tr>
<td colspan="15">暂无数据</td>
<td colspan="4">暂无数据</td>
</tr>
{{/if}}
</tbody>
<tfoot>
<tr>
<td colspan="9">
<td colspan="4">
<span>共{{results}}条记录,每页展示{{pageSize}}条</span>
<ul class="pagination-sm pull-right"></ul>
</td>
Expand Down
3 changes: 2 additions & 1 deletion lts-admin/src/main/webapp/views/templates/nodeJvmInfo.vm
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@
showJVMInfo(identity, data);
});
} else {
if (json) {
if (json && json['msg']) {
swal(json['msg']);
}
showJVMInfo('', {});
}
}
});
Expand Down
6 changes: 3 additions & 3 deletions lts-admin/src/main/webapp/views/templates/nodeManager.vm
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@
<div class="col-sm-2">
<select name="nodeType" class="form-control">
<option value="">所有</option>
<option value="JOB_CLIENT">JobClient</option>
<option value="TASK_TRACKER">TaskTracker</option>
<option value="JOB_TRACKER">JobTracker</option>
<option value="JOB_CLIENT">JOB_CLIENT</option>
<option value="TASK_TRACKER">TASK_TRACKER</option>
<option value="JOB_TRACKER">JOB_TRACKER</option>
</select>
</div>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ final public void stop() {
try {
if (started.compareAndSet(true, false)) {

registry.unregister(node);
if (registry != null) {
registry.unregister(node);
}

preRemotingStop();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private static InetAddress getLocalAddress0() {
} catch (Exception e) {
logger.warn("Failed to retriving ip address, " + e.getMessage(), e);
}
logger.error("Could not get local host ip address, will use 127.0.0.1 instead.");
// logger.error("Could not get local host ip address, will use 127.0.0.1 instead.");
return localAddress;
}

Expand Down
7 changes: 7 additions & 0 deletions lts-core/src/main/java/com/lts/core/constant/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,11 @@ public interface Constants {

String ADMIN_ID_PREFIX = "LTS_admin_";

// 是否延迟批量刷盘日志, 如果启用,采用队列的方式批量将日志刷盘(在应用关闭的时候,可能会造成日志丢失)
String LAZY_JOB_LOGGER = "lazy.job.logger";
// 延迟批量刷盘日志 内存中的最大日志量阀值
String LAZY_JOB_LOGGER_MEM_SIZE = "lazy.job.logger.mem.size";
// 延迟批量刷盘日志 检查频率
String LAZY_JOB_LOGGER_CHECK_PERIOD = "lazy.job.logger.check.period";

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,12 @@ public static void testMysqlQueue() {
// 任务队列用mysql
jobTracker.addConfig("job.queue", "mysql");
// mysql 配置
jobTracker.addConfig("jdbc.url", "jdbc:mysql://127.0.0.1:3306/lts");
jobTracker.addConfig("jdbc.username", "root");
jobTracker.addConfig("jdbc.password", "root");
jobTracker.addConfig("jdbc.url", "jdbc:mysql://127.0.0.1:3306/lts");
jobTracker.addConfig("jdbc.username", "root");
jobTracker.addConfig("jdbc.password", "root");

// 延迟批量刷盘业务日志开关
// jobTracker.addConfig("lazy.job.logger", "true");

jobTracker.setOldDataHandler(new OldDataDeletePolicy());
// 设置 zk 客户端用哪个, 可选 zkclient, curator 默认是 zkclient
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.lts.jobtracker;

import com.lts.biz.logger.JobLoggerDelegate;
import com.lts.biz.logger.JobLoggerFactory;
import com.lts.core.cluster.AbstractServerNode;
import com.lts.core.extension.ExtensionLoader;
Expand All @@ -21,8 +22,6 @@
*/
public class JobTracker extends AbstractServerNode<JobTrackerNode, JobTrackerApplication> {

private JobLoggerFactory jobLoggerFactory = ExtensionLoader.getExtensionLoader(JobLoggerFactory.class).getAdaptiveExtension();

private CronJobQueueFactory cronJobQueueFactory =
ExtensionLoader.getExtensionLoader(CronJobQueueFactory.class).getAdaptiveExtension();
private ExecutableJobQueueFactory executableJobQueueFactory =
Expand Down Expand Up @@ -56,7 +55,7 @@ protected void preRemotingStart() {
super.preRemotingStart();
// injectRemotingServer
application.setRemotingServer(remotingServer);
application.setJobLogger(jobLoggerFactory.getJobLogger(config));
application.setJobLogger(new JobLoggerDelegate(config));
application.setExecutableJobQueue(executableJobQueueFactory.getQueue(config));
application.setExecutingJobQueue(executingJobQueueFactory.getQueue(config));
application.setCronJobQueue(cronJobQueueFactory.getQueue(config));
Expand Down
Loading