Skip to content

Commit

Permalink
[Broker] Fix LeaderElectionService.getCurrentLeader and add support f…
Browse files Browse the repository at this point in the history
…or empheralOwner in MockZooKeeper (apache#13066)

* Add leader election unit test that uses multiple brokers

* Support empheralOwner in MockZooKeeper

* Use unique sessions for all brokers

* Add failing test case for leader broker information not available on other brokers

* Add test for reading the current leader

* Fix issue in leader election

* Address review feedback: make methods static

* Use unique-session only in MultiBrokerBaseTests

* Move tenant and namespace creation to it's own method

* Improve cleanup

* Add alwaysRun to BeforeClass

* Fix leaks in locking in MockZooKeeper

* Reduce code duplication

* Fix NPE when CreateMode is null

(cherry picked from commit 36a45ee)
  • Loading branch information
lhotari committed Dec 2, 2021
1 parent a8383d9 commit 72690be
Show file tree
Hide file tree
Showing 6 changed files with 789 additions and 383 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.MockZooKeeperSession;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;

public abstract class MultiBrokerBaseTest extends MockedPulsarServiceBaseTest {
protected List<PulsarService> additionalBrokers;
protected List<PulsarAdmin> additionalBrokerAdmins;
protected List<PulsarClient> additionalBrokerClients;

protected int numberOfAdditionalBrokers() {
return 2;
}

@BeforeClass(alwaysRun = true)
@Override
public final void setup() throws Exception {
super.internalSetup();
additionalBrokersSetup();
pulsarResourcesSetup();
}

protected void pulsarResourcesSetup() throws PulsarAdminException {
admin.tenants().createTenant("public", createDefaultTenantInfo());
admin.namespaces()
.createNamespace("public/default", getPulsar().getConfiguration().getDefaultNumberOfNamespaceBundles());
}

protected void additionalBrokersSetup() throws Exception {
int numberOfAdditionalBrokers = numberOfAdditionalBrokers();
additionalBrokers = new ArrayList<>(numberOfAdditionalBrokers);
additionalBrokerAdmins = new ArrayList<>(numberOfAdditionalBrokers);
additionalBrokerClients = new ArrayList<>(numberOfAdditionalBrokers);
for (int i = 0; i < numberOfAdditionalBrokers; i++) {
PulsarService pulsarService = createAdditionalBroker(i);
additionalBrokers.add(i, pulsarService);
PulsarAdminBuilder pulsarAdminBuilder =
PulsarAdmin.builder().serviceHttpUrl(pulsarService.getWebServiceAddress() != null
? pulsarService.getWebServiceAddress()
: pulsarService.getWebServiceAddressTls());
customizeNewPulsarAdminBuilder(pulsarAdminBuilder);
additionalBrokerAdmins.add(i, pulsarAdminBuilder.build());
additionalBrokerClients.add(i, newPulsarClient(pulsarService.getBrokerServiceUrl(), 0));
}
}

protected ServiceConfiguration createConfForAdditionalBroker(int additionalBrokerIndex) {
return getDefaultConf();
}

protected PulsarService createAdditionalBroker(int additionalBrokerIndex) throws Exception {
return startBroker(createConfForAdditionalBroker(additionalBrokerIndex));
}

@Override
protected ZKMetadataStore createLocalMetadataStore() {
// use MockZooKeeperSession to provide a unique session id for each instance
return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeper));
}

@Override
protected ZKMetadataStore createConfigurationMetadataStore() {
// use MockZooKeeperSession to provide a unique session id for each instance
return new ZKMetadataStore(MockZooKeeperSession.newInstance(mockZooKeeperGlobal));
}

@AfterClass(alwaysRun = true)
@Override
public final void cleanup() throws Exception {
additionalBrokersCleanup();
super.internalCleanup();
}

protected void additionalBrokersCleanup() {
if (additionalBrokerAdmins != null) {
for (PulsarAdmin additionalBrokerAdmin : additionalBrokerAdmins) {
additionalBrokerAdmin.close();
}
additionalBrokerAdmins = null;
}
if (additionalBrokerClients != null) {
for (PulsarClient additionalBrokerClient : additionalBrokerClients) {
try {
additionalBrokerClient.shutdown();
} catch (PulsarClientException e) {
// ignore
}
}
additionalBrokerClients = null;
}
if (additionalBrokers != null) {
for (PulsarService pulsarService : additionalBrokers) {
try {
pulsarService.getConfiguration().setBrokerShutdownTimeoutMs(0L);
pulsarService.close();
} catch (PulsarServerException e) {
// ignore
}
}
additionalBrokers = null;
}
}

public final List<PulsarService> getAllBrokers() {
List<PulsarService> brokers = new ArrayList<>(numberOfAdditionalBrokers() + 1);
brokers.add(getPulsar());
brokers.addAll(additionalBrokers);
return Collections.unmodifiableList(brokers);
}

public final List<PulsarAdmin> getAllAdmins() {
List<PulsarAdmin> admins = new ArrayList<>(numberOfAdditionalBrokers() + 1);
admins.add(admin);
admins.addAll(additionalBrokerAdmins);
return Collections.unmodifiableList(admins);
}

public final List<PulsarClient> getAllClients() {
List<PulsarClient> clients = new ArrayList<>(numberOfAdditionalBrokers() + 1);
clients.add(pulsarClient);
clients.addAll(additionalBrokerClients);
return Collections.unmodifiableList(clients);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
// Override default providers with mocked ones
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
doReturn(mockBookKeeperClientFactory).when(pulsar).newBookKeeperClientFactory();
doReturn(new ZKMetadataStore(mockZooKeeper)).when(pulsar).createLocalMetadataStore();
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(pulsar).createConfigurationMetadataStore();
doReturn(createLocalMetadataStore()).when(pulsar).createLocalMetadataStore();
doReturn(createConfigurationMetadataStore()).when(pulsar).createConfigurationMetadataStore();

Supplier<NamespaceService> namespaceServiceSupplier = () -> spy(new NamespaceService(pulsar));
doReturn(namespaceServiceSupplier).when(pulsar).getNamespaceServiceProvider();
Expand All @@ -321,6 +321,14 @@ protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
}
}

protected ZKMetadataStore createLocalMetadataStore() {
return new ZKMetadataStore(mockZooKeeper);
}

protected ZKMetadataStore createConfigurationMetadataStore() {
return new ZKMetadataStore(mockZooKeeperGlobal);
}

private void mockConfigBrokerInterceptors(PulsarService pulsarService) {
ServiceConfiguration configuration = spy(conf);
Set<String> mockBrokerInterceptors = mock(Set.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.PulsarService;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class MultiBrokerLeaderElectionTest extends MultiBrokerBaseTest {

@Test
public void shouldElectOneLeader() {
int leaders = 0;
for (PulsarService broker : getAllBrokers()) {
if (broker.getLeaderElectionService().isLeader()) {
leaders++;
}
}
assertEquals(leaders, 1);
}

@Test
public void shouldAllBrokersKnowTheLeader() {
Awaitility.await().untilAsserted(() -> {
for (PulsarService broker : getAllBrokers()) {
Optional<LeaderBroker> currentLeader = broker.getLeaderElectionService().getCurrentLeader();
assertTrue(currentLeader.isPresent(), "Leader wasn't known on broker " + broker.getBrokerServiceUrl());
}
});
}

@Test
public void shouldAllBrokersBeAbleToGetTheLeader() {
Awaitility.await().untilAsserted(() -> {
LeaderBroker leader = null;
for (PulsarService broker : getAllBrokers()) {
Optional<LeaderBroker> currentLeader =
broker.getLeaderElectionService().readCurrentLeader().get(1, TimeUnit.SECONDS);
assertTrue(currentLeader.isPresent(), "Leader wasn't known on broker " + broker.getBrokerServiceUrl());
if (leader != null) {
assertEquals(currentLeader.get(), leader,
"Different leader on broker " + broker.getBrokerServiceUrl());
} else {
leader = currentLeader.get();
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ private synchronized CompletableFuture<LeaderElectionState> elect() {
} else {
return tryToBecomeLeader();
}
});
}).thenCompose(leaderElectionState ->
// make sure that the cache contains the current leader
// so that getLeaderValueIfPresent works on all brokers
cache.get(path).thenApply(__ -> leaderElectionState));
}

private synchronized CompletableFuture<LeaderElectionState> handleExistingLeaderValue(GetResult res) {
Expand Down

0 comments on commit 72690be

Please sign in to comment.