Skip to content

Commit ddd642e

Browse files
poorbarcodeJason918
authored andcommitted
[fix][broker]Consumer can't consume messages because there has two sames topics in one broker (apache#17526)
(cherry picked from commit 260f5c6)
1 parent 917f997 commit ddd642e

File tree

3 files changed

+55
-9
lines changed

3 files changed

+55
-9
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1913,7 +1913,7 @@ public void cleanUnloadedTopicFromCache(NamespaceBundle serviceUnit) {
19131913
TopicName topicName = TopicName.get(topic);
19141914
if (serviceUnit.includes(topicName) && getTopicReference(topic).isPresent()) {
19151915
log.info("[{}][{}] Clean unloaded topic from cache.", serviceUnit.toString(), topic);
1916-
pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit);
1916+
pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit, null);
19171917
}
19181918
}
19191919
}
@@ -1922,15 +1922,56 @@ public AuthorizationService getAuthorizationService() {
19221922
return authorizationService;
19231923
}
19241924

1925-
public CompletableFuture<Void> removeTopicFromCache(String topic) {
1925+
public CompletableFuture<Void> removeTopicFromCache(String topicName) {
1926+
return removeTopicFutureFromCache(topicName, null);
1927+
}
1928+
1929+
public CompletableFuture<Void> removeTopicFromCache(Topic topic) {
1930+
Optional<CompletableFuture<Optional<Topic>>> createTopicFuture = findTopicFutureInCache(topic);
1931+
if (!createTopicFuture.isPresent()){
1932+
return CompletableFuture.completedFuture(null);
1933+
}
1934+
return removeTopicFutureFromCache(topic.getName(), createTopicFuture.get());
1935+
}
1936+
1937+
private Optional<CompletableFuture<Optional<Topic>>> findTopicFutureInCache(Topic topic){
1938+
if (topic == null){
1939+
return Optional.empty();
1940+
}
1941+
final CompletableFuture<Optional<Topic>> createTopicFuture = topics.get(topic.getName());
1942+
// If not exists in cache, do nothing.
1943+
if (createTopicFuture == null){
1944+
return Optional.empty();
1945+
}
1946+
// If the future in cache is not yet complete, the topic instance in the cache is not the same with the topic.
1947+
if (!createTopicFuture.isDone()){
1948+
return Optional.empty();
1949+
}
1950+
// If the future in cache has exception complete,
1951+
// the topic instance in the cache is not the same with the topic.
1952+
if (createTopicFuture.isCompletedExceptionally()){
1953+
return Optional.empty();
1954+
}
1955+
Optional<Topic> optionalTopic = createTopicFuture.join();
1956+
Topic topicInCache = optionalTopic.orElse(null);
1957+
if (topicInCache == null || topicInCache != topic){
1958+
return Optional.empty();
1959+
} else {
1960+
return Optional.of(createTopicFuture);
1961+
}
1962+
}
1963+
1964+
private CompletableFuture<Void> removeTopicFutureFromCache(String topic,
1965+
CompletableFuture<Optional<Topic>> createTopicFuture) {
19261966
TopicName topicName = TopicName.get(topic);
19271967
return pulsar.getNamespaceService().getBundleAsync(topicName)
19281968
.thenAccept(namespaceBundle -> {
1929-
removeTopicFromCache(topic, namespaceBundle);
1969+
removeTopicFromCache(topic, namespaceBundle, createTopicFuture);
19301970
});
19311971
}
19321972

1933-
public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle) {
1973+
private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle,
1974+
CompletableFuture<Optional<Topic>> createTopicFuture) {
19341975
String bundleName = namespaceBundle.toString();
19351976
String namespaceName = TopicName.get(topic).getNamespaceObject().toString();
19361977

@@ -1957,7 +1998,12 @@ public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle)
19571998
}
19581999
}
19592000
}
1960-
topics.remove(topic);
2001+
2002+
if (createTopicFuture == null) {
2003+
topics.remove(topic);
2004+
} else {
2005+
topics.remove(topic, createTopicFuture);
2006+
}
19612007

19622008
Compactor compactor = pulsar.getNullableCompactor();
19632009
if (compactor != null) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c
440440
// topic GC iterates over topics map and removing from the map with the same thread creates
441441
// deadlock. so, execute it in different thread
442442
brokerService.executor().execute(() -> {
443-
brokerService.removeTopicFromCache(topic);
443+
brokerService.removeTopicFromCache(NonPersistentTopic.this);
444444
unregisterTopicPolicyListener();
445445
log.info("[{}] Topic deleted", topic);
446446
deleteFuture.complete(null);
@@ -507,7 +507,7 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
507507
// unload topic iterates over topics map and removing from the map with the same thread creates deadlock.
508508
// so, execute it in different thread
509509
brokerService.executor().execute(() -> {
510-
brokerService.removeTopicFromCache(topic);
510+
brokerService.removeTopicFromCache(NonPersistentTopic.this);
511511
unregisterTopicPolicyListener();
512512
closeFuture.complete(null);
513513
});

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,7 +1185,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
11851185
ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
11861186
@Override
11871187
public void deleteLedgerComplete(Object ctx) {
1188-
brokerService.removeTopicFromCache(topic);
1188+
brokerService.removeTopicFromCache(PersistentTopic.this);
11891189

11901190
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
11911191

@@ -1305,7 +1305,7 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
13051305
}
13061306

13071307
private void disposeTopic(CompletableFuture<?> closeFuture) {
1308-
brokerService.removeTopicFromCache(topic)
1308+
brokerService.removeTopicFromCache(PersistentTopic.this)
13091309
.thenRun(() -> {
13101310
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
13111311

0 commit comments

Comments
 (0)