Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Bentley <tbentley@redhat.com>
  • Loading branch information
tombentley committed Nov 19, 2020
1 parent 6db173e commit a0f2a2f
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 22 deletions.
10 changes: 10 additions & 0 deletions topic-operator/src/main/java/io/strimzi/operator/topic/Kafka.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ public interface Kafka {
*/
Future<Void> deleteTopic(TopicName topicName);

/**
* Wait for the given topic to not existing Kafka ,
* completing the returned Future when the topic does not exists.
* If the operation fails the returned Future will be failed with the
* KafkaException (not an ExecutionException).
* @param topicName The name of the topic to delete.
* @return A future which is completed once the topic has been deleted.
*/
Future<Void> awaitNotExists(TopicName topicName);

/**
* Asynchronously update the topic config in Kafka,
* completing the returned Future when the topic has been updated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,7 @@ public Future<Void> deleteTopic(TopicName topicName) {
KafkaFuture<Void> future = adminClient.deleteTopics(
singleton(topicName.toString())).values().get(topicName.toString());
mapFuture(future).compose(ig ->
Util.waitFor(vertx, "deleted sync " + topicName, "deleted", 1000, 120_000, () -> {
try {
return adminClient.describeTopics(singleton(topicName.toString())).all().get().get(topicName.toString()) == null;
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
return true;
} else if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new RuntimeException(e);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, error -> true)
awaitNotExists(topicName)
).onComplete(ar -> {
// Complete the result future on the context thread.
vertx.runOnContext(ignored -> {
Expand All @@ -82,6 +68,24 @@ public Future<Void> deleteTopic(TopicName topicName) {
return handler.future();
}

public Future<Void> awaitNotExists(TopicName topicName) {
return Util.waitFor(vertx, "deleted sync " + topicName, "deleted", 1000, 300_000, () -> {
try {
return adminClient.describeTopics(singleton(topicName.toString())).all().get().get(topicName.toString()) == null;
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
return true;
} else if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new RuntimeException(e);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, error -> true);
}

@SuppressWarnings("deprecation")
@Override
public Future<Void> updateTopicConfig(Topic topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,13 +771,17 @@ void enqueue(Handler<Void> event) {

/** Called when a topic znode is deleted in ZK */
Future<Void> onTopicDeleted(LogContext logContext, TopicName topicName) {
return executeWithTopicLockHeld(logContext, topicName,
new Reconciliation("onTopicDeleted", true) {
@Override
public Future<Void> execute() {
return reconcileOnTopicChange(logContext, topicName, null, this);
}
});
return kafka.awaitNotExists(topicName).compose(
ignored ->
executeWithTopicLockHeld(logContext, topicName,
new Reconciliation("onTopicDeleted", true) {
@Override
public Future<Void> execute() {
return reconcileOnTopicChange(logContext, topicName, null, this);
}
}),
error ->
Future.failedFuture("Ignored spurious-seeming topic deletion"));
}

private Map<String, Long> statusUpdateGeneration = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ public Future<Void> deleteTopic(TopicName topicName) {
return event;
}

@Override
public Future<Void> awaitNotExists(TopicName topicName) {
return Future.succeededFuture();
}

public MockKafka setUpdateTopicResponse(Function<TopicName, Future<Void>> updateTopicResponse) {
this.updateTopicResponse = updateTopicResponse;
return this;
Expand Down

0 comments on commit a0f2a2f

Please sign in to comment.