Skip to content

Commit

Permalink
re apache#3724: Tablet transaction log
Browse files Browse the repository at this point in the history
* Updated to log expected as well as current
  • Loading branch information
ivakegg committed Sep 6, 2023
1 parent 7932235 commit d6d0ebd
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,10 @@ public void logTransactions() {

public void checkTransactionLog() {
Set<StoredTabletFile> files = datafileSizes.keySet();
if (!tabletLog.isExpectedFiles(files)) {
log.error("In-memory files {} do not match transaction log {}", files);
Set<StoredTabletFile> expected = tabletLog.getExpectedFiles();
if (!expected.equals(files)) {
log.error("In-memory files {} do not match transaction log {}", new TreeSet<>(files),
new TreeSet<>(expected));
logTransactions();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public boolean isEmpty() {
return this.log.isEmpty();
}

public boolean isExpectedFiles(Set<StoredTabletFile> files) {
return this.log.isExpectedFiles(files);
public Set<StoredTabletFile> getExpectedFiles() {
return this.log.getExpectedFiles();
}

public void compacted(Set<StoredTabletFile> files, Optional<StoredTabletFile> output) {
Expand Down Expand Up @@ -240,8 +240,8 @@ List<DatafileTransaction> getTransactions() {
return tabletLog.toList();
}

boolean isExpectedFiles(Set<StoredTabletFile> expected) {
return new HashSet<>(Arrays.asList(finalFiles)).equals(new HashSet<>(expected));
Set<StoredTabletFile> getExpectedFiles() {
return new HashSet<>(Arrays.asList(finalFiles));
}

boolean isEmpty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private DatafileTransactionLog createLog(final Set<StoredTabletFile> initialFile
setMaxSize(maxSize);
DatafileTransactionLog log = new DatafileTransactionLog(FOO_EXTENT, initialFiles,
factory.getTableConfiguration(FOO.getId()));
assertTrue(log.isExpectedFiles(initialFiles));
assertEquals(log.getExpectedFiles(), initialFiles);
return log;
}

Expand All @@ -108,7 +108,7 @@ public void testFlushed() throws InterruptedException {

Thread.sleep(2);
log.flushed(Optional.empty());
assertTrue(log.isExpectedFiles(initialFiles));
assertEquals(log.getExpectedFiles(), initialFiles);
assertEquals(0, log.getTransactions().size());
assertTrue(log.getInitialDate().getTime() > logDate);
logDate = log.getInitialDate().getTime();
Expand All @@ -119,7 +119,7 @@ public void testFlushed() throws InterruptedException {
StoredTabletFile flushedFile =
new StoredTabletFile("file://accumulo/tables/1/default_tablet/Ffile1.rf");
log.flushed(Optional.of(flushedFile));
assertTrue(log.isExpectedFiles(Sets.newHashSet(initialFile, flushedFile)));
assertEquals(log.getExpectedFiles(), Sets.newHashSet(initialFile, flushedFile));
assertEquals(0, log.getTransactions().size());
assertTrue(log.getInitialDate().getTime() > logDate);
assertNotEquals(dump, log.dumpLog());
Expand All @@ -138,7 +138,7 @@ public void testBulkImport() throws InterruptedException {
StoredTabletFile importedFile =
new StoredTabletFile("file://accumulo/tables/1/default_tablet/Ifile1.rf");
log.bulkImported(importedFile);
assertTrue(log.isExpectedFiles(Sets.newHashSet(initialFile, importedFile)));
assertEquals(log.getExpectedFiles(), Sets.newHashSet(initialFile, importedFile));
assertEquals(0, log.getTransactions().size());
assertTrue(log.getInitialDate().getTime() > logDate);
assertNotEquals(dump, log.dumpLog());
Expand All @@ -161,7 +161,7 @@ public void testCompacted() throws InterruptedException {
StoredTabletFile compactedFile =
new StoredTabletFile("file://accumulo/tables/1/default_tablet/Cfile1.rf");
log.compacted(Sets.newHashSet(initialFile1, initialFile2), Optional.of(compactedFile));
assertTrue(log.isExpectedFiles(Sets.newHashSet(initialFile3, compactedFile)));
assertEquals(log.getExpectedFiles(), Sets.newHashSet(initialFile3, compactedFile));
assertEquals(0, log.getTransactions().size());
assertTrue(log.getInitialDate().getTime() > logDate);
assertNotEquals(dump, log.dumpLog());
Expand All @@ -170,7 +170,7 @@ public void testCompacted() throws InterruptedException {

Thread.sleep(2);
log.compacted(Sets.newHashSet(compactedFile), Optional.empty());
assertTrue(log.isExpectedFiles(Sets.newHashSet(initialFile3)));
assertEquals(log.getExpectedFiles(), Sets.newHashSet(initialFile3));
assertEquals(0, log.getTransactions().size());
assertTrue(log.getInitialDate().getTime() > logDate);
assertNotEquals(dump, log.dumpLog());
Expand All @@ -193,8 +193,8 @@ public void testNonEmptyLog() throws InterruptedException {
StoredTabletFile importedFile =
new StoredTabletFile("file://accumulo/tables/1/default_tablet/Ifile1.rf");
log.bulkImported(importedFile);
assertTrue(log
.isExpectedFiles(Sets.newHashSet(initialFile1, initialFile2, initialFile3, importedFile)));
assertTrue(log.getExpectedFiles()
.equals(Sets.newHashSet(initialFile1, initialFile2, initialFile3, importedFile)));
List<DatafileTransaction> logs = log.getTransactions();
assertEquals(1, logs.size());
assertEquals(logDate, log.getInitialDate().getTime());
Expand All @@ -205,7 +205,8 @@ public void testNonEmptyLog() throws InterruptedException {
StoredTabletFile compactedFile =
new StoredTabletFile("file://accumulo/tables/1/default_tablet/Cfile1.rf");
log.compacted(Sets.newHashSet(initialFile3, importedFile), Optional.of(compactedFile));
assertTrue(log.isExpectedFiles(Sets.newHashSet(initialFile1, initialFile2, compactedFile)));
assertEquals(log.getExpectedFiles(),
Sets.newHashSet(initialFile1, initialFile2, compactedFile));
assertEquals(logDate, log.getInitialDate().getTime());
logs = log.getTransactions();
assertEquals(2, logs.size());
Expand All @@ -214,7 +215,7 @@ public void testNonEmptyLog() throws InterruptedException {

Thread.sleep(2);
log.compacted(Sets.newHashSet(compactedFile), Optional.empty());
assertTrue(log.isExpectedFiles(Sets.newHashSet(initialFile1, initialFile2)));
assertEquals(log.getExpectedFiles(), Sets.newHashSet(initialFile1, initialFile2));
assertEquals(logDate, log.getInitialDate().getTime());
logs = log.getTransactions();
assertEquals(3, logs.size());
Expand All @@ -237,7 +238,7 @@ public void testNonEmptyLog() throws InterruptedException {
StoredTabletFile flushedFile =
new StoredTabletFile("file://accumulo/tables/1/default_tablet/Ffile1.rf");
log.flushed(Optional.of(flushedFile));
assertTrue(log.isExpectedFiles(Sets.newHashSet(initialFile1, initialFile2, flushedFile)));
assertEquals(log.getExpectedFiles(), Sets.newHashSet(initialFile1, initialFile2, flushedFile));
assertEquals(logs.get(0).ts, log.getInitialDate().getTime());
logs = log.getTransactions();
assertEquals(3, logs.size());
Expand All @@ -260,7 +261,7 @@ public void testNonEmptyLog() throws InterruptedException {
Thread.sleep(2);
setMaxSize(1);
log.flushed(Optional.empty());
assertTrue(log.isExpectedFiles(Sets.newHashSet(initialFile1, initialFile2, flushedFile)));
assertEquals(log.getExpectedFiles(), Sets.newHashSet(initialFile1, initialFile2, flushedFile));
// we went from 3 to 1 logs, so the first one should not be the last of the original logs
assertEquals(logs.get(2).ts, log.getInitialDate().getTime());
logs = log.getTransactions();
Expand Down

0 comments on commit d6d0ebd

Please sign in to comment.