Skip to content

Commit

Permalink
This is PR #267
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Aug 6, 2019
2 parents ecb56b2 + 8ff4fa8 commit a9dd5b5
Show file tree
Hide file tree
Showing 29 changed files with 474 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private static void printPages(DescribeJournal describeJournal,
Page page = pgStore.createPage(pgid);
page.open();
List<PagedMessage> msgs = page.read(sm);
page.close();
page.close(false, false);

int msgID = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private void printPagedMessagesAsXML() {
Page page = pageStore.createPage(pageId);
page.open();
List<PagedMessage> messages = page.read(storageManager);
page.close();
page.close(false, false);

int messageId = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,10 @@ public final String getFileName() {

@Override
public final void delete() throws IOException, InterruptedException, ActiveMQException {
if (isOpen()) {
close();
}

try {
if (isOpen()) {
close(false);
}
Files.deleteIfExists(file.toPath());
} catch (Throwable t) {
logger.trace("Fine error while deleting file", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,36 +107,32 @@ public synchronized void close(boolean waitSync) throws IOException, Interrupted
}

super.close();

if (waitSync) {
final String fileName = this.getFileName();
try {
int waitCount = 0;
while (!pendingCallbacks.await(10, TimeUnit.SECONDS)) {
waitCount++;
if (waitCount == 1) {
final ThreadInfo[] threads = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
for (ThreadInfo threadInfo : threads) {
ActiveMQJournalLogger.LOGGER.warn(threadInfo.toString());
try {
if (waitSync) {
final String fileName = this.getFileName();
try {
int waitCount = 0;
while (!pendingCallbacks.await(10, TimeUnit.SECONDS)) {
waitCount++;
if (waitCount == 1) {
final ThreadInfo[] threads = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
for (ThreadInfo threadInfo : threads) {
ActiveMQJournalLogger.LOGGER.warn(threadInfo.toString());
}
factory.onIOError(new IOException("Timeout on close"), "Timeout on close", this);
}
factory.onIOError(new IOException("Timeout on close"), "Timeout on close", this);
ActiveMQJournalLogger.LOGGER.warn("waiting pending callbacks on " + fileName + " from " + (waitCount * 10) + " seconds!");
}
ActiveMQJournalLogger.LOGGER.warn("waiting pending callbacks on " + fileName + " from " + (waitCount * 10) + " seconds!");
} catch (InterruptedException e) {
ActiveMQJournalLogger.LOGGER.warn("interrupted while waiting pending callbacks on " + fileName, e);
throw e;
}
} catch (InterruptedException e) {
ActiveMQJournalLogger.LOGGER.warn("interrupted while waiting pending callbacks on " + fileName, e);
throw e;
} finally {

opened = false;

timedBuffer = null;

aioFile.close();

aioFile = null;

}
} finally {
opened = false;
timedBuffer = null;
aioFile.close();
aioFile = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void fill(int size) throws IOException {

@Override
public void delete() {
close();
close(false);
if (file.exists() && !file.delete()) {
ActiveMQJournalLogger.LOGGER.errorDeletingFile(this);
}
Expand Down Expand Up @@ -361,6 +361,8 @@ public void close() {
@Override
public void close(boolean waitOnSync) {
if (this.mappedFile != null) {
if (waitOnSync && factory.isDatasync())
this.mappedFile.force();
this.mappedFile.close();
this.mappedFile = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,17 @@ public void fill(final int size) throws IOException {

@Override
public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
close(true);
}

@Override
public synchronized void close(boolean waitSync) throws IOException, InterruptedException, ActiveMQException {
super.close();

try {
if (channel != null) {
if (waitSync && factory.isDatasync())
channel.force(false);
channel.close();
}
} catch (ClosedChannelException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ record = new byte[variableSize];
fileFactory.releaseDirectBuffer(wholeFileBuffer);
}
try {
file.getFile().close();
file.getFile().close(false);
} catch (Throwable ignored) {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
Expand Down Expand Up @@ -136,7 +135,6 @@ public int sendMessage(MessageReference ref,

ICoreMessage coreMessage = serverMessage.toCore();

LargeServerMessageImpl largeMessage = null;
ICoreMessage newServerMessage = serverMessage.toCore();
try {
StompSubscription subscription = subscriptions.get(consumer.getID());
Expand Down Expand Up @@ -179,13 +177,7 @@ public void run() throws Exception {
ActiveMQStompProtocolLogger.LOGGER.debug(e);
}
return 0;
} finally {
if (largeMessage != null) {
largeMessage.releaseResources();
largeMessage = null;
}
}

}

@Override
Expand Down Expand Up @@ -369,7 +361,7 @@ public void sendInternalLarge(CoreMessage message, boolean direct) throws Except

largeMessage.addBytes(bytes);

largeMessage.releaseResources();
largeMessage.releaseResources(true);

largeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, bytes.length);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ public PageCache getPageCache(final long pageId) {
if (!pagingStore.checkPageFileExists((int) pageId)) {
return null;
}
Page currentPage = pagingStore.getCurrentPage();
// Live page cache might be cleared by gc, we need to retrieve it otherwise partially written page cache is being returned
if (currentPage != null && currentPage.getPageId() == pageId && (cache = currentPage.getLiveCache()) != null) {
softCache.put(cache.getPageId(), cache);
return cache;
}
inProgressReadPage = inProgressReadPages.get(pageId);
if (inProgressReadPage == null) {
final CompletableFuture<PageCache> readPage = new CompletableFuture<>();
Expand Down Expand Up @@ -225,7 +231,7 @@ private PageCache readPage(long pageId,
} finally {
try {
if (page != null) {
page.close(false);
page.close(false, false);
}
} catch (Throwable ignored) {
}
Expand Down Expand Up @@ -521,13 +527,12 @@ protected void finishCleanup(ArrayList<Page> depagedPages) {
pgdMessagesList = depagedPage.read(storageManager);
} finally {
try {
depagedPage.close(false);
depagedPage.close(false, false);
} catch (Exception e) {
}

storageManager.afterPageRead();
}
depagedPage.close(false);
pgdMessages = pgdMessagesList.toArray(new PagedMessage[pgdMessagesList.size()]);
} else {
pgdMessages = cache.getMessages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,42 +382,35 @@ public PageIterator iterator(boolean browsing) {
private PagedReference internalGetNext(final PagePosition pos) {
PagePosition retPos = pos.nextMessage();

PageCache cache = cursorProvider.getPageCache(pos.getPageNr());

PageCache emptyCache = null;
if (cache != null && !cache.isLive() && retPos.getMessageNr() >= cache.getNumberOfMessages()) {
emptyCache = cache;
saveEmptyPageAsConsumedPage(emptyCache);
// The next message is beyond what's available at the current page, so we need to move to the next page
cache = null;
}

// it will scan for the next available page
while ((cache == null && retPos.getPageNr() <= pageStore.getCurrentWritingPage()) || (cache != null && retPos.getPageNr() <= pageStore.getCurrentWritingPage() && cache.getNumberOfMessages() == 0)) {
emptyCache = cache;
retPos = moveNextPage(retPos);
PageCache cache = null;

while (retPos.getPageNr() <= pageStore.getCurrentWritingPage()) {
cache = cursorProvider.getPageCache(retPos.getPageNr());

if (cache != null) {
saveEmptyPageAsConsumedPage(emptyCache);
/**
* In following cases, we should move to the next page
* case 1: cache == null means file might be deleted unexpectedly.
* case 2: cache is not live and contains no messages.
* case 3: cache is not live and next message is beyond what's available at the current page.
*/
if (cache == null || (!cache.isLive() && (retPos.getMessageNr() >= cache.getNumberOfMessages() || cache.getNumberOfMessages() == 0))) {
// Save current empty page every time we move to next page
saveEmptyPageAsConsumedPage(cache);
retPos = moveNextPage(retPos);
cache = null;
} else {
// We need to break loop to get message if cache is live or the next message number is in the range of current page
break;
}
}

if (cache == null) {
saveEmptyPageAsConsumedPage(emptyCache);

// it will be null in the case of the current writing page
return null;
} else {
if (cache != null) {
PagedMessage serverMessage = cache.getMessage(retPos.getMessageNr());

if (serverMessage != null) {
return cursorProvider.newReference(retPos, serverMessage, this);
} else {
return null;
}
}
return null;
}

private PagePosition moveNextPage(final PagePosition pos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public void setLiveCache(LivePageCache pageCache) {
this.pageCache = pageCache;
}

public LivePageCache getLiveCache() {
return pageCache;
}

public synchronized List<PagedMessage> read(StorageManager storage) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("reading page " + this.pageId + " on address = " + storeName);
Expand Down Expand Up @@ -259,8 +263,9 @@ private List<PagedMessage> readFromSequentialFile(StorageManager storage) throws
if (fileBuffer != null) {
fileFactory.releaseBuffer(fileBuffer);
}
if (file.position() != fileSize) {
file.position(fileSize);
size.lazySet(processedBytes);
if (file.position() != processedBytes) {
file.position(processedBytes);
}
}
}
Expand Down Expand Up @@ -303,15 +308,15 @@ public void open() throws Exception {
file.position(0);
}

public void close() throws Exception {
close(false);
public void close(boolean sendEvent) throws Exception {
close(sendEvent, true);
}

/**
* sendEvent means it's a close happening from a major event such moveNext.
* While reading the cache we don't need (and shouldn't inform the backup
*/
public synchronized void close(boolean sendEvent) throws Exception {
public synchronized void close(boolean sendEvent, boolean waitSync) throws Exception {
if (sendEvent && storageManager != null) {
storageManager.pageClosed(storeName, pageId);
}
Expand All @@ -320,7 +325,7 @@ public synchronized void close(boolean sendEvent) throws Exception {
// leave it to the soft cache to decide when to release it now
pageCache = null;
}
file.close();
file.close(waitSync);

Set<PageSubscriptionCounter> counters = getPendingCounters();
if (counters != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,15 @@ public void start() throws Exception {
currentPage = page;

cursorProvider.addPageCache(pageCache);

/**
* The page file might be incomplete in the cases: 1) last message incomplete 2) disk damaged.
* In case 1 we can keep writing the file. But in case 2 we'd better not bcs old data might be overwritten.
* Here we open a new page so the incomplete page would be reserved for recovery if needed.
*/
if (page.getSize() != page.getFile().size()) {
openNewPage();
}
}

// We will not mark it for paging if there's only a single empty file
Expand Down Expand Up @@ -573,7 +582,7 @@ public Page createPage(final int pageNumber) throws Exception {

file.position(0);

file.close();
file.close(false);

return page;
}
Expand Down

0 comments on commit a9dd5b5

Please sign in to comment.