Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Callbacks Easier to Use #3360

Merged
merged 4 commits into from
Jul 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@

import com.palantir.common.base.Throwables;

/**
* A Callback is a potentially retryable operation on a resource R. The intended use is to specify a task to be run on
* an uninitialized resource R once it becomes ready by calling {@link #runWithRetry(R)}.
*
* The desired task is specified by implementing {@link #init(R)}. The default behaviour on failure is to not retry and
* perform no cleanup. If cleanup is necessary, the user should override the {@link #cleanup(R, Throwable)} method. If
* the call to cleanup does not throw, {@link #init(R)} will be immediately retried.
*
* In case {@link #runWithRetry(R)} needs to be interrupted in a safe way, the user should call
* {@link #blockUntilSafeToShutdown()}, which will block until any in-progress call to init() terminates, and potential
* cleanup is finished, then prevent further retrying.
*/
public abstract class Callback<R> {
private volatile boolean shutdownSignal = false;
private Lock lock = new ReentrantLock();
Expand All @@ -40,7 +52,7 @@ public abstract class Callback<R> {
* @param initException Throwable thrown by init()
*/
public void cleanup(R resource, Throwable initException) {
Throwables.rewrapAndThrowUncheckedException(initException);
throw Throwables.rewrapAndThrowUncheckedException(initException);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2018 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.async.initializer;

/**
* An interface simplifying the creation of initialization Callbacks. If a class C requires a resource R to initialize
* some of its components, but R may not be ready yet at the time of C's creation, C should implement this interface
* and pass the desired Callback to R. When R is ready, it can then call {@link Callback#runWithRetry(R)} to initialize
* C.
*/
public interface CallbackInitializable<R> {
void initialize(R resource);

/**
* If {@link #initialize(R)} failing can result in a state where cleanup is necessary, override this method.
*
* @param resource the resource used in initialization.
* @param initFailure the Throwable causing the failure in initialization.
*/
default void onInitializationFailureCleanup(R resource, Throwable initFailure) {
}

/**
* Returns a Callback that runs initialize only once. On initialization failure, executes the specified cleanup and
* then wraps and throws the cause.
*/
default Callback<R> singleAttemptCallback() {
return LambdaCallback.singleAttempt(this::initialize, this::onInitializationFailureCleanup);
}

/**
* Returns a Callback that will retry initialization on failure, unless the cleanup task also throws, in which case
* the exception thrown by the cleanup task is propagated.
*/
default Callback<R> retryUnlessCleanupThrowsCallback() {
return LambdaCallback.retrying(this::initialize, this::onInitializationFailureCleanup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,67 @@

package com.palantir.async.initializer;

import java.util.function.BiConsumer;
import java.util.function.Consumer;

import com.palantir.common.base.Throwables;

/**
* Convenience class for creating callbacks using lambda expressions.
*/
public final class LambdaCallback<R> extends Callback<R> {
private final Consumer<R> consumer;
private static final BiConsumer<Object, Throwable> FAIL = (ignore, throwable) -> {
throw Throwables.rewrapAndThrowUncheckedException(throwable);
};

private final Consumer<R> initialize;
private final BiConsumer<R, Throwable> onInitializationFailure;

private LambdaCallback(Consumer<R> initialize, BiConsumer<R, Throwable> onInitializationFailure) {
this.initialize = initialize;
this.onInitializationFailure = onInitializationFailure;
}

private LambdaCallback(Consumer<R> consumer) {
this.consumer = consumer;
/**
* Creates a callback that will attempt to initialize once. If initialize throws, onInitFailureCleanup is called
* before wrapping and throwing a RuntimeException.
* @param initialize initialization method.
* @param onInitFailureCleanup cleanup to be done in case initialization throws. If this method also throws, it will
* propagate up to the caller.
* @return the desired Callback object.
*/
public static <R> Callback<R> singleAttempt(Consumer<R> initialize, BiConsumer<R, Throwable> onInitFailureCleanup) {
return new LambdaCallback<>(initialize, onInitFailureCleanup.andThen(FAIL));
}

public static <R> Callback<R> of(Consumer<R> consumer) {
return new LambdaCallback<>(consumer);
/**
* Creates a callback that will retry initialization until cleanup also throws. If initialize throws,
* onInitFailureCleanup is called before calling initialize again.
* @param initialize initialization method.
* @param onInitFailureCleanup cleanup to be done in case initialization throws. If this method also throws, no more
* retries will be attempted and it will propagate up to the caller.
* @return the desired Callback object.
*/
public static <R> Callback<R> retrying(Consumer<R> initialize, BiConsumer<R, Throwable> onInitFailureCleanup) {
return new LambdaCallback<>(initialize, onInitFailureCleanup);
}

/**
* Simple version of {@link #singleAttempt(Consumer, BiConsumer)} when no cleanup is necessary on failure.
* @param initialize initialization method.
* @return the desired Callback object.
*/
public static <R> Callback<R> of(Consumer<R> initialize) {
return singleAttempt(initialize, (no, cleanup) -> { });
}

@Override
public void init(R resource) {
consumer.accept(resource);
initialize.accept(resource);
}

@Override
public void cleanup(R resource, Throwable initFailure) {
onInitializationFailure.accept(resource, initFailure);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 2018 Palantir Technologies, Inc. All rights reserved.
*
* Licensed under the BSD-3 License (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.palantir.async.initializer;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import org.immutables.value.Value;
import org.junit.Before;
import org.junit.Test;

public class LambdaCallbackTest {
private static final RuntimeException INIT_FAIL = new RuntimeException("Failed during initialization.");
private static final RuntimeException CLEANUP_FAIL = new RuntimeException("Failed during cleanup.");
private ModifiableInitsAndCleanups initsAndCleanups;
private Callback<ModifiableInitsAndCleanups> callback;

@Before
public void setup() {
initsAndCleanups = InitsAndCleanups.createInitialized();
}

@Test
public void singleAttemptCallbackCallsInitializeOnceAndSkipsCleanupOnSuccess() {
callback = LambdaCallback.singleAttempt(this::markInit, this::markCleanup);

callback.runWithRetry(initsAndCleanups);
assertThat(initsAndCleanups.inits()).isEqualTo(1);
assertThat(initsAndCleanups.cleanups()).isEqualTo(0);
}

@Test
public void singleAttemptCallbackCallsInitializeAndCleanupOnceThenRethrowsOnInitFailure() {
callback = LambdaCallback.singleAttempt(this::markInitAndFail, this::markCleanup);

assertThatThrownBy(() -> callback.runWithRetry(initsAndCleanups)).isEqualToComparingFieldByField(INIT_FAIL);
assertThat(initsAndCleanups.inits()).isEqualTo(1);
assertThat(initsAndCleanups.cleanups()).isEqualTo(1);
}

@Test
public void singleAttemptCallbackThrowsCleanupExceptionOnCleanupFailure() {
callback = LambdaCallback.singleAttempt(this::markInitAndFail, this::markCleanupAndFail);

assertThatThrownBy(() -> callback.runWithRetry(initsAndCleanups)).isEqualToComparingFieldByField(CLEANUP_FAIL);
}

@Test
public void retryingCallbackCallsInitializeOnceAndSkipsCleanupOnSuccess() {
callback = LambdaCallback.retrying(this::markInit, this::markCleanup);

callback.runWithRetry(initsAndCleanups);
assertThat(initsAndCleanups.inits()).isEqualTo(1);
assertThat(initsAndCleanups.cleanups()).isEqualTo(0);
}

@Test
public void retryingCallbackCallsInitializeAndCleanupUntilSuccess() {
callback = LambdaCallback.retrying(this::markInitThenFailIfFewerThatTenInits, this::markCleanup);

callback.runWithRetry(initsAndCleanups);
assertThat(initsAndCleanups.inits()).isEqualTo(10);
assertThat(initsAndCleanups.cleanups()).isEqualTo(9);
}

@Test
public void retryingCallbackStopsRetryingWhenCleanupThrows() {
callback = LambdaCallback.retrying(this::markInitAndFail, this::markCleanupThenFailIfMoreThatTenInits);

assertThatThrownBy(() -> callback.runWithRetry(initsAndCleanups)).isEqualToComparingFieldByField(CLEANUP_FAIL);
assertThat(initsAndCleanups.inits()).isEqualTo(11);
assertThat(initsAndCleanups.cleanups()).isEqualTo(11);
}

private void markInit(ModifiableInitsAndCleanups initsAndCleanups) {
initsAndCleanups.setInits(initsAndCleanups.inits() + 1);
}

private void markInitAndFail(ModifiableInitsAndCleanups initsAndCleanups) {
markInit(initsAndCleanups);
throw INIT_FAIL;
}

private void markInitThenFailIfFewerThatTenInits(ModifiableInitsAndCleanups initsAndCleanups) {
markInit(initsAndCleanups);
if (initsAndCleanups.inits() < 10) {
throw INIT_FAIL;
}
}

private void markCleanup(ModifiableInitsAndCleanups initsAndCleanups, Throwable ignore) {
initsAndCleanups.setCleanups(initsAndCleanups.cleanups() + 1);
}

private void markCleanupAndFail(ModifiableInitsAndCleanups initsAndCleanups, Throwable ignore) {
markCleanup(initsAndCleanups, ignore);
throw CLEANUP_FAIL;
}

private void markCleanupThenFailIfMoreThatTenInits(ModifiableInitsAndCleanups initsAndCleanups, Throwable ignore) {
markCleanup(initsAndCleanups, ignore);
if (initsAndCleanups.inits() > 10) {
throw CLEANUP_FAIL;
}
}

@Value.Modifiable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! Wasn't aware of this feature

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah really useful, I saw it in one of the recent PR's

interface InitsAndCleanups {
int inits();
int cleanups();

static ModifiableInitsAndCleanups createInitialized() {
return ModifiableInitsAndCleanups.create()
.setInits(0)
.setCleanups(0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import com.google.common.util.concurrent.Uninterruptibles;
import com.palantir.async.initializer.AsyncInitializer;
import com.palantir.async.initializer.Callback;
import com.palantir.async.initializer.LambdaCallback;
import com.palantir.atlasdb.AtlasDbConstants;
import com.palantir.atlasdb.cleaner.CleanupFollower;
import com.palantir.atlasdb.cleaner.DefaultCleanerBuilder;
Expand Down Expand Up @@ -347,9 +346,9 @@ runtimeConfigSupplier, registrar(), () -> LockServiceImpl.create(lockServerOptio
JavaSuppliers.compose(AtlasDbRuntimeConfig::targetedSweep, runtimeConfigSupplier)),
closeables);

Callback<TransactionManager> callbacks = new Callback.CallChain(ImmutableList.of(
Callback<TransactionManager> callbacks = new Callback.CallChain<>(ImmutableList.of(
timelockConsistencyCheckCallback(config, runtimeConfigSupplier.get(), lockAndTimestampServices),
LambdaCallback.of(targetedSweep::callbackInit),
targetedSweep.singleAttemptCallback(),
asyncInitializationCallback()));

TransactionManager transactionManager = initializeCloseable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import java.util.Map;
import java.util.stream.Collectors;

import com.palantir.async.initializer.CallbackInitializable;
import com.palantir.atlasdb.keyvalue.api.Cell;
import com.palantir.atlasdb.keyvalue.api.TableReference;
import com.palantir.atlasdb.transaction.api.TransactionManager;

/**
* Adds {@link WriteInfo}s to a global queue to be swept.
*/
public interface MultiTableSweepQueueWriter extends AutoCloseable {
public interface MultiTableSweepQueueWriter extends AutoCloseable, CallbackInitializable<TransactionManager> {
MultiTableSweepQueueWriter NO_OP = ignored -> { };

default void enqueue(Map<TableReference, ? extends Map<Cell, byte[]>> writes, long timestamp) {
Expand All @@ -48,7 +49,8 @@ default void enqueue(Map<TableReference, ? extends Map<Cell, byte[]>> writes, lo
*
* @param txManager the transaction manager performing the callback
*/
default void callbackInit(TransactionManager txManager) {
@Override
default void initialize(TransactionManager txManager) {
// noop
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void initialize(SpecialTimestampsSupplier timestamps, TimelockService tim
}

@Override
public void callbackInit(TransactionManager txManager) {
public void initialize(TransactionManager txManager) {
initialize(SpecialTimestampsSupplier.create(txManager),
txManager.getTimelockService(),
txManager.getKeyValueService(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static TransactionManager setupTxManager(KeyValueService kvs,
() -> AtlasDbConstants.DEFAULT_TIMESTAMP_CACHE_SIZE,
writer);
setupTables(kvs);
writer.callbackInit(txManager);
writer.initialize(txManager);
return txManager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected TransactionManager getManager() {
AbstractTransactionTest.DEFAULT_GET_RANGES_CONCURRENCY,
() -> AtlasDbConstants.DEFAULT_TIMESTAMP_CACHE_SIZE,
sweepQueue);
sweepQueue.callbackInit(txManager);
sweepQueue.initialize(txManager);
return txManager;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void setUp() throws Exception {
sweepQueue,
MoreExecutors.newDirectExecutorService());

sweepQueue.callbackInit(serializableTxManager);
sweepQueue.initialize(serializableTxManager);
txManager = new CachingTestTransactionManager(serializableTxManager);
}

Expand Down