Skip to content

Commit

Permalink
ISSUE apache#1527: Make ExplicitLAC persistent
Browse files Browse the repository at this point in the history
- bumped version of journal
- introduced fileInfoFormatVersionToWrite config
- testcases to validate combination of fileInfoFormatVersionToWrite and journalFormatVersionToWrite
  • Loading branch information
reddycharan committed Jul 19, 2018
1 parent 20cd0b4 commit c79778c
Show file tree
Hide file tree
Showing 16 changed files with 202 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,7 @@ public void process(int journalVersion, long offset, ByteBuffer recBuff) throws
+ ") is too old to hold this");
}
} else if (entryId == METAENTRY_ID_LEDGER_EXPLICITLAC) {
if (journalVersion >= JournalChannel.V5) {
if (journalVersion >= JournalChannel.V6) {
int explicitLacBufLength = recBuff.getInt();
ByteBuf explicitLacBuf = Unpooled.buffer(explicitLacBufLength);
byte[] explicitLacBufArray = new byte[explicitLacBufLength];
Expand All @@ -804,6 +804,18 @@ public void process(int journalVersion, long offset, ByteBuffer recBuff) throws
throw new IOException("Invalid journal. Contains explicitLAC " + " but layout version ("
+ journalVersion + ") is too old to hold this");
}
} else if (entryId < 0) {
/*
* this is possible if bookie code binary is rolledback
* to older version but when it is trying to read
* Journal which was created previously using newer
* code/journalversion, which introduced new special
* entry. So in anycase, if we see unrecognizable
* special entry while replaying journal we should skip
* (ignore) it.
*/
LOG.warn("Read unrecognizable entryId: {} for ledger: {} while replaying Journal. Skipping it",
entryId, ledgerId);
} else {
byte[] key = masterKeyCache.get(ledgerId);
if (key == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
// this FileInfo Header Version
int headerVersion;

public FileInfo(File lf, byte[] masterKey) throws IOException {
public FileInfo(File lf, byte[] masterKey, int fileInfoVersionToWrite) throws IOException {
super(WATCHER_RECYCLER);

this.lf = lf;
this.masterKey = masterKey;
mode = "rw";
this.headerVersion = CURRENT_HEADER_VERSION;
this.headerVersion = fileInfoVersionToWrite;
}

synchronized Long getLastAddConfirmed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ class FileInfoBackingCache {
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
final ConcurrentLongHashMap<CachedFileInfo> fileInfos = new ConcurrentLongHashMap<>();
final FileLoader fileLoader;
final int fileInfoVersionToWrite;

FileInfoBackingCache(FileLoader fileLoader) {
FileInfoBackingCache(FileLoader fileLoader, int fileInfoVersionToWrite) {
this.fileLoader = fileLoader;
this.fileInfoVersionToWrite = fileInfoVersionToWrite;
}

/**
Expand Down Expand Up @@ -125,7 +127,7 @@ class CachedFileInfo extends FileInfo {
final AtomicInteger refCount;

CachedFileInfo(long ledgerId, File lf, byte[] masterKey) throws IOException {
super(lf, masterKey);
super(lf, masterKey, fileInfoVersionToWrite);
this.ledgerId = ledgerId;
this.refCount = new AtomicInteger(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public IndexPersistenceMgr(int pageSize,

// build the file info cache
int concurrencyLevel = Math.max(1, Math.max(conf.getNumAddWorkerThreads(), conf.getNumReadWorkerThreads()));
fileInfoBackingCache = new FileInfoBackingCache(this::createFileInfoBackingFile);
fileInfoBackingCache = new FileInfoBackingCache(this::createFileInfoBackingFile,
conf.getFileInfoFormatVersionToWrite());
RemovalListener<Long, CachedFileInfo> fileInfoEvictionListener = this::handleLedgerEviction;
writeFileInfoCache = buildCache(
concurrencyLevel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,8 @@ static void writePaddingBytes(JournalChannel jc, ByteBuf paddingBuffer, int jour
private final boolean flushWhenQueueEmpty;
// should we hint the filesystem to remove pages from cache after force write
private final boolean removePagesFromCache;
private final int journalFormatVersionToWrite;
private final int journalAlignmentSize;

// Should data be fsynced on disk before triggering the callback
private final boolean syncData;
Expand Down Expand Up @@ -646,6 +648,8 @@ public Journal(int journalIndex, File journalDirectory, ServerConfiguration conf
this.maxGroupWaitInNanos = TimeUnit.MILLISECONDS.toNanos(conf.getJournalMaxGroupWaitMSec());
this.bufferedWritesThreshold = conf.getJournalBufferedWritesThreshold();
this.bufferedEntriesThreshold = conf.getJournalBufferedEntriesThreshold();
this.journalFormatVersionToWrite = conf.getJournalFormatVersionToWrite();
this.journalAlignmentSize = conf.getJournalAlignmentSize();
if (conf.getNumJournalCallbackThreads() > 0) {
this.cbThreadPool = Executors.newFixedThreadPool(conf.getNumJournalCallbackThreads(),
new DefaultThreadFactory("bookie-journal-callback"));
Expand Down Expand Up @@ -926,8 +930,7 @@ public void run() {
ByteBuf lenBuff = Unpooled.buffer(4);
ByteBuf paddingBuff = Unpooled.buffer(2 * conf.getJournalAlignmentSize());
paddingBuff.writeZero(paddingBuff.capacity());
final int journalFormatVersionToWrite = conf.getJournalFormatVersionToWrite();
final int journalAlignmentSize = conf.getJournalAlignmentSize();

BufferedChannel bc = null;
JournalChannel logFile = null;
forceWriteThread.start();
Expand Down Expand Up @@ -1099,7 +1102,17 @@ public void run() {
if (qe == null) { // no more queue entry
continue;
}
if (qe.entryId != Bookie.METAENTRY_ID_FORCE_LEDGER) {
if ((qe.entryId == Bookie.METAENTRY_ID_LEDGER_EXPLICITLAC)
&& (journalFormatVersionToWrite < JournalChannel.V6)) {
/*
* this means we are using new code which supports
* persisting explicitLac, but "journalFormatVersionToWrite"
* is set to some older value (< V6). In this case we
* shouldn't write this special entry
* (METAENTRY_ID_LEDGER_EXPLICITLAC) to Journal.
*/
qe.entry.release();
} else if (qe.entryId != Bookie.METAENTRY_ID_FORCE_LEDGER) {
int entrySize = qe.entry.readableBytes();
journalWriteBytes.add(entrySize);
journalQueueSize.dec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ class JournalChannel implements Closeable {
// 1) expanding header to 512
// 2) Padding writes to align sector size
static final int V5 = 5;
// Adding explicitlac entry
public static final int V6 = 6;

static final int HEADER_SIZE = SECTOR_SIZE; // align header to sector size
static final int VERSION_HEADER_SIZE = 8; // 4byte magic word, 4 byte version
static final int MIN_COMPAT_JOURNAL_FORMAT_VERSION = V1;
static final int CURRENT_JOURNAL_FORMAT_VERSION = V5;
static final int CURRENT_JOURNAL_FORMAT_VERSION = V6;

private final long preAllocSize;
private final int journalAlignSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@
class ReadOnlyFileInfo extends FileInfo {

public ReadOnlyFileInfo(File lf, byte[] masterKey) throws IOException {
super(lf, masterKey);
/*
* For ReadOnlyFile it is okay to initialize FileInfo with
* CURRENT_HEADER_VERSION, when fileinfo.readHeader is called it would
* read actual header version.
*/
super(lf, masterKey, FileInfo.CURRENT_HEADER_VERSION);
mode = "r";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
protected static final String PAGE_SIZE = "pageSize";
protected static final String FILEINFO_CACHE_INITIAL_CAPACITY = "fileInfoCacheInitialCapacity";
protected static final String FILEINFO_MAX_IDLE_TIME = "fileInfoMaxIdleTime";
protected static final String FILEINFO_FORMAT_VERSION_TO_WRITE = "fileInfoFormatVersionToWrite";
// Journal Parameters
protected static final String MAX_JOURNAL_SIZE = "journalMaxSizeMB";
protected static final String MAX_BACKUP_JOURNALS = "journalMaxBackups";
Expand Down Expand Up @@ -546,6 +547,27 @@ public ServerConfiguration setFileInfoMaxIdleTime(long idleTime) {
return this;
}

/**
* Get fileinfo format version to write.
*
* @return fileinfo format version to write.
*/
public int getFileInfoFormatVersionToWrite() {
return this.getInt(FILEINFO_FORMAT_VERSION_TO_WRITE, 0);
}

/**
* Set fileinfo format version to write.
*
* @param version
* fileinfo format version to write.
* @return server configuration.
*/
public ServerConfiguration setFileInfoFormatVersionToWrite(int version) {
this.setProperty(FILEINFO_FORMAT_VERSION_TO_WRITE, version);
return this;
}

/**
* Max journal file size.
*
Expand Down Expand Up @@ -2432,6 +2454,10 @@ public void validate() throws ConfigurationException {
throw new ConfigurationException(
"When entryLogPerLedger is enabled , it is unnecessary to use transactional compaction");
}
if ((getJournalFormatVersionToWrite() >= 6) ^ (getFileInfoFormatVersionToWrite() >= 1)) {
throw new ConfigurationException("For persisiting explicitLac, journalFormatVersionToWrite should be >= 6"
+ "and FileInfoFormatVersionToWrite should be >= 1");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private void writeIndexFileForLedger(File indexDir, long ledgerId,
throws Exception {
File fn = new File(indexDir, IndexPersistenceMgr.getLedgerName(ledgerId));
fn.getParentFile().mkdirs();
FileInfo fi = new FileInfo(fn, masterKey);
FileInfo fi = new FileInfo(fn, masterKey, FileInfo.CURRENT_HEADER_VERSION);
// force creation of index file
fi.write(new ByteBuffer[]{ ByteBuffer.allocate(0) }, 0);
fi.close(true);
Expand All @@ -89,7 +89,7 @@ private void writePartialIndexFileForLedger(File indexDir, long ledgerId,
throws Exception {
File fn = new File(indexDir, IndexPersistenceMgr.getLedgerName(ledgerId));
fn.getParentFile().mkdirs();
FileInfo fi = new FileInfo(fn, masterKey);
FileInfo fi = new FileInfo(fn, masterKey, FileInfo.CURRENT_HEADER_VERSION);
// force creation of index file
fi.write(new ByteBuffer[]{ ByteBuffer.allocate(0) }, 0);
fi.close(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,30 @@ public void testLedgerDeleteNotification() throws Exception {
}

@Test
public void testExplicitLacWriteToJournal() throws Exception {
public void testExplicitLacWriteToJournalWithValidVersions() throws Exception {
/*
* to persist explicitLac, journalFormatVersionToWrite should be atleast
* V6 and fileInfoFormatVersionToWrite should be atleast V1
*/
testExplicitLacWriteToJournal(6, 1);
}

@Test
public void testExplicitLacWriteToJournalWithOlderVersions() throws Exception {
/*
* to persist explicitLac, journalFormatVersionToWrite should be atleast
* V6 and fileInfoFormatVersionToWrite should be atleast V1
*/
testExplicitLacWriteToJournal(5, 0);
}

public void testExplicitLacWriteToJournal(int journalFormatVersionToWrite, int fileInfoFormatVersionToWrite)
throws Exception {
ServerConfiguration bookieServerConfig = bsConfs.get(0);
bookieServerConfig.setJournalFormatVersionToWrite(journalFormatVersionToWrite);
bookieServerConfig.setFileInfoFormatVersionToWrite(fileInfoFormatVersionToWrite);

restartBookies(bookieServerConfig);

ClientConfiguration confWithExplicitLAC = new ClientConfiguration();
confWithExplicitLAC.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
Expand Down Expand Up @@ -104,7 +126,7 @@ public void testExplicitLacWriteToJournal() throws Exception {
assertEquals("Read explicit LAC of rlh after wait for explicitlacflush", (numOfEntries - 1),
readExplicitLastConfirmed);

ServerConfiguration newBookieConf = new ServerConfiguration(bookieServerConfig);
ServerConfiguration newBookieConf = new ServerConfiguration(bsConfs.get(0));
/*
* by reusing bookieServerConfig and setting metadataServiceUri to null
* we can create/start new Bookie instance using the same data
Expand All @@ -121,17 +143,42 @@ public void testExplicitLacWriteToJournal() throws Exception {
newbookie.readJournal();
ByteBuf explicitLacBuf = newbookie.getExplicitLac(ledgerId);

DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes,
BookKeeper.DigestType.toProtoDigestType(digestType), confWithExplicitLAC.getUseV2WireProtocol());
long explicitLacPersistedInJournal = digestManager.verifyDigestAndReturnLac(explicitLacBuf);
assertEquals("explicitLac persisted in journal", (numOfEntries - 1), explicitLacPersistedInJournal);

if ((journalFormatVersionToWrite >= 6) && (fileInfoFormatVersionToWrite >= 1)) {
DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes,
BookKeeper.DigestType.toProtoDigestType(digestType), confWithExplicitLAC.getUseV2WireProtocol());
long explicitLacPersistedInJournal = digestManager.verifyDigestAndReturnLac(explicitLacBuf);
assertEquals("explicitLac persisted in journal", (numOfEntries - 1), explicitLacPersistedInJournal);
} else {
assertEquals("explicitLac is not expected to be persisted, so it should be null", null, explicitLacBuf);
}
bkcWithExplicitLAC.close();
}

@Test
public void testExplicitLacWriteToFileInfo() throws Exception {
public void testExplicitLacWriteToFileInfoWithValidVersions() throws Exception {
/*
* to persist explicitLac, journalFormatVersionToWrite should be atleast
* V6 and fileInfoFormatVersionToWrite should be atleast V1
*/
testExplicitLacWriteToFileInfo(6, 1);
}

@Test
public void testExplicitLacWriteToFileInfoWithOlderVersions() throws Exception {
/*
* to persist explicitLac, journalFormatVersionToWrite should be atleast
* V6 and fileInfoFormatVersionToWrite should be atleast V1
*/
testExplicitLacWriteToFileInfo(5, 0);
}

public void testExplicitLacWriteToFileInfo(int journalFormatVersionToWrite, int fileInfoFormatVersionToWrite)
throws Exception {
ServerConfiguration bookieServerConfig = bsConfs.get(0);
bookieServerConfig.setJournalFormatVersionToWrite(journalFormatVersionToWrite);
bookieServerConfig.setFileInfoFormatVersionToWrite(fileInfoFormatVersionToWrite);

restartBookies(bookieServerConfig);

ClientConfiguration confWithExplicitLAC = new ClientConfiguration();
confWithExplicitLAC.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
Expand Down Expand Up @@ -177,10 +224,15 @@ public void testExplicitLacWriteToFileInfo() throws Exception {
fileInfo.readHeader();
ByteBuf explicitLacBufReadFromFileInfo = fileInfo.getExplicitLac();

DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes,
BookKeeper.DigestType.toProtoDigestType(digestType), confWithExplicitLAC.getUseV2WireProtocol());
long explicitLacReadFromFileInfo = digestManager.verifyDigestAndReturnLac(explicitLacBufReadFromFileInfo);
assertEquals("explicitLac persisted in FileInfo", (numOfEntries - 1), explicitLacReadFromFileInfo);
if ((journalFormatVersionToWrite >= 6) && (fileInfoFormatVersionToWrite >= 1)) {
DigestManager digestManager = DigestManager.instantiate(ledgerId, passwdBytes,
BookKeeper.DigestType.toProtoDigestType(digestType), confWithExplicitLAC.getUseV2WireProtocol());
long explicitLacReadFromFileInfo = digestManager.verifyDigestAndReturnLac(explicitLacBufReadFromFileInfo);
assertEquals("explicitLac persisted in FileInfo", (numOfEntries - 1), explicitLacReadFromFileInfo);
} else {
assertEquals("explicitLac is not expected to be persisted, so it should be null", null,
explicitLacBufReadFromFileInfo);
}

bkcWithExplicitLAC.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void basicTest() throws Exception {
File f = new File(baseDir, String.valueOf(ledgerId));
f.deleteOnExit();
return f;
});
}, FileInfo.CURRENT_HEADER_VERSION);
CachedFileInfo fi = cache.loadFileInfo(1, masterKey);
Assert.assertEquals(fi.getRefCount(), 1);
CachedFileInfo fi2 = cache.loadFileInfo(2, masterKey);
Expand All @@ -116,7 +116,7 @@ public void testNoKey() throws Exception {
(ledgerId, createIfNotFound) -> {
Assert.assertFalse(createIfNotFound);
throw new Bookie.NoLedgerException(ledgerId);
});
}, FileInfo.CURRENT_HEADER_VERSION);
cache.loadFileInfo(1, null);
}

Expand All @@ -135,7 +135,7 @@ public void testForDeadlocks() throws Exception {
File f = new File(baseDir, String.valueOf(ledgerId));
f.deleteOnExit();
return f;
});
}, FileInfo.CURRENT_HEADER_VERSION);
Iterable<Future<Set<CachedFileInfo>>> futures =
IntStream.range(0, numRunners).mapToObj(
(i) -> {
Expand Down Expand Up @@ -194,7 +194,7 @@ public void testRefCountRace() throws Exception {
File f = new File(baseDir, String.valueOf(ledgerId));
f.deleteOnExit();
return f;
});
}, FileInfo.CURRENT_HEADER_VERSION);

Iterable<Future<Set<CachedFileInfo>>> futures =
IntStream.range(0, 2).mapToObj(
Expand Down Expand Up @@ -239,7 +239,7 @@ public void testRaceGuavaEvictAndReleaseBeforeRetain() throws Exception {
File f = new File(baseDir, String.valueOf(ledgerId));
f.deleteOnExit();
return f;
});
}, FileInfo.CURRENT_HEADER_VERSION);

Cache<Long, CachedFileInfo> guavaCache = CacheBuilder.newBuilder()
.maximumSize(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ static void writeLedgerDir(File dir,

File fn = new File(dir, IndexPersistenceMgr.getLedgerName(ledgerId));
fn.getParentFile().mkdirs();
FileInfo fi = new FileInfo(fn, masterKey);
FileInfo fi = new FileInfo(fn, masterKey, FileInfo.CURRENT_HEADER_VERSION);
// force creation of index file
fi.write(new ByteBuffer[]{ ByteBuffer.allocate(0) }, 0);
fi.close(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public ExplicitLacTest(Class<? extends LedgerStorage> storageClass) {
super(1);
this.digestType = DigestType.CRC32;
baseConf.setLedgerStorageClass(storageClass.getName());
/*
* to persist explicitLac, journalFormatVersionToWrite should be atleast
* V6 and fileInfoFormatVersionToWrite should be atleast V1
*/
baseConf.setJournalFormatVersionToWrite(6);
baseConf.setFileInfoFormatVersionToWrite(1);
}

@Parameters
Expand Down
Loading

0 comments on commit c79778c

Please sign in to comment.