Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Paging Fixes #267

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private static void printPages(DescribeJournal describeJournal,
Page page = pgStore.createPage(pgid);
page.open();
List<PagedMessage> msgs = page.read(sm);
page.close();
page.close(false, false);

int msgID = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private void printPagedMessagesAsXML() {
Page page = pageStore.createPage(pageId);
page.open();
List<PagedMessage> messages = page.read(storageManager);
page.close();
page.close(false, false);

int messageId = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,10 @@ public final String getFileName() {

@Override
public final void delete() throws IOException, InterruptedException, ActiveMQException {
if (isOpen()) {
close();
}

try {
if (isOpen()) {
close(false);
}
Files.deleteIfExists(file.toPath());
} catch (Throwable t) {
logger.trace("Fine error while deleting file", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,36 +107,32 @@ public synchronized void close(boolean waitSync) throws IOException, Interrupted
}

super.close();

if (waitSync) {
final String fileName = this.getFileName();
try {
int waitCount = 0;
while (!pendingCallbacks.await(10, TimeUnit.SECONDS)) {
waitCount++;
if (waitCount == 1) {
final ThreadInfo[] threads = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
for (ThreadInfo threadInfo : threads) {
ActiveMQJournalLogger.LOGGER.warn(threadInfo.toString());
try {
if (waitSync) {
final String fileName = this.getFileName();
try {
int waitCount = 0;
while (!pendingCallbacks.await(10, TimeUnit.SECONDS)) {
waitCount++;
if (waitCount == 1) {
final ThreadInfo[] threads = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
for (ThreadInfo threadInfo : threads) {
ActiveMQJournalLogger.LOGGER.warn(threadInfo.toString());
}
factory.onIOError(new IOException("Timeout on close"), "Timeout on close", this);
}
factory.onIOError(new IOException("Timeout on close"), "Timeout on close", this);
ActiveMQJournalLogger.LOGGER.warn("waiting pending callbacks on " + fileName + " from " + (waitCount * 10) + " seconds!");
}
ActiveMQJournalLogger.LOGGER.warn("waiting pending callbacks on " + fileName + " from " + (waitCount * 10) + " seconds!");
} catch (InterruptedException e) {
ActiveMQJournalLogger.LOGGER.warn("interrupted while waiting pending callbacks on " + fileName, e);
throw e;
}
} catch (InterruptedException e) {
ActiveMQJournalLogger.LOGGER.warn("interrupted while waiting pending callbacks on " + fileName, e);
throw e;
} finally {

opened = false;

timedBuffer = null;

aioFile.close();

aioFile = null;

}
} finally {
opened = false;
timedBuffer = null;
aioFile.close();
aioFile = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void fill(int size) throws IOException {

@Override
public void delete() {
close();
close(false);
if (file.exists() && !file.delete()) {
ActiveMQJournalLogger.LOGGER.errorDeletingFile(this);
}
Expand Down Expand Up @@ -361,6 +361,8 @@ public void close() {
@Override
public void close(boolean waitOnSync) {
if (this.mappedFile != null) {
if (waitOnSync && factory.isDatasync())
this.mappedFile.force();
this.mappedFile.close();
this.mappedFile = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,17 @@ public void fill(final int size) throws IOException {

@Override
public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
close(true);
}

@Override
public synchronized void close(boolean waitSync) throws IOException, InterruptedException, ActiveMQException {
super.close();

try {
if (channel != null) {
if (waitSync && factory.isDatasync())
channel.force(false);
channel.close();
}
} catch (ClosedChannelException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ record = new byte[variableSize];
fileFactory.releaseDirectBuffer(wholeFileBuffer);
}
try {
file.getFile().close();
file.getFile().close(false);
} catch (Throwable ignored) {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
Expand Down Expand Up @@ -136,7 +135,6 @@ public int sendMessage(MessageReference ref,

ICoreMessage coreMessage = serverMessage.toCore();

LargeServerMessageImpl largeMessage = null;
ICoreMessage newServerMessage = serverMessage.toCore();
try {
StompSubscription subscription = subscriptions.get(consumer.getID());
Expand Down Expand Up @@ -179,13 +177,7 @@ public void run() throws Exception {
ActiveMQStompProtocolLogger.LOGGER.debug(e);
}
return 0;
} finally {
if (largeMessage != null) {
largeMessage.releaseResources();
largeMessage = null;
}
}

}

@Override
Expand Down Expand Up @@ -369,7 +361,7 @@ public void sendInternalLarge(CoreMessage message, boolean direct) throws Except

largeMessage.addBytes(bytes);

largeMessage.releaseResources();
largeMessage.releaseResources(true);

largeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, bytes.length);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ public PageCache getPageCache(final long pageId) {
if (!pagingStore.checkPageFileExists((int) pageId)) {
return null;
}
Page currentPage = pagingStore.getCurrentPage();
// Live page cache might be cleared by gc, we need to retrieve it otherwise partially written page cache is being returned
if (currentPage != null && currentPage.getPageId() == pageId && (cache = currentPage.getLiveCache()) != null) {
softCache.put(cache.getPageId(), cache);
return cache;
}
inProgressReadPage = inProgressReadPages.get(pageId);
if (inProgressReadPage == null) {
final CompletableFuture<PageCache> readPage = new CompletableFuture<>();
Expand Down Expand Up @@ -225,7 +231,7 @@ private PageCache readPage(long pageId,
} finally {
try {
if (page != null) {
page.close(false);
page.close(false, false);
}
} catch (Throwable ignored) {
}
Expand Down Expand Up @@ -521,13 +527,12 @@ protected void finishCleanup(ArrayList<Page> depagedPages) {
pgdMessagesList = depagedPage.read(storageManager);
} finally {
try {
depagedPage.close(false);
depagedPage.close(false, false);
} catch (Exception e) {
}

storageManager.afterPageRead();
}
depagedPage.close(false);
pgdMessages = pgdMessagesList.toArray(new PagedMessage[pgdMessagesList.size()]);
} else {
pgdMessages = cache.getMessages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,42 +382,35 @@ public PageIterator iterator(boolean browsing) {
private PagedReference internalGetNext(final PagePosition pos) {
PagePosition retPos = pos.nextMessage();

PageCache cache = cursorProvider.getPageCache(pos.getPageNr());

PageCache emptyCache = null;
if (cache != null && !cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages()) {
emptyCache = cache;
saveEmptyPageAsConsumedPage(emptyCache);
// The next message is beyond what's available at the current page, so we need to move to the next page
cache = null;
}

// it will scan for the next available page
while ((cache == null && retPos.getPageNr() <= pageStore.getCurrentWritingPage()) || (cache != null && retPos.getPageNr() <= pageStore.getCurrentWritingPage() && cache.getNumberOfMessages() == 0)) {
emptyCache = cache;
retPos = moveNextPage(retPos);
PageCache cache = null;

while (retPos.getPageNr() <= pageStore.getCurrentWritingPage()) {
cache = cursorProvider.getPageCache(retPos.getPageNr());

if (cache != null) {
saveEmptyPageAsConsumedPage(emptyCache);
/**
* In following cases, we should move to the next page
* case 1: cache == null means file might be deleted unexpectedly.
* case 2: cache is not live and contains no messages.
* case 3: cache is not live and next message is beyond what's available at the current page.
*/
if (cache == null || (!cache.isLive() && (retPos.getMessageNr() >= cache.getNumberOfMessages() || cache.getNumberOfMessages() == 0))) {
// Save current empty page every time we move to next page
saveEmptyPageAsConsumedPage(cache);
retPos = moveNextPage(retPos);
cache = null;
} else {
// We need to break loop to get message if cache is live or the next message number is in the range of current page
break;
}
}

if (cache == null) {
saveEmptyPageAsConsumedPage(emptyCache);

// it will be null in the case of the current writing page
return null;
} else {
if (cache != null) {
PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());

if (serverMessage != null) {
return cursorProvider.newReference(retPos, serverMessage, this);
} else {
return null;
}
}
return null;
}

private PagePosition moveNextPage(final PagePosition pos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public void setLiveCache(LivePageCache pageCache) {
this.pageCache = pageCache;
}

public LivePageCache getLiveCache() {
return pageCache;
}

public synchronized List<PagedMessage> read(StorageManager storage) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("reading page " + this.pageId + " on address = " + storeName);
Expand Down Expand Up @@ -259,8 +263,9 @@ private List<PagedMessage> readFromSequentialFile(StorageManager storage) throws
if (fileBuffer != null) {
fileFactory.releaseBuffer(fileBuffer);
}
if (file.position() != fileSize) {
file.position(fileSize);
size.lazySet(processedBytes);
if (file.position() != processedBytes) {
file.position(processedBytes);
}
}
}
Expand Down Expand Up @@ -303,15 +308,15 @@ public void open() throws Exception {
file.position(0);
}

public void close() throws Exception {
close(false);
public void close(boolean sendEvent) throws Exception {
close(sendEvent, true);
}

/**
* sendEvent means it's a close happening from a major event such moveNext.
* While reading the cache we don't need (and shouldn't inform the backup
*/
public synchronized void close(boolean sendEvent) throws Exception {
public synchronized void close(boolean sendEvent, boolean waitSync) throws Exception {
if (sendEvent && storageManager != null) {
storageManager.pageClosed(storeName, pageId);
}
Expand All @@ -320,7 +325,7 @@ public synchronized void close(boolean sendEvent) throws Exception {
// leave it to the soft cache to decide when to release it now
pageCache = null;
}
file.close();
file.close(waitSync);

Set<PageSubscriptionCounter> counters = getPendingCounters();
if (counters != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,15 @@ public void start() throws Exception {
currentPage = page;

cursorProvider.addPageCache(pageCache);

/**
* The page file might be incomplete in the cases: 1) last message incomplete 2) disk damaged.
* In case 1 we can keep writing the file. But in case 2 we'd better not bcs old data might be overwritten.
* Here we open a new page so the incomplete page would be reserved for recovery if needed.
*/
if (page.getSize() != page.getFile().size()) {
openNewPage();
}
}

// We will not mark it for paging if there's only a single empty file
Expand Down Expand Up @@ -573,7 +582,7 @@ public Page createPage(final int pageNumber) throws Exception {

file.position(0);

file.close();
file.close(false);

return page;
}
Expand Down