Skip to content

Commit

Permalink
Use BookieServer#getLocalAddress as the identifier in the test
Browse files Browse the repository at this point in the history
*Problem*

Issue apache#1097 introduced using `loopback` nic as the bookie server identifier. However a few test cases construct
the bookie socket address using `InetAddress.getLocalHost()` and the bookie port. This bookie socket address can
be different from the actual socket address that a bookie is advertising to zookeeper. It will cause test cases
fail with some network settings.

*Solution*

This change use `BookieServer#getLocalAddress()` to retrieve the actual bookie socket address. so it would respect
to the server configuration and use the right socket address in the test.
  • Loading branch information
sijie committed Feb 19, 2018
1 parent df71247 commit 5f4ce9d
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 72 deletions.
Expand Up @@ -28,7 +28,6 @@
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
Expand Down Expand Up @@ -307,20 +306,18 @@ public void testAsyncBookieRecoveryToSpecificBookie() throws Exception {

// Shutdown the first bookie server
LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
int initialPort = bsConfs.get(0).getBookiePort();
BookieSocketAddress bookieSrc = bs.get(0).getLocalAddress();
bs.get(0).shutdown();
bs.remove(0);

// Startup a new bookie server
int newBookiePort = startNewBookie();
startNewBookie();

// Write some more entries for the ledgers so a new ensemble will be
// created for them.
writeEntriestoLedgers(numMsgs, 10, lhs);

// Call the async recover bookie method.
BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
initialPort);
// Initiate the sync object
sync.value = false;
bkAdmin.asyncRecoverBookieData(bookieSrc, bookieRecoverCb, sync);
Expand Down Expand Up @@ -359,7 +356,7 @@ public void testAsyncBookieRecoveryToRandomBookies() throws Exception {

// Shutdown the first bookie server
LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
int initialPort = bsConfs.get(0).getBookiePort();
BookieSocketAddress bookieSrc = bs.get(0).getLocalAddress();
bs.get(0).shutdown();
bs.remove(0);

Expand All @@ -373,8 +370,6 @@ public void testAsyncBookieRecoveryToRandomBookies() throws Exception {
writeEntriestoLedgers(numMsgs, 10, lhs);

// Call the async recover bookie method.
BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
initialPort);
LOG.info("Now recover the data on the killed bookie (" + bookieSrc
+ ") and replicate it to a random available one");
// Initiate the sync object
Expand Down Expand Up @@ -414,7 +409,7 @@ public void testSyncBookieRecoveryToSpecificBookie() throws Exception {

// Shutdown the first bookie server
LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
int initialPort = bsConfs.get(0).getBookiePort();
BookieSocketAddress bookieSrc = bs.get(0).getLocalAddress();
bs.get(0).shutdown();
bs.remove(0);

Expand All @@ -426,12 +421,7 @@ public void testSyncBookieRecoveryToSpecificBookie() throws Exception {
writeEntriestoLedgers(numMsgs, 10, lhs);

// Call the sync recover bookie method.
BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
initialPort);
BookieSocketAddress bookieDest = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
newBookiePort);
LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to the new one ("
+ bookieDest + ")");
LOG.info("Now recover the data on the killed bookie (" + bookieSrc + ") and replicate it to other bookies");
bkAdmin.recoverBookieData(bookieSrc);

// Verify the recovered ledger entries are okay.
Expand Down Expand Up @@ -460,7 +450,7 @@ public void testSyncBookieRecoveryToRandomBookies() throws Exception {

// Shutdown the first bookie server
LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
int initialPort = bsConfs.get(0).getBookiePort();
BookieSocketAddress bookieSrc = bs.get(0).getLocalAddress();
bs.get(0).shutdown();
bs.remove(0);

Expand All @@ -474,8 +464,6 @@ public void testSyncBookieRecoveryToRandomBookies() throws Exception {
writeEntriestoLedgers(numMsgs, 10, lhs);

// Call the sync recover bookie method.
BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
initialPort);
LOG.info("Now recover the data on the killed bookie (" + bookieSrc
+ ") and replicate it to a random available one");
bkAdmin.recoverBookieData(bookieSrc);
Expand Down Expand Up @@ -757,13 +745,11 @@ public void testAsyncBookieRecoveryToRandomBookiesNotEnoughBookies() throws Exce

// Shutdown the first bookie server
LOG.info("Finished writing all ledger entries so shutdown one of the bookies.");
int initialPort = bsConfs.get(0).getBookiePort();
BookieSocketAddress bookieSrc = bs.get(0).getLocalAddress();
bs.get(0).shutdown();
bs.remove(0);

// Call the async recover bookie method.
BookieSocketAddress bookieSrc = new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(),
initialPort);
LOG.info("Now recover the data on the killed bookie (" + bookieSrc
+ ") and replicate it to a random available one");
// Initiate the sync object
Expand Down
Expand Up @@ -25,7 +25,6 @@

import com.google.common.collect.Sets;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Enumeration;
Expand Down Expand Up @@ -95,14 +94,13 @@ public void testReplicateLFShouldCopyFailedBookieFragmentsToTargetBookie()
LOG.info("Killing Bookie", replicaToKill);
killBookie(replicaToKill);

int startNewBookie = startNewBookie();
BookieSocketAddress newBkAddr = startNewBookieAndReturnAddress();
LOG.info("New Bookie addr : {}", newBkAddr);

for (int i = 0; i < 10; i++) {
lh.addEntry(data);
}

BookieSocketAddress newBkAddr = new BookieSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie);
LOG.info("New Bookie addr :" + newBkAddr);
Set<LedgerFragment> result = getFragmentsToReplicate(lh);

BookKeeperAdmin admin = new BookKeeperAdmin(baseClientConf);
Expand Down Expand Up @@ -161,13 +159,12 @@ public void testReplicateLFFailsOnlyOnLastUnClosedFragments()
BookieSocketAddress replicaToKill2 = lh.getLedgerMetadata()
.getEnsembles().get(0L).get(1);

int startNewBookie2 = startNewBookie();
BookieSocketAddress newBkAddr = startNewBookieAndReturnAddress();
LOG.info("New Bookie addr : {}", newBkAddr);

LOG.info("Killing Bookie", replicaToKill2);
killBookie(replicaToKill2);

BookieSocketAddress newBkAddr = new BookieSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie2);
LOG.info("New Bookie addr :" + newBkAddr);
Set<LedgerFragment> result = getFragmentsToReplicate(lh);

BookKeeperAdmin admin = new BookKeeperAdmin(baseClientConf);
Expand Down
Expand Up @@ -23,7 +23,6 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Map.Entry;
Expand Down Expand Up @@ -71,9 +70,7 @@ public void testAutoRecoveryAlongWithBookieServers() throws Exception {

killBookie(replicaToKill);

int startNewBookie = startNewBookie();
BookieSocketAddress newBkAddr = new BookieSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie);
BookieSocketAddress newBkAddr = startNewBookieAndReturnAddress();

while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh.getId(),
basePath)) {
Expand Down
Expand Up @@ -23,7 +23,6 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Map.Entry;
Expand Down Expand Up @@ -125,15 +124,13 @@ public void testRWShouldReplicateFragmentsToTargetBookie() throws Exception {
LOG.info("Killing Bookie", replicaToKill);
killBookie(replicaToKill);

int startNewBookie = startNewBookie();
BookieSocketAddress newBkAddr = startNewBookieAndReturnAddress();
LOG.info("New Bookie addr : {}", newBkAddr);

for (int i = 0; i < 10; i++) {
lh.addEntry(data);
}

BookieSocketAddress newBkAddr = new BookieSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie);
LOG.info("New Bookie addr :" + newBkAddr);

ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);

rw.start();
Expand Down Expand Up @@ -175,9 +172,7 @@ public void testRWShouldRetryUntilThereAreEnoughBksAvailableForReplication()
LOG.info("Killing Bookie", replicaToKill);
ServerConfiguration killedBookieConfig = killBookie(replicaToKill);

int startNewBookie = startNewBookie();
BookieSocketAddress newBkAddr = new BookieSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie);
BookieSocketAddress newBkAddr = startNewBookieAndReturnAddress();
LOG.info("New Bookie addr :" + newBkAddr);

killAllBookies(lh, newBkAddr);
Expand Down Expand Up @@ -229,17 +224,13 @@ public void test2RWsShouldCompeteForReplicationOf2FragmentsAndCompleteReplicatio

killAllBookies(lh, null);
// Starte RW1
int startNewBookie1 = startNewBookie();
BookieSocketAddress newBkAddr1 = new BookieSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie1);
LOG.info("New Bookie addr :" + newBkAddr1);
BookieSocketAddress newBkAddr1 = startNewBookieAndReturnAddress();
LOG.info("New Bookie addr : {}", newBkAddr1);
ReplicationWorker rw1 = new ReplicationWorker(zkc, baseConf);

// Starte RW2
int startNewBookie2 = startNewBookie();
BookieSocketAddress newBkAddr2 = new BookieSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie2);
LOG.info("New Bookie addr :" + newBkAddr2);
BookieSocketAddress newBkAddr2 = startNewBookieAndReturnAddress();
LOG.info("New Bookie addr : {}", newBkAddr2);
ZooKeeper zkc1 = ZooKeeperClient.newBuilder()
.connectString(zkUtil.getZooKeeperConnectString())
.sessionTimeoutMs(10000)
Expand Down Expand Up @@ -293,10 +284,8 @@ public void testRWShouldCleanTheLedgerFromUnderReplicationIfLedgerAlreadyDeleted
LOG.info("Killing Bookie", replicaToKill);
killBookie(replicaToKill);

int startNewBookie = startNewBookie();
BookieSocketAddress newBkAddr = new BookieSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie);
LOG.info("New Bookie addr :" + newBkAddr);
BookieSocketAddress newBkAddr = startNewBookieAndReturnAddress();
LOG.info("New Bookie addr : {}", newBkAddr);
ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);
rw.start();

Expand Down Expand Up @@ -349,11 +338,8 @@ public void testMultipleLedgerReplicationWithReplicationWorker()
killBookie(replicaToKillFromFirstLedger);
lh2.close();

int startNewBookie = startNewBookie();

BookieSocketAddress newBkAddr = new BookieSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie);
LOG.info("New Bookie addr :" + newBkAddr);
BookieSocketAddress newBkAddr = startNewBookieAndReturnAddress();
LOG.info("New Bookie addr : {}", newBkAddr);

ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);

Expand Down Expand Up @@ -406,11 +392,8 @@ public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsUR()
LOG.info("Killing Bookie", replicaToKill);
killBookie(replicaToKill);

int startNewBookie = startNewBookie();

BookieSocketAddress newBkAddr = new BookieSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie);
LOG.info("New Bookie addr :" + newBkAddr);
BookieSocketAddress newBkAddr = startNewBookieAndReturnAddress();
LOG.info("New Bookie addr : {}", newBkAddr);

// set to 3s instead of default 30s
baseConf.setOpenLedgerRereplicationGracePeriod("3000");
Expand Down Expand Up @@ -468,18 +451,15 @@ public void testRWShouldReplicateTheLedgersAfterTimeoutIfLastFragmentIsNotUR()
LOG.info("Killing Bookie", replicaToKill);
killBookie(replicaToKill);

int startNewBookie = startNewBookie();
BookieSocketAddress newBkAddr = startNewBookieAndReturnAddress();
LOG.info("New Bookie addr : {}", newBkAddr);

// Reform ensemble...Making sure that last fragment is not in
// under-replication
for (int i = 0; i < 10; i++) {
lh.addEntry(data);
}

BookieSocketAddress newBkAddr = new BookieSocketAddress(InetAddress
.getLocalHost().getHostAddress(), startNewBookie);
LOG.info("New Bookie addr :" + newBkAddr);

ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);

baseClientConf.setZkServers(zkUtil.getZooKeeperConnectString());
Expand Down
Expand Up @@ -584,12 +584,18 @@ public void restartBookies(ServerConfiguration newConf)
*/
public int startNewBookie()
throws Exception {
return startNewBookieAndReturnAddress().getPort();
}

public BookieSocketAddress startNewBookieAndReturnAddress()
throws Exception {
ServerConfiguration conf = newServerConfiguration();
bsConfs.add(conf);
LOG.info("Starting new bookie on port: {}", conf.getBookiePort());
bs.add(startBookie(conf));
BookieServer server = startBookie(conf);
bs.add(server);

return conf.getBookiePort();
return server.getLocalAddress();
}

/**
Expand Down

0 comments on commit 5f4ce9d

Please sign in to comment.