diff --git a/lts-core/src/main/java/com/lts/core/constant/Constants.java b/lts-core/src/main/java/com/lts/core/constant/Constants.java index d0ab395fe..051b0c94c 100644 --- a/lts-core/src/main/java/com/lts/core/constant/Constants.java +++ b/lts-core/src/main/java/com/lts/core/constant/Constants.java @@ -86,5 +86,5 @@ public interface Constants { String MONITOR_DATA_ADD_URL = "/api/monitor/monitor-data-add.do"; String JOB_PULL_FREQUENCY = "job.pull.frequency"; - int DEFAULT_JOB_PULL_FREQUENCY = 3; + int DEFAULT_JOB_PULL_FREQUENCY = 1; } diff --git a/lts-queue/lts-queue-api/src/main/java/com/lts/queue/AbstractPreLoader.java b/lts-queue/lts-queue-api/src/main/java/com/lts/queue/AbstractPreLoader.java index 1d992bbe4..5f4d75c34 100644 --- a/lts-queue/lts-queue-api/src/main/java/com/lts/queue/AbstractPreLoader.java +++ b/lts-queue/lts-queue-api/src/main/java/com/lts/queue/AbstractPreLoader.java @@ -31,8 +31,7 @@ public abstract class AbstractPreLoader implements PreLoader { // 预取阀值 private double factor = 0.8; - private ConcurrentHashMap> - JOB_MAP = new ConcurrentHashMap>(); + private ConcurrentHashMap> JOB_MAP = new ConcurrentHashMap>(); // 加载的信号 private ConcurrentHashSet LOAD_SIGNAL = new ConcurrentHashSet(); @@ -47,12 +46,13 @@ public AbstractPreLoader(final Application application) { public void run() { for (String loadTaskTrackerNodeGroup : LOAD_SIGNAL) { - if (JOB_MAP.get(loadTaskTrackerNodeGroup).size() / step < factor) { + BlockingQueue queue = JOB_MAP.get(loadTaskTrackerNodeGroup); + if (queue.size() / step < factor) { // load List loads = load(loadTaskTrackerNodeGroup, curSequence * step); // 加入到内存中 if (CollectionUtils.isNotEmpty(loads)) { - JOB_MAP.get(loadTaskTrackerNodeGroup).addAll(loads); + queue.addAll(loads); } } LOAD_SIGNAL.remove(loadTaskTrackerNodeGroup); @@ -106,14 +106,20 @@ public JobPo take(String taskTrackerNodeGroup, String taskTrackerIdentity) { return null; } // update jobPo - if (lockJob(taskTrackerNodeGroup, jobPo.getJobId(), taskTrackerIdentity)) { + if (lockJob(taskTrackerNodeGroup, jobPo.getJobId(), taskTrackerIdentity, jobPo.getTriggerTime())) { return jobPo; } } } - protected abstract boolean lockJob(String taskTrackerNodeGroup, String jobId, String taskTrackerIdentity); + /** + * 锁定任务 + */ + protected abstract boolean lockJob(String taskTrackerNodeGroup, String jobId, String taskTrackerIdentity, Long triggerTime); + /** + * 加载任务 + */ protected abstract List load(String loadTaskTrackerNodeGroup, int offset); private JobPo get(String taskTrackerNodeGroup) { diff --git a/lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoPreLoader.java b/lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoPreLoader.java index 5fae8e441..bfa2ba4e7 100644 --- a/lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoPreLoader.java +++ b/lts-queue/lts-queue-mongo/src/main/java/com/lts/queue/mongo/MongoPreLoader.java @@ -27,7 +27,7 @@ public MongoPreLoader(final Application application) { (AdvancedDatastore) DataStoreProvider.getDataStore(application.getConfig())); } - protected boolean lockJob(String taskTrackerNodeGroup, String jobId, String taskTrackerIdentity) { + protected boolean lockJob(String taskTrackerNodeGroup, String jobId, String taskTrackerIdentity, Long triggerTime) { UpdateOperations operations = template.createUpdateOperations(JobPo.class) .set("isRunning", true) @@ -38,7 +38,8 @@ protected boolean lockJob(String taskTrackerNodeGroup, String jobId, String task Query updateQuery = template.createQuery(tableName, JobPo.class); updateQuery.field("jobId").equal(jobId) - .field("isRunning").equal(false); + .field("isRunning").equal(false) + .field("triggerTime").equal(triggerTime); UpdateResults updateResult = template.update(updateQuery, operations); return updateResult.getUpdatedCount() == 1; } diff --git a/lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlPreLoader.java b/lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlPreLoader.java index 3043d411b..0832e6b34 100644 --- a/lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlPreLoader.java +++ b/lts-queue/lts-queue-mysql/src/main/java/com/lts/queue/mysql/MysqlPreLoader.java @@ -33,12 +33,13 @@ public MysqlPreLoader(Application application) { "`is_running` = ?, " + "`task_tracker_identity` = ?, " + "`gmt_modified` = ?" + - " WHERE job_id = ? AND is_running = ?"; + " WHERE job_id = ? AND is_running = ? AND trigger_time = ? "; @Override - protected boolean lockJob(String taskTrackerNodeGroup, String jobId, String taskTrackerIdentity) { + protected boolean lockJob(String taskTrackerNodeGroup, String jobId, String taskTrackerIdentity, Long triggerTime) { try { - int affectedRow = sqlTemplate.update(getRealSql(taskUpdateSQL, taskTrackerNodeGroup), true, taskTrackerIdentity, SystemClock.now(), jobId, false); + int affectedRow = sqlTemplate.update(getRealSql(taskUpdateSQL, taskTrackerNodeGroup), true, + taskTrackerIdentity, SystemClock.now(), jobId, false, triggerTime); return affectedRow == 1; } catch (SQLException e) { return false;