Skip to content

Commit

Permalink
[broker] Timeout API calls in BrokerService (apache#6489)
Browse files Browse the repository at this point in the history
See apache#6416. This change ensures that all futures within BrokerService
have a guranteed timeout. As stated in apache#6416, we see cases where it
appears that loading or creating a topic fails to resolve the future for
unknown reasons. It appears that these futures *may* not be returning.
This seems like a sane change to make to ensure that these futures
finish, however, it still isn't understood under what conditions these
futures may not be returning, so this fix is mostly a workaround for
some underlying issues

Co-authored-by: Addison Higham <ahigham@instructure.com>(cherry picked from commit 4a4cce9)
  • Loading branch information
addisonj authored and jiazhai committed May 17, 2020
1 parent 4d984ab commit 89e2d86
Showing 1 changed file with 20 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -693,7 +694,7 @@ public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean c
}

private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
CompletableFuture<Optional<Topic>> topicFuture = futureWithDeadline();

if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -732,6 +733,20 @@ private static <T> CompletableFuture<T> failedFuture(Throwable t) {
return future;
}

private <T> CompletableFuture<T> futureWithDeadline(Long delay, TimeUnit unit, Exception exp) {
CompletableFuture<T> future = new CompletableFuture<T>();
executor().schedule(() -> {
if (!future.isDone()) {
future.completeExceptionally(exp);
}
}, delay, unit);
return future;
}

private <T> CompletableFuture<T> futureWithDeadline() {
return futureWithDeadline(60000L, TimeUnit.MILLISECONDS, new TimeoutException("Future didn't finish within deadline"));
}

public PulsarClient getReplicationClient(String cluster) {
PulsarClient client = replicationClients.get(cluster);
if (client != null) {
Expand Down Expand Up @@ -821,8 +836,7 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster) {
protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final String topic,
boolean createIfMissing) throws RuntimeException {
checkTopicNsOwnership(topic);

final CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
final CompletableFuture<Optional<Topic>> topicFuture = futureWithDeadline();
if (!pulsar.getConfiguration().isEnablePersistentTopics()) {
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load persistent topic {}", topic);
Expand Down Expand Up @@ -929,7 +943,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
}

public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName topicName) {
CompletableFuture<ManagedLedgerConfig> future = new CompletableFuture<>();
CompletableFuture<ManagedLedgerConfig> future = futureWithDeadline();
// Execute in background thread, since getting the policies might block if the z-node wasn't already cached
pulsar.getOrderedExecutor().executeOrdered(topicName, safeRun(() -> {
NamespaceName namespace = topicName.getNamespaceObject();
Expand Down Expand Up @@ -1243,7 +1257,7 @@ public void checkTopicNsOwnership(final String topic) throws RuntimeException {
* @return
*/
public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit, boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Integer> result = new CompletableFuture<Integer>();
CompletableFuture<Integer> result = futureWithDeadline();
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
topics.forEach((name, topicFuture) -> {
TopicName topicName = TopicName.get(name);
Expand Down Expand Up @@ -1836,7 +1850,7 @@ private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopi
checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");

PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = new CompletableFuture<>();
CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = futureWithDeadline();

try {
byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configMetadata);
Expand Down

0 comments on commit 89e2d86

Please sign in to comment.