Skip to content

Commit

Permalink
re apache#3724: tablet transaction log
Browse files Browse the repository at this point in the history
* Lazy creation of the transactions to avoid creation when disabled
* enable/disable testing
  • Loading branch information
ivakegg committed Sep 29, 2023
1 parent 1ee31e8 commit 1837082
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Supplier;

import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
Expand Down Expand Up @@ -129,39 +130,40 @@ public Set<StoredTabletFile> getExpectedFiles() {
* @param files The files that were compacted
* @param output The destination file
*/
public void compacted(Set<StoredTabletFile> files, Optional<StoredTabletFile> output,
Set<StoredTabletFile> newFiles) {
addTransaction(new TabletTransaction.Compacted(files, output), newFiles);
public void compacted(final Set<StoredTabletFile> files, final Optional<StoredTabletFile> output,
final Set<StoredTabletFile> newFiles) {
addTransaction(() -> new TabletTransaction.Compacted(files, output), newFiles);
}

/**
* Add a flush transaction
*
* @param newDatafile The new flushed file
*/
public void flushed(Optional<StoredTabletFile> newDatafile, Set<StoredTabletFile> newFiles) {
addTransaction(new TabletTransaction.Flushed(newDatafile), newFiles);
public void flushed(final Optional<StoredTabletFile> newDatafile,
final Set<StoredTabletFile> newFiles) {
addTransaction(() -> new TabletTransaction.Flushed(newDatafile), newFiles);
}

/**
* Add a bulk import transaction
*
* @param file the new bulk import file
*/
public void bulkImported(StoredTabletFile file, Set<StoredTabletFile> newFiles) {
addTransaction(new TabletTransaction.BulkImported(file), newFiles);
public void bulkImported(final StoredTabletFile file, final Set<StoredTabletFile> newFiles) {
addTransaction(() -> new TabletTransaction.BulkImported(file), newFiles);
}

/**
* Add a transaction to the log. This will trim the size of the log if needed.
*
* @param transaction The transaction to add
* @param transactionFactory The transaction supplier
*/
private synchronized void addTransaction(TabletTransaction transaction,
private synchronized void addTransaction(Supplier<TabletTransaction> transactionFactory,
Set<StoredTabletFile> newFiles) {
if (isEnabled()) {
this.log.setCapacity(getMaxSize());
this.log.addTransaction(transaction);
this.log.addTransaction(transactionFactory.get());
checkTransactionLog(newFiles);
} else {
this.log.reset(newFiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,70 @@ public void testNonEmptyLog() throws InterruptedException {
assertEquals(Optional.empty(), ((TabletTransaction.Flushed) logs.get(0)).getFlushFile());
}

@Test
public void testEnableDisable() throws InterruptedException {
StoredTabletFile initialFile1 =
new StoredTabletFile("file://accumulo/tables/1/default_tablet/Afile1.rf");
StoredTabletFile initialFile2 =
new StoredTabletFile("file://accumulo/tables/1/default_tablet/Afile2.rf");
StoredTabletFile initialFile3 =
new StoredTabletFile("file://accumulo/tables/1/default_tablet/Afile3.rf");
Set<StoredTabletFile> initialFiles = Sets.newHashSet(initialFile1, initialFile2, initialFile3);
TabletTransactionLog log = createLog(initialFiles, 3);

StoredTabletFile importedFile =
new StoredTabletFile("file://accumulo/tables/1/default_tablet/Ifile1.rf");
Set<StoredTabletFile> expectedFiles =
Sets.newHashSet(initialFile1, initialFile2, initialFile3, importedFile);
log.bulkImported(importedFile, expectedFiles);
StoredTabletFile compactedFile =
new StoredTabletFile("file://accumulo/tables/1/default_tablet/Cfile1.rf");
expectedFiles = Sets.newHashSet(initialFile1, initialFile2, compactedFile);
log.compacted(Sets.newHashSet(initialFile3, importedFile), Optional.of(compactedFile),
expectedFiles);
expectedFiles = Sets.newHashSet(initialFile1, initialFile2);
log.compacted(Sets.newHashSet(compactedFile), Optional.empty(), expectedFiles);

List<TabletTransaction> logs = log.getTransactions();
assertEquals(3, logs.size());

// now disable the log
enableLog(false);

// still has transactions until a new one is added
logs = log.getTransactions();
assertEquals(3, logs.size());

// another transaction
StoredTabletFile flushedFile =
new StoredTabletFile("file://accumulo/tables/1/default_tablet/Ffile1.rf");
expectedFiles = Sets.newHashSet(initialFile1, initialFile2, flushedFile);
log.flushed(Optional.of(flushedFile), expectedFiles);

// no more transactions
logs = log.getTransactions();
assertEquals(0, logs.size());

// add a transaction
expectedFiles = Sets.newHashSet(initialFile1, initialFile2, flushedFile);
log.flushed(Optional.empty(), expectedFiles);

// still no more transactions
logs = log.getTransactions();
assertEquals(0, logs.size());

// reenable the log
enableLog(true);

// add a transaction
expectedFiles = Sets.newHashSet(initialFile1, initialFile2, flushedFile);
log.flushed(Optional.empty(), expectedFiles);

// Now we have a transaction
logs = log.getTransactions();
assertEquals(1, logs.size());
}

@Test
public void testCapacityChange() {
StoredTabletFile initialFile1 =
Expand Down

0 comments on commit 1837082

Please sign in to comment.