diff --git a/joyqueue-common/joyqueue-model/src/main/java/org/joyqueue/monitor/BufferPoolMonitorInfo.java b/joyqueue-common/joyqueue-model/src/main/java/org/joyqueue/monitor/BufferPoolMonitorInfo.java index 81412aeb2..830c30832 100644 --- a/joyqueue-common/joyqueue-model/src/main/java/org/joyqueue/monitor/BufferPoolMonitorInfo.java +++ b/joyqueue-common/joyqueue-model/src/main/java/org/joyqueue/monitor/BufferPoolMonitorInfo.java @@ -25,7 +25,13 @@ public class BufferPoolMonitorInfo { private String used; private String maxMemorySize; private String mmpUsed; + private String mmpFd; private String directUsed; + private String directFd; + private int directDestroy; + private int directAllocate; + private int mmapDestroy; + private int mmapAllocate; private List plMonitorInfos; @@ -61,6 +67,14 @@ public void setMmpUsed(String mmpUsed) { this.mmpUsed = mmpUsed; } + public String getMmpFd() { + return mmpFd; + } + + public void setMmpFd(String mmpFd) { + this.mmpFd = mmpFd; + } + public String getDirectUsed() { return directUsed; } @@ -69,6 +83,14 @@ public void setDirectUsed(String directUsed) { this.directUsed = directUsed; } + public String getDirectFd() { + return directFd; + } + + public void setDirectFd(String directFd) { + this.directFd = directFd; + } + public List getPlMonitorInfos() { return plMonitorInfos; } @@ -77,6 +99,38 @@ public void setPlMonitorInfos(List plMonitorInfos) { this.plMonitorInfos = plMonitorInfos; } + public int getDirectDestroy() { + return directDestroy; + } + + public void setDirectDestroy(int directDestroy) { + this.directDestroy = directDestroy; + } + + public int getDirectAllocate() { + return directAllocate; + } + + public void setDirectAllocate(int directAllocate) { + this.directAllocate = directAllocate; + } + + public int getMmapDestroy() { + return mmapDestroy; + } + + public void setMmapDestroy(int mmapDestroy) { + this.mmapDestroy = mmapDestroy; + } + + public int getMmapAllocate() { + return mmapAllocate; + } + + public void setMmapAllocate(int mmapAllocate) { + this.mmapAllocate = mmapAllocate; + } + public static class PLMonitorInfo { private String cached; private String usedPreLoad; diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/config/BrokerStoreConfig.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/config/BrokerStoreConfig.java index 221349e93..86ba32844 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/config/BrokerStoreConfig.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/config/BrokerStoreConfig.java @@ -17,6 +17,7 @@ import org.joyqueue.toolkit.config.PropertyDef; import org.joyqueue.toolkit.config.PropertySupplier; + import static org.joyqueue.toolkit.config.Property.APPLICATION_DATA_PATH; /** @@ -26,8 +27,11 @@ public class BrokerStoreConfig { public static final String DEFAULT_CLEAN_STRATEGY_CLASS = "GlobalStorageLimitCleaningStrategy"; public static final long DEFAULT_MAX_STORE_SIZE = 10L * 1024 * 1024 * 1024; // 10gb public static final long DEFAULT_MAX_STORE_TIME = 1000 * 60 * 60 * 24 * 7; // 7days - public static final long DEFAULT_STORE_CLEAN_SCHEDULE_BEGIN = 5 * 60 * 1000; - public static final long DEFAULT_STORE_CLEAN_SCHEDULE_END = 10 * 60 * 1000; + public static final long DEFAULT_STORE_CLEAN_SCHEDULE_BEGIN = 1 * 60 * 1000; + public static final long DEFAULT_STORE_CLEAN_SCHEDULE_END = 5 * 60 * 1000; + public static final long DEFAULT_STORE_PHYSICAL_CLEAN_SCHEDULE_BEGIN = 1 * 60 * 1000; + public static final long DEFAULT_STORE_PHYSICAL_CLEAN_SCHEDULE_END = 5 * 60 * 1000; + public static final long DEFAULT_STORE_PHYSICAL_CLEAN_INTERNAL = 100; public static final boolean DEFAULT_KEEP_UNCONSUMED = true; public static final int DEFAULT_STORE_DISK_USAGE_MAX= 80; public static final int DEFAULT_STORE_DISK_USAGE_SAFE=75; @@ -49,7 +53,12 @@ public enum BrokerStoreConfigKey implements PropertyDef { CLEAN_SCHEDULE_END("store.clean.schedule.end", DEFAULT_STORE_CLEAN_SCHEDULE_END, Type.LONG), FORCE_RESTORE("store.force.restore", true, Type.BOOLEAN), STORE_DISK_USAGE_MAX("store.disk.usage.max",DEFAULT_STORE_DISK_USAGE_MAX,Type.INT), - STORE_DISK_USAGE_SAFE("store.disk.usage.safe",DEFAULT_STORE_DISK_USAGE_SAFE,Type.INT); + STORE_DISK_USAGE_SAFE("store.disk.usage.safe",DEFAULT_STORE_DISK_USAGE_SAFE,Type.INT), + STORE_PHYSICAL_CLEAN_SCHEDULE_BEGIN("store.physical.clean.schedule.begin",DEFAULT_STORE_PHYSICAL_CLEAN_SCHEDULE_BEGIN,Type.INT), + STORE_PHYSICAL_CLEAN_SCHEDULE_END("store.physical.clean.schedule.end",DEFAULT_STORE_PHYSICAL_CLEAN_SCHEDULE_END,Type.INT), + STORE_PHYSICAL_CLEAN_INTERVAL("store.physical.clean.interval",DEFAULT_STORE_PHYSICAL_CLEAN_INTERNAL,Type.INT), + + ; private String name; private Object value; private Type type; @@ -158,5 +167,17 @@ public int getStoreDiskUsageSafe(){ public String getApplicationDataPath(){ return propertySupplier.getOrCreateProperty(APPLICATION_DATA_PATH).getString(); } + + public int getStorePhysicalCleanScheduleBegin() { + return PropertySupplier.getValue(propertySupplier, BrokerStoreConfigKey.STORE_PHYSICAL_CLEAN_SCHEDULE_BEGIN, DEFAULT_STORE_PHYSICAL_CLEAN_SCHEDULE_BEGIN); + } + + public int getStorePhysicalCleanScheduleEnd() { + return PropertySupplier.getValue(propertySupplier, BrokerStoreConfigKey.STORE_PHYSICAL_CLEAN_SCHEDULE_END, DEFAULT_STORE_PHYSICAL_CLEAN_SCHEDULE_END); + } + + public int getStorePhysicalCleanInterval() { + return PropertySupplier.getValue(propertySupplier, BrokerStoreConfigKey.STORE_PHYSICAL_CLEAN_INTERVAL, DEFAULT_STORE_PHYSICAL_CLEAN_INTERNAL); + } } diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/store/ClusterStoreService.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/store/ClusterStoreService.java index 7f7319499..8cd328c20 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/store/ClusterStoreService.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/store/ClusterStoreService.java @@ -13,6 +13,7 @@ import org.joyqueue.domain.TopicName; import org.joyqueue.monitor.BufferPoolMonitorInfo; import org.joyqueue.store.PartitionGroupStore; +import org.joyqueue.store.RemovedPartitionGroupStore; import org.joyqueue.store.StoreManagementService; import org.joyqueue.store.StoreNode; import org.joyqueue.store.StoreNodes; @@ -165,6 +166,16 @@ public void removePartitionGroup(String topic, int partitionGroup) { storeService.removePartitionGroup(topic, partitionGroup); } + @Override + public void physicalDeleteRemovedPartitionGroup(String topic, int partitionGroup) { + storeService.physicalDeleteRemovedPartitionGroup(topic, partitionGroup); + } + + @Override + public List getRemovedPartitionGroups() { + return storeService.getRemovedPartitionGroups(); + } + @Override public void restorePartitionGroup(String topic, int partitionGroup) throws Exception { storeService.restorePartitionGroup(topic, partitionGroup); diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/store/StoreCleanManager.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/store/StoreCleanManager.java index c22b500d6..3f10abb26 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/store/StoreCleanManager.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/store/StoreCleanManager.java @@ -17,18 +17,19 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.jd.laf.extension.ExtensionManager; +import org.apache.commons.collections.CollectionUtils; import org.joyqueue.broker.cluster.ClusterManager; import org.joyqueue.broker.config.BrokerStoreConfig; import org.joyqueue.broker.consumer.position.PositionManager; import org.joyqueue.domain.PartitionGroup; import org.joyqueue.domain.TopicConfig; +import org.joyqueue.store.RemovedPartitionGroupStore; import org.joyqueue.store.StoreService; import org.joyqueue.toolkit.concurrent.NamedThreadFactory; import org.joyqueue.toolkit.config.PropertySupplier; import org.joyqueue.toolkit.service.Service; import org.joyqueue.toolkit.time.SystemClock; -import com.jd.laf.extension.ExtensionManager; -import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,6 +58,7 @@ public class StoreCleanManager extends Service { private PositionManager positionManager; private Map cleaningStrategyMap; private final ScheduledExecutorService scheduledExecutorService; + private final ScheduledExecutorService physicalDeleteScheduledExecutorService; private ScheduledFuture cleanFuture; public StoreCleanManager(final PropertySupplier propertySupplier, final StoreService storeService, final ClusterManager clusterManager, final PositionManager positionManager) { @@ -66,6 +68,7 @@ public StoreCleanManager(final PropertySupplier propertySupplier, final StoreSer this.clusterManager = clusterManager; this.positionManager = positionManager; this.scheduledExecutorService = Executors.newScheduledThreadPool(SCHEDULE_EXECUTOR_THREADS, new NamedThreadFactory("StoreCleaning-Scheduled-Executor")); + this.physicalDeleteScheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("StoreCleaning-Physical-Scheduled-Executor")); } @Override @@ -95,6 +98,11 @@ public void start() throws Exception { ThreadLocalRandom.current().nextLong(brokerStoreConfig.getStoreCleanScheduleBegin(), brokerStoreConfig.getStoreCleanScheduleEnd()), ThreadLocalRandom.current().nextLong(brokerStoreConfig.getStoreCleanScheduleBegin(), brokerStoreConfig.getStoreCleanScheduleEnd()), TimeUnit.MILLISECONDS); + + physicalDeleteScheduledExecutorService.scheduleWithFixedDelay(this::physicalClean, + ThreadLocalRandom.current().nextLong(brokerStoreConfig.getStorePhysicalCleanScheduleBegin(), brokerStoreConfig.getStorePhysicalCleanScheduleEnd()), + ThreadLocalRandom.current().nextLong(brokerStoreConfig.getStorePhysicalCleanScheduleBegin(), brokerStoreConfig.getStorePhysicalCleanScheduleEnd()), + TimeUnit.MILLISECONDS); } @Override @@ -119,6 +127,26 @@ public void stop() { } catch (Throwable t) { LOG.error(t.getMessage(), t); } + + physicalDeleteScheduledExecutorService.shutdownNow(); + } + + private void physicalClean() { + for (RemovedPartitionGroupStore removedPartitionGroup : storeService.getRemovedPartitionGroups()) { + boolean interrupted = false; + while (removedPartitionGroup.physicalDeleteLeftFile()) { + try { + Thread.currentThread().sleep(brokerStoreConfig.getStorePhysicalCleanInterval()); + } catch (InterruptedException e) { + interrupted = true; + break; + } + } + if (!interrupted) { + LOG.info("Store file deleted, topic: {}, partitionGroup: {}", removedPartitionGroup.getTopic(), removedPartitionGroup.getPartitionGroup()); + storeService.physicalDeleteRemovedPartitionGroup(removedPartitionGroup.getTopic(), removedPartitionGroup.getPartitionGroup()); + } + } } private void clean() { @@ -161,7 +189,7 @@ private void clean() { } } } catch (Throwable t) { - LOG.error("Error to clean store for topic <{}>, partition group <{}>, exception: {}", topicConfig, partitionGroup.getGroup(), t); + LOG.error("Error to clean store for topic <{}>, partition group <{}>, exception: ", topicConfig, partitionGroup.getGroup(), t); } } } diff --git a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/store/StoreInitializer.java b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/store/StoreInitializer.java index b5c914148..537376f6c 100644 --- a/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/store/StoreInitializer.java +++ b/joyqueue-server/joyqueue-broker-core/src/main/java/org/joyqueue/broker/store/StoreInitializer.java @@ -18,6 +18,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.primitives.Shorts; +import org.apache.commons.collections.CollectionUtils; import org.joyqueue.broker.cluster.ClusterManager; import org.joyqueue.broker.cluster.event.CompensateEvent; import org.joyqueue.broker.config.BrokerStoreConfig; @@ -37,12 +38,12 @@ import org.joyqueue.nsr.event.RemovePartitionGroupEvent; import org.joyqueue.nsr.event.RemoveTopicEvent; import org.joyqueue.nsr.event.UpdatePartitionGroupEvent; +import org.joyqueue.store.NoSuchPartitionGroupException; import org.joyqueue.store.PartitionGroupStore; import org.joyqueue.store.StoreService; import org.joyqueue.toolkit.concurrent.EventListener; import org.joyqueue.toolkit.concurrent.NamedThreadFactory; import org.joyqueue.toolkit.service.Service; -import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -306,12 +307,15 @@ protected void onUpdatePartitionGroup(TopicName topicName, PartitionGroup oldPar electionService.onNodeAdd(topicName, newPartitionGroup.getGroup(), newPartitionGroup.getElectType(), brokers, newPartitionGroup.getLearners(), nameService.getBroker(newReplica), currentBrokerId, newPartitionGroup.getLeader()); - storeService.rePartition(topicName.getFullName(), newPartitionGroup.getGroup(), newPartitionGroup.getPartitions().toArray(new Short[newPartitionGroup.getPartitions().size()])); } } if (oldPartitionGroup.getPartitions().size() != newPartitionGroup.getPartitions().size()) { - storeService.rePartition(topicName.getFullName(), newPartitionGroup.getGroup(), newPartitionGroup.getPartitions().toArray(new Short[newPartitionGroup.getPartitions().size()])); + try { + storeService.rePartition(topicName.getFullName(), newPartitionGroup.getGroup(), newPartitionGroup.getPartitions().toArray(new Short[newPartitionGroup.getPartitions().size()])); + } catch (NoSuchPartitionGroupException e) { + logger.error("rePartition exception, topic: {}, group: {}", newPartitionGroup.getTopic(), newPartitionGroup.getGroup(), e); + } } for (Integer oldReplica : oldPartitionGroup.getReplicas()) { @@ -325,7 +329,6 @@ protected void onUpdatePartitionGroup(TopicName topicName, PartitionGroup oldPar } else { logger.info("topic[{}] update partitionGroup[{}] add node[{}] ", topicName, newPartitionGroup.getGroup(), oldReplica); electionService.onNodeRemove(topicName, newPartitionGroup.getGroup(), oldReplica, currentBrokerId); - storeService.rePartition(topicName.getFullName(), newPartitionGroup.getGroup(), newPartitionGroup.getPartitions().toArray(new Short[newPartitionGroup.getPartitions().size()])); } } } diff --git a/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/cluster/StoreServiceStub.java b/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/cluster/StoreServiceStub.java index 5010fdf96..defcae3e8 100644 --- a/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/cluster/StoreServiceStub.java +++ b/joyqueue-server/joyqueue-broker-core/src/test/java/org/joyqueue/broker/cluster/StoreServiceStub.java @@ -7,6 +7,7 @@ import org.joyqueue.monitor.BufferPoolMonitorInfo; import org.joyqueue.nsr.NameService; import org.joyqueue.store.PartitionGroupStore; +import org.joyqueue.store.RemovedPartitionGroupStore; import org.joyqueue.store.StoreManagementService; import org.joyqueue.store.StoreNode; import org.joyqueue.store.StoreNodes; @@ -57,6 +58,16 @@ public void removePartitionGroup(String topic, int partitionGroup) { } + @Override + public void physicalDeleteRemovedPartitionGroup(String topic, int partitionGroup) { + + } + + @Override + public List getRemovedPartitionGroups() { + return null; + } + @Override public void restorePartitionGroup(String topic, int partitionGroup) { diff --git a/joyqueue-server/joyqueue-store/joyqueue-store-api/src/main/java/org/joyqueue/store/RemovedPartitionGroupStore.java b/joyqueue-server/joyqueue-store/joyqueue-store-api/src/main/java/org/joyqueue/store/RemovedPartitionGroupStore.java new file mode 100644 index 000000000..7564ba5b7 --- /dev/null +++ b/joyqueue-server/joyqueue-store/joyqueue-store-api/src/main/java/org/joyqueue/store/RemovedPartitionGroupStore.java @@ -0,0 +1,33 @@ +package org.joyqueue.store; + +/** + * RemovedPartitionGroupStore + * author: gaohaoxiang + * date: 2020/10/28 + */ +public interface RemovedPartitionGroupStore { + + /** + * 获取Topic + * @return Topic + */ + String getTopic(); + + /** + * 获取Partition Group 序号 + * @return Partition Group 序号 + */ + int getPartitionGroup(); + + /** + * 物理删除最左文件 + * @return + */ + boolean physicalDeleteLeftFile(); + + /** + * 物理删除所有文件,包括目录 + * @return + */ + boolean physicalDelete(); +} \ No newline at end of file diff --git a/joyqueue-server/joyqueue-store/joyqueue-store-api/src/main/java/org/joyqueue/store/StoreService.java b/joyqueue-server/joyqueue-store/joyqueue-store-api/src/main/java/org/joyqueue/store/StoreService.java index f221f3b32..3589cb32b 100644 --- a/joyqueue-server/joyqueue-store/joyqueue-store-api/src/main/java/org/joyqueue/store/StoreService.java +++ b/joyqueue-server/joyqueue-store/joyqueue-store-api/src/main/java/org/joyqueue/store/StoreService.java @@ -68,6 +68,19 @@ public interface StoreService { */ void removePartitionGroup(String topic, int partitionGroup); + /** + * 物理删除已删除的group + * @param topic + * @param partitionGroup + */ + void physicalDeleteRemovedPartitionGroup(String topic, int partitionGroup); + + /** + * 获取已删除的group + * @return + */ + List getRemovedPartitionGroups(); + /** * 从磁盘恢复partition group,系统启动时调用 */ diff --git a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/PartitionGroupStoreManager.java b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/PartitionGroupStoreManager.java index c0248346a..69073f81d 100644 --- a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/PartitionGroupStoreManager.java +++ b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/PartitionGroupStoreManager.java @@ -17,6 +17,7 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.serializer.SerializerFeature; +import com.google.common.collect.Maps; import org.joyqueue.domain.QosLevel; import org.joyqueue.exception.JoyQueueCode; import org.joyqueue.store.file.Checkpoint; @@ -58,6 +59,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -973,14 +975,27 @@ void asyncWrite(QosLevel qosLevel, EventListener eventListener, Wri // 构建写入请求对象 WriteCommand writeCommand = new WriteCommand(qosLevel, eventListener, messages); - // 放入队列中,如果队列满,阻塞等待 + try { - this.writeCommandCache.put(writeCommand); + if (!this.writeCommandCache.offer(writeCommand, config.enqueueTimeout, TimeUnit.MILLISECONDS)) { + logger.warn("offer command queue failed, topic: {}, group: {}, queue size: {}", + topic, partitionGroup, writeCommandCache.size()); + + if (eventListener != null) { + eventListener.onEvent(new WriteResult(JoyQueueCode.SE_WRITE_FAILED, null)); + } + return; + } } catch (InterruptedException e) { - logger.warn("Exception: ", e); - if (eventListener != null) + logger.warn("offer command queue interrupted, topic: {}, group: {}, queue size: {}", + topic, partitionGroup, writeCommandCache.size()); + + if (eventListener != null) { eventListener.onEvent(new WriteResult(JoyQueueCode.SE_WRITE_FAILED, null)); + } + return; } + // 如果QosLevel.RECEIVE,这里就可以给客户端返回写入成功的响应了。 if (qosLevel == QosLevel.RECEIVE && null != eventListener) { eventListener.onEvent(new WriteResult(JoyQueueCode.SUCCESS, null)); @@ -1027,15 +1042,15 @@ long clean(long time, Map partitionAckMap, boolean keepUnconsumed) // 依次删除每个分区p索引中最左侧的文件 满足当前分区p的最小消费位置之前的文件块 if (indexStore.fileCount() > 1 && indexStore.meetMinStoreFile(minPartitionIndex) > 1) { deletedSize += indexStore.physicalDeleteLeftFile(); - if (logger.isDebugEnabled()){ + if (logger.isDebugEnabled()) { logger.info("Delete PositioningStore physical index file by size, partition: <{}>, offset position: <{}>", p, minPartitionIndex); } } } else { // 依次删除每个分区p索引中最左侧的文件 满足当前分区p的最小消费位置之前的以及最长时间戳的文件块 - if (indexStore.fileCount() > 1 && indexStore.meetMinStoreFile(minPartitionIndex) > 1 && hasEarly(indexStore,time)) { + if (indexStore.fileCount() > 1 && indexStore.meetMinStoreFile(minPartitionIndex) > 1 && hasEarly(indexStore, time, minPartitionIndex)) { deletedSize += indexStore.physicalDeleteLeftFile(); - if (logger.isDebugEnabled()){ + if (logger.isDebugEnabled()) { logger.info("Delete PositioningStore physical index file by time, partition: <{}>, offset position: <{}>", p, minPartitionIndex); } } @@ -1062,20 +1077,27 @@ long clean(long time, Map partitionAckMap, boolean keepUnconsumed) } /** + * 最早消息是否早于制定时间 * * @param indexStore partition index store * @param time 查询时间 * @return true if partition 的最早消息时间小于指定时间 * **/ - private boolean hasEarly(PositioningStore indexStore,long time) throws IOException{ - long left=indexStore.left(); - IndexItem item=indexStore.read(left); - ByteBuffer message=store.read(item.getOffset()); - // message send time - long clientTimestamp=MessageParser.getLong(message,MessageParser.CLIENT_TIMESTAMP); - long offset=MessageParser.getInt(message,MessageParser.STORAGE_TIMESTAMP); - return clientTimestamp + offset < time; + private boolean hasEarly(PositioningStore indexStore, long time, long minPartitionIndex) throws IOException { + long left = indexStore.left(); + + try { + IndexItem item = indexStore.read(left); + ByteBuffer message = store.read(item.getOffset()); + // message send time + long clientTimestamp = MessageParser.getLong(message, MessageParser.CLIENT_TIMESTAMP); + long offset = MessageParser.getInt(message, MessageParser.STORAGE_TIMESTAMP); + return clientTimestamp + offset < time; + } catch (Exception e) { + logger.error("hasEarly exception, base: {}, index: {}", indexStore.base(), left); + return indexStore.isEarly(time, minPartitionIndex); + } } @@ -1645,6 +1667,18 @@ private long binarySearchByTimestamp(long timestamp, } } + public List getStoreFiles() { + return store.getFiles(); + } + + public Map> getIndexStoreFiles() { + Map> result = Maps.newHashMap(); + for (Map.Entry entry : partitionMap.entrySet()) { + result.put(entry.getKey(), entry.getValue().store.getFiles()); + } + return result; + } + QosStore getQosStore(QosLevel level) { return qosStores[level.value()]; } @@ -1704,12 +1738,13 @@ private WriteCommand(QosLevel qosLevel, EventListener eventListener public static class Config { public static final int DEFAULT_MAX_MESSAGE_LENGTH = 4 * 1024 * 1024; - public static final int DEFAULT_WRITE_REQUEST_CACHE_SIZE = 128; + public static final int DEFAULT_WRITE_REQUEST_CACHE_SIZE = 10240; public static final long DEFAULT_FLUSH_INTERVAL_MS = 50L; - public static final boolean DEFAULT_FLUSH_FORCE = true; + public static final boolean DEFAULT_FLUSH_FORCE = false; public static final long DEFAULT_WRITE_TIMEOUT_MS = 3000L; public static final long DEFAULT_MAX_DIRTY_SIZE = 10L * 1024 * 1024; public static final long DEFAULT_PRINT_METRIC_INTERVAL_MS = 0L; + public static final int DEFAULT_ENQUEUE_TIMEOUT = 100; /** * 允许脏数据的最大长度,超过这个长度就阻塞写入。 @@ -1741,20 +1776,24 @@ public static class Config { */ private final long printMetricIntervalMs; + /** + * 入队超时 + */ + private final int enqueueTimeout; + private final PositioningStore.Config storeConfig; private final PositioningStore.Config indexStoreConfig; public Config() { this(DEFAULT_MAX_MESSAGE_LENGTH, DEFAULT_WRITE_REQUEST_CACHE_SIZE, DEFAULT_FLUSH_INTERVAL_MS, - DEFAULT_WRITE_TIMEOUT_MS, DEFAULT_MAX_DIRTY_SIZE, DEFAULT_PRINT_METRIC_INTERVAL_MS, + DEFAULT_WRITE_TIMEOUT_MS, DEFAULT_MAX_DIRTY_SIZE, DEFAULT_PRINT_METRIC_INTERVAL_MS, DEFAULT_ENQUEUE_TIMEOUT, new PositioningStore.Config(PositioningStore.Config.DEFAULT_FILE_DATA_SIZE), - // 索引在读取的时候默认加载到内存中 - new PositioningStore.Config(PositioningStore.Config.DEFAULT_FILE_DATA_SIZE, true, DEFAULT_FLUSH_FORCE)); + new PositioningStore.Config(PositioningStore.Config.DEFAULT_FILE_DATA_SIZE, false, DEFAULT_FLUSH_FORCE)); } public Config(int maxMessageLength, int writeRequestCacheSize, long flushIntervalMs, - long writeTimeoutMs, long maxDirtySize, long printMetricIntervalMs, + long writeTimeoutMs, long maxDirtySize, long printMetricIntervalMs, int enqueueTimeout, PositioningStore.Config storeConfig, PositioningStore.Config indexStoreConfig) { this.maxMessageLength = maxMessageLength; this.writeRequestCacheSize = writeRequestCacheSize; @@ -1762,6 +1801,7 @@ public Config(int maxMessageLength, int writeRequestCacheSize, long flushInterva this.writeTimeoutMs = writeTimeoutMs; this.maxDirtySize = maxDirtySize; this.printMetricIntervalMs = printMetricIntervalMs; + this.enqueueTimeout = enqueueTimeout; this.storeConfig = storeConfig; this.indexStoreConfig = indexStoreConfig; } diff --git a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/RemovedPartitionGroupStoreManager.java b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/RemovedPartitionGroupStoreManager.java new file mode 100644 index 000000000..9a091d27b --- /dev/null +++ b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/RemovedPartitionGroupStoreManager.java @@ -0,0 +1,89 @@ +package org.joyqueue.store; + +import org.joyqueue.store.utils.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.List; +import java.util.Map; + +/** + * RemovedPartitionGroupStoreManager + * author: gaohaoxiang + * date: 2020/10/28 + */ +public class RemovedPartitionGroupStoreManager implements RemovedPartitionGroupStore { + + protected static final Logger logger = LoggerFactory.getLogger(RemovedPartitionGroupStoreManager.class); + + private String topic; + private int partitionGroup; + private File base; + private List storeFiles; + private Map> indexStoreFiles; + + public RemovedPartitionGroupStoreManager(String topic, int partitionGroup, File base, List storeFiles, + Map> indexStoreFiles) { + this.topic = topic; + this.partitionGroup = partitionGroup; + this.base = base; + this.storeFiles = storeFiles; + this.indexStoreFiles = indexStoreFiles; + } + + @Override + public String getTopic() { + return topic; + } + + @Override + public int getPartitionGroup() { + return partitionGroup; + } + + @Override + public boolean physicalDeleteLeftFile() { + boolean deleteStoreFile = false; + boolean deleteIndexStoreFile = false; + + File storeFile = (storeFiles.isEmpty() ? null : storeFiles.remove(0)); + if (storeFile != null) { + deleteStoreFile = physicalDeleteFile(base, storeFile.getName()); + } + + for (Map.Entry> entry : indexStoreFiles.entrySet()) { + List indexStoreFiles = entry.getValue(); + File indexStoreFile = (indexStoreFiles.isEmpty() ? null : indexStoreFiles.remove(0)); + if (indexStoreFile != null) { + physicalDeleteFile(new File(base, "index" + File.separator + entry.getKey()), indexStoreFile.getName()); + deleteIndexStoreFile = true; + } + } + + return deleteStoreFile || deleteIndexStoreFile; + } + + @Override + public boolean physicalDelete() { + return physicalDeleteFile(base); + } + + protected boolean physicalDeleteFile(File base, String file) { + return physicalDeleteFile(new File(base, file)); + } + + protected boolean physicalDeleteFile(File file) { + if (logger.isDebugEnabled()) { + logger.debug("Store file deleted, file: {}", base, file); + } + if (!file.exists()) { + return false; + } + if (file.isDirectory()) { + return FileUtils.deleteFolder(file); + } else { + return file.delete(); + } + } +} \ No newline at end of file diff --git a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/Store.java b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/Store.java index cf709267c..2b676adcb 100644 --- a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/Store.java +++ b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/Store.java @@ -15,6 +15,8 @@ */ package org.joyqueue.store; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.ArrayUtils; import org.joyqueue.domain.QosLevel; import org.joyqueue.monitor.BufferPoolMonitorInfo; import org.joyqueue.store.event.StoreEvent; @@ -23,6 +25,7 @@ import org.joyqueue.store.replication.ReplicableStore; import org.joyqueue.store.transaction.TransactionStore; import org.joyqueue.store.transaction.TransactionStoreManager; +import org.joyqueue.store.utils.FileUtils; import org.joyqueue.store.utils.PreloadBufferPool; import org.joyqueue.toolkit.concurrent.EventListener; import org.joyqueue.toolkit.config.PropertySupplier; @@ -36,6 +39,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -62,6 +66,7 @@ public class Store extends Service implements StoreService, Closeable, PropertyS private static final String DEL_PREFIX = ".d."; private final Map storeMap = new HashMap<>(); + private final Map removedStoreMap = new HashMap<>(); private final Map txStoreMap = new HashMap<>(); private StoreConfig config; private PreloadBufferPool bufferPool; @@ -88,6 +93,7 @@ protected void validate() throws Exception { base = new File(config.getPath()); } checkOrCreateBase(); + physicalDeleteRemoved(); if (storeLock == null) { storeLock = new StoreLock(new File(base, "lock")); storeLock.lock(); @@ -153,6 +159,18 @@ public boolean physicalDelete() { } } + private void physicalDeleteRemoved() { + File[] removedGroups = new File(base, TOPICS_DIR).listFiles((dir, name) -> dir.isDirectory() && name.startsWith(DEL_PREFIX)); + + if (ArrayUtils.isEmpty(removedGroups)) { + return; + } + + for (File removed : removedGroups) { + deleteFolder(removed); + } + } + @Override public boolean partitionGroupExists(String topic, int partitionGroup) { return new File(base, getPartitionGroupRelPath(topic, partitionGroup)).isDirectory(); @@ -218,7 +236,7 @@ public synchronized void removePartitionGroup(String topic, int partitionGroup) } File groupBase = new File(base, getPartitionGroupRelPath(topic, partitionGroup)); - if (groupBase.exists()) delete(groupBase); + if (groupBase.exists()) deletePartitionGroup(groupBase, topic, partitionGroup, partitionGroupStoreManger); File topicBase = new File(base, getTopicRelPath(topic)); @@ -233,11 +251,24 @@ public synchronized void removePartitionGroup(String topic, int partitionGroup) } } } - delete(topicBase); + deleteFolder(topicBase); } } + @Override + public void physicalDeleteRemovedPartitionGroup(String topic, int partitionGroup) { + RemovedPartitionGroupStoreManager removedPartitionGroupStoreManager = removedStoreMap.remove(topic + "/" + partitionGroup); + removedPartitionGroupStoreManager.physicalDelete(); + } + + @Override + public List getRemovedPartitionGroups() { + if (removedStoreMap.isEmpty()) { + return Collections.emptyList(); + } + return Lists.newArrayList(removedStoreMap.values()); + } @Override public synchronized void restorePartitionGroup(String topic, int partitionGroup) throws Exception { @@ -261,7 +292,7 @@ public synchronized void restorePartitionGroup(String topic, int partitionGroup) public synchronized void createPartitionGroup(String topic, int partitionGroup, short[] partitions) throws Exception { if (!storeMap.containsKey(topic + "/" + partitionGroup)) { File groupBase = new File(base, getPartitionGroupRelPath(topic, partitionGroup)); - if (groupBase.exists()) delete(groupBase); + if (groupBase.exists()) deletePartitionGroup(groupBase, topic, partitionGroup, null); PartitionGroupStoreSupport.init(groupBase, partitions); restorePartitionGroup(topic, partitionGroup); @@ -276,7 +307,7 @@ private PartitionGroupStoreManager.Config getPartitionGroupConfig(StoreConfig co return new PartitionGroupStoreManager.Config( config.getMaxMessageLength(), config.getWriteRequestCacheSize(), config.getFlushIntervalMs(), config.getWriteTimeoutMs(), config.getMaxDirtySize(), - config.getPrintMetricIntervalMs(), messageConfig, indexConfig); + config.getPrintMetricIntervalMs(), config.getEnqueueTimeout(), messageConfig, indexConfig); } private PositioningStore.Config getIndexStoreConfig(StoreConfig config) { @@ -292,9 +323,21 @@ private PositioningStore.Config getMessageStoreConfig(StoreConfig config) { /** * 并不真正删除,只是重命名 */ - private boolean delete(File file) { - File renamed = new File(file.getParent(), DEL_PREFIX + SystemClock.now() + "." + file.getName()); - return file.renameTo(renamed); + private boolean deletePartitionGroup(File file, String topic, int partitionGroup, PartitionGroupStoreManager partitionGroupStoreManager) { + File renamed = new File(base, TOPICS_DIR + File.separator + DEL_PREFIX + SystemClock.now() + + "." + topic.replace('/', '@') + "." + partitionGroup); + boolean renameResult = file.renameTo(renamed); + + if (partitionGroupStoreManager != null) { + RemovedPartitionGroupStoreManager removedPartitionGroupStoreManager = new RemovedPartitionGroupStoreManager( + partitionGroupStoreManager.getTopic(), partitionGroupStoreManager.getPartitionGroup(), + renamed, partitionGroupStoreManager.getStoreFiles(), partitionGroupStoreManager.getIndexStoreFiles()); + + removedStoreMap.put(partitionGroupStoreManager.getTopic() + "/" + partitionGroupStoreManager.getPartitionGroup(), + removedPartitionGroupStoreManager); + } + + return renameResult; } @Override @@ -386,22 +429,8 @@ public void close() throws IOException { } private void deleteFolder(File folder) { - File[] files = folder.listFiles(); - if (files != null) { - for (File f : files) { - if (f.isDirectory()) { - deleteFolder(f); - } else { - if (!f.delete()) { - logger.warn("Delete failed: {}", f.getAbsolutePath()); - } - } - } - } - if (!folder.delete()) { - logger.warn("Delete failed: {}", folder.getAbsolutePath()); - - } + logger.warn("Delete folder: {}", folder); + FileUtils.deleteFolder(folder); } diff --git a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/StoreConfig.java b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/StoreConfig.java index 2eccf4110..771631619 100644 --- a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/StoreConfig.java +++ b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/StoreConfig.java @@ -34,8 +34,9 @@ public class StoreConfig { public static final int DEFAULT_PRE_LOAD_BUFFER_MAX_COUNT = 10; public static final long DEFAULT_PRINT_METRIC_INTERVAL_MS = 0; public static final boolean DEFAULT_MESSAGE_FILE_LOAD_ON_READ = false; - public static final boolean DEFAULT_INDEX_FILE_LOAD_ON_READ = true; - public static final boolean DEFAULT_FLUSH_FORCE = true; + public static final boolean DEFAULT_INDEX_FILE_LOAD_ON_READ = false; + public static final boolean DEFAULT_FLUSH_FORCE = false; + public static final int DEFAULT_ENQUEUE_TIMEOUT = 100; public static final String STORE_PATH = "/store"; /** @@ -62,6 +63,8 @@ public class StoreConfig { private long printMetricIntervalMs = DEFAULT_PRINT_METRIC_INTERVAL_MS; + private int enqueueTimeout = DEFAULT_ENQUEUE_TIMEOUT; + /** * 最大消息长度 */ @@ -209,6 +212,10 @@ public long getPrintMetricIntervalMs() { return PropertySupplier.getValue(propertySupplier, StoreConfigKey.PRINT_METRIC_INTERVAL_MS, printMetricIntervalMs); } + public int getEnqueueTimeout() { + return PropertySupplier.getValue(propertySupplier, StoreConfigKey.ENQUEUE_TIMEOUT, enqueueTimeout); + } + public void setPrintMetricIntervalMs(long printMetricIntervalMs) { this.printMetricIntervalMs = printMetricIntervalMs; } diff --git a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/StoreConfigKey.java b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/StoreConfigKey.java index cbc0585d3..88e7b4f61 100644 --- a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/StoreConfigKey.java +++ b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/StoreConfigKey.java @@ -88,6 +88,8 @@ public enum StoreConfigKey implements PropertyDef { DISK_FULL_RATIO("store.disk.full.ratio", PositioningStore.Config.DEFAULT_DISK_FULL_RATIO, Type.INT), + ENQUEUE_TIMEOUT("store.enqueue.timeout", StoreConfig.DEFAULT_ENQUEUE_TIMEOUT, Type.INT), + PRINT_METRIC_INTERVAL_MS("print.metric.interval", StoreConfig.DEFAULT_PRINT_METRIC_INTERVAL_MS, Type.LONG); diff --git a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/file/Checkpoint.java b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/file/Checkpoint.java index 81ce315c8..41ff63445 100644 --- a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/file/Checkpoint.java +++ b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/file/Checkpoint.java @@ -73,4 +73,14 @@ public long getReplicationPosition() { public void setReplicationPosition(long replicationPosition) { this.replicationPosition = replicationPosition; } + + @Override + public String toString() { + return "Checkpoint{" + + "version=" + version + + ", indexPosition=" + indexPosition + + ", partitions=" + partitions + + ", replicationPosition=" + replicationPosition + + '}'; + } } diff --git a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/file/PositioningStore.java b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/file/PositioningStore.java index 632d4b2fe..1e0318029 100644 --- a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/file/PositioningStore.java +++ b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/file/PositioningStore.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.NavigableMap; @@ -37,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; /** * 带缓存的、高性能、多文件、基于位置的、Append Only的日志存储存储。 @@ -303,14 +305,24 @@ private void recoverFileMap() throws IOException { // 检查文件是否连续完整 if (!storeFileMap.isEmpty()) { + long notContinueStartPosition = -1; long position = storeFileMap.firstKey(); + for (Map.Entry> fileEntry : storeFileMap.entrySet()) { if (position != fileEntry.getKey()) { - throw new CorruptedLogException(String.format("Files are not continuous! expect: %d, actual file name: %d, store: %s.", position, fileEntry.getKey(), base.getAbsolutePath())); - // TODO: 考虑自动删除store尾部不连续的文件,以解决掉电后需要手动恢复存储的问题。 + notContinueStartPosition = fileEntry.getKey(); + break; } position += fileEntry.getValue().file().length() - fileHeaderSize; } + + if (notContinueStartPosition != -1) { + for (Map.Entry> entry : storeFileMap.tailMap(notContinueStartPosition).entrySet()) { + logger.warn("delete not continue file: {}", entry.getValue().file()); + forceDeleteStoreFile(entry.getValue()); + storeFileMap.remove(entry.getKey()); + } + } } } @@ -631,6 +643,23 @@ public long physicalDeleteLeftFile() throws IOException { return physicalDeleteTo(storeFile.position() + (storeFile.hasPage() ? storeFile.writePosition() : storeFile.fileDataSize())); } + public List getFiles() { + if (storeFileMap.isEmpty()) { + return Collections.emptyList(); + } + return storeFileMap.values().stream().map(StoreFile::file).collect(Collectors.toList()); + } + + public boolean isEarly(long timestamp, long minIndexedPhysicalPosition) { + for (StoreFile storeFile : storeFileMap.headMap(minIndexedPhysicalPosition).values()) { + if (storeFile.timestamp() > 0) + if (storeFile.timestamp() < timestamp) { + return true; + } + } + return false; + } + /** * 删除文件,丢弃未刷盘的数据,用于rollback */ @@ -729,7 +758,7 @@ public static class Config { public static final int DEFAULT_DISK_FULL_RATIO = 90; public static final int DEFAULT_MAX_MESSAGE_LENGTH = 4 * 1024 * 1024; public static final boolean DEFAULT_LOAD_ON_READ = false; - public static final boolean DEFAULT_FLUSH_FORCE = true; + public static final boolean DEFAULT_FLUSH_FORCE = false; /** * 文件头长度 diff --git a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/file/StoreFileImpl.java b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/file/StoreFileImpl.java index 9a0831fb8..dc7d16587 100644 --- a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/file/StoreFileImpl.java +++ b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/file/StoreFileImpl.java @@ -262,6 +262,7 @@ public R read(int position, int length, BufferReader bufferReader) throws ByteBuffer byteBuffer = pageBuffer.asReadOnlyBuffer(); byteBuffer.position(position); byteBuffer.limit(writePosition); + return bufferReader.read(byteBuffer, length); } finally { bufferLock.unlock(stamp); diff --git a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/utils/FileUtils.java b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/utils/FileUtils.java new file mode 100644 index 000000000..87cbb01fa --- /dev/null +++ b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/utils/FileUtils.java @@ -0,0 +1,26 @@ +package org.joyqueue.store.utils; + +import java.io.File; + +/** + * FileUtils + * author: gaohaoxiang + * date: 2020/10/28 + */ +public class FileUtils { + + public static boolean deleteFolder(File folder) { + File[] files = folder.listFiles(); + if (files != null) { + for (File f : files) { + if (f.isDirectory()) { + deleteFolder(f); + } else { + if (!f.delete()) { + } + } + } + } + return folder.delete(); + } +} \ No newline at end of file diff --git a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/utils/PreloadBufferPool.java b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/utils/PreloadBufferPool.java index 8955c5862..6f8426512 100644 --- a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/utils/PreloadBufferPool.java +++ b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/main/java/org/joyqueue/store/utils/PreloadBufferPool.java @@ -29,7 +29,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Comparator; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -37,6 +36,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -47,7 +47,7 @@ * Date: 2018-12-20 */ public class PreloadBufferPool { - private static final long INTERVAL_MS = 50L; + private static final long INTERVAL_MS = 1000L; private static final Logger logger = LoggerFactory.getLogger(PreloadBufferPool.class); // 缓存比率:如果非堆内存使用率超过这个比率,就不再申请内存,抛出OOM。 // 由于jvm在读写文件的时候会用到少量DirectBuffer作为缓存,必须预留一部分。 @@ -66,7 +66,10 @@ public class PreloadBufferPool { public static final String MAX_MEMORY_KEY = "PreloadBufferPool.MaxMemory"; private static final String WRITE_PAGE_EXTRA_WEIGHT_MS_KEY = "PreloadBufferPool.WritePageExtraWeightMs"; private static final String MAX_PAGE_AGE = "PreloadBufferPool.MaxPageAge"; - private static final int DEFAULT_MAX_PAGE_AGE = 1000 * 60 * 5; + private static final int DEFAULT_MAX_PAGE_AGE = 1000 * 60 * 10; + private static final String MAX_FD = "PreloadBufferPool.MaxFD"; + private static final int DEFAULT_MAX_FD = 3000; + private final LoopThread preloadThread; private final LoopThread metricThread; private final LoopThread evictThread; @@ -74,6 +77,7 @@ public class PreloadBufferPool { private final long coreMemorySize; private final long evictMemorySize; private final int maxPageAge; + private final int maxFd; // 正在写入的页在置换时有额外的权重,这个权重用时间Ms体现。 // 默认是60秒。 @@ -87,6 +91,11 @@ public class PreloadBufferPool { private final Map bufferCache = new ConcurrentHashMap<>(); private static PreloadBufferPool instance = null; + private final AtomicInteger directDestroyCounter = new AtomicInteger(); + private final AtomicInteger directAllocateCounter = new AtomicInteger(); + private final AtomicInteger mmapDestroyCounter = new AtomicInteger(); + private final AtomicInteger mmapAllocateCounter = new AtomicInteger(); + public static PreloadBufferPool getInstance() { if(null == instance) { instance = new PreloadBufferPool(); @@ -102,6 +111,7 @@ private PreloadBufferPool() { coreMemorySize = Math.round(maxMemorySize * CORE_RATIO); writePageExtraWeightMs = Long.parseLong(System.getProperty(WRITE_PAGE_EXTRA_WEIGHT_MS_KEY, String.valueOf(DEFAULT_WRITE_PAGE_EXTRA_WEIGHT_MS))); maxPageAge = Integer.parseInt(System.getProperty(MAX_PAGE_AGE, String.valueOf(DEFAULT_MAX_PAGE_AGE))); + maxFd = Integer.parseInt(System.getProperty(MAX_FD, String.valueOf(DEFAULT_MAX_FD))); preloadThread = buildPreloadThread(); preloadThread.start(); @@ -201,19 +211,29 @@ private LoopThread buildEvictThread() { return LoopThread.builder() .name("EvictThread") .sleepTime(INTERVAL_MS, INTERVAL_MS) - .condition(this::needEviction) .doWork(this::evict) .onException(e -> logger.warn("EvictThread exception:", e)) .daemon(true) .build(); } + protected boolean needEvictMmap() { + return mMapBufferHolders.size() >= maxFd; + } + /** - * 清除文件缓存页。LRU。 + * 清除文件缓存页 */ private void evict() { - // 清理超过maxCount的缓存页 +// maybeEvictMaxCount(); + maybeEvictExpired(); + maybeEvictDirect(); + maybeEvictMmap(); + } + + // 清理超过maxCount的缓存页 + protected void maybeEvictMaxCount() { for (PreLoadCache preLoadCache : bufferCache.values()) { if (!needEviction()) { break; @@ -224,33 +244,67 @@ private void evict() { } catch (NoSuchElementException ignored) {} } } + } - List> sortedPage = Stream.concat(directBufferHolders.stream(), mMapBufferHolders.stream()) + /** + * 清理超时页 + */ + protected void maybeEvictExpired() { + List> sortedAllPage = Stream.concat(directBufferHolders.stream(), mMapBufferHolders.stream()) .filter(BufferHolder::isFree) - .map(bufferHolder -> new LruWrapper<>(bufferHolder, bufferHolder.lastAccessTime(), bufferHolder.writable() ? writePageExtraWeightMs : 0L)) - .sorted(Comparator.comparing(LruWrapper::getWeight)) + .map(bufferHolder -> new LruWrapper<>(bufferHolder, bufferHolder.lastAccessTime(), 0L)) + .sorted(Comparator.comparing(LruWrapper::getLastAccessTime)) .collect(Collectors.toList()); - Iterator> sortedPageIterator = sortedPage.iterator(); - while (sortedPageIterator.hasNext()) { - LruWrapper lruWrapper = sortedPageIterator.next(); - if (SystemClock.now() - lruWrapper.getLastAccessTime() >= maxPageAge) { - lruWrapper.get().evict(); - sortedPageIterator.remove(); + while (!sortedAllPage.isEmpty()) { + LruWrapper bufferHolder = sortedAllPage.get(0); + if (SystemClock.now() - bufferHolder.get().lastAccessTime() >= maxPageAge) { + sortedAllPage.remove(0); + bufferHolder.get().evict(); } else { break; } } + } - // 清理使用中最旧的页面,直到内存占用率达标 - if (needEviction()) { - while (needEviction() && !sortedPage.isEmpty()) { - LruWrapper wrapper = sortedPage.remove(0); - BufferHolder holder = wrapper.get(); - if (holder.lastAccessTime() == wrapper.getLastAccessTime()) { - holder.evict(); - } - } + /** + * 清理mmap + */ + protected void maybeEvictMmap() { + if (!needEvictMmap()) { + return; + } + + List> sortedMmapPage = mMapBufferHolders.stream() + .map(bufferHolder -> new LruWrapper<>(bufferHolder, bufferHolder.lastAccessTime(), 0L)) + .sorted(Comparator.comparing(LruWrapper::getLastAccessTime)) + .collect(Collectors.toList()); + + int needEvictCount = mMapBufferHolders.size() - maxFd; + for (int i = 0; i < needEvictCount && !sortedMmapPage.isEmpty(); i++) { + LruWrapper bufferHolder = sortedMmapPage.remove(0); + bufferHolder.get().evict(); + } + } + + /** + * 清理DirectBuffer + */ + protected void maybeEvictDirect() { + if (!needEviction()) { + return; + } + + // 如果内存不足,清理使用中最旧的页面,直到内存占用率达标 + List> sortedDirectPage = directBufferHolders.stream() + .filter(BufferHolder::isFree) + .map(bufferHolder -> new LruWrapper<>(bufferHolder, bufferHolder.lastAccessTime(), 0L)) + .sorted(Comparator.comparing(LruWrapper::getLastAccessTime)) + .collect(Collectors.toList()); + + while (needEviction() && !sortedDirectPage.isEmpty()) { + LruWrapper bufferHolder = sortedDirectPage.remove(0); + bufferHolder.get().evict(); } } @@ -287,6 +341,7 @@ private void close() { private void destroyOne(ByteBuffer byteBuffer) { usedSize.getAndAdd(-1 * byteBuffer.capacity()); + directDestroyCounter.incrementAndGet(); releaseIfDirect(byteBuffer); } @@ -323,6 +378,7 @@ private void preLoadBuffer() { private ByteBuffer createOne(int size) { reserveMemory(size); + directAllocateCounter.incrementAndGet(); return ByteBuffer.allocateDirect(size); } @@ -375,8 +431,9 @@ private void releaseIfDirect(ByteBuffer byteBuffer) { } public void allocateMMap(BufferHolder bufferHolder) { - reserveMemory(bufferHolder.capacity()); +// reserveMemory(bufferHolder.capacity()); mMapBufferHolders.add(bufferHolder); + mmapAllocateCounter.incrementAndGet(); } public ByteBuffer allocateDirect(BufferHolder bufferHolder) { @@ -389,16 +446,12 @@ private ByteBuffer allocateDirect(int bufferSize) { try { PreLoadCache preLoadCache = bufferCache.get(bufferSize); if (null != preLoadCache) { - try { - ByteBuffer byteBuffer = preLoadCache.cache.remove(); - preLoadCache.onFlyCounter.getAndIncrement(); - return byteBuffer; - } catch (NoSuchElementException e) { - logger.debug("Pool is empty, create ByteBuffer: {}", bufferSize); - ByteBuffer byteBuffer = createOne(bufferSize); - preLoadCache.onFlyCounter.getAndIncrement(); - return byteBuffer; + ByteBuffer byteBuffer = preLoadCache.cache.poll(); + if (byteBuffer == null) { + byteBuffer = createOne(bufferSize); } + preLoadCache.onFlyCounter.getAndIncrement(); + return byteBuffer; } else { logger.warn("No cached buffer in pool, create ByteBuffer: {}", bufferSize); return createOne(bufferSize); @@ -428,8 +481,9 @@ public void releaseDirect(ByteBuffer byteBuffer, BufferHolder bufferHolder) { } public void releaseMMap(BufferHolder bufferHolder) { +// usedSize.getAndAdd(-1 * bufferHolder.capacity()); mMapBufferHolders.remove(bufferHolder); - usedSize.getAndAdd(-1 * bufferHolder.capacity()); + mmapDestroyCounter.incrementAndGet(); } @@ -466,7 +520,13 @@ public BufferPoolMonitorInfo monitorInfo() { bufferPoolMonitorInfo.setUsed(Format.formatSize(totalUsed)); bufferPoolMonitorInfo.setMaxMemorySize(Format.formatSize(maxMemorySize)); bufferPoolMonitorInfo.setMmpUsed(Format.formatSize(mmpUsed)); + bufferPoolMonitorInfo.setMmpFd(String.valueOf(mMapBufferHolders.size())); + bufferPoolMonitorInfo.setMmapAllocate(mmapAllocateCounter.get()); + bufferPoolMonitorInfo.setMmapDestroy(mmapDestroyCounter.get()); bufferPoolMonitorInfo.setDirectUsed(Format.formatSize(directUsed)); + bufferPoolMonitorInfo.setDirectFd(String.valueOf(directBufferHolders.size())); + bufferPoolMonitorInfo.setDirectAllocate(directAllocateCounter.get()); + bufferPoolMonitorInfo.setDirectDestroy(directDestroyCounter.get()); return bufferPoolMonitorInfo; } static class PreLoadCache { diff --git a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/test/java/org/joyqueue/store/PartitionGroupStoreManagerTest.java b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/test/java/org/joyqueue/store/PartitionGroupStoreManagerTest.java index 1f4727644..fb1507182 100644 --- a/joyqueue-server/joyqueue-store/joyqueue-store-core/src/test/java/org/joyqueue/store/PartitionGroupStoreManagerTest.java +++ b/joyqueue-server/joyqueue-store/joyqueue-store-core/src/test/java/org/joyqueue/store/PartitionGroupStoreManagerTest.java @@ -64,6 +64,7 @@ import static org.joyqueue.store.PartitionGroupStoreManager.Config.DEFAULT_MAX_MESSAGE_LENGTH; import static org.joyqueue.store.PartitionGroupStoreManager.Config.DEFAULT_WRITE_REQUEST_CACHE_SIZE; import static org.joyqueue.store.PartitionGroupStoreManager.Config.DEFAULT_WRITE_TIMEOUT_MS; +import static org.joyqueue.store.PartitionGroupStoreManager.Config.DEFAULT_ENQUEUE_TIMEOUT; /** * @author liyue25 @@ -175,7 +176,7 @@ public void batchWriteReadTest() throws Exception { @Test public void indexLengthTest() throws Exception { - int count = 1024 * 1024; + int count = 1024 * 10; long timeout = 500000L; long length = 0L; @@ -430,7 +431,7 @@ public void rePartitionTest() throws Exception { bufferPool.addPreLoad(512 * 1024, 2, 4); } PartitionGroupStoreManager.Config config = new PartitionGroupStoreManager.Config(DEFAULT_MAX_MESSAGE_LENGTH, DEFAULT_WRITE_REQUEST_CACHE_SIZE, DEFAULT_FLUSH_INTERVAL_MS, - DEFAULT_WRITE_TIMEOUT_MS, DEFAULT_MAX_DIRTY_SIZE, 6000, + DEFAULT_WRITE_TIMEOUT_MS, DEFAULT_MAX_DIRTY_SIZE, 6000, DEFAULT_ENQUEUE_TIMEOUT, new PositioningStore.Config(128 * 1024 * 1024), new PositioningStore.Config(512 * 1024)); @@ -479,7 +480,7 @@ public void changeFileSizeTest() throws Exception { bufferPool.addPreLoad(indexFileSize, 2, 4); } PartitionGroupStoreManager.Config config = new PartitionGroupStoreManager.Config(DEFAULT_MAX_MESSAGE_LENGTH, DEFAULT_WRITE_REQUEST_CACHE_SIZE, DEFAULT_FLUSH_INTERVAL_MS, - DEFAULT_WRITE_TIMEOUT_MS, DEFAULT_MAX_DIRTY_SIZE, 6000, + DEFAULT_WRITE_TIMEOUT_MS, DEFAULT_MAX_DIRTY_SIZE, 6000,DEFAULT_ENQUEUE_TIMEOUT, new PositioningStore.Config(storeFileSize, storeLoadOnRead, flushForce), new PositioningStore.Config(indexFileSize, indexLoadOnRead, flushForce)); @@ -517,7 +518,7 @@ public void changeFileSizeTest() throws Exception { } config = new PartitionGroupStoreManager.Config(DEFAULT_MAX_MESSAGE_LENGTH, DEFAULT_WRITE_REQUEST_CACHE_SIZE, DEFAULT_FLUSH_INTERVAL_MS, - DEFAULT_WRITE_TIMEOUT_MS, DEFAULT_MAX_DIRTY_SIZE, 6000, + DEFAULT_WRITE_TIMEOUT_MS, DEFAULT_MAX_DIRTY_SIZE, 6000, DEFAULT_ENQUEUE_TIMEOUT, new PositioningStore.Config(storeFileSize, storeLoadOnRead, flushForce), new PositioningStore.Config(indexFileSize, indexLoadOnRead, flushForce)); @@ -1009,7 +1010,7 @@ private void recoverStore() throws Exception { DEFAULT_MAX_MESSAGE_LENGTH, DEFAULT_WRITE_REQUEST_CACHE_SIZE, 1L, - DEFAULT_WRITE_TIMEOUT_MS, DEFAULT_MAX_DIRTY_SIZE, 6000, + DEFAULT_WRITE_TIMEOUT_MS, DEFAULT_MAX_DIRTY_SIZE, 6000, DEFAULT_ENQUEUE_TIMEOUT, new PositioningStore.Config(32 * 1024 * 1024), new PositioningStore.Config(128 * 1024,true, false));