Skip to content

Commit

Permalink
HIVE-28001: Fix the flaky test TestLeaderElection (apache#5011) (Zhih…
Browse files Browse the repository at this point in the history
…ua Deng, reviewed by Sai Hemanth Gantasala)
  • Loading branch information
dengzhhu653 authored and tarak271 committed Feb 9, 2024
1 parent 64d79c1 commit 0e03bf1
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 47 deletions.
Expand Up @@ -580,7 +580,7 @@ public enum ConfVars {
"metastore.housekeeping.leader.election",
"host", new StringSetValidator("host", "lock"),
"Set to host, HMS will choose the leader by the configured metastore.housekeeping.leader.hostname.\n" +
"Set to lock, HMS will use the hive lock to elect the leader."),
"Set to lock, HMS will use the Hive lock to elect the leader."),
METASTORE_HOUSEKEEPING_LEADER_AUDITTABLE("metastore.housekeeping.leader.auditTable",
"metastore.housekeeping.leader.auditTable", "",
"Audit the leader election event to a plain json table when configured."),
Expand All @@ -593,6 +593,9 @@ public enum ConfVars {
"metastore.housekeeping.leader.auditFiles.limit", 10,
"Limit the number of small audit files when metastore.housekeeping.leader.newAuditFile is true.\n" +
"If the number of audit files exceeds the limit, then the oldest will be deleted."),
METASTORE_HOUSEKEEPING_LEADER_LOCK_NAMESPACE("metastore.housekeeping.leader.lock.namespace",
"metastore.housekeeping.leader.lock.namespace", "",
"The database where the Hive lock sits when metastore.housekeeping.leader.election is set to lock."),
METASTORE_HOUSEKEEPING_THREADS_ON("metastore.housekeeping.threads.on",
"hive.metastore.housekeeping.threads.on", false,
"Whether to run the tasks under metastore.task.threads.remote on this metastore instance or not.\n" +
Expand Down
Expand Up @@ -42,12 +42,12 @@ public class LeaderElectionContext {
* For those tasks which belong to the same type, they will be running in the same leader.
*/
public enum TTYPE {
HOUSEKEEPING(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
"metastore_housekeeping_leader"), "housekeeping"),
WORKER(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
"metastore_worker_leader"), "compactor_worker"),
ALWAYS_TASKS(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "sys",
"metastore_always_tasks_leader"), "always_tasks");
HOUSEKEEPING(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "__METASTORE_LEADER_ELECTION__",
"metastore_housekeeping"), "housekeeping"),
WORKER(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "__METASTORE_LEADER_ELECTION__",
"metastore_compactor_worker"), "compactor_worker"),
ALWAYS_TASKS(new TableName(Warehouse.DEFAULT_CATALOG_NAME, "__METASTORE_LEADER_ELECTION__",
"metastore_always_tasks"), "always_tasks");
// Mutex of TTYPE, which can be a nonexistent table
private final TableName mutex;
// Name of TTYPE
Expand Down Expand Up @@ -127,9 +127,10 @@ public void start() throws Exception {
throw new RuntimeException("Error claiming to be leader: " + leaderElection.getName(), e);
}
});
daemon.setName("Metastore Election " + leaderElection.getName());
daemon.setDaemon(true);

if (startAsDaemon) {
daemon.setName("Leader-Election-" + leaderElection.getName());
daemon.setDaemon(true);
daemon.start();
} else {
daemon.run();
Expand All @@ -154,7 +155,13 @@ public static Object getLeaderMutex(Configuration conf, TTYPE ttype, String serv
case "host":
return servHost;
case "lock":
return ttype.getTableName();
TableName mutex = ttype.getTableName();
String namespace =
MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_LOCK_NAMESPACE);
if (StringUtils.isNotEmpty(namespace)) {
return new TableName(mutex.getCat(), namespace, mutex.getTable());
}
return mutex;
default:
throw new UnsupportedOperationException(method + " not supported for leader election");
}
Expand Down
Expand Up @@ -18,6 +18,8 @@

package org.apache.hadoop.hive.metastore.leader;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;

Expand All @@ -26,7 +28,7 @@
*/
public class LeaderElectionFactory {

public static LeaderElection create(Configuration conf) {
public static LeaderElection create(Configuration conf) throws IOException {
String method =
MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_HOUSEKEEPING_LEADER_ELECTION);
switch (method.toLowerCase()) {
Expand All @@ -35,7 +37,7 @@ public static LeaderElection create(Configuration conf) {
case "lock":
return new LeaseLeaderElection();
default:
throw new UnsupportedOperationException("Do not support " + method + " now");
throw new UnsupportedOperationException(method + " is not supported for electing the leader");
}
}

Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -92,9 +93,17 @@ public class LeaseLeaderElection implements LeaderElection<TableName> {
public static final String METASTORE_RENEW_LEASE = "metastore.renew.leader.lease";

private String name;
private String userName;
private String hostName;

private void doWork(LockResponse resp, Configuration conf,
public LeaseLeaderElection() throws IOException {
userName = SecurityUtils.getUser();
hostName = InetAddress.getLocalHost().getHostName();
}

private synchronized void doWork(LockResponse resp, Configuration conf,
TableName tableName) throws LeaderException {
long start = System.currentTimeMillis();
lockId = resp.getLockid();
assert resp.getState() == LockState.ACQUIRED || resp.getState() == LockState.WAITING;
shutdownWatcher();
Expand All @@ -121,6 +130,7 @@ private void doWork(LockResponse resp, Configuration conf,
default:
throw new IllegalStateException("Unexpected lock state: " + resp.getState());
}
LOG.debug("Spent {}ms to notify the listeners, isLeader: {}", System.currentTimeMillis() - start, isLeader);
}

private void notifyListener() {
Expand All @@ -142,13 +152,6 @@ private void notifyListener() {
public void tryBeLeader(Configuration conf, TableName table) throws LeaderException {
requireNonNull(conf, "conf is null");
requireNonNull(table, "table is null");
String user, hostName;
try {
user = SecurityUtils.getUser();
hostName = InetAddress.getLocalHost().getHostName();
} catch (Exception e) {
throw new LeaderException("Error while getting the username", e);
}

if (store == null) {
store = TxnUtils.getTxnStore(conf);
Expand All @@ -165,7 +168,7 @@ public void tryBeLeader(Configuration conf, TableName table) throws LeaderExcept
boolean lockable = false;
Exception recentException = null;
long start = System.currentTimeMillis();
LockRequest req = new LockRequest(components, user, hostName);
LockRequest req = new LockRequest(components, userName, hostName);
int numRetries = MetastoreConf.getIntVar(conf, MetastoreConf.ConfVars.LOCK_NUMRETRIES);
long maxSleep = MetastoreConf.getTimeVar(conf,
MetastoreConf.ConfVars.LOCK_SLEEP_BETWEEN_RETRIES, TimeUnit.MILLISECONDS);
Expand All @@ -175,6 +178,7 @@ public void tryBeLeader(Configuration conf, TableName table) throws LeaderExcept
if (res.getState() == LockState.WAITING || res.getState() == LockState.ACQUIRED) {
lockable = true;
doWork(res, conf, table);
LOG.debug("Spent {}ms to lock the table {}, retries: {}", System.currentTimeMillis() - start, table, i);
break;
}
} catch (NoSuchTxnException | TxnAbortedException e) {
Expand Down Expand Up @@ -324,6 +328,7 @@ public void runInternal() {
} catch (NoSuchTxnException | TxnAbortedException e) {
throw new AssertionError("This should not happen, we didn't open txn", e);
} catch (NoSuchLockException e) {
LOG.info("No such lock {} for NonLeaderWatcher, try to obtain the lock again...", lockId);
reclaim();
} catch (Exception e) {
// Wait for next cycle.
Expand Down Expand Up @@ -379,6 +384,7 @@ public void runInternal() {
} catch (NoSuchTxnException | TxnAbortedException e) {
throw new AssertionError("This should not happen, we didn't open txn", e);
} catch (NoSuchLockException e) {
LOG.info("No such lock {} for Heartbeater, try to obtain the lock again...", lockId);
reclaim();
} catch (Exception e) {
// Wait for next cycle.
Expand All @@ -404,6 +410,7 @@ public ReleaseAndRequireWatcher(Configuration conf,
super(conf, tableName);
timeout = MetastoreConf.getTimeVar(conf,
MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS) + 3000;
setName("ReleaseAndRequireWatcher");
}

@Override
Expand Down
Expand Up @@ -27,6 +27,9 @@
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.junit.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -52,6 +55,28 @@ public void testConfigLeaderElection() throws Exception {
assertFalse(election.isLeader());
}

static class TestLeaderListener implements LeaderElection.LeadershipStateListener {
AtomicBoolean flag;
TestLeaderListener(AtomicBoolean flag) {
this.flag = flag;
}
@Override
public void takeLeadership(LeaderElection election) throws Exception {
synchronized (flag) {
flag.set(true);
flag.notifyAll();
}
}

@Override
public void lossLeadership(LeaderElection election) throws Exception {
synchronized (flag) {
flag.set(false);
flag.notifyAll();
}
}
}

@Test
public void testLeaseLeaderElection() throws Exception {
Configuration configuration = MetastoreConf.newMetastoreConf();
Expand All @@ -68,56 +93,38 @@ public void testLeaseLeaderElection() throws Exception {
TableName mutex = new TableName("hive", "default", "leader_lease_ms");
LeaseLeaderElection instance1 = new LeaseLeaderElection();
AtomicBoolean flag1 = new AtomicBoolean(false);
instance1.addStateListener(new LeaderElection.LeadershipStateListener() {
@Override
public void takeLeadership(LeaderElection election) {
flag1.set(true);
}
@Override
public void lossLeadership(LeaderElection election) {
flag1.set(false);
}
});
instance1.addStateListener(new TestLeaderListener(flag1));
instance1.tryBeLeader(configuration, mutex);
// elect1 as a leader now
assertTrue(flag1.get() && instance1.isLeader());

configuration.setBoolean(LeaseLeaderElection.METASTORE_RENEW_LEASE, true);
LeaseLeaderElection instance2 = new LeaseLeaderElection();
AtomicBoolean flag2 = new AtomicBoolean(false);
instance2.addStateListener(new LeaderElection.LeadershipStateListener() {
@Override
public void takeLeadership(LeaderElection election) {
flag2.set(true);
}
@Override
public void lossLeadership(LeaderElection election) {
flag2.set(false);
}
});
instance2.addStateListener(new TestLeaderListener(flag2));
instance2.tryBeLeader(configuration, mutex);

// instance2 should not be leader as elect1 holds the lease
assertFalse(flag2.get() || instance2.isLeader());
Thread.sleep(15 * 1000);

ExecutorService service = Executors.newFixedThreadPool(4);
wait(service, flag1, flag2);
// now instance1 lease is timeout, the instance2 should be leader now
assertTrue(instance2.isLeader() && flag2.get());

assertFalse(flag1.get() || instance1.isLeader());
assertTrue(flag2.get() && instance2.isLeader());

// remove leader's lease (instance2)
long lockId2 = instance2.getLockId();
txnStore.unlock(new UnlockRequest(lockId2));
Thread.sleep(4 * 1000);
assertTrue(flag1.get() && instance1.isLeader());
wait(service, flag1, flag2);
assertFalse(flag2.get() || instance2.isLeader());
assertTrue(lockId2 > 0);
assertFalse(instance2.getLockId() == lockId2);

// remove leader's lease(instance1)
long lockId1 = instance1.getLockId();
txnStore.unlock(new UnlockRequest(lockId1));
Thread.sleep(4 * 1000);
wait(service, flag1, flag2);
assertFalse(lockId1 == instance1.getLockId());
assertTrue(lockId1 > 0);

Expand All @@ -128,4 +135,23 @@ public void lossLeadership(LeaderElection election) {
}
}

private void wait(ExecutorService service, Object... obj) throws Exception {
Future[] fs = new Future[obj.length];
for (int i = 0; i < obj.length; i++) {
Object monitor = obj[i];
fs[i] = service.submit(() -> {
try {
synchronized (monitor) {
monitor.wait();
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
for (Future f : fs) {
f.get();
}
}

}

0 comments on commit 0e03bf1

Please sign in to comment.