Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimised page file opening #837

Merged
merged 1 commit into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version 0.18-SNAPSHOT:
[fix] Optimised page file opening for disk-based queues. (#837)
[feature] Manage payload format indicator property, when set verify payload format. (#826)
[refactoring] Refactory of PostOffice to pass publish message in hits entirety avoiding decomposition into single parameters. (#827)
[feature] Add Netty native transport support on MacOS. Bundle all the native transport module by default (#806)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package io.moquette.broker.unsafequeues;

import java.io.IOException;
import java.lang.ref.WeakReference;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Default implementation of SegmentAllocator. It uses a series of files (named pages) and split them in segments.
Expand All @@ -15,7 +20,10 @@
* */
class PagedFilesAllocator implements SegmentAllocator {

private static final Logger LOG = LoggerFactory.getLogger(PagedFilesAllocator.class);

interface AllocationListener {

void segmentedCreated(String name, Segment segment);
}

Expand All @@ -27,6 +35,8 @@ interface AllocationListener {
private MappedByteBuffer currentPage;
private FileChannel currentPageFile;

private final Map<Integer, WeakReference<MappedByteBuffer>> pageCache = new HashMap<>();

PagedFilesAllocator(Path pagesFolder, int pageSize, int segmentSize, int lastPage, int lastSegmentAllocated) throws QueueException {
if (pageSize % segmentSize != 0) {
throw new IllegalArgumentException("The pageSize must be an exact multiple of the segmentSize");
Expand All @@ -36,11 +46,30 @@ interface AllocationListener {
this.segmentSize = segmentSize;
this.lastPage = lastPage;
this.lastSegmentAllocated = lastSegmentAllocated;
this.currentPage = openRWPageFile(this.pagesFolder, this.lastPage);
this.currentPage = openOrRetrievePageFile(this.lastPage);
}

private MappedByteBuffer openOrRetrievePageFile(int pageId) throws QueueException {
MappedByteBuffer pageBuffer = null;
WeakReference<MappedByteBuffer> pageBufferRef = pageCache.get(pageId);
if (pageBufferRef != null) {
pageBuffer = pageBufferRef.get();
}
if (pageBuffer == null) {
pageBuffer = openRWPageFile(pageId);
pageBufferRef = new WeakReference<>(pageBuffer);
WeakReference<MappedByteBuffer> old = pageCache.put(pageId, pageBufferRef);
//Sanity check, should not happen...
if (old != null && old.get() != null) {
LOG.warn("Page file {} opened even though it already is open!", pageId);
}
}
return pageBuffer;
}

private MappedByteBuffer openRWPageFile(Path pagesFolder, int pageId) throws QueueException {
private MappedByteBuffer openRWPageFile(int pageId) throws QueueException {
final Path pageFile = pagesFolder.resolve(String.format("%d.page", pageId));
LOG.debug("Opening page {} from file {}", pageId, pageFile);
boolean createNew = false;
if (!Files.exists(pageFile)) {
try {
Expand Down Expand Up @@ -71,7 +100,7 @@ private MappedByteBuffer openRWPageFile(Path pagesFolder, int pageId) throws Que
public Segment nextFreeSegment() throws QueueException {
if (currentPageIsExhausted()) {
lastPage++;
currentPage = openRWPageFile(pagesFolder, lastPage);
currentPage = openOrRetrievePageFile(lastPage);
lastSegmentAllocated = 0;
}

Expand All @@ -84,7 +113,7 @@ public Segment nextFreeSegment() throws QueueException {

@Override
public Segment reopenSegment(int pageId, int beginOffset) throws QueueException {
final MappedByteBuffer page = openRWPageFile(pagesFolder, pageId);
final MappedByteBuffer page = openOrRetrievePageFile(pageId);
final SegmentPointer begin = new SegmentPointer(pageId, beginOffset);
final SegmentPointer end = new SegmentPointer(pageId, beginOffset + segmentSize - 1);
return new Segment(page, begin, end);
Expand Down