From 6b76ad5c836c64fecd1515e3e370b073ec9f8767 Mon Sep 17 00:00:00 2001 From: Git_Yang Date: Wed, 10 Nov 2021 19:43:42 +0800 Subject: [PATCH] [ISSUE #3286] Replace Timer with ScheduledExecutorService (#3287) Signed-off-by: zhangyang21 --- .../schedule/ScheduleMessageService.java | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java index e0e7b9565b6..c45287ff659 100644 --- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java +++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java @@ -19,13 +19,15 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.rocketmq.common.ConfigManager; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.topic.TopicValidator; @@ -60,7 +62,8 @@ public class ScheduleMessageService extends ConfigManager { new ConcurrentHashMap(32); private final DefaultMessageStore defaultMessageStore; private final AtomicBoolean started = new AtomicBoolean(false); - private Timer timer; + private ScheduledExecutorService deliverExecutorService; + private int deliverThreadPoolNums = Runtime.getRuntime().availableProcessors(); private MessageStore writeMessageStore; private int maxDelayLevel; @@ -113,7 +116,7 @@ public long computeDeliverTimestamp(final int delayLevel, final long storeTimest public void start() { if (started.compareAndSet(false, true)) { super.load(); - this.timer = new Timer("ScheduleMessageTimerThread", true); + this.deliverExecutorService = new ScheduledThreadPoolExecutor(deliverThreadPoolNums, new ThreadFactoryImpl("ScheduleMessageTimerThread_")); for (Map.Entry entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); @@ -123,11 +126,11 @@ public void start() { } if (timeDelay != null) { - this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); + this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME, TimeUnit.MILLISECONDS); } } - this.timer.scheduleAtFixedRate(new TimerTask() { + this.deliverExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { @@ -139,16 +142,14 @@ public void run() { log.error("scheduleAtFixedRate flush exception", e); } } - }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); + }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS); } } public void shutdown() { - if (this.started.compareAndSet(true, false)) { - if (null != this.timer) - this.timer.cancel(); + if (this.started.compareAndSet(true, false) && null != this.deliverExecutorService) { + this.deliverExecutorService.shutdownNow(); } - } public boolean isStarted() { @@ -159,10 +160,12 @@ public int getMaxDelayLevel() { return maxDelayLevel; } + @Override public String encode() { return this.encode(false); } + @Override public boolean load() { boolean result = super.load(); result = result && this.parseDelayLevel(); @@ -223,6 +226,7 @@ public void decode(String jsonString) { } } + @Override public String encode(final boolean prettyFormat) { DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new DelayOffsetSerializeWrapper(); delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable); @@ -261,7 +265,7 @@ public boolean parseDelayLevel() { return true; } - class DeliverDelayedMessageTimerTask extends TimerTask { + class DeliverDelayedMessageTimerTask implements Runnable { private final int delayLevel; private final long offset; @@ -279,8 +283,8 @@ public void run() { } catch (Exception e) { // XXX: warn and notify me log.error("ScheduleMessageService, executeOnTimeup exception", e); - ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( - this.delayLevel, this.offset), DELAY_FOR_A_PERIOD); + ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask( + this.delayLevel, this.offset), DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS); } } @@ -372,9 +376,9 @@ public void executeOnTimeup() { log.error( "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId()); - ScheduleMessageService.this.timer.schedule( + ScheduleMessageService.this.deliverExecutorService.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, - nextOffset), DELAY_FOR_A_PERIOD); + nextOffset), DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; @@ -388,17 +392,17 @@ public void executeOnTimeup() { } } } else { - ScheduleMessageService.this.timer.schedule( + ScheduleMessageService.this.deliverExecutorService.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), - countdown); + countdown, TimeUnit.MILLISECONDS); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } // end of for nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); - ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( - this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); + ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask( + this.delayLevel, nextOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } finally { @@ -424,8 +428,8 @@ public void executeOnTimeup() { } } // end of if (cq != null) - ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, - failScheduleOffset), DELAY_FOR_A_WHILE); + ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, + failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS); } private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {