diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 1371019be41dc4..e0e95482f390f3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -151,6 +151,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener> entryFilters; + private volatile TopicCacheCleanupFunction cleanupFunction; + public AbstractTopic(String topic, BrokerService brokerService) { this.topic = topic; this.brokerService = brokerService; @@ -1310,6 +1312,23 @@ public HierarchyTopicPolicies getHierarchyTopicPolicies() { return topicPolicies; } + @Override + public void registerTopicCacheCleanupFunction(TopicCacheCleanupFunction cleanupFunction) { + if (this.cleanupFunction != null) { + log.warn("Topic {} has already been cached. It should have been removed before re-adding.", topic); + } + this.cleanupFunction = cleanupFunction; + } + + @Override + public void cleanupTopicCache(CompletableFuture> topicFuture) { + TopicCacheCleanupFunction c = this.cleanupFunction; + this.cleanupFunction = null; + if (c != null) { + c.cleanup(topicFuture); + } + } + public void updateBrokerSubscriptionDispatchRate() { topicPolicies.getSubscriptionDispatchRate().updateBrokerValue( subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration())); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index c38514237856ba..9565fa24ae870c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -67,6 +67,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Predicate; import javax.ws.rs.core.Response; import lombok.AccessLevel; @@ -790,6 +791,9 @@ public CompletableFuture closeAsync() { // unloads all namespaces gracefully without disrupting mutually unloadNamespaceBundlesGracefully(); + // fail all pending topic loading requests + failPendingTopics(); + // close replication clients replicationClients.forEach((cluster, client) -> { try { @@ -1254,12 +1258,11 @@ private CompletableFuture> createNonPersistentTopic(String topic long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs; pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic); - topicFuture.complete(Optional.of(nonPersistentTopic)); + completeTopicFuture(topicFuture, nonPersistentTopic); }).exceptionally(ex -> { log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause()); nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> { - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(ex); + failTopicFuture(topic, topicFuture, ex); }); return null; }); @@ -1271,7 +1274,7 @@ private CompletableFuture> createNonPersistentTopic(String topic // so we should add checkTopicNsOwnership logic otherwise the topic will be created // if it dont own by this broker,we should return success // otherwise it will keep retrying getPartitionedTopicMetadata - topicFuture.complete(Optional.of(nonPersistentTopic)); + completeTopicFuture(topicFuture, nonPersistentTopic); // after get metadata return success, we should delete this topic from this broker, because this topic not // owner by this broker and it don't initialize and checkReplication pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); @@ -1509,7 +1512,7 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional c protected CompletableFuture> loadOrCreatePersistentTopic(final String topic, boolean createIfMissing, Map properties) throws RuntimeException { final CompletableFuture> topicFuture = FutureUtil.createFutureWithTimeout( - Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), + Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), pulsar().getExecutor(), () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); if (!pulsar.getConfiguration().isEnablePersistentTopics()) { if (log.isDebugEnabled()) { @@ -1522,20 +1525,27 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S checkTopicNsOwnership(topic) .thenRun(() -> { - final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); - - if (topicLoadSemaphore.tryAcquire()) { - checkOwnershipAndCreatePersistentTopic(topic, createIfMissing, topicFuture, properties); - topicFuture.handle((persistentTopic, ex) -> { - // release permit and process pending topic - topicLoadSemaphore.release(); - createPendingLoadTopic(); - return null; - }); - } else { - pendingTopicLoadingQueue.add(new TopicLoadingContext(topic, topicFuture, properties)); - if (log.isDebugEnabled()) { - log.debug("topic-loading for {} added into pending queue", topic); + if (!topicFuture.isDone()) { + if (pulsar().isRunning()) { + final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); + + if (topicLoadSemaphore.tryAcquire()) { + checkOwnershipAndCreatePersistentTopic(topic, createIfMissing, topicFuture, properties); + topicFuture.handle((persistentTopic, ex) -> { + // release permit and process pending topic + topicLoadSemaphore.release(); + createPendingLoadTopic(); + return null; + }); + } else { + pendingTopicLoadingQueue.add(new TopicLoadingContext(topic, topicFuture, properties)); + if (log.isDebugEnabled()) { + log.debug("topic-loading for {} added into pending queue", topic); + } + } + } else { + topicFuture.completeExceptionally(new NotAllowedException( + "Broker is shutting down")); } } }).exceptionally(ex -> { @@ -1579,31 +1589,31 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName) .thenAccept(isActive -> { if (isActive) { - CompletableFuture> propertiesFuture; - if (properties == null) { - //Read properties from storage when loading topic. - propertiesFuture = fetchTopicPropertiesAsync(topicName); - } else { - propertiesFuture = CompletableFuture.completedFuture(properties); + if (!topicFuture.isDone()) { + CompletableFuture> propertiesFuture; + if (properties == null) { + //Read properties from storage when loading topic. + propertiesFuture = fetchTopicPropertiesAsync(topicName); + } else { + propertiesFuture = CompletableFuture.completedFuture(properties); + } + propertiesFuture.thenAccept(finalProperties -> + //TODO add topicName in properties? + createPersistentTopic(topic, createIfMissing, topicFuture, finalProperties) + ).exceptionally(throwable -> { + log.warn("[{}] Read topic property failed", topic, throwable); + failTopicFuture(topic, topicFuture, throwable); + return null; + }); } - propertiesFuture.thenAccept(finalProperties -> - //TODO add topicName in properties? - createPersistentTopic(topic, createIfMissing, topicFuture, finalProperties) - ).exceptionally(throwable -> { - log.warn("[{}] Read topic property failed", topic, throwable); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(throwable); - return null; - }); } else { // namespace is being unloaded String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); + failTopicFuture(topic, topicFuture, new ServiceUnitNotReadyException(msg)); } }).exceptionally(ex -> { - topicFuture.completeExceptionally(ex); + failTopicFuture(topic, topicFuture, ex); return null; }); } @@ -1619,6 +1629,10 @@ public void createPersistentTopic0(final String topic, boolean createIfMissing, private void createPersistentTopic(final String topic, boolean createIfMissing, CompletableFuture> topicFuture, Map properties) { + if (topicFuture.isDone()) { + return; + } + TopicName topicName = TopicName.get(topic); final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); @@ -1630,8 +1644,7 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, if (isTransactionInternalName(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new NotAllowedException(msg)); + failTopicFuture(topic, topicFuture, new NotAllowedException(msg)); return; } @@ -1718,7 +1731,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { }, executor()); } else { addTopicToStatsMaps(topicName, persistentTopic); - topicFuture.complete(Optional.of(persistentTopic)); + completeTopicFuture(topicFuture, persistentTopic); } }) .exceptionally((ex) -> { @@ -1731,15 +1744,13 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { return null; }); persistentTopic.stopReplProducers().whenCompleteAsync((v, exception) -> { - topics.remove(topic, topicFuture); - topicFuture.completeExceptionally(ex); + failTopicFuture(topic, topicFuture, ex); }, executor()); return null; }); } catch (PulsarServerException e) { log.warn("Failed to create topic {}: {}", topic, e.getMessage()); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(e); + failTopicFuture(topic, topicFuture, e); } } @@ -1751,8 +1762,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { topicFuture.complete(Optional.empty()); } else { log.warn("Failed to create topic {}", topic, exception); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new PersistenceException(exception)); + failTopicFuture(topic, topicFuture, new PersistenceException(exception)); } } }, () -> isTopicNsOwnedByBroker(topicName), null); @@ -1761,12 +1771,37 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception); // remove topic from topics-map in different thread to avoid possible deadlock if // createPersistentTopic-thread only tries to handle this future-result - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(exception); + failTopicFuture(topic, topicFuture, exception); return null; }); } + private void failTopicFuture(String topic, CompletableFuture> topicFuture, Throwable exception) { + if (topicFuture.completeExceptionally(exception)) { + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + } else { + if (topicFuture.isDone() && !topicFuture.isCompletedExceptionally()) { + topicFuture.join().ifPresent(topicObject -> { + log.warn( + "Topic future for topic {} was already completed successfully. Another exception happened. " + + "Closing the topic to ensure consistency.", + topic, exception); + topicObject.close(); + }); + } + } + } + + private static void completeTopicFuture(CompletableFuture> topicFuture, Topic topic) { + if (!topicFuture.complete(Optional.of(topic))) { + log.warn( + "Failed to complete future for topic {}, it was already completed (failed={}). " + + "Closing the topic to ensure consistency.", + topic.getName(), topicFuture.isCompletedExceptionally()); + topic.close(); + } + } + public CompletableFuture getManagedLedgerConfig(TopicName topicName) { NamespaceName namespace = topicName.getNamespaceObject(); ServiceConfiguration serviceConfig = pulsar.getConfiguration(); @@ -1933,7 +1968,8 @@ private void addTopicToStatsMaps(TopicName topicName, Topic topic) { .thenAccept(namespaceBundle -> { if (namespaceBundle != null) { synchronized (multiLayerTopicsMap) { - String serviceUnit = namespaceBundle.toString(); + final String serviceUnit = namespaceBundle.toString(); + final String topicNameString = topicName.toString(); multiLayerTopicsMap // .computeIfAbsent(topicName.getNamespace(), k -> ConcurrentOpenHashMap. ConcurrentOpenHashMap.newBuilder().build()) // - .put(topicName.toString(), topic); + .put(topicNameString, topic); + topic.registerTopicCacheCleanupFunction(topicFuture -> + removeTopicFromCache(topicNameString, serviceUnit, topicFuture)); } } invalidateOfflineTopicStatCache(topicName); @@ -1961,6 +1999,7 @@ public void refreshTopicToStatsMaps(NamespaceBundle oldBundle) { if (!isEmpty(topics)) { // add topic under new split bundles which already updated into NamespaceBundleFactory.bundleCache topics.stream().forEach(t -> { + t.cleanupTopicCache(null); addTopicToStatsMaps(TopicName.get(t.getName()), t); }); // remove old bundle from the map @@ -2193,9 +2232,24 @@ private CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit if (serviceUnit.includes(topicName)) { // Topic needs to be unloaded log.info("[{}] Unloading topic", topicName); - closeFutures.add(topicFuture - .thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect) - : CompletableFuture.completedFuture(null))); + if (!topicFuture.isDone()) { + // Complete the topic future with exception to abort the loading of the topic + topicFuture.completeExceptionally(new ServiceUnitNotReadyException( + "Namespace is being unloaded, cannot add topic " + name)); + } + closeFutures.add(topicFuture.>handle((topic, throwable) -> { + if (throwable != null) { + // Ignore the exception since that usually causes the exception to be logged multiple times. + // Complete the future successfully so that "waitForAll" will wait for all topics + // to be closed before unloading completes. + return CompletableFuture.completedFuture(null); + } + if (topic.isPresent()) { + return topic.get().close(closeWithoutWaitingClientDisconnect); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenCompose(Function.identity())); } }); if (getPulsar().getConfig().isTransactionCoordinatorEnabled() @@ -2218,7 +2272,7 @@ public void cleanUnloadedTopicFromCache(NamespaceBundle serviceUnit) { TopicName topicName = TopicName.get(topic); if (serviceUnit.includes(topicName) && getTopicReference(topic).isPresent()) { log.info("[{}][{}] Clean unloaded topic from cache.", serviceUnit.toString(), topic); - pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit, null); + pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit.toString(), null); } } } @@ -2228,11 +2282,8 @@ public AuthorizationService getAuthorizationService() { } public CompletableFuture removeTopicFromCache(Topic topic) { - Optional>> createTopicFuture = findTopicFutureInCache(topic); - if (createTopicFuture.isEmpty()){ - return CompletableFuture.completedFuture(null); - } - return removeTopicFutureFromCache(topic.getName(), createTopicFuture.get()); + topic.cleanupTopicCache(findTopicFutureInCache(topic).orElse(null)); + return CompletableFuture.completedFuture(null); } private Optional>> findTopicFutureInCache(Topic topic){ @@ -2262,18 +2313,8 @@ private Optional>> findTopicFutureInCache(Topi } } - private CompletableFuture removeTopicFutureFromCache(String topic, - CompletableFuture> createTopicFuture) { - TopicName topicName = TopicName.get(topic); - return pulsar.getNamespaceService().getBundleAsync(topicName) - .thenAccept(namespaceBundle -> { - removeTopicFromCache(topic, namespaceBundle, createTopicFuture); - }); - } - - private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle, - CompletableFuture> createTopicFuture) { - String bundleName = namespaceBundle.toString(); + private void removeTopicFromCache(String topic, + String bundleName, CompletableFuture> createTopicFuture) { String namespaceName = TopicName.get(topic).getNamespaceObject().toString(); topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, EventStage.BEFORE); @@ -2302,16 +2343,16 @@ private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle, } } - if (createTopicFuture == null) { - topics.remove(topic); - } else { + if (createTopicFuture != null) { topics.remove(topic, createTopicFuture); } + // Remove topic from compactor, there is a possibility for a race condition here Compactor compactor = pulsar.getNullableCompactor(); if (compactor != null) { compactor.getStats().removeTopic(topic); } + topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, EventStage.SUCCESS); } @@ -3005,7 +3046,13 @@ private ConcurrentOpenHashMap getRuntimeConfigurationMap() { * permit if it was successful to acquire it. */ private void createPendingLoadTopic() { - TopicLoadingContext pendingTopic = pendingTopicLoadingQueue.poll(); + if (!pulsar().isRunning()) { + log.warn("Pulsar is not running, skip create pending topic"); + failPendingTopics(); + return; + } + + TopicLoadingContext pendingTopic = getNextPendingTopic(); if (pendingTopic == null) { return; } @@ -3026,14 +3073,35 @@ private void createPendingLoadTopic() { }); }).exceptionally(e -> { log.error("Failed to create pending topic {}", topic, e); - pendingTopic.getTopicFuture() - .completeExceptionally((e instanceof RuntimeException && e.getCause() != null) ? e.getCause() : e); + Throwable cause = (e instanceof RuntimeException && e.getCause() != null) ? e.getCause() : e; + failTopicFuture(topic, pendingTopic.getTopicFuture(), cause); // schedule to process next pending topic inactivityMonitor.schedule(this::createPendingLoadTopic, 100, TimeUnit.MILLISECONDS); return null; }); } + private void failPendingTopics() { + for (TopicLoadingContext topicLoadingContext = getNextPendingTopic(); topicLoadingContext != null; + topicLoadingContext = getNextPendingTopic()) { + topicLoadingContext.getTopicFuture() + .completeExceptionally(new NotAllowedException("Broker is shutting down")); + } + } + + private TopicLoadingContext getNextPendingTopic() { + while (true) { + TopicLoadingContext pendingTopic = pendingTopicLoadingQueue.poll(); + if (pendingTopic == null) { + return null; + } + // check that the topic future is not completed + if (!pendingTopic.getTopicFuture().isDone()) { + return pendingTopic; + } + } + } + public CompletableFuture fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync( TopicName topicName) { if (pulsar.getNamespaceService() == null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 7657d77e1299f3..ba79cc41c61a5a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -369,4 +369,18 @@ default boolean isSystemTopic() { */ HierarchyTopicPolicies getHierarchyTopicPolicies(); + + /** + * Closes the topic and releases resources. + * @return a future that is completed when the topic is closed + */ + CompletableFuture close(); + + void registerTopicCacheCleanupFunction(TopicCacheCleanupFunction cleanupFunction); + + void cleanupTopicCache(CompletableFuture> topicFuture); + + interface TopicCacheCleanupFunction { + void cleanup(CompletableFuture> topicFuture); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 9fe0a735c90d9b..49fc1e691eb067 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -1189,4 +1189,9 @@ protected boolean isMigrated() { public boolean isPersistent() { return false; } + + @Override + public CompletableFuture close() { + return deleteForcefully(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 2a0c229daf6c22..cccef15931b813 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1423,6 +1423,7 @@ public void deleteLedgerComplete(Object ctx) { } + @Override public CompletableFuture close() { return close(false); }