Skip to content

Commit

Permalink
[fix][broker] Allow Access to System Topic Metadata for Reader Creati…
Browse files Browse the repository at this point in the history
…on Post-Namespace Deletion (apache#20304)

## Motivation
After initiating the snapshot segment function, deletion of topics necessitates the activation of readers. Furthermore, these readers should be opened and deleted as they are used, which implies that we should not pre-store readers. However, after initiating the deletion of namespaces currently, it is not allowed to obtain the metadata of partition topics or lookup, making it impossible to create readers. This results in the inability to delete namespaces.

## Modification
Allow the acquisition of system topic metadata after initiating namespace deletion, thus creating readers to clean up topic data.
  • Loading branch information
liangyepianzhou committed May 12, 2023
1 parent 94c7bf3 commit 5d5ec94
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static org.apache.pulsar.common.naming.SystemTopicNames.isSystemTopic;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionCoordinatorAssign;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
Expand Down Expand Up @@ -4408,8 +4409,13 @@ public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
// It is necessary for system topic operations because system topics are used to store metadata
// and other vital information. Even after namespace starting deletion,,
// we need to access the metadata of system topics to create readers and clean up topic data.
// If we don't do this, it can prevent namespace deletion due to inaccessible readers.
authorizationFuture.thenCompose(__ ->
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()))
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject(),
SystemTopicNames.isSystemTopic(topicName)))
.thenCompose(res ->
pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
Expand All @@ -4436,7 +4442,11 @@ public static CompletableFuture<PartitionedTopicMetadata> unsafeGetPartitionedTo
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
// It is necessary for system topic operations because system topics are used to store metadata
// and other vital information. Even after namespace starting deletion,,
// we need to access the metadata of system topics to create readers and clean up topic data.
// If we don't do this, it can prevent namespace deletion due to inaccessible readers.
checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject(), isSystemTopic(topicName))
.thenCompose(res -> pulsar.getBrokerService()
.fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
.thenAccept(metadata -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
Expand Down Expand Up @@ -221,26 +222,31 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
// (2) authorize client
checkAuthorizationAsync(pulsarService, topicName, clientAppId, authenticationData).thenRun(() -> {
// (3) validate global namespace
// It is necessary for system topic operations because system topics are used to store metadata
// and other vital information. Even after namespace starting deletion,
// we need to access the metadata of system topics to create readers and clean up topic data.
// If we don't do this, it can prevent namespace deletion due to inaccessible readers.
checkLocalOrGetPeerReplicationCluster(pulsarService,
topicName.getNamespaceObject()).thenAccept(peerClusterData -> {
if (peerClusterData == null) {
// (4) all validation passed: initiate lookup
validationFuture.complete(null);
return;
}
// if peer-cluster-data is present it means namespace is owned by that peer-cluster and
// request should be redirect to the peer-cluster
if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
&& StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
"Redirected cluster's brokerService url is not configured",
requestId));
return;
}
validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
peerClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect,
requestId,
false));
topicName.getNamespaceObject(), SystemTopicNames.isSystemTopic(topicName))
.thenAccept(peerClusterData -> {
if (peerClusterData == null) {
// (4) all validation passed: initiate lookup
validationFuture.complete(null);
return;
}
// if peer-cluster-data is present it means namespace is owned by that peer-cluster
// and request should be redirect to the peer-cluster
if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
&& StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
validationFuture.complete(newLookupErrorResponse(ServerError.MetadataError,
"Redirected cluster's brokerService url is not configured",
requestId));
return;
}
validationFuture.complete(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
peerClusterData.getBrokerServiceUrlTls(), true,
LookupType.Redirect, requestId,
false));
}).exceptionally(ex -> {
Throwable throwable = FutureUtil.unwrapCompletionException(ex);
if (throwable instanceof RestException restException){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,27 @@ public void testCreateTransactionSystemTopic() throws Exception {
}
}

@Test
public void testCanDeleteNamespaceWhenEnableTxnSegmentedSnapshot() throws Exception {
// Enable the segmented snapshot feature
pulsarServiceList.get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
pulsarServiceList.get(0).getConfig().setForceDeleteNamespaceAllowed(true);

// Create a new namespace
String namespaceName = TENANT + "/testSegmentedSnapshotWithoutCreatingOldSnapshotTopic";
admin.namespaces().createNamespace(namespaceName);

// Create a new topic in the namespace
String topicName = "persistent://" + namespaceName + "/newTopic";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
producer.close();

// Destroy the namespace after the test
admin.namespaces().deleteNamespace(namespaceName, true);
pulsarServiceList.get(0).getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
}

@Test
public void brokerNotInitTxnManagedLedgerTopic() throws Exception {
String subName = "test";
Expand Down

0 comments on commit 5d5ec94

Please sign in to comment.