Skip to content

Commit

Permalink
Core: Abort file groups should be under same lock as committerService (
Browse files Browse the repository at this point in the history
  • Loading branch information
ConeyLiu authored and nastra committed Jul 18, 2023
1 parent cc896a2 commit e135ce5
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
Expand All @@ -49,13 +50,16 @@
abstract class BaseCommitService<T> implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(BaseCommitService.class);

public static final long TIMEOUT_IN_MS_DEFAULT = TimeUnit.MINUTES.toMillis(120);

private final Table table;
private final ExecutorService committerService;
private final ConcurrentLinkedQueue<T> completedRewrites;
private final ConcurrentLinkedQueue<String> inProgressCommits;
private final List<T> committedRewrites;
private final ConcurrentLinkedQueue<T> committedRewrites;
private final int rewritesPerCommit;
private final AtomicBoolean running = new AtomicBoolean(false);
private final long timeoutInMS;

/**
* Constructs a {@link BaseCommitService}
Expand All @@ -64,17 +68,30 @@ abstract class BaseCommitService<T> implements Closeable {
* @param rewritesPerCommit number of file groups to include in a commit
*/
BaseCommitService(Table table, int rewritesPerCommit) {
this(table, rewritesPerCommit, TIMEOUT_IN_MS_DEFAULT);
}

/**
* Constructs a {@link BaseCommitService}
*
* @param table table to perform commit on
* @param rewritesPerCommit number of file groups to include in a commit
* @param timeoutInMS The timeout to wait for commits to complete after all rewrite jobs have been
* completed
*/
BaseCommitService(Table table, int rewritesPerCommit, long timeoutInMS) {
this.table = table;
LOG.info(
"Creating commit service for table {} with {} groups per commit", table, rewritesPerCommit);
this.rewritesPerCommit = rewritesPerCommit;
this.timeoutInMS = timeoutInMS;

committerService =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("Committer-Service").build());

completedRewrites = Queues.newConcurrentLinkedQueue();
committedRewrites = Lists.newArrayList();
committedRewrites = Queues.newConcurrentLinkedQueue();
inProgressCommits = Queues.newConcurrentLinkedQueue();
}

Expand Down Expand Up @@ -138,7 +155,7 @@ public List<T> results() {
Preconditions.checkState(
committerService.isShutdown(),
"Cannot get results from a service which has not been closed");
return committedRewrites;
return Lists.newArrayList(committedRewrites.iterator());
}

@Override
Expand All @@ -154,11 +171,13 @@ public void close() {
// the commit pool to finish doing its commits to Iceberg State. In the case of partial
// progress this should have been occurring simultaneously with rewrites, if not there should
// be only a single commit operation.
if (!committerService.awaitTermination(120, TimeUnit.MINUTES)) {
if (!committerService.awaitTermination(timeoutInMS, TimeUnit.MILLISECONDS)) {
LOG.warn(
"Commit operation did not complete within 120 minutes of the all files "
"Commit operation did not complete within {} minutes ({} ms) of the all files "
+ "being rewritten. This may mean that some changes were not successfully committed to the "
+ "table.");
+ "table.",
TimeUnit.MILLISECONDS.toMinutes(timeoutInMS),
timeoutInMS);
timeout = true;
}
} catch (InterruptedException e) {
Expand All @@ -169,7 +188,11 @@ public void close() {

if (!completedRewrites.isEmpty() && timeout) {
LOG.error("Attempting to cleanup uncommitted file groups");
completedRewrites.forEach(this::abortFileGroup);
synchronized (completedRewrites) {
while (!completedRewrites.isEmpty()) {
abortFileGroup(completedRewrites.poll());
}
}
}

Preconditions.checkArgument(
Expand Down Expand Up @@ -211,11 +234,17 @@ private void commitReadyCommitGroups() {
}
}

private boolean canCreateCommitGroup() {
@VisibleForTesting
boolean canCreateCommitGroup() {
// Either we have a full commit group, or we have completed writing and need to commit
// what is left over
boolean fullCommitGroup = completedRewrites.size() >= rewritesPerCommit;
boolean writingComplete = !running.get() && completedRewrites.size() > 0;
return fullCommitGroup || writingComplete;
}

@VisibleForTesting
boolean completedRewritesAllCommitted() {
return completedRewrites.isEmpty() && inProgressCommits.isEmpty();
}
}
138 changes: 138 additions & 0 deletions core/src/test/java/org/apache/iceberg/actions/TestCommitService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.Tasks;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Test;

public class TestCommitService extends TableTestBase {

public TestCommitService() {
super(1);
}

@Test
public void testCommittedResultsCorrectly() {
CustomCommitService commitService = new CustomCommitService(table, 5, 10000);
commitService.start();

ExecutorService executorService = Executors.newFixedThreadPool(10);
int numberOfFileGroups = 100;
Tasks.range(numberOfFileGroups).executeWith(executorService).run(commitService::offer);
commitService.close();

Set<Integer> expected = Sets.newHashSet(IntStream.range(0, 100).iterator());
Set<Integer> actual = Sets.newHashSet(commitService.results());
Assertions.assertThat(actual).isEqualTo(expected);
}

@Test
public void testAbortFileGroupsAfterTimeout() {
CustomCommitService commitService = new CustomCommitService(table, 5, 200);
commitService.start();

// Add file groups [0-3] for commit.
// There are less than the rewritesPerCommit, and thus will not trigger a commit action. Those
// file groups will be added to the completedRewrites queue.
// Now the queue has 4 file groups that need to commit.
for (int i = 0; i < 4; i++) {
commitService.offer(i);
}

// Add file groups [4-7] for commit
// These are gated to not be able to commit, so all those 4 file groups will be added to the
// queue as well.
// Now the queue has 8 file groups that need to commit.
CustomCommitService spyCommitService = spy(commitService);
doReturn(false).when(spyCommitService).canCreateCommitGroup();
for (int i = 4; i < 8; i++) {
spyCommitService.offer(i);
}

// close commitService.
// This allows committerService thread to start to commit the remaining file groups [0-7] in the
// completedRewrites queue. And also the main thread waits for the committerService thread to
// finish within a timeout.

// The committerService thread commits file groups [0-4]. These will wait a fixed duration to
// simulate timeout on the main thread, which then tries to abort file groups [5-7].
// This tests the race conditions, as the committerService is also trying to commit groups
// [5-7].
Assertions.assertThatThrownBy(commitService::close)
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("Timeout occurred when waiting for commits");

// Wait for the commitService to finish. Committed all file groups or aborted remaining file
// groups.
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.pollInSameThread()
.untilAsserted(() -> assertThat(commitService.completedRewritesAllCommitted()).isTrue());
if (commitService.aborted.isEmpty()) {
// All file groups are committed
Assertions.assertThat(commitService.results())
.isEqualTo(ImmutableList.of(0, 1, 2, 3, 4, 5, 6, 7));
} else {
// File groups [5-7] are aborted
Assertions.assertThat(commitService.results())
.doesNotContainAnyElementsOf(commitService.aborted);
Assertions.assertThat(commitService.results()).isEqualTo(ImmutableList.of(0, 1, 2, 3, 4));
Assertions.assertThat(commitService.aborted).isEqualTo(ImmutableSet.of(5, 6, 7));
}
}

private static class CustomCommitService extends BaseCommitService<Integer> {
private final Set<Integer> aborted = Sets.newConcurrentHashSet();

CustomCommitService(Table table, int rewritesPerCommit, int timeoutInSeconds) {
super(table, rewritesPerCommit, timeoutInSeconds);
}

@Override
protected void commitOrClean(Set<Integer> batch) {
try {
// Slightly longer than timeout
Thread.sleep(210);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
protected void abortFileGroup(Integer group) {
aborted.add(group);
}
}
}

0 comments on commit e135ce5

Please sign in to comment.