From f0fa762e5b7bfaa5a39f97c44e841b78de877cea Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Tue, 30 Apr 2024 10:23:19 +0200 Subject: [PATCH] Optimised page file opening Instead of mapping the entire page file every time a segment is re-used, we map the page file once, and re-use the mapped buffer for each segment. The cache uses WeakReferences to ensure that a page file that is no longer used does not stay mapped. --- ChangeLog.txt | 1 + .../unsafequeues/PagedFilesAllocator.java | 37 +++++++++++++++++-- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index d9151c7d9..5764e651a 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,4 +1,5 @@ Version 0.18-SNAPSHOT: + [fix] Optimised page file opening for disk-based queues. (#837) [feature] Manage payload format indicator property, when set verify payload format. (#826) [refactoring] Refactory of PostOffice to pass publish message in hits entirety avoiding decomposition into single parameters. (#827) [feature] Add Netty native transport support on MacOS. Bundle all the native transport module by default (#806) diff --git a/broker/src/main/java/io/moquette/broker/unsafequeues/PagedFilesAllocator.java b/broker/src/main/java/io/moquette/broker/unsafequeues/PagedFilesAllocator.java index f005fe7bd..ceb371054 100644 --- a/broker/src/main/java/io/moquette/broker/unsafequeues/PagedFilesAllocator.java +++ b/broker/src/main/java/io/moquette/broker/unsafequeues/PagedFilesAllocator.java @@ -1,12 +1,17 @@ package io.moquette.broker.unsafequeues; import java.io.IOException; +import java.lang.ref.WeakReference; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Default implementation of SegmentAllocator. It uses a series of files (named pages) and split them in segments. @@ -15,7 +20,10 @@ * */ class PagedFilesAllocator implements SegmentAllocator { + private static final Logger LOG = LoggerFactory.getLogger(PagedFilesAllocator.class); + interface AllocationListener { + void segmentedCreated(String name, Segment segment); } @@ -27,6 +35,8 @@ interface AllocationListener { private MappedByteBuffer currentPage; private FileChannel currentPageFile; + private final Map> pageCache = new HashMap<>(); + PagedFilesAllocator(Path pagesFolder, int pageSize, int segmentSize, int lastPage, int lastSegmentAllocated) throws QueueException { if (pageSize % segmentSize != 0) { throw new IllegalArgumentException("The pageSize must be an exact multiple of the segmentSize"); @@ -36,11 +46,30 @@ interface AllocationListener { this.segmentSize = segmentSize; this.lastPage = lastPage; this.lastSegmentAllocated = lastSegmentAllocated; - this.currentPage = openRWPageFile(this.pagesFolder, this.lastPage); + this.currentPage = openOrRetrievePageFile(this.lastPage); + } + + private MappedByteBuffer openOrRetrievePageFile(int pageId) throws QueueException { + MappedByteBuffer pageBuffer = null; + WeakReference pageBufferRef = pageCache.get(pageId); + if (pageBufferRef != null) { + pageBuffer = pageBufferRef.get(); + } + if (pageBuffer == null) { + pageBuffer = openRWPageFile(pageId); + pageBufferRef = new WeakReference<>(pageBuffer); + WeakReference old = pageCache.put(pageId, pageBufferRef); + //Sanity check, should not happen... + if (old != null && old.get() != null) { + LOG.warn("Page file {} opened even though it already is open!", pageId); + } + } + return pageBuffer; } - private MappedByteBuffer openRWPageFile(Path pagesFolder, int pageId) throws QueueException { + private MappedByteBuffer openRWPageFile(int pageId) throws QueueException { final Path pageFile = pagesFolder.resolve(String.format("%d.page", pageId)); + LOG.debug("Opening page {} from file {}", pageId, pageFile); boolean createNew = false; if (!Files.exists(pageFile)) { try { @@ -71,7 +100,7 @@ private MappedByteBuffer openRWPageFile(Path pagesFolder, int pageId) throws Que public Segment nextFreeSegment() throws QueueException { if (currentPageIsExhausted()) { lastPage++; - currentPage = openRWPageFile(pagesFolder, lastPage); + currentPage = openOrRetrievePageFile(lastPage); lastSegmentAllocated = 0; } @@ -84,7 +113,7 @@ public Segment nextFreeSegment() throws QueueException { @Override public Segment reopenSegment(int pageId, int beginOffset) throws QueueException { - final MappedByteBuffer page = openRWPageFile(pagesFolder, pageId); + final MappedByteBuffer page = openOrRetrievePageFile(pageId); final SegmentPointer begin = new SegmentPointer(pageId, beginOffset); final SegmentPointer end = new SegmentPointer(pageId, beginOffset + segmentSize - 1); return new Segment(page, begin, end);