Skip to content

Commit

Permalink
HBASE-27732 NPE in TestBasicWALEntryStreamFSHLog.testEOFExceptionInOl…
Browse files Browse the repository at this point in the history
…dWALsDirectory (apache#5119)

Add a 'closed' flag in WALProps in AbstractFSWAL to indicate that whether a WAL
file has been closed, if not, we will not try to archive it. Will mark it as
closed after we fully close it in the background close task, and try to archive
again.

Also modified some tests since now the archiving of a rolled WAL file is also
asynchronous, we need to wait instead of asserting directly.

Signed-off-by: Viraj Jasani <vjasani@apache.org>
(cherry picked from commit 230fdc0)
(cherry picked from commit e95b47e)
Change-Id: I29133cfe4b4695bf817b7abb1cca775f4a5eb88a
  • Loading branch information
Apache9 committed Mar 24, 2023
1 parent 158422a commit 5c8e36a
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 286 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,22 +288,29 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
final Comparator<Path> LOG_NAME_COMPARATOR =
(o1, o2) -> Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2));

private static final class WalProps {
private static final class WALProps {

/**
* Map the encoded region name to the highest sequence id.
* <p/>
* Contains all the regions it has an entry for.
*/
public final Map<byte[], Long> encodedName2HighestSequenceId;
private final Map<byte[], Long> encodedName2HighestSequenceId;

/**
* The log file size. Notice that the size may not be accurate if we do asynchronous close in
* sub classes.
*/
public final long logSize;
private final long logSize;

public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
/**
* If we do asynchronous close in sub classes, it is possible that when adding WALProps to the
* rolled map, the file is not closed yet, so in cleanOldLogs we should not archive this file,
* for safety.
*/
private volatile boolean closed = false;

WALProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
this.logSize = logSize;
}
Expand All @@ -313,7 +320,7 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
* Map of WAL log file to properties. The map is sorted by the log file creation timestamp
* (contained in the log file name).
*/
protected ConcurrentNavigableMap<Path, WalProps> walFile2Props =
protected final ConcurrentNavigableMap<Path, WALProps> walFile2Props =
new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);

/**
Expand All @@ -332,6 +339,9 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {

protected final AtomicBoolean rollRequested = new AtomicBoolean(false);

protected final ExecutorService closeExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());

// Run in caller if we get reject execution exception, to avoid aborting region server when we get
// reject execution exception. Usually this should not happen but let's make it more robust.
private final ExecutorService logArchiveExecutor =
Expand Down Expand Up @@ -655,7 +665,7 @@ Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
Map<byte[], List<byte[]>> regions = null;
int logCount = getNumRolledLogFiles();
if (logCount > this.maxLogs && logCount > 0) {
Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
Map.Entry<Path, WALProps> firstWALEntry = this.walFile2Props.firstEntry();
regions =
this.sequenceIdAccounting.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
}
Expand All @@ -678,14 +688,35 @@ Map<byte[], List<byte[]>> findRegionsToForceFlush() throws IOException {
return regions;
}

/**
* Mark this WAL file as closed and call cleanOldLogs to see if we can archive this file.
*/
protected final void markClosedAndClean(Path path) {
WALProps props = walFile2Props.get(path);
// typically this should not be null, but if there is no big issue if it is already null, so
// let's make the code more robust
if (props != null) {
props.closed = true;
cleanOldLogs();
}
}

/**
* Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
* <p/>
* Use synchronized because we may call this method in different threads, normally when replacing
* writer, and since now close writer may be asynchronous, we will also call this method in the
* closeExecutor, right after we actually close a WAL writer.
*/
private void cleanOldLogs() throws IOException {
private synchronized void cleanOldLogs() {
List<Pair<Path, Long>> logsToArchive = null;
// For each log file, look at its Map of regions to highest sequence id; if all sequence ids
// are older than what is currently in memory, the WAL can be GC'd.
for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
for (Map.Entry<Path, WALProps> e : this.walFile2Props.entrySet()) {
if (!e.getValue().closed) {
LOG.debug("{} is not closed yet, will try archiving it next time", e.getKey());
continue;
}
Path log = e.getKey();
Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
Expand Down Expand Up @@ -767,7 +798,7 @@ protected final void logRollAndSetupWalProps(Path oldPath, Path newPath, long ol
String newPathString = newPath != null ? CommonFSUtils.getPath(newPath) : null;
if (oldPath != null) {
this.walFile2Props.put(oldPath,
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
new WALProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
this.totalLogSize.addAndGet(oldFileLen);
LOG.info("Rolled WAL {} with entries={}, filesize={}; new WAL {}",
CommonFSUtils.getPath(oldPath), oldNumEntries, StringUtils.byteDesc(oldFileLen),
Expand Down Expand Up @@ -961,6 +992,20 @@ public Void call() throws Exception {
// and abort the region server
logArchiveExecutor.shutdown();
}
// we also need to wait logArchive to finish if we want to a graceful shutdown as we may still
// have some pending archiving tasks not finished yet, and in close we may archive all the
// remaining WAL files, there could be race if we do not wait for the background archive task
// finish
try {
if (!logArchiveExecutor.awaitTermination(walShutdownTimeout, TimeUnit.MILLISECONDS)) {
throw new TimeoutIOException("We have waited " + walShutdownTimeout + "ms, but"
+ " the shutdown of WAL doesn't complete! Please check the status of underlying "
+ "filesystem or increase the wait time by the config \"" + WAL_SHUTDOWN_WAIT_TIMEOUT_MS
+ "\"");
}
} catch (InterruptedException e) {
throw new InterruptedIOException("Interrupted when waiting for shutdown WAL");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -181,9 +180,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {

private final long batchSize;

private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());

private volatile AsyncFSOutput fsOut;

private final Deque<FSWALEntry> toWriteAppends = new ArrayDeque<>();
Expand Down Expand Up @@ -750,32 +746,40 @@ private void waitForSafePoint() {
}
}

protected final long closeWriter(AsyncWriter writer, Path path) {
if (writer != null) {
inflightWALClosures.put(path.getName(), writer);
long fileLength = writer.getLength();
closeExecutor.execute(() -> {
try {
writer.close();
} catch (IOException e) {
LOG.warn("close old writer failed", e);
} finally {
inflightWALClosures.remove(path.getName());
}
});
return fileLength;
} else {
return 0L;
}
private void closeWriter(AsyncWriter writer, Path path) {
inflightWALClosures.put(path.getName(), writer);
closeExecutor.execute(() -> {
try {
writer.close();
} catch (IOException e) {
LOG.warn("close old writer failed", e);
} finally {
// call this even if the above close fails, as there is no other chance we can set closed to
// true, it will not cause big problems.
markClosedAndClean(path);
inflightWALClosures.remove(path.getName());
}
});
}

@Override
protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter)
throws IOException {
Preconditions.checkNotNull(nextWriter);
waitForSafePoint();
long oldFileLen = closeWriter(this.writer, oldPath);
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
// we will call rollWriter in init method, where we want to create the first writer and
// obviously the previous writer is null, so here we need this null check. And why we must call
// logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean after
// closing the writer asynchronously, we need to make sure the WALProps is put into
// walFile2Props before we call markClosedAndClean
if (writer != null) {
long oldFileLen = writer.getLength();
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
closeWriter(writer, oldPath);
} else {
logRollAndSetupWalProps(oldPath, newPath, 0);
}

this.writer = nextWriter;
if (nextWriter instanceof AsyncProtobufLogWriter) {
this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -171,8 +169,6 @@ public class FSHLog extends AbstractFSWAL<Writer> {
private final AtomicInteger closeErrorCount = new AtomicInteger();

private final int waitOnShutdownInSeconds;
private final ExecutorService closeExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build());

/**
* Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs
Expand Down Expand Up @@ -379,28 +375,44 @@ protected void doReplaceWriter(Path oldPath, Path newPath, Writer nextWriter) th
LOG.warn(
"Failed sync-before-close but no outstanding appends; closing WAL" + e.getMessage());
}
long oldFileLen = 0L;
// It is at the safe point. Swap out writer from under the blocked writer thread.
// we will call rollWriter in init method, where we want to create the first writer and
// obviously the previous writer is null, so here we need this null check. And why we must
// call logRollAndSetupWalProps before closeWriter is that, we will call markClosedAndClean
// after closing the writer asynchronously, we need to make sure the WALProps is put into
// walFile2Props before we call markClosedAndClean
if (this.writer != null) {
oldFileLen = this.writer.getLength();
long oldFileLen = this.writer.getLength();
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);
// In case of having unflushed entries or we already reached the
// closeErrorsTolerated count, call the closeWriter inline rather than in async
// way so that in case of an IOE we will throw it back and abort RS.
inflightWALClosures.put(oldPath.getName(), writer);
if (isUnflushedEntries() || closeErrorCount.get() >= this.closeErrorsTolerated) {
closeWriter(this.writer, oldPath, true);
try {
closeWriter(this.writer, oldPath, true);
} finally {
inflightWALClosures.remove(oldPath.getName());
}
} else {
Writer localWriter = this.writer;
closeExecutor.execute(() -> {
try {
closeWriter(localWriter, oldPath, false);
} catch (IOException e) {
// We will never reach here.
LOG.warn("close old writer failed", e);
} finally {
// call this even if the above close fails, as there is no other chance we can set
// closed to true, it will not cause big problems.
markClosedAndClean(oldPath);
inflightWALClosures.remove(oldPath.getName());
}
});
}
} else {
logRollAndSetupWalProps(oldPath, newPath, 0);
}
logRollAndSetupWalProps(oldPath, newPath, oldFileLen);

this.writer = nextWriter;
if (nextWriter != null && nextWriter instanceof ProtobufLogWriter) {
this.hdfs_out = ((ProtobufLogWriter) nextWriter).getStream();
Expand Down Expand Up @@ -455,8 +467,6 @@ private void closeWriter(Writer writer, Path path, boolean syncCloseCall) throws
}
LOG.warn("Riding over failed WAL close of " + path
+ "; THIS FILE WAS NOT CLOSED BUT ALL EDITS SYNCED SO SHOULD BE OK", ioe);
} finally {
inflightWALClosures.remove(path.getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,10 +397,11 @@ public static ServerName getServerNameFromWALDirectoryName(Path logFile) {
serverName = ServerName.parseServerName(logDirName);
} catch (IllegalArgumentException | IllegalStateException ex) {
serverName = null;
LOG.warn("Cannot parse a server name from path=" + logFile + "; " + ex.getMessage());
LOG.warn("Cannot parse a server name from path={}", logFile, ex);
}
if (serverName != null && serverName.getStartcode() < 0) {
LOG.warn("Invalid log file path=" + logFile);
LOG.warn("Invalid log file path={}, start code {} is less than 0", logFile,
serverName.getStartcode());
serverName = null;
}
return serverName;
Expand Down Expand Up @@ -465,6 +466,11 @@ public static Path findArchivedLog(Path path, Configuration conf) throws IOExcep
}

ServerName serverName = getServerNameFromWALDirectoryName(path);
if (serverName == null) {
LOG.warn("Can not extract server name from path {}, "
+ "give up searching the separated old log dir", path);
return null;
}
// Try finding the log in separate old log dir
oldLogDir = new Path(walRootDir, new StringBuilder(HConstants.HREGION_OLDLOGDIR_NAME)
.append(Path.SEPARATOR).append(serverName.getServerName()).toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,11 @@ public void testWALRollWriting() throws Exception {
r.flush(true);
}
ADMIN.rollWALWriter(regionServer.getServerName());
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
assertTrue(("actual count: " + count), count <= 2);
TEST_UTIL.waitFor(5000, () -> {
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
return count <= 2;
});
}

private void setUpforLogRolling() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,11 @@ public void testRollWALWALWriter() throws Exception {
r.flush(true);
}
admin.rollWALWriter(regionServer.getServerName()).join();
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
assertTrue(("actual count: " + count), count <= 2);
TEST_UTIL.waitFor(5000, () -> {
int count = AbstractFSWALProvider.getNumRolledLogFiles(regionServer.getWAL(null));
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
return count <= 2;
});
}

private void setUpforLogRolling() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,8 @@ public void testFlushingWhenLogRolling() throws Exception {
// Roll the WAL. The log file count is less than maxLogs so no flush is triggered.
int currentNumRolledLogFiles = getNumRolledLogFiles(desiredRegion);
assertNull(getWAL(desiredRegion).rollWriter());
while (getNumRolledLogFiles(desiredRegion) <= currentNumRolledLogFiles) {
Thread.sleep(100);
}
TEST_UTIL.waitFor(60000,
() -> getNumRolledLogFiles(desiredRegion) > currentNumRolledLogFiles);
}
assertEquals(maxLogs, getNumRolledLogFiles(desiredRegion));
assertTrue(
Expand Down Expand Up @@ -529,7 +528,7 @@ public String explainFailure() throws Exception {
desiredRegion.getStore(FAMILY3).getMemStoreSize().getHeapSize());
// let WAL cleanOldLogs
assertNull(getWAL(desiredRegion).rollWriter(true));
assertTrue(getNumRolledLogFiles(desiredRegion) < maxLogs);
TEST_UTIL.waitFor(60000, () -> getNumRolledLogFiles(desiredRegion) < maxLogs);
} finally {
TEST_UTIL.shutdownMiniCluster();
}
Expand Down
Loading

0 comments on commit 5c8e36a

Please sign in to comment.