diff --git a/lts-queue/lts-queue-api/src/main/java/com/lts/queue/AbstractPreLoader.java b/lts-queue/lts-queue-api/src/main/java/com/lts/queue/AbstractPreLoader.java index 31901a1b1..1d992bbe4 100644 --- a/lts-queue/lts-queue-api/src/main/java/com/lts/queue/AbstractPreLoader.java +++ b/lts-queue/lts-queue-api/src/main/java/com/lts/queue/AbstractPreLoader.java @@ -31,8 +31,8 @@ public abstract class AbstractPreLoader implements PreLoader { // 预取阀值 private double factor = 0.8; - private ConcurrentHashMap> - JOB_MAP = new ConcurrentHashMap>(); + private ConcurrentHashMap> + JOB_MAP = new ConcurrentHashMap>(); // 加载的信号 private ConcurrentHashSet LOAD_SIGNAL = new ConcurrentHashSet(); @@ -58,7 +58,7 @@ public void run() { LOAD_SIGNAL.remove(loadTaskTrackerNodeGroup); } } - }, 3, 1, TimeUnit.SECONDS); + }, 500, 500, TimeUnit.MILLISECONDS); } application.getEventCenter().subscribe(new EventSubscriber(application.getConfig().getIdentity() + "_preLoader", new Observer() { @@ -117,10 +117,23 @@ public JobPo take(String taskTrackerNodeGroup, String taskTrackerIdentity) { protected abstract List load(String loadTaskTrackerNodeGroup, int offset); private JobPo get(String taskTrackerNodeGroup) { - List jobPos = JOB_MAP.get(taskTrackerNodeGroup); + BlockingQueue jobPos = JOB_MAP.get(taskTrackerNodeGroup); if (jobPos == null) { - jobPos = new CopyOnWriteArrayList(); - List oldJobPos = JOB_MAP.putIfAbsent(taskTrackerNodeGroup, jobPos); + jobPos = new PriorityBlockingQueue(step, new Comparator() { + @Override + public int compare(JobPo left, JobPo right) { + int compare = left.getTriggerTime().compareTo(right.getTriggerTime()); + if (compare != 0) { + return compare; + } + compare = left.getPriority().compareTo(left.getPriority()); + if (compare != 0) { + return compare; + } + return left.getGmtCreated().compareTo(right.getGmtCreated()); + } + }); + BlockingQueue oldJobPos = JOB_MAP.putIfAbsent(taskTrackerNodeGroup, jobPos); if (oldJobPos != null) { jobPos = oldJobPos; } @@ -132,13 +145,7 @@ private JobPo get(String taskTrackerNodeGroup) { LOAD_SIGNAL.add(taskTrackerNodeGroup); } } - if (jobPos.size() > 0) { - try { - return jobPos.remove(0); - } catch (ArrayIndexOutOfBoundsException e) { - return null; - } - } - return null; + return jobPos.poll(); } + }