diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index d30144360ec36..d87707abddfad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1087,7 +1087,8 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { return closeFuture; } - private CompletableFuture checkReplicationAndRetryOnFailure() { + @VisibleForTesting + CompletableFuture checkReplicationAndRetryOnFailure() { CompletableFuture result = new CompletableFuture(); checkReplication().thenAccept(res -> { log.info("[{}] Policies updated successfully", topic); @@ -2059,6 +2060,10 @@ public CompletableFuture onPoliciesUpdate(Policies data) { if (log.isDebugEnabled()) { log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required); } + if (data.deleted) { + log.debug("Ignore the update because it has been deleted : {}", data); + return CompletableFuture.completedFuture(null); + } isEncryptionRequired = data.encryption_required; setSchemaCompatibilityStrategy(data); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java new file mode 100644 index 0000000000000..f4d11b81f3012 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Sets; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.common.policies.data.Policies; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class PersistentTopicTest extends BrokerTestBase { + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.baseSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testDeleteNamespaceInfiniteRetry() throws Exception { + //init namespace + final String myNamespace = "prop/ns" + UUID.randomUUID(); + admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test")); + final String topic = "persistent://" + myNamespace + "/testDeleteNamespaceInfiniteRetry"; + //init topic and policies + pulsarClient.newProducer().topic(topic).create().close(); + admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0); + Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() + -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0); + + PersistentTopic persistentTopic = + spy((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get()); + + Policies policies = new Policies(); + policies.deleted = true; + persistentTopic.onPoliciesUpdate(policies); + verify(persistentTopic, times(0)).checkReplicationAndRetryOnFailure(); + + policies.deleted = false; + persistentTopic.onPoliciesUpdate(policies); + verify(persistentTopic, times(1)).checkReplicationAndRetryOnFailure(); + } +} \ No newline at end of file