From 796fd8783094e9ab9a31d56235686dd19a17def7 Mon Sep 17 00:00:00 2001 From: hugui <254963746@qq.com> Date: Fri, 14 Aug 2015 09:32:13 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=20=E9=98=9F=E5=88=97?= =?UTF-8?q?=E7=AD=96=E7=95=A5=EF=BC=8C=E5=A4=A7=E5=A4=A7=E6=8F=90=E5=8D=87?= =?UTF-8?q?=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/lts/core/constant/Constants.java | 69 +++++++------- .../com/lts/example/api/TaskTrackerTest.java | 3 +- .../lts/example/benchmark/JobClientMain.java | 90 +++++++++++++++++++ .../lts/example/benchmark/JobTrackerMain.java | 65 ++++++++++++++ .../example/benchmark/TaskTrackerMain.java | 38 ++++++++ .../lts/example/support/NoopJobRunner.java | 21 +++++ .../src/main/resources/log4j.properties | 4 +- .../java/com/lts/jobtracker/JobTracker.java | 4 +- .../domain/JobTrackerApplication.java | 10 +++ .../processor/JobFinishedProcessor.java | 2 +- .../com/lts/jobtracker/support/JobPusher.java | 9 +- .../com/lts/queue/ExecutableJobQueue.java | 5 -- .../queue/mongo/MongoExecutableJobQueue.java | 62 ------------- .../queue/mysql/MysqlExecutableJobQueue.java | 80 ----------------- 14 files changed, 265 insertions(+), 197 deletions(-) create mode 100644 lts-example/src/main/java/com/lts/example/benchmark/JobClientMain.java create mode 100644 lts-example/src/main/java/com/lts/example/benchmark/JobTrackerMain.java create mode 100644 lts-example/src/main/java/com/lts/example/benchmark/TaskTrackerMain.java create mode 100644 lts-example/src/main/java/com/lts/example/support/NoopJobRunner.java diff --git a/lts-core/src/main/java/com/lts/core/constant/Constants.java b/lts-core/src/main/java/com/lts/core/constant/Constants.java index 575b3724f..daa986dc7 100644 --- a/lts-core/src/main/java/com/lts/core/constant/Constants.java +++ b/lts-core/src/main/java/com/lts/core/constant/Constants.java @@ -11,80 +11,75 @@ public interface Constants { // 可用的处理器个数 - public static final int AVAILABLE_PROCESSOR = Runtime.getRuntime().availableProcessors(); + int AVAILABLE_PROCESSOR = Runtime.getRuntime().availableProcessors(); - public static final String USER_HOME = System.getProperty("user.home"); + String USER_HOME = System.getProperty("user.home"); - public static final int JOB_TRACKER_DEFAULT_LISTEN_PORT = 35001; + int JOB_TRACKER_DEFAULT_LISTEN_PORT = 35001; // 默认集群名字 - public static final String DEFAULT_CLUSTER_NAME = "defaultCluster"; + String DEFAULT_CLUSTER_NAME = "defaultCluster"; // 默认JobTracker节点组 - public static final String DEFAULT_NODE_JOB_TRACKER_GROUP = "jobTrackerGroup"; + String DEFAULT_NODE_JOB_TRACKER_GROUP = "jobTrackerGroup"; // 默认JobClient节点组 - public static final String DEFAULT_NODE_JOB_CLIENT_GROUP = "jobClientGroup"; + String DEFAULT_NODE_JOB_CLIENT_GROUP = "jobClientGroup"; // 默认TaskTracker节点组 - public static final String DEFAULT_NODE_TASK_TRACKER_GROUP = "taskTrackerGroup"; + String DEFAULT_NODE_TASK_TRACKER_GROUP = "taskTrackerGroup"; - public static final String CHARSET = "utf-8"; + String CHARSET = "utf-8"; - public static final int DEFAULT_TIMEOUT = 1000; + int DEFAULT_TIMEOUT = 1000; - public static final String TIMEOUT_KEY = "timeout"; + String TIMEOUT_KEY = "timeout"; - public static final String SESSION_TIMEOUT_KEY = "session"; + String SESSION_TIMEOUT_KEY = "session"; - public static final int DEFAULT_SESSION_TIMEOUT = 60 * 1000; + int DEFAULT_SESSION_TIMEOUT = 60 * 1000; - public static final String REGISTER = "register"; + String REGISTER = "register"; - public static final String UNREGISTER = "unregister"; + String UNREGISTER = "unregister"; - public static final String SUBSCRIBE = "subscribe"; + String SUBSCRIBE = "subscribe"; - public static final String UNSUBSCRIBE = "unsubscribe"; + String UNSUBSCRIBE = "unsubscribe"; /** * 注册中心失败事件重试事件 */ - public static final String REGISTRY_RETRY_PERIOD_KEY = "retry.period"; + String REGISTRY_RETRY_PERIOD_KEY = "retry.period"; /** * 重试周期 */ - public static final int DEFAULT_REGISTRY_RETRY_PERIOD = 5 * 1000; + int DEFAULT_REGISTRY_RETRY_PERIOD = 5 * 1000; - public static final Pattern COMMA_SPLIT_PATTERN = Pattern.compile("\\s*[,]+\\s*"); + Pattern COMMA_SPLIT_PATTERN = Pattern.compile("\\s*[,]+\\s*"); /** * 注册中心自动重连时间 */ - public static final String REGISTRY_RECONNECT_PERIOD_KEY = "reconnect.period"; + String REGISTRY_RECONNECT_PERIOD_KEY = "reconnect.period"; - public static final int DEFAULT_REGISTRY_RECONNECT_PERIOD = 3 * 1000; + int DEFAULT_REGISTRY_RECONNECT_PERIOD = 3 * 1000; - public static final String ZK_CLIENT_KEY = "zk.client"; + String ZK_CLIENT_KEY = "zk.client"; - public static final String JOB_LOGGER_KEY = "job.logger"; + String JOB_LOGGER_KEY = "job.logger"; - public static final String JOB_QUEUE_KEY = "job.queue"; + String JOB_QUEUE_KEY = "job.queue"; // 客户端提交并发请求size - public static final String JOB_SUBMIT_CONCURRENCY_SIZE = "job.submit.concurrency.size"; - public static final int DEFAULT_JOB_SUBMIT_CONCURRENCY_SIZE = 100; + String JOB_SUBMIT_CONCURRENCY_SIZE = "job.submit.concurrency.size"; + int DEFAULT_JOB_SUBMIT_CONCURRENCY_SIZE = 100; - public static final String PROCESSOR_THREAD = "job.processor.thread"; - public static final int DEFAULT_PROCESSOR_THREAD = 32 + AVAILABLE_PROCESSOR * 5; + String PROCESSOR_THREAD = "job.processor.thread"; + int DEFAULT_PROCESSOR_THREAD = 32 + AVAILABLE_PROCESSOR * 5; - public static final int LATCH_TIMEOUT_MILLIS = 10 * 60 * 1000; // 10分钟 - - // 取任务的时候的并发数控制 - public static final String JOB_TAKE_PARALLEL_SIZE = "job.take.parallel.size"; - public static final String JOB_TAKE_ACQUIRE_TIMEOUT = "job.take.acquire.timeout"; - public static final int DEFAULT_JOB_TAKE_PARALLEL_SIZE = 20; + int LATCH_TIMEOUT_MILLIS = 10 * 60 * 1000; // 10分钟 // 任务最多重试次数 - public static final String JOB_MAX_RETRY_TIMES = "job.max.retry.times"; - public static final int DEFAULT_JOB_MAX_RETRY_TIMES = 10; + String JOB_MAX_RETRY_TIMES = "job.max.retry.times"; + int DEFAULT_JOB_MAX_RETRY_TIMES = 10; - public static final Charset UTF_8 = Charset.forName("UTF-8"); + Charset UTF_8 = Charset.forName("UTF-8"); } diff --git a/lts-example/src/main/java/com/lts/example/api/TaskTrackerTest.java b/lts-example/src/main/java/com/lts/example/api/TaskTrackerTest.java index 391320944..744975e74 100644 --- a/lts-example/src/main/java/com/lts/example/api/TaskTrackerTest.java +++ b/lts-example/src/main/java/com/lts/example/api/TaskTrackerTest.java @@ -1,6 +1,7 @@ package com.lts.example.api; import com.lts.example.support.MasterChangeListenerImpl; +import com.lts.example.support.NoopJobRunner; import com.lts.example.support.TestJobRunner; import com.lts.tasktracker.TaskTracker; @@ -19,7 +20,7 @@ public static void main(String[] args) { // taskTracker.setRegistryAddress("redis://127.0.0.1:6379"); taskTracker.setNodeGroup("test_trade_TaskTracker"); // 同一个TaskTracker集群这个名字相同 taskTracker.setClusterName("test_cluster"); - taskTracker.setWorkThreads(20); + taskTracker.setWorkThreads(100); // 反馈任务给JobTracker失败,存储本地文件路径 // taskTracker.setFailStorePath(Constants.USER_HOME); // master 节点变化监听器,当有集群中只需要一个节点执行某个事情的时候,可以监听这个事件 diff --git a/lts-example/src/main/java/com/lts/example/benchmark/JobClientMain.java b/lts-example/src/main/java/com/lts/example/benchmark/JobClientMain.java new file mode 100644 index 000000000..9810b110d --- /dev/null +++ b/lts-example/src/main/java/com/lts/example/benchmark/JobClientMain.java @@ -0,0 +1,90 @@ +package com.lts.example.benchmark; + +import com.lts.core.commons.utils.StringUtils; +import com.lts.core.domain.Job; +import com.lts.example.support.JobFinishedHandlerImpl; +import com.lts.example.support.MasterChangeListenerImpl; +import com.lts.jobclient.JobClient; +import com.lts.jobclient.RetryJobClient; +import com.lts.jobclient.domain.Response; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.atomic.AtomicLong; + +/** + * @author Robert HG (254963746@qq.com) on 8/13/15. + */ +public class JobClientMain { + + public static void main(String[] args) { + // 推荐使用RetryJobClient + final JobClient jobClient = new RetryJobClient(); + jobClient.setNodeGroup("test_jobClient"); + jobClient.setClusterName("test_cluster"); + jobClient.setRegistryAddress("zookeeper://127.0.0.1:2181"); + // jobClient.setRegistryAddress("redis://127.0.0.1:6379"); + // 任务重试保存地址,默认用户目录下 + // jobClient.setFailStorePath(Constants.USER_HOME); + // 任务完成反馈接口 + jobClient.setJobFinishedHandler(new JobFinishedHandlerImpl()); + // master 节点变化监听器,当有集群中只需要一个节点执行某个事情的时候,可以监听这个事件 + jobClient.addMasterChangeListener(new MasterChangeListenerImpl()); + // 可选址 leveldb(默认), rocksdb, bekeleydb + // taskTracker.addConfig("job.fail.store", "leveldb"); + jobClient.addConfig("job.submit.concurrency.size", "100"); + jobClient.start(); + + try { + // 休息1s 等待 连上JobTracker + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + final AtomicLong num = new AtomicLong(); + System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); + + // 假设分了 20 个 partition + + final int partition = 100; + + for (int i = 0; i < 100; i++) { + + new Thread(new Runnable() { + @Override + public void run() { + while (true) { + Job job = new Job(); + job.setTaskId(StringUtils.generateUUID()); + job.setTaskTrackerNodeGroup("test_trade_TaskTracker_" + (num.get() % partition)); + job.setParam("shopId", "111"); + job.setNeedFeedback(false); + + Response response = jobClient.submitJob(job); + if (!response.isSuccess()) { + System.out.println(response.getMsg()); + } else { + num.incrementAndGet(); + } + } + } + }).start(); + } + + new Thread(new Runnable() { + @Override + public void run() { + while(true){ + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println(num.get()); + } + } + }).start(); + } + +} diff --git a/lts-example/src/main/java/com/lts/example/benchmark/JobTrackerMain.java b/lts-example/src/main/java/com/lts/example/benchmark/JobTrackerMain.java new file mode 100644 index 000000000..76b11f194 --- /dev/null +++ b/lts-example/src/main/java/com/lts/example/benchmark/JobTrackerMain.java @@ -0,0 +1,65 @@ +package com.lts.example.benchmark; + +import com.lts.example.support.MasterChangeListenerImpl; +import com.lts.jobtracker.JobTracker; +import com.lts.jobtracker.support.policy.OldDataDeletePolicy; + +/** + * @author Robert HG (254963746@qq.com) on 8/13/15. + */ +public class JobTrackerMain { + + public static void main(String[] args) { + +// final JobTracker jobTracker = new JobTracker(); +// // 节点信息配置 +// jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181"); +//// jobTracker.setRegistryAddress("redis://127.0.0.1:6379"); +// jobTracker.setListenPort(35001); // 默认 35001 +// jobTracker.setClusterName("test_cluster"); +// +// jobTracker.addMasterChangeListener(new MasterChangeListenerImpl()); +// +// // 设置业务日志记录 mysql +// jobTracker.addConfig("job.logger", "mysql"); +// // 任务队列用mysql +// jobTracker.addConfig("job.queue", "mysql"); +// // mysql 配置 +// jobTracker.addConfig("jdbc.url", "jdbc:mysql://127.0.0.1:3306/lts"); +// jobTracker.addConfig("jdbc.username", "root"); +// jobTracker.addConfig("jdbc.password", "root"); +// +// jobTracker.setOldDataHandler(new OldDataDeletePolicy()); +// // 设置 zk 客户端用哪个, 可选 zkclient, curator 默认是 zkclient +//// jobTracker.addConfig("zk.client", "zkclient"); +// // 启动节点 +// jobTracker.start(); + + final JobTracker jobTracker = new JobTracker(); + // 节点信息配置 + jobTracker.setRegistryAddress("zookeeper://127.0.0.1:2181"); + jobTracker.setClusterName("test_cluster"); // 三种节点都要保持一致 +// jobTracker.setListenPort(50001); + // master 节点变化监听器,当有集群中只需要一个节点执行某个事情的时候,可以监听这个事件 + jobTracker.addMasterChangeListener(new MasterChangeListenerImpl()); + // 设置业务日志记录, 可选值: mongo, mysql , console + jobTracker.addConfig("job.logger", "mongo"); + // 任务队列用mongo + jobTracker.addConfig("job.queue", "mongo"); + // mongo 配置 + jobTracker.addConfig("mongo.addresses", "127.0.0.1:27017"); // 多个地址用逗号分割 + jobTracker.addConfig("mongo.database", "lts2"); + // 这个是对于 返回给客户端 任务的 老数据删除策略 + jobTracker.setOldDataHandler(new OldDataDeletePolicy()); + // 设置 zk 客户端用哪个, 可选 zkclient(默认), curator + jobTracker.addConfig("zk.client", "zkclient"); + // 启动节点 + jobTracker.start(); + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + jobTracker.stop(); + } + })); + } +} diff --git a/lts-example/src/main/java/com/lts/example/benchmark/TaskTrackerMain.java b/lts-example/src/main/java/com/lts/example/benchmark/TaskTrackerMain.java new file mode 100644 index 000000000..0b07d8e55 --- /dev/null +++ b/lts-example/src/main/java/com/lts/example/benchmark/TaskTrackerMain.java @@ -0,0 +1,38 @@ +package com.lts.example.benchmark; + +import com.lts.example.support.MasterChangeListenerImpl; +import com.lts.example.support.NoopJobRunner; +import com.lts.tasktracker.TaskTracker; + +/** + * @author Robert HG (254963746@qq.com) on 8/13/15. + */ +public class TaskTrackerMain { + + public static void main(String[] args) { + for (int i = 0; i < 10; i++) { + startTaskTracker(i); + } + } + + private static void startTaskTracker(int index) { + final TaskTracker taskTracker = new TaskTracker(); + // 任务执行类,实现JobRunner 接口 + taskTracker.setJobRunnerClass(NoopJobRunner.class); + taskTracker.setRegistryAddress("zookeeper://127.0.0.1:2181"); + // taskTracker.setRegistryAddress("redis://127.0.0.1:6379"); + taskTracker.setNodeGroup("test_trade_TaskTracker_" + index); // 同一个TaskTracker集群这个名字相同 + taskTracker.setClusterName("test_cluster"); + taskTracker.setWorkThreads(10); + // 反馈任务给JobTracker失败,存储本地文件路径 + // taskTracker.setFailStorePath(Constants.USER_HOME); + // master 节点变化监听器,当有集群中只需要一个节点执行某个事情的时候,可以监听这个事件 + taskTracker.addMasterChangeListener(new MasterChangeListenerImpl()); + // 业务日志级别 + // taskTracker.setBizLoggerLevel(Level.INFO); + // 可选址 leveldb(默认), rocksdb, bekeleydb + // taskTracker.addConfig("job.fail.store", "leveldb"); + taskTracker.start(); + } + +} diff --git a/lts-example/src/main/java/com/lts/example/support/NoopJobRunner.java b/lts-example/src/main/java/com/lts/example/support/NoopJobRunner.java new file mode 100644 index 000000000..7f3da3ea9 --- /dev/null +++ b/lts-example/src/main/java/com/lts/example/support/NoopJobRunner.java @@ -0,0 +1,21 @@ +package com.lts.example.support; + +import com.lts.core.domain.Action; +import com.lts.core.domain.Job; +import com.lts.tasktracker.Result; +import com.lts.tasktracker.runner.JobRunner; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author Robert HG (254963746@qq.com) on 8/13/15. + */ +public class NoopJobRunner implements JobRunner { + + static AtomicInteger num = new AtomicInteger(0); + @Override + public Result run(Job job) throws Throwable { + System.out.println(num.incrementAndGet()); + return new Result(Action.EXECUTE_SUCCESS); + } +} diff --git a/lts-example/src/main/resources/log4j.properties b/lts-example/src/main/resources/log4j.properties index 096897efc..5a639ac45 100644 --- a/lts-example/src/main/resources/log4j.properties +++ b/lts-example/src/main/resources/log4j.properties @@ -1,7 +1,7 @@ -log4j.rootLogger=INFO,stdout +log4j.rootLogger=WARN,stdout -log4j.appender.stdout.Threshold=INFO +log4j.appender.stdout.Threshold=WARN log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%-5p] [%d{HH:mm:ss}] %c - %m%n diff --git a/lts-jobtracker/src/main/java/com/lts/jobtracker/JobTracker.java b/lts-jobtracker/src/main/java/com/lts/jobtracker/JobTracker.java index 7a84ad802..3932f5127 100644 --- a/lts-jobtracker/src/main/java/com/lts/jobtracker/JobTracker.java +++ b/lts-jobtracker/src/main/java/com/lts/jobtracker/JobTracker.java @@ -28,6 +28,8 @@ public class JobTracker extends AbstractServerNode results) { */ private JobPushRequest getNewJob(String taskTrackerNodeGroup, String taskTrackerIdentity) { - JobPo jobPo = application.getExecutableJobQueue().take(taskTrackerNodeGroup, taskTrackerIdentity); + JobPo jobPo = application.getPreLoader().take(taskTrackerNodeGroup, taskTrackerIdentity); if (jobPo == null) { return null; } diff --git a/lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobPusher.java b/lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobPusher.java index f92e911d4..2d5d3e350 100644 --- a/lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobPusher.java +++ b/lts-jobtracker/src/main/java/com/lts/jobtracker/support/JobPusher.java @@ -7,7 +7,6 @@ import com.lts.core.constant.Level; import com.lts.core.exception.RemotingSendException; import com.lts.core.exception.RequestTimeoutException; -import com.lts.core.extension.ExtensionLoader; import com.lts.core.factory.NamedThreadFactory; import com.lts.core.logger.Logger; import com.lts.core.logger.LoggerFactory; @@ -18,8 +17,6 @@ import com.lts.core.support.SystemClock; import com.lts.jobtracker.domain.JobTrackerApplication; import com.lts.jobtracker.domain.TaskTrackerNode; -import com.lts.queue.PreLoader; -import com.lts.queue.PreLoaderFactory; import com.lts.queue.domain.JobPo; import com.lts.queue.exception.DuplicateJobException; import com.lts.remoting.InvokeCallback; @@ -40,15 +37,11 @@ public class JobPusher { private final Logger LOGGER = LoggerFactory.getLogger(JobPusher.class); private JobTrackerApplication application; private final ExecutorService executorService; - private PreLoader preLoader; - public JobPusher(JobTrackerApplication application) { this.application = application; this.executorService = Executors.newFixedThreadPool(Constants.AVAILABLE_PROCESSOR * 5, new NamedThreadFactory(JobPusher.class.getSimpleName())); - - preLoader = ExtensionLoader.getExtensionLoader(PreLoaderFactory.class).getAdaptiveExtension().getPreLoader(application.getConfig(), application); } public void push(final RemotingServerDelegate remotingServer, final JobPullRequest request) { @@ -115,7 +108,7 @@ private PushResult sendJob(RemotingServerDelegate remotingServer, TaskTrackerNod final String identity = taskTrackerNode.getIdentity(); // 从mongo 中取一个可运行的job - final JobPo jobPo = preLoader.take(nodeGroup, identity); + final JobPo jobPo = application.getPreLoader().take(nodeGroup, identity); if (jobPo == null) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Job push failed: no job! nodeGroup=" + nodeGroup + ", identity=" + identity); diff --git a/lts-queue/lts-queue-api/src/main/java/com/lts/queue/ExecutableJobQueue.java b/lts-queue/lts-queue-api/src/main/java/com/lts/queue/ExecutableJobQueue.java index 15ac8e723..da0d14fb2 100644 --- a/lts-queue/lts-queue-api/src/main/java/com/lts/queue/ExecutableJobQueue.java +++ b/lts-queue/lts-queue-api/src/main/java/com/lts/queue/ExecutableJobQueue.java @@ -21,11 +21,6 @@ public interface ExecutableJobQueue extends JobQueue{ */ boolean add(JobPo jobPo); - /** - * 从队列中取一个元素,并锁住这个元素 - */ - JobPo take(String taskTrackerNodeGroup, String taskTrackerIdentity); - /** * 出队列 */ diff --git a/lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoExecutableJobQueue.java b/lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoExecutableJobQueue.java index fd83507e8..cabfe0be7 100644 --- a/lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoExecutableJobQueue.java +++ b/lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoExecutableJobQueue.java @@ -4,7 +4,6 @@ import com.lts.core.commons.collect.ConcurrentHashSet; import com.lts.core.commons.utils.CollectionUtils; import com.lts.core.commons.utils.StringUtils; -import com.lts.core.constant.Constants; import com.lts.core.logger.Logger; import com.lts.core.logger.LoggerFactory; import com.lts.core.support.JobQueueUtils; @@ -19,11 +18,8 @@ import com.mongodb.WriteResult; import org.mongodb.morphia.query.Query; import org.mongodb.morphia.query.UpdateOperations; -import org.mongodb.morphia.query.UpdateResults; import java.util.List; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; /** * @author Robert HG (254963746@qq.com) on 5/28/15. @@ -32,18 +28,8 @@ public class MongoExecutableJobQueue extends AbstractMongoJobQueue implements Ex private static final Logger LOGGER = LoggerFactory.getLogger(MongoExecutableJobQueue.class); - // 这里做一下流控 - private Semaphore semaphore; - private long acquireTimeout = 2000; // 2s - public MongoExecutableJobQueue(Config config) { super(config); - int permits = config.getParameter(Constants.JOB_TAKE_PARALLEL_SIZE, 10000); - if (permits <= 10) { - permits = Constants.DEFAULT_JOB_TAKE_PARALLEL_SIZE; - } - this.acquireTimeout = config.getParameter(Constants.JOB_TAKE_ACQUIRE_TIMEOUT, acquireTimeout); - this.semaphore = new Semaphore(permits); } @Override @@ -92,54 +78,6 @@ public boolean add(JobPo jobPo) { return true; } - @Override - public JobPo take(String taskTrackerNodeGroup, String taskTrackerIdentity) { - boolean acquire = false; - try { - acquire = semaphore.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS); - if (!acquire) { - // 直接返回null - return null; - } - } catch (InterruptedException e) { - LOGGER.warn("Try to take job failed.", e); - } - try { - while (true) { - String tableName = JobQueueUtils.getExecutableQueueName(taskTrackerNodeGroup); - Query query = template.createQuery(tableName, JobPo.class); - query.field("isRunning").equal(false) - .filter("triggerTime < ", SystemClock.now()) - .order(" triggerTime, priority , gmtCreated"); - - JobPo jobPo = query.get(); - if (jobPo == null) { - return null; - } - UpdateOperations operations = - template.createUpdateOperations(JobPo.class) - .set("isRunning", true) - .set("taskTrackerIdentity", taskTrackerIdentity) - .set("gmtModified", SystemClock.now()); - Query updateQuery = template.createQuery(tableName, JobPo.class); - updateQuery.field("jobId").equal(jobPo.getJobId()) - .field("isRunning").equal(false); - UpdateResults updateResult = template.update(updateQuery, operations); - if (updateResult.getUpdatedCount() == 1) { - return jobPo; - } - try { - Thread.sleep(10); - } catch (InterruptedException ignored) { - } - } - } finally { - if (acquire) { - semaphore.release(); - } - } - } - @Override public boolean remove(String taskTrackerNodeGroup, String jobId) { String tableName = JobQueueUtils.getExecutableQueueName(taskTrackerNodeGroup); diff --git a/lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlExecutableJobQueue.java b/lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlExecutableJobQueue.java index 149ffc6a5..417f639cf 100644 --- a/lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlExecutableJobQueue.java +++ b/lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlExecutableJobQueue.java @@ -27,22 +27,11 @@ */ public class MysqlExecutableJobQueue extends AbstractMysqlJobQueue implements ExecutableJobQueue { - private static final Logger LOGGER = LoggerFactory.getLogger(MysqlExecutableJobQueue.class); - - // 这里做一下流控, 尽量减小 mysql dead lock 的概率 - private Semaphore semaphore; - private long acquireTimeout = 2000; // 2s // 用来缓存SQL,不用每次去生成,可以重用 private final ConcurrentHashMap SQL_CACHE_MAP = new ConcurrentHashMap(); public MysqlExecutableJobQueue(Config config) { super(config); - int permits = config.getParameter(Constants.JOB_TAKE_PARALLEL_SIZE, Constants.DEFAULT_JOB_TAKE_PARALLEL_SIZE); - if (permits <= 10) { - permits = Constants.DEFAULT_JOB_TAKE_PARALLEL_SIZE; - } - this.acquireTimeout = config.getParameter(Constants.JOB_TAKE_ACQUIRE_TIMEOUT, acquireTimeout); - this.semaphore = new Semaphore(permits); } @Override @@ -96,75 +85,6 @@ public boolean add(JobPo jobPo) { return true; } - private String takeSelectSQL = "SELECT *" + - " FROM `{tableName}` " + - " WHERE is_running = ? " + - " AND `trigger_time` < ? " + - " ORDER BY `trigger_time` ASC, `priority` ASC, `gmt_created` ASC " + - " LIMIT 0, 1"; - - private String taskUpdateSQL = "UPDATE `{tableName}` SET " + - "`is_running` = ?, " + - "`task_tracker_identity` = ?, " + - "`gmt_modified` = ?" + - " WHERE job_id = ? AND is_running = ?"; - - @Override - public JobPo take(final String taskTrackerNodeGroup, final String taskTrackerIdentity) { - boolean acquire = false; - try { - acquire = semaphore.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS); - if (!acquire) { - // 直接返回null - return null; - } - } catch (InterruptedException e) { - LOGGER.warn("Try to take job failed.", e); - } - try { - /** - * 这里从SELECT FOR UPDATE 优化为 CAS 乐观锁 - */ - Long now = SystemClock.now(); - Object[] selectParams = new Object[]{false, now}; - - JobPo jobPo = getSqlTemplate().query(getRealSql(takeSelectSQL, taskTrackerNodeGroup), - ResultSetHandlerHolder.JOB_PO_RESULT_SET_HANDLER, selectParams); - if (jobPo == null) { - return null; - } - - Object[] params = new Object[]{ - true, taskTrackerIdentity, now, jobPo.getJobId(), false - }; - // 返回影响的行数 - int affectedRow = 0; - try { - affectedRow = getSqlTemplate().update(getRealSql(taskUpdateSQL, taskTrackerNodeGroup), params); - } catch (SQLException e) { - // dead lock ignore - if (e.getMessage().contains("Deadlock found when trying to get lock")) { - return null; - } - throw e; - } - if (affectedRow == 0) { - return take(taskTrackerNodeGroup, taskTrackerIdentity); - } else { - jobPo.setIsRunning(true); - jobPo.setTaskTrackerIdentity(taskTrackerIdentity); - jobPo.setGmtModified(now); - return jobPo; - } - } catch (SQLException e) { - throw new JobQueueException(e); - } finally { - if(acquire){ - semaphore.release(); - } - } - } - private String removeSQL = "DELETE FROM `{tableName}` WHERE job_id = ?"; @Override