Skip to content

Commit

Permalink
Merge pull request apache#55 from juma/PLAT-47949
Browse files Browse the repository at this point in the history
PLAT-47949 - fix the inconsistency in tombstone merge
  • Loading branch information
juma authored and GitHub Enterprise committed Feb 7, 2020
2 parents ef0ce54 + 9eda6a3 commit 7eb81c7
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 59 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Expand Up @@ -48,7 +48,7 @@ allprojects {
group = "org.apache.iceberg"
apply plugin: 'com.palantir.baseline-idea'
/* TODO - this assumes that the upstream apache version is 1.0 so we need to be consistent w/ upstream changes */
version = "1.0-adobe-17.12"
version = "1.0-adobe-17.13"
repositories {
maven { url "http://palantir.bintray.com/releases" }
mavenCentral()
Expand Down
Expand Up @@ -38,9 +38,12 @@ public List<ManifestFile> apply(TableMetadata base) {
List<ManifestFile> apply = super.apply(base);

// Handle tombstones merge
// Note: We should use `base` instead of `getBase()` to get current snapshot. Because there is a refresh happening
// in SnapshotProducer.apply(), which may cause getBase() referring to a staled table. `base` is the latest
// version that is used to generate new snapshot.
Optional<String> outputFilePath = tombstoneExtension.merge(
getBase().snapshot(getCherryPickSnapshotId()),
getBase().currentSnapshot());
base.snapshot(getCherryPickSnapshotId()),
base.currentSnapshot());

// Atomic guarantee - bind the tombstone avro output file location to the new snapshot summary property
// Iceberg will do an atomic commit of the snapshot w/ both the data files and the tombstone file or neither
Expand Down
Expand Up @@ -29,7 +29,14 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.AppendFiles;
Expand All @@ -43,7 +50,9 @@
import org.junit.Assert;
import org.junit.Test;

public class TestWapWorkflowOverTombstone extends WithSpark {
import static java.util.stream.Collectors.toList;

public class TestWapWorkflowOverTombstone extends WithSpark implements WithExecutorService{

@Test
public void testSerialCherrypickWithTombstone() {
Expand Down Expand Up @@ -109,63 +118,46 @@ private List<Snapshot> listSnapshots(ExtendedTable table) {
return Lists.newArrayList(table.snapshots());
}

private Callable<String> doCherrypick(ExtendedTable table, Long snapshotId) {
return () -> {
table.cherrypick().cherrypick(snapshotId).commit();
return String.valueOf(snapshotId);
};
}

@Test
public void testParallelCherrypickWithTombstone() {
public void testParallelCherrypickWithTombstone() throws InterruptedException {
ExtendedTable table = tables.loadWithTombstoneExtension(getTableLocation());
Types.NestedField batchField = table.schema().findField("batch");

AppendFiles first = table.newAppendWithTombstonesAdd(
batchField,
Lists.newArrayList(() -> "1", () -> "2", () -> "3"),
ImmutableMap.of("purgeByMillis", "1571226183000", "reason", "test"),
1579792561L);
first.commit();

AppendFiles second = table.newAppendWithTombstonesAdd(
batchField,
Lists.newArrayList(() -> "4", () -> "5", () -> "6"),
ImmutableMap.of("purgeByMillis", "1571226183000", "reason", "test"),
1579792561L);

second.set("wap.id", "456")
.stageOnly()
.commit();

List<Snapshot> snapshots = listSnapshots(table);
Snapshot staged1Snapshot = snapshots.get(snapshots.size() - 1);
Assert.assertEquals("Check for staged wap id 1", "456", staged1Snapshot.summary().get("wap.id"));

AppendFiles third = table.newAppendWithTombstonesAdd(
batchField,
Lists.newArrayList(() -> "7", () -> "8", () -> "9"),
ImmutableMap.of("purgeByMillis", "1571226183000", "reason", "test"),
1579792561L);
third.set("wap.id", "789")
.stageOnly()
.commit();

snapshots = listSnapshots(table);
Snapshot staged2Snapshot = snapshots.get(snapshots.size() - 1);
Assert.assertEquals("Check for staged wap id 2", "789", staged2Snapshot.summary().get("wap.id"));

// cherrypick both staged snapshots to simulate parallel cherry-picking
table.cherrypick().cherrypick(staged1Snapshot.snapshotId()).commit();
table.cherrypick().cherrypick(staged2Snapshot.snapshotId()).commit();

snapshots = listSnapshots(table);
Assert.assertEquals("Snapshot count should include both staged and published snapshots", 5, snapshots.size());

List<ExtendedEntry> currentSnapshotTombstones = table.getSnapshotTombstones(
batchField,
table.currentSnapshot());

List<String> collect = currentSnapshotTombstones.stream()
.map(t -> t.getEntry().getId())
.collect(Collectors.toList());

Assert.assertTrue(
"Expect all appended tombstones in second set are available in the current snapshot and no more",
collect.size() == 9 && collect.containsAll(Lists.newArrayList("1", "2", "3", "4", "5", "6", "7", "8", "9")));
Types.NestedField field = table.schema().findField("batch");

int stagedSnapshotCount = 100;
LongStream stagedSnapshots = IntStream.range(0, stagedSnapshotCount).mapToLong(i -> {
table.newAppendWithTombstonesAdd(field, Lists.newArrayList(() -> String.valueOf(i)), Collections.emptyMap(), i)
.set("wap.id", String.valueOf(i))
.stageOnly()
.commit();
List<Snapshot> snapshots = listSnapshots(table);
return snapshots.get(snapshots.size() - 1).snapshotId();
});

// This will generate 100 callable commit operations with all tombstones available from 0 to 100
List<Callable<String>> commits = stagedSnapshots.mapToObj(snapshot -> doCherrypick(table, snapshot)).collect(toList());

// All commits will be executed on a fixed thread pool of two threads
ExecutorService executorService = Executors.newFixedThreadPool(2);

try {
executorService.invokeAll(commits, 30, TimeUnit.SECONDS);
} finally {
shutdownAndAwaitTermination(executorService);
}

int tombstonesCount = table.getSnapshotTombstones(field, table.currentSnapshot()).size();
int snapshotCount = Iterables.size(table.snapshots());
int publishedSnapshotCount = snapshotCount - stagedSnapshotCount;
Assert.assertEquals("Expect published snapshot count is the same as tombstone count", publishedSnapshotCount, tombstonesCount);
Assert.assertEquals("Expect 100 published snapshot", 100, publishedSnapshotCount);
Assert.assertEquals("Expect 100 tombstoned values ", 100, tombstonesCount);
}

@Test
Expand Down

0 comments on commit 7eb81c7

Please sign in to comment.