Skip to content

Commit

Permalink
ARTEMIS-2414 Sync before closing file in case data loss
Browse files Browse the repository at this point in the history
(cherry picked from commit edace88)

downstream: ENTMQBR-2730
  • Loading branch information
wy96f authored and clebertsuconic committed Aug 5, 2019
1 parent 73c4ca8 commit fcff849
Show file tree
Hide file tree
Showing 26 changed files with 62 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private static void printPages(DescribeJournal describeJournal,
Page page = pgStore.createPage(pgid);
page.open();
List<PagedMessage> msgs = page.read(sm);
page.close();
page.close(false, false);

int msgID = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private void printPagedMessagesAsXML() {
Page page = pageStore.createPage(pageId);
page.open();
List<PagedMessage> messages = page.read(storageManager);
page.close();
page.close(false, false);

int messageId = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,10 @@ public final String getFileName() {

@Override
public final void delete() throws IOException, InterruptedException, ActiveMQException {
if (isOpen()) {
close();
}

try {
if (isOpen()) {
close(false);
}
Files.deleteIfExists(file.toPath());
} catch (Throwable t) {
logger.trace("Fine error while deleting file", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void fill(int size) throws IOException {

@Override
public void delete() {
close();
close(false);
if (file.exists() && !file.delete()) {
ActiveMQJournalLogger.LOGGER.errorDeletingFile(this);
}
Expand Down Expand Up @@ -361,7 +361,7 @@ public void close() {
@Override
public void close(boolean waitOnSync) {
if (this.mappedFile != null) {
if (factory.isDatasync())
if (waitOnSync && factory.isDatasync())
this.mappedFile.force();
this.mappedFile.close();
this.mappedFile = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,16 @@ public void fill(final int size) throws IOException {

@Override
public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
close(true);
}

@Override
public synchronized void close(boolean waitSync) throws IOException, InterruptedException, ActiveMQException {
super.close();

try {
if (channel != null) {
if (factory.isDatasync())
if (waitSync && factory.isDatasync())
channel.force(false);
channel.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ record = new byte[variableSize];
fileFactory.releaseDirectBuffer(wholeFileBuffer);
}
try {
file.getFile().close();
file.getFile().close(false);
} catch (Throwable ignored) {
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference;
Expand Down Expand Up @@ -136,7 +135,6 @@ public int sendMessage(MessageReference ref,

ICoreMessage coreMessage = serverMessage.toCore();

LargeServerMessageImpl largeMessage = null;
ICoreMessage newServerMessage = serverMessage.toCore();
try {
StompSubscription subscription = subscriptions.get(consumer.getID());
Expand Down Expand Up @@ -179,13 +177,7 @@ public void run() throws Exception {
ActiveMQStompProtocolLogger.LOGGER.debug(e);
}
return 0;
} finally {
if (largeMessage != null) {
largeMessage.releaseResources();
largeMessage = null;
}
}

}

@Override
Expand Down Expand Up @@ -369,7 +361,7 @@ public void sendInternalLarge(CoreMessage message, boolean direct) throws Except

largeMessage.addBytes(bytes);

largeMessage.releaseResources();
largeMessage.releaseResources(true);

largeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, bytes.length);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private PageCache readPage(long pageId,
} finally {
try {
if (page != null) {
page.close(false);
page.close(false, false);
}
} catch (Throwable ignored) {
}
Expand Down Expand Up @@ -527,13 +527,12 @@ protected void finishCleanup(ArrayList<Page> depagedPages) {
pgdMessagesList = depagedPage.read(storageManager);
} finally {
try {
depagedPage.close(false);
depagedPage.close(false, false);
} catch (Exception e) {
}

storageManager.afterPageRead();
}
depagedPage.close(false);
pgdMessages = pgdMessagesList.toArray(new PagedMessage[pgdMessagesList.size()]);
} else {
pgdMessages = cache.getMessages();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,15 +308,15 @@ public void open() throws Exception {
file.position(0);
}

public void close() throws Exception {
close(false);
public void close(boolean sendEvent) throws Exception {
close(sendEvent, true);
}

/**
* sendEvent means it's a close happening from a major event such moveNext.
* While reading the cache we don't need (and shouldn't inform the backup
*/
public synchronized void close(boolean sendEvent) throws Exception {
public synchronized void close(boolean sendEvent, boolean waitSync) throws Exception {
if (sendEvent && storageManager != null) {
storageManager.pageClosed(storeName, pageId);
}
Expand All @@ -325,7 +325,7 @@ public synchronized void close(boolean sendEvent) throws Exception {
// leave it to the soft cache to decide when to release it now
pageCache = null;
}
file.close();
file.close(waitSync);

Set<PageSubscriptionCounter> counters = getPendingCounters();
if (counters != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ public Page createPage(final int pageNumber) throws Exception {

file.position(0);

file.close();
file.close(false);

return page;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private static Message asLargeMessage(Message message, StorageManager storageMan
ActiveMQBuffer buffer = coreMessage.getReadOnlyBodyBuffer();
final int readableBytes = buffer.readableBytes();
lsm.addBytes(buffer);
lsm.releaseResources();
lsm.releaseResources(true);
lsm.putLongProperty(Message.HDR_LARGE_BODY_SIZE, readableBytes);
return lsm;
}
Expand Down Expand Up @@ -274,7 +274,7 @@ public ActiveMQBuffer getReadOnlyBodyBuffer() {
throw new RuntimeException(e);
} finally {
try {
file.close();
file.close(false);
} catch (Exception ignored) {
}
}
Expand All @@ -298,7 +298,7 @@ public int getBodyBufferSize() {
} finally {
if (closeFile) {
try {
file.close();
file.close(false);
} catch (Exception ignored) {
}
}
Expand All @@ -313,7 +313,7 @@ public boolean isLargeMessage() {
@Override
public synchronized void deleteFile() throws Exception {
validateFile();
releaseResources();
releaseResources(false);
storageManager.deleteLargeMessageFile(this);
}

Expand All @@ -328,11 +328,13 @@ public synchronized int getMemoryEstimate() {
}

@Override
public synchronized void releaseResources() {
public synchronized void releaseResources(boolean sync) {
if (file != null && file.isOpen()) {
try {
file.sync();
file.close();
if (sync) {
file.sync();
}
file.close(false);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.largeMessageErrorReleasingResources(e);
}
Expand Down Expand Up @@ -408,7 +410,7 @@ public Message copy(final long newID) {
file.position(oldPosition);

if (!originallyOpen) {
file.close();
file.close(false);
newMessage.getFile().close();
}

Expand All @@ -435,7 +437,7 @@ private long getBodySize() throws ActiveMQException {
} else {
SequentialFile tmpFile = createFile();
bodySize = tmpFile.size();
tmpFile.close();
tmpFile.close(false);
}
}
return bodySize;
Expand Down Expand Up @@ -517,7 +519,7 @@ class DecodingContext implements LargeBodyEncoder {
public void open() throws ActiveMQException {
try {
if (cFile != null && cFile.isOpen()) {
cFile.close();
cFile.close(false);
}
cFile = file.cloneFile();
cFile.open();
Expand All @@ -530,7 +532,7 @@ public void open() throws ActiveMQException {
public void close() throws ActiveMQException {
try {
if (cFile != null) {
cFile.close();
cFile.close(false);
}
} catch (Exception e) {
throw new ActiveMQInternalErrorException(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ public synchronized Message setMessageID(long id) {
}

@Override
public synchronized void releaseResources() {
public synchronized void releaseResources(boolean sync) {
if (logger.isTraceEnabled()) {
logger.trace("release resources called on " + mainLM, new Exception("trace"));
}
mainLM.releaseResources();
mainLM.releaseResources(sync);
if (appendFile != null && appendFile.isOpen()) {
try {
appendFile.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class NullStorageLargeServerMessage extends CoreMessage implements LargeServerMe
}

@Override
public void releaseResources() {
public void releaseResources(boolean sync) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ private void sendContinuations(final int packetSize,
currentLargeMessage.addBytes(body);

if (!continues) {
currentLargeMessage.releaseResources();
currentLargeMessage.releaseResources(true);

if (messageBodySize >= 0) {
currentLargeMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, messageBodySize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ public interface ReplicatedLargeMessage {
Message setMessageID(long id);

/**
* @see org.apache.activemq.artemis.core.server.LargeServerMessage#releaseResources()
* @see org.apache.activemq.artemis.core.server.LargeServerMessage#releaseResources(boolean)
*/
void releaseResources();
void releaseResources(boolean sync);

/**
* @see org.apache.activemq.artemis.core.server.LargeServerMessage#deleteFile()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ public synchronized void stop() throws Exception {
}

for (ReplicatedLargeMessage largeMessage : largeMessages.values()) {
largeMessage.releaseResources();
largeMessage.releaseResources(true);
}
largeMessages.clear();

Expand All @@ -343,7 +343,6 @@ public synchronized void stop() throws Exception {
for (ConcurrentMap<Integer, Page> map : pageIndex.values()) {
for (Page page : map.values()) {
try {
page.sync();
page.close(false);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorClosingPageOnReplication(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public interface LargeServerMessage extends ReplicatedLargeMessage, ICoreMessage
* Close the files if opened
*/
@Override
void releaseResources();
void releaseResources(boolean sync);

@Override
void deleteFile() throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,7 @@ public void finish() throws Exception {
context = null;
}

largeMessage.releaseResources();
largeMessage.releaseResources(false);

largeMessage.decrementDelayDeletionCount();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void testSkipLivePageCache() {
public void testSkipNullPageCache() throws Exception {
skipNullPageCache = true;
// Simulate scenario #2 depicted in https://issues.apache.org/jira/browse/ARTEMIS-2418
queue.getPageSubscription().getPagingStore().getCurrentPage().close();
queue.getPageSubscription().getPagingStore().getCurrentPage().close(false);

PagedReference ref = queue.getPageSubscription().iterator().next();
assertTrue("first msg should not be " + (ref == null ? "null" : ref.getPagedMessage().getMessage().getMessageID()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2305,7 +2305,7 @@ public void testLargeMessageBodySize() throws Exception {
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);

fileMessage.releaseResources();
fileMessage.releaseResources(false);

Assert.assertEquals(largeMessageSize, fileMessage.getBodyBufferSize());
}
Expand All @@ -2332,7 +2332,7 @@ public void testSendServerMessage() throws Exception {
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, largeMessageSize);

fileMessage.releaseResources();
fileMessage.releaseResources(false);

session.createQueue(ADDRESS, ADDRESS, true);

Expand Down Expand Up @@ -2497,7 +2497,7 @@ public void testGlobalSizeBytesAndAddressSize(boolean isPage) throws Exception {
fileMessage.addBytes(new byte[]{ActiveMQTestBase.getSamplebyte(i)});
}

fileMessage.releaseResources();
fileMessage.releaseResources(false);

session.createQueue(ADDRESS, ADDRESS, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void testSendServerMessage() throws Exception {
// The server would be doing this
fileMessage.putLongProperty(Message.HDR_LARGE_BODY_SIZE, 2 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);

fileMessage.releaseResources();
fileMessage.releaseResources(false);

session.createQueue("A", RoutingType.ANYCAST, "A");

Expand Down Expand Up @@ -331,7 +331,7 @@ public File getJavaFile() {
largeServerMessage.setMessageID(1234);
largeServerMessage.addBytes(new byte[0]);
assertTrue(open.get());
largeServerMessage.releaseResources();
largeServerMessage.releaseResources(true);
assertTrue(sync.get());
}

Expand Down

0 comments on commit fcff849

Please sign in to comment.