Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 32 additions & 37 deletions lts-core/src/main/java/com/lts/core/constant/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,80 +11,75 @@
public interface Constants {

// 可用的处理器个数
public static final int AVAILABLE_PROCESSOR = Runtime.getRuntime().availableProcessors();
int AVAILABLE_PROCESSOR = Runtime.getRuntime().availableProcessors();

public static final String USER_HOME = System.getProperty("user.home");
String USER_HOME = System.getProperty("user.home");

public static final int JOB_TRACKER_DEFAULT_LISTEN_PORT = 35001;
int JOB_TRACKER_DEFAULT_LISTEN_PORT = 35001;

// 默认集群名字
public static final String DEFAULT_CLUSTER_NAME = "defaultCluster";
String DEFAULT_CLUSTER_NAME = "defaultCluster";

// 默认JobTracker节点组
public static final String DEFAULT_NODE_JOB_TRACKER_GROUP = "jobTrackerGroup";
String DEFAULT_NODE_JOB_TRACKER_GROUP = "jobTrackerGroup";
// 默认JobClient节点组
public static final String DEFAULT_NODE_JOB_CLIENT_GROUP = "jobClientGroup";
String DEFAULT_NODE_JOB_CLIENT_GROUP = "jobClientGroup";
// 默认TaskTracker节点组
public static final String DEFAULT_NODE_TASK_TRACKER_GROUP = "taskTrackerGroup";
String DEFAULT_NODE_TASK_TRACKER_GROUP = "taskTrackerGroup";

public static final String CHARSET = "utf-8";
String CHARSET = "utf-8";

public static final int DEFAULT_TIMEOUT = 1000;
int DEFAULT_TIMEOUT = 1000;

public static final String TIMEOUT_KEY = "timeout";
String TIMEOUT_KEY = "timeout";

public static final String SESSION_TIMEOUT_KEY = "session";
String SESSION_TIMEOUT_KEY = "session";

public static final int DEFAULT_SESSION_TIMEOUT = 60 * 1000;
int DEFAULT_SESSION_TIMEOUT = 60 * 1000;

public static final String REGISTER = "register";
String REGISTER = "register";

public static final String UNREGISTER = "unregister";
String UNREGISTER = "unregister";

public static final String SUBSCRIBE = "subscribe";
String SUBSCRIBE = "subscribe";

public static final String UNSUBSCRIBE = "unsubscribe";
String UNSUBSCRIBE = "unsubscribe";
/**
* 注册中心失败事件重试事件
*/
public static final String REGISTRY_RETRY_PERIOD_KEY = "retry.period";
String REGISTRY_RETRY_PERIOD_KEY = "retry.period";

/**
* 重试周期
*/
public static final int DEFAULT_REGISTRY_RETRY_PERIOD = 5 * 1000;
int DEFAULT_REGISTRY_RETRY_PERIOD = 5 * 1000;

public static final Pattern COMMA_SPLIT_PATTERN = Pattern.compile("\\s*[,]+\\s*");
Pattern COMMA_SPLIT_PATTERN = Pattern.compile("\\s*[,]+\\s*");

/**
* 注册中心自动重连时间
*/
public static final String REGISTRY_RECONNECT_PERIOD_KEY = "reconnect.period";
String REGISTRY_RECONNECT_PERIOD_KEY = "reconnect.period";

public static final int DEFAULT_REGISTRY_RECONNECT_PERIOD = 3 * 1000;
int DEFAULT_REGISTRY_RECONNECT_PERIOD = 3 * 1000;

public static final String ZK_CLIENT_KEY = "zk.client";
String ZK_CLIENT_KEY = "zk.client";

public static final String JOB_LOGGER_KEY = "job.logger";
String JOB_LOGGER_KEY = "job.logger";

public static final String JOB_QUEUE_KEY = "job.queue";
String JOB_QUEUE_KEY = "job.queue";
// 客户端提交并发请求size
public static final String JOB_SUBMIT_CONCURRENCY_SIZE = "job.submit.concurrency.size";
public static final int DEFAULT_JOB_SUBMIT_CONCURRENCY_SIZE = 100;
String JOB_SUBMIT_CONCURRENCY_SIZE = "job.submit.concurrency.size";
int DEFAULT_JOB_SUBMIT_CONCURRENCY_SIZE = 100;

public static final String PROCESSOR_THREAD = "job.processor.thread";
public static final int DEFAULT_PROCESSOR_THREAD = 32 + AVAILABLE_PROCESSOR * 5;
String PROCESSOR_THREAD = "job.processor.thread";
int DEFAULT_PROCESSOR_THREAD = 32 + AVAILABLE_PROCESSOR * 5;

public static final int LATCH_TIMEOUT_MILLIS = 10 * 60 * 1000; // 10分钟

// 取任务的时候的并发数控制
public static final String JOB_TAKE_PARALLEL_SIZE = "job.take.parallel.size";
public static final String JOB_TAKE_ACQUIRE_TIMEOUT = "job.take.acquire.timeout";
public static final int DEFAULT_JOB_TAKE_PARALLEL_SIZE = 20;
int LATCH_TIMEOUT_MILLIS = 10 * 60 * 1000; // 10分钟

// 任务最多重试次数
public static final String JOB_MAX_RETRY_TIMES = "job.max.retry.times";
public static final int DEFAULT_JOB_MAX_RETRY_TIMES = 10;
String JOB_MAX_RETRY_TIMES = "job.max.retry.times";
int DEFAULT_JOB_MAX_RETRY_TIMES = 10;

public static final Charset UTF_8 = Charset.forName("UTF-8");
Charset UTF_8 = Charset.forName("UTF-8");
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.lts.example.api;

import com.lts.example.support.MasterChangeListenerImpl;
import com.lts.example.support.NoopJobRunner;
import com.lts.example.support.TestJobRunner;
import com.lts.tasktracker.TaskTracker;

Expand All @@ -19,7 +20,7 @@ public static void main(String[] args) {
// taskTracker.setRegistryAddress("redis://127.0.0.1:6379");
taskTracker.setNodeGroup("test_trade_TaskTracker"); // 同一个TaskTracker集群这个名字相同
taskTracker.setClusterName("test_cluster");
taskTracker.setWorkThreads(20);
taskTracker.setWorkThreads(100);
// 反馈任务给JobTracker失败,存储本地文件路径
// taskTracker.setFailStorePath(Constants.USER_HOME);
// master 节点变化监听器,当有集群中只需要一个节点执行某个事情的时候,可以监听这个事件
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.lts.example.benchmark;

import com.lts.core.commons.utils.StringUtils;
import com.lts.core.domain.Job;
import com.lts.example.support.JobFinishedHandlerImpl;
import com.lts.example.support.MasterChangeListenerImpl;
import com.lts.jobclient.JobClient;
import com.lts.jobclient.RetryJobClient;
import com.lts.jobclient.domain.Response;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;

/**
* @author Robert HG (254963746@qq.com) on 8/13/15.
*/
public class JobClientMain {

public static void main(String[] args) {
// 推荐使用RetryJobClient
final JobClient jobClient = new RetryJobClient();
jobClient.setNodeGroup("test_jobClient");
jobClient.setClusterName("test_cluster");
jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181");
// jobClient.setRegistryAddress("redis://127.0.0.1:6379");
// 任务重试保存地址,默认用户目录下
// jobClient.setFailStorePath(Constants.USER_HOME);
// 任务完成反馈接口
jobClient.setJobFinishedHandler(new JobFinishedHandlerImpl());
// master 节点变化监听器,当有集群中只需要一个节点执行某个事情的时候,可以监听这个事件
jobClient.addMasterChangeListener(new MasterChangeListenerImpl());
// 可选址 leveldb(默认), rocksdb, bekeleydb
// taskTracker.addConfig("job.fail.store", "leveldb");
jobClient.addConfig("job.submit.concurrency.size", "100");
jobClient.start();

try {
// 休息1s 等待 连上JobTracker
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

final AtomicLong num = new AtomicLong();
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));

// 假设分了 20 个 partition

final int partition = 100;

for (int i = 0; i < 100; i++) {

new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Job job = new Job();
job.setTaskId(StringUtils.generateUUID());
job.setTaskTrackerNodeGroup("test_trade_TaskTracker_" + (num.get() % partition));
job.setParam("shopId", "111");
job.setNeedFeedback(false);

Response response = jobClient.submitJob(job);
if (!response.isSuccess()) {
System.out.println(response.getMsg());
} else {
num.incrementAndGet();
}
}
}
}).start();
}

new Thread(new Runnable() {
@Override
public void run() {
while(true){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(num.get());
}
}
}).start();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.lts.example.benchmark;

import com.lts.example.support.MasterChangeListenerImpl;
import com.lts.jobtracker.JobTracker;
import com.lts.jobtracker.support.policy.OldDataDeletePolicy;

/**
* @author Robert HG (254963746@qq.com) on 8/13/15.
*/
public class JobTrackerMain {

public static void main(String[] args) {

// final JobTracker jobTracker = new JobTracker();
// // 节点信息配置
// jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
//// jobTracker.setRegistryAddress("redis://127.0.0.1:6379");
// jobTracker.setListenPort(35001); // 默认 35001
// jobTracker.setClusterName("test_cluster");
//
// jobTracker.addMasterChangeListener(new MasterChangeListenerImpl());
//
// // 设置业务日志记录 mysql
// jobTracker.addConfig("job.logger", "mysql");
// // 任务队列用mysql
// jobTracker.addConfig("job.queue", "mysql");
// // mysql 配置
// jobTracker.addConfig("jdbc.url", "jdbc:mysql://127.0.0.1:3306/lts");
// jobTracker.addConfig("jdbc.username", "root");
// jobTracker.addConfig("jdbc.password", "root");
//
// jobTracker.setOldDataHandler(new OldDataDeletePolicy());
// // 设置 zk 客户端用哪个, 可选 zkclient, curator 默认是 zkclient
//// jobTracker.addConfig("zk.client", "zkclient");
// // 启动节点
// jobTracker.start();

final JobTracker jobTracker = new JobTracker();
// 节点信息配置
jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
jobTracker.setClusterName("test_cluster"); // 三种节点都要保持一致
// jobTracker.setListenPort(50001);
// master 节点变化监听器,当有集群中只需要一个节点执行某个事情的时候,可以监听这个事件
jobTracker.addMasterChangeListener(new MasterChangeListenerImpl());
// 设置业务日志记录, 可选值: mongo, mysql , console
jobTracker.addConfig("job.logger", "mongo");
// 任务队列用mongo
jobTracker.addConfig("job.queue", "mongo");
// mongo 配置
jobTracker.addConfig("mongo.addresses", "127.0.0.1:27017"); // 多个地址用逗号分割
jobTracker.addConfig("mongo.database", "lts2");
// 这个是对于 返回给客户端 任务的 老数据删除策略
jobTracker.setOldDataHandler(new OldDataDeletePolicy());
// 设置 zk 客户端用哪个, 可选 zkclient(默认), curator
jobTracker.addConfig("zk.client", "zkclient");
// 启动节点
jobTracker.start();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
jobTracker.stop();
}
}));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.lts.example.benchmark;

import com.lts.example.support.MasterChangeListenerImpl;
import com.lts.example.support.NoopJobRunner;
import com.lts.tasktracker.TaskTracker;

/**
* @author Robert HG (254963746@qq.com) on 8/13/15.
*/
public class TaskTrackerMain {

public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
startTaskTracker(i);
}
}

private static void startTaskTracker(int index) {
final TaskTracker taskTracker = new TaskTracker();
// 任务执行类,实现JobRunner 接口
taskTracker.setJobRunnerClass(NoopJobRunner.class);
taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181");
// taskTracker.setRegistryAddress("redis://127.0.0.1:6379");
taskTracker.setNodeGroup("test_trade_TaskTracker_" + index); // 同一个TaskTracker集群这个名字相同
taskTracker.setClusterName("test_cluster");
taskTracker.setWorkThreads(10);
// 反馈任务给JobTracker失败,存储本地文件路径
// taskTracker.setFailStorePath(Constants.USER_HOME);
// master 节点变化监听器,当有集群中只需要一个节点执行某个事情的时候,可以监听这个事件
taskTracker.addMasterChangeListener(new MasterChangeListenerImpl());
// 业务日志级别
// taskTracker.setBizLoggerLevel(Level.INFO);
// 可选址 leveldb(默认), rocksdb, bekeleydb
// taskTracker.addConfig("job.fail.store", "leveldb");
taskTracker.start();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.lts.example.support;

import com.lts.core.domain.Action;
import com.lts.core.domain.Job;
import com.lts.tasktracker.Result;
import com.lts.tasktracker.runner.JobRunner;

import java.util.concurrent.atomic.AtomicInteger;

/**
* @author Robert HG (254963746@qq.com) on 8/13/15.
*/
public class NoopJobRunner implements JobRunner {

static AtomicInteger num = new AtomicInteger(0);
@Override
public Result run(Job job) throws Throwable {
System.out.println(num.incrementAndGet());
return new Result(Action.EXECUTE_SUCCESS);
}
}
4 changes: 2 additions & 2 deletions lts-example/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

log4j.rootLogger=INFO,stdout
log4j.rootLogger=WARN,stdout

log4j.appender.stdout.Threshold=INFO
log4j.appender.stdout.Threshold=WARN
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%-5p] [%d{HH:mm:ss}] %c - %m%n
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class JobTracker extends AbstractServerNode<JobTrackerNode, JobTrackerApp
private JobFeedbackQueueFactory jobFeedbackQueueFactory = ExtensionLoader.getExtensionLoader(JobFeedbackQueueFactory.class).getAdaptiveExtension();
private NodeGroupStoreFactory nodeGroupStoreFactory = ExtensionLoader.getExtensionLoader(NodeGroupStoreFactory.class).getAdaptiveExtension();

private PreLoaderFactory preLoaderFactory = ExtensionLoader.getExtensionLoader(PreLoaderFactory.class).getAdaptiveExtension();

public JobTracker() {
// 添加节点变化监听器
addNodeChangeListener(new JobNodeChangeListener(application));
Expand All @@ -50,7 +52,7 @@ protected void innerStart() {
application.setCronJobQueue(cronJobQueueFactory.getQueue(config));
application.setJobFeedbackQueue(jobFeedbackQueueFactory.getQueue(config));
application.setNodeGroupStore(nodeGroupStoreFactory.getStore(config));

application.setPreLoader(preLoaderFactory.getPreLoader(config, application));
application.getChannelManager().start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ public class JobTrackerApplication extends Application {
// job id generator
private IdGenerator idGenerator;

private PreLoader preLoader;

public PreLoader getPreLoader() {
return preLoader;
}

public void setPreLoader(PreLoader preLoader) {
this.preLoader = preLoader;
}

public JobLogger getJobLogger() {
return jobLogger;
}
Expand Down
Loading