Skip to content

Commit

Permalink
Release fd when MmapFile shutdown. It effects the unittest 'DLedgerMa…
Browse files Browse the repository at this point in the history
…ppedFileStoreTest#testAbnormalRecovery' or other case on Windows.
  • Loading branch information
iefangzh@163.com committed Nov 20, 2021
1 parent eb1ad9e commit cc2af63
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import io.openmessaging.storage.dledger.entry.DLedgerEntryCoder;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.store.DLedgerStore;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import io.openmessaging.storage.dledger.utils.IOUtils;
import io.openmessaging.storage.dledger.utils.Pair;
import io.openmessaging.storage.dledger.utils.PreConditions;
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -91,6 +91,8 @@ public void shutdown() {
persistCheckPoint();
cleanSpaceService.shutdown();
flushDataService.shutdown();
this.dataFileList.shutdown(0);
this.indexFileList.shutdown(0);
}

public long getWritePos() {
Expand Down Expand Up @@ -150,7 +152,7 @@ public void recover() {
byteBuffer.getInt(); //chain crc
byteBuffer.getInt(); //body crc
int bodySize = byteBuffer.getInt();
PreConditions.check(magic != MmapFileList.BLANK_MAGIC_CODE && magic >= MAGIC_1 && MAGIC_1 <= CURRENT_MAGIC, DLedgerResponseCode.DISK_ERROR, "unknown magic=%d", magic);
PreConditions.check(magic >= MAGIC_1, DLedgerResponseCode.DISK_ERROR, "unknown magic=%d", magic);
PreConditions.check(size > DLedgerEntry.HEADER_SIZE, DLedgerResponseCode.DISK_ERROR, "Size %d should > %d", size, DLedgerEntry.HEADER_SIZE);

PreConditions.check(pos == startPos, DLedgerResponseCode.DISK_ERROR, "pos %d != %d", pos, startPos);
Expand Down Expand Up @@ -298,9 +300,7 @@ public void recover() {
return;
}
logger.info("Recover to get committed index={} from checkpoint", committedIndexStr);
updateCommittedIndex(memberState.currTerm(), Long.valueOf(committedIndexStr));

return;
updateCommittedIndex(memberState.currTerm(), Long.parseLong(committedIndexStr));
}

private void reviseLedgerBeginIndex() {
Expand Down Expand Up @@ -676,7 +676,7 @@ public CleanSpaceService(String name, Logger logger) {
boolean enableForceClean = dLedgerConfig.isEnableDiskForceClean();
if (timeUp || checkExpired) {
int count = getDataFileList().deleteExpiredFileByTime(fileReservedTimeMs, 100, 120 * 1000, forceClean && enableForceClean);
if (count > 0 || (forceClean && enableForceClean) || isDiskFull) {
if (count > 0 || forceClean && enableForceClean || isDiskFull) {
logger.info("Clean space count={} timeUp={} checkExpired={} forceClean={} enableForceClean={} diskFull={} storeBaseRatio={} dataRatio={}",
count, timeUp, checkExpired, forceClean, enableForceClean, isDiskFull, storeBaseRatio, dataRatio);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.openmessaging.storage.dledger.store.file;

import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import io.openmessaging.storage.dledger.utils.IOUtils;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
Expand Down Expand Up @@ -296,6 +297,11 @@ public SelectMmapBufferResult selectMappedBuffer(int pos) {
return null;
}

@Override public void shutdown(long intervalForcibly) {
super.shutdown(intervalForcibly);
IOUtils.close(fileChannel, true);
}

@Override
public boolean getData(int pos, int size, ByteBuffer byteBuffer) {
if (byteBuffer.remaining() < size) {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/openmessaging/storage/dledger/utils/IOUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.openmessaging.storage.dledger.utils;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
Expand Down Expand Up @@ -326,4 +327,13 @@ public static void deleteFile(File file) {
}
}

public static void close(Closeable closeable, boolean silent) {
try {
closeable.close();
} catch (IOException e) {
if (!silent) {
throw new RuntimeException(e);
}
}
}
}

0 comments on commit cc2af63

Please sign in to comment.