Skip to content

Commit

Permalink
[refactor][broker] PIP-301 Part-1: Add BundleDataResources (apache#21119
Browse files Browse the repository at this point in the history
)

### Motivation

See pip: apache#21129

### Modifications

Add  `BundleDataResources`
  • Loading branch information
AnonHxy authored and vinayakmalik12 committed Oct 12, 2023
1 parent fa5b4c8 commit 4ad992f
Show file tree
Hide file tree
Showing 14 changed files with 173 additions and 93 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.resources;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.policies.data.loadbalancer.BundleData;

@Getter
public class LoadBalanceResources {
public static final String BUNDLE_DATA_BASE_PATH = "/loadbalance/bundle-data";

private final BundleDataResources bundleDataResources;

public LoadBalanceResources(MetadataStore store, int operationTimeoutSec) {
bundleDataResources = new BundleDataResources(store, operationTimeoutSec);
}

public static class BundleDataResources extends BaseResources<BundleData> {
public BundleDataResources(MetadataStore store, int operationTimeoutSec) {
super(store, BundleData.class, operationTimeoutSec);
}

public CompletableFuture<Optional<BundleData>> getBundleData(String bundle) {
return getAsync(getBundleDataPath(bundle));
}

public CompletableFuture<Void> updateBundleData(String bundle, BundleData data) {
return setWithCreateAsync(getBundleDataPath(bundle), __ -> data);
}

public CompletableFuture<Void> deleteBundleData(String bundle) {
return deleteAsync(getBundleDataPath(bundle));
}

// clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in metadata-store
public CompletableFuture<Void> deleteBundleDataAsync(NamespaceName ns) {
final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, ns.toString());
return getStore().deleteRecursive(namespaceBundlePath);
}

// clear resource of `/loadbalance/bundle-data/{tenant}/` in metadata-store
public CompletableFuture<Void> deleteBundleDataTenantAsync(String tenant) {
final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
return getStore().deleteRecursive(tenantBundlePath);
}

// Get the metadata store path for the given bundle full name.
private String getBundleDataPath(final String bundle) {
return BUNDLE_DATA_BASE_PATH + "/" + bundle;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,15 @@ public class NamespaceResources extends BaseResources<Policies> {
private final IsolationPolicyResources isolationPolicies;
private final PartitionedTopicResources partitionedTopicResources;
private final MetadataStore configurationStore;
private final MetadataStore localStore;

public static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
private static final String NAMESPACE_BASE_PATH = "/namespace";
private static final String BUNDLE_DATA_BASE_PATH = "/loadbalance/bundle-data";

public NamespaceResources(MetadataStore localStore, MetadataStore configurationStore, int operationTimeoutSec) {
public NamespaceResources(MetadataStore configurationStore, int operationTimeoutSec) {
super(configurationStore, Policies.class, operationTimeoutSec);
this.configurationStore = configurationStore;
isolationPolicies = new IsolationPolicyResources(configurationStore, operationTimeoutSec);
partitionedTopicResources = new PartitionedTopicResources(configurationStore, operationTimeoutSec);
this.localStore = localStore;
}

public CompletableFuture<List<String>> listNamespacesAsync(String tenant) {
Expand Down Expand Up @@ -379,17 +376,4 @@ public CompletableFuture<Void> runWithMarkDeleteAsync(TopicName topic,
return future;
}
}

// clear resource of `/loadbalance/bundle-data/{tenant}/{namespace}/` in metadata-store
public CompletableFuture<Void> deleteBundleDataAsync(NamespaceName ns) {
final String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, ns.toString());
return this.localStore.deleteRecursive(namespaceBundlePath);
}

// clear resource of `/loadbalance/bundle-data/{tenant}/` in metadata-store
public CompletableFuture<Void> deleteBundleDataTenantAsync(String tenant) {
final String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
return this.localStore.deleteRecursive(tenantBundlePath);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class PulsarResources {
@Getter
private final TopicResources topicResources;
@Getter
private final LoadBalanceResources loadBalanceResources;
@Getter
private final Optional<MetadataStore> localMetadataStore;
@Getter
private final Optional<MetadataStore> configurationMetadataStore;
Expand All @@ -60,8 +62,7 @@ public PulsarResources(MetadataStore localMetadataStore, MetadataStore configura
if (configurationMetadataStore != null) {
tenantResources = new TenantResources(configurationMetadataStore, operationTimeoutSec);
clusterResources = new ClusterResources(configurationMetadataStore, operationTimeoutSec);
namespaceResources = new NamespaceResources(localMetadataStore, configurationMetadataStore
, operationTimeoutSec);
namespaceResources = new NamespaceResources(configurationMetadataStore, operationTimeoutSec);
resourcegroupResources = new ResourceGroupResources(configurationMetadataStore, operationTimeoutSec);
} else {
tenantResources = null;
Expand All @@ -76,12 +77,14 @@ public PulsarResources(MetadataStore localMetadataStore, MetadataStore configura
loadReportResources = new LoadManagerReportResources(localMetadataStore, operationTimeoutSec);
bookieResources = new BookieResources(localMetadataStore, operationTimeoutSec);
topicResources = new TopicResources(localMetadataStore);
loadBalanceResources = new LoadBalanceResources(localMetadataStore, operationTimeoutSec);
} else {
dynamicConfigResources = null;
localPolicies = null;
loadReportResources = null;
bookieResources = null;
topicResources = null;
loadBalanceResources = null;
}

this.localMetadataStore = Optional.ofNullable(localMetadataStore);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.resources;

import static org.apache.pulsar.broker.resources.BaseResources.joinPath;
import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertThrows;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class LoadBalanceResourcesTest {
private MetadataStore configurationStore;
private MetadataStore localStore;
private LoadBalanceResources loadBalanceResources;

@BeforeMethod
public void setup() {
localStore = mock(MetadataStore.class);
configurationStore = mock(MetadataStore.class);
loadBalanceResources = new LoadBalanceResources(localStore, 30);
}

/**
* Test that the bundle-data node is deleted from the local stores.
*/
@Test
public void testDeleteBundleDataAsync() {
NamespaceName ns = NamespaceName.get("my-tenant/my-ns");
String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, ns.toString());
loadBalanceResources.getBundleDataResources().deleteBundleDataAsync(ns);

String tenant="my-tenant";
String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
loadBalanceResources.getBundleDataResources().deleteBundleDataTenantAsync(tenant);

verify(localStore).deleteRecursive(namespaceBundlePath);
verify(localStore).deleteRecursive(tenantBundlePath);

assertThrows(()-> verify(configurationStore).deleteRecursive(namespaceBundlePath));
assertThrows(()-> verify(configurationStore).deleteRecursive(tenantBundlePath));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,12 @@
*/
package org.apache.pulsar.broker.resources;

import static org.apache.pulsar.broker.resources.BaseResources.joinPath;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;

import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


public class NamespaceResourcesTest {

private MetadataStore localStore;
private MetadataStore configurationStore;
private NamespaceResources namespaceResources;

private static final String BUNDLE_DATA_BASE_PATH = "/loadbalance/bundle-data";

@BeforeMethod
public void setup() {
localStore = mock(MetadataStore.class);
configurationStore = mock(MetadataStore.class);
namespaceResources = new NamespaceResources(localStore, configurationStore, 30);
}

@Test
public void test_pathIsFromNamespace() {
assertFalse(NamespaceResources.pathIsFromNamespace("/admin/clusters"));
Expand All @@ -54,25 +32,5 @@ public void test_pathIsFromNamespace() {
assertTrue(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant/my-ns"));
}

/**
* Test that the bundle-data node is deleted from the local stores.
*/
@Test
public void testDeleteBundleDataAsync() {
NamespaceName ns = NamespaceName.get("my-tenant/my-ns");
String namespaceBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, ns.toString());
namespaceResources.deleteBundleDataAsync(ns);

String tenant="my-tenant";
String tenantBundlePath = joinPath(BUNDLE_DATA_BASE_PATH, tenant);
namespaceResources.deleteBundleDataTenantAsync(tenant);

verify(localStore).deleteRecursive(namespaceBundlePath);
verify(localStore).deleteRecursive(tenantBundlePath);

assertThrows(()-> verify(configurationStore).deleteRecursive(namespaceBundlePath));
assertThrows(()-> verify(configurationStore).deleteRecursive(tenantBundlePath));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,8 @@ protected CompletableFuture<Void> internalClearZkSources() {
// clear z-node of local policies
.thenCompose(ignore -> getLocalPolicies().deleteLocalPoliciesAsync(namespaceName))
// clear /loadbalance/bundle-data
.thenCompose(ignore -> namespaceResources().deleteBundleDataAsync(namespaceName));
.thenCompose(ignore ->
loadBalanceResources().getBundleDataResources().deleteBundleDataAsync(namespaceName));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ protected CompletableFuture<Void> internalDeleteTenantAsync(String tenant) {
.getPartitionedTopicResources().clearPartitionedTopicTenantAsync(tenant))
.thenCompose(__ -> pulsar().getPulsarResources().getLocalPolicies()
.deleteLocalPoliciesTenantAsync(tenant))
.thenCompose(__ -> pulsar().getPulsarResources().getNamespaceResources()
.thenCompose(__ -> pulsar().getPulsarResources().getLoadBalanceResources().getBundleDataResources()
.deleteBundleDataTenantAsync(tenant));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pulsar.broker.loadbalance.ModularLoadManager;
import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.util.ExecutorProvider;
Expand Down Expand Up @@ -91,9 +92,6 @@
public class ModularLoadManagerImpl implements ModularLoadManager {
private static final Logger log = LoggerFactory.getLogger(ModularLoadManagerImpl.class);

// Path to ZNode whose children contain BundleData jsons for each bundle (new API version of ResourceQuota).
public static final String BUNDLE_DATA_PATH = "/loadbalance/bundle-data";

// Default message rate to assume for unseen bundles.
public static final double DEFAULT_MESSAGE_RATE = 50;

Expand All @@ -120,7 +118,6 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
private LockManager<LocalBrokerData> brokersData;
private ResourceLock<LocalBrokerData> brokerDataLock;

private MetadataCache<BundleData> bundlesCache;
private MetadataCache<ResourceQuota> resourceQuotaCache;
private MetadataCache<TimeAverageBrokerData> timeAverageBrokerDataCache;

Expand Down Expand Up @@ -172,6 +169,8 @@ public class ModularLoadManagerImpl implements ModularLoadManager {
// Pulsar service used to initialize this.
private PulsarService pulsar;

private PulsarResources pulsarResources;

// Executor service used to update broker data.
private final ExecutorService executors;

Expand Down Expand Up @@ -243,8 +242,8 @@ public boolean isEnableNonPersistentTopics(String brokerUrl) {
@Override
public void initialize(final PulsarService pulsar) {
this.pulsar = pulsar;
this.pulsarResources = pulsar.getPulsarResources();
brokersData = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
bundlesCache = pulsar.getLocalMetadataStore().getMetadataCache(BundleData.class);
resourceQuotaCache = pulsar.getLocalMetadataStore().getMetadataCache(ResourceQuota.class);
timeAverageBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(TimeAverageBrokerData.class);
pulsar.getLocalMetadataStore().registerListener(this::handleDataNotification);
Expand Down Expand Up @@ -273,7 +272,7 @@ public void initialize(final PulsarService pulsar) {

LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap);
// register listeners for domain changes
pulsar.getPulsarResources().getClusterResources().getFailureDomainResources()
pulsarResources.getClusterResources().getFailureDomainResources()
.registerListener(__ -> {
executors.execute(
() -> LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap));
Expand Down Expand Up @@ -381,7 +380,8 @@ public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
public BundleData getBundleDataOrDefault(final String bundle) {
BundleData bundleData = null;
try {
Optional<BundleData> optBundleData = bundlesCache.get(getBundleDataPath(bundle)).join();
Optional<BundleData> optBundleData =
pulsarResources.getLoadBalanceResources().getBundleDataResources().getBundleData(bundle).join();
if (optBundleData.isPresent()) {
return optBundleData.get();
}
Expand Down Expand Up @@ -418,11 +418,6 @@ public BundleData getBundleDataOrDefault(final String bundle) {
return bundleData;
}

// Get the metadata store path for the given bundle full name.
public static String getBundleDataPath(final String bundle) {
return BUNDLE_DATA_PATH + "/" + bundle;
}

// Use the Pulsar client to acquire the namespace bundle stats.
private Map<String, NamespaceBundleStats> getBundleStats() {
return pulsar.getBrokerService().getBundleStats();
Expand Down Expand Up @@ -1151,8 +1146,8 @@ public void writeBundleDataOnZooKeeper() {
for (Map.Entry<String, BundleData> entry : loadData.getBundleData().entrySet()) {
final String bundle = entry.getKey();
final BundleData data = entry.getValue();
futures.add(bundlesCache.readModifyUpdateOrCreate(getBundleDataPath(bundle), __ -> data)
.thenApply(__ -> null));
futures.add(
pulsarResources.getLoadBalanceResources().getBundleDataResources().updateBundleData(bundle, data));
}

// Write the time average broker data to metadata store.
Expand All @@ -1173,7 +1168,7 @@ public void writeBundleDataOnZooKeeper() {

private void deleteBundleDataFromMetadataStore(String bundle) {
try {
bundlesCache.delete(getBundleDataPath(bundle)).join();
pulsarResources.getLoadBalanceResources().getBundleDataResources().deleteBundleData(bundle).join();
} catch (Exception e) {
if (!(e.getCause() instanceof NotFoundException)) {
log.warn("Failed to delete bundle-data {} from metadata store", bundle, e);
Expand Down

0 comments on commit 4ad992f

Please sign in to comment.