Skip to content

Commit

Permalink
HBASE-22819 Automatically migrate the rs group config for table after…
Browse files Browse the repository at this point in the history
… HBASE-22695 (apache#498)

Signed-off-by: Guanghao Zhang <zghao@apache.org>
  • Loading branch information
Apache9 authored and thangTang committed Apr 16, 2020
1 parent 729f635 commit b38a793
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.hadoop.hbase.rsgroup;

import java.util.Collection;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.net.Address;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -104,7 +102,7 @@ public boolean containsServer(Address hostPort) {
/**
* Get list of servers.
*/
public Set<Address> getServers() {
public SortedSet<Address> getServers() {
return servers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class RSGroupAdminServer implements RSGroupAdmin {
"one server in 'default' RSGroup.";

private MasterServices master;
private final RSGroupInfoManager rsGroupInfoManager;
final RSGroupInfoManager rsGroupInfoManager;

/** Define the config key of retries threshold when movements failed */
//made package private for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void getRSGroupInfoOfTable(RpcController controller, GetRSGroupInfoOfTabl
}
checkPermission("getRSGroupInfoOfTable");
Optional<RSGroupInfo> optGroup =
RSGroupUtil.getRSGroupInfo(master, groupAdminServer, tableName);
RSGroupUtil.getRSGroupInfo(master, groupAdminServer.rsGroupInfoManager, tableName);
if (optGroup.isPresent()) {
builder.setRSGroupInfo(ProtobufUtil.toProtoGroupInfo(fillTables(optGroup.get())));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.net.Address;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -67,11 +68,6 @@ Set<Address> moveServers(Set<Address> servers, String srcGroup, String dstGroup)
*/
List<RSGroupInfo> listRSGroups() throws IOException;

/**
* Refresh/reload the group information from the persistent store
*/
void refresh() throws IOException;

/**
* Whether the manager is able to fully return group metadata
* @return whether the manager is in online mode
Expand All @@ -83,4 +79,12 @@ Set<Address> moveServers(Set<Address> servers, String srcGroup, String dstGroup)
* @param servers set of servers to remove
*/
void removeServers(Set<Address> servers) throws IOException;

/**
* Get {@code RSGroupInfo} for the given table.
* @deprecated Since 3.0.0, will be removed in 4.0.0. Only for compatibility, where we upgrade
* from a version that stores table names for a rs group in the {@code RSGroupInfo}.
*/
@Deprecated
RSGroupInfo getRSGroupForTable(TableName tableName) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -33,6 +34,7 @@
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
Expand Down Expand Up @@ -76,6 +78,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
Expand Down Expand Up @@ -104,9 +107,6 @@
final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private static final Logger LOG = LoggerFactory.getLogger(RSGroupInfoManagerImpl.class);

private static final String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait";
private static final long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L;

// Assigned before user tables
@VisibleForTesting
static final TableName RSGROUP_TABLE_NAME =
Expand All @@ -120,6 +120,9 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {
@VisibleForTesting
static final byte[] META_QUALIFIER_BYTES = Bytes.toBytes("i");

@VisibleForTesting
static final String MIGRATE_THREAD_NAME = "Migrate-RSGroup-Tables";

private static final byte[] ROW_KEY = { 0 };

/** Table descriptor for <code>hbase:rsgroup</code> catalog table */
Expand All @@ -140,7 +143,30 @@ final class RSGroupInfoManagerImpl implements RSGroupInfoManager {

// There two Maps are immutable and wholesale replaced on each modification
// so are safe to access concurrently. See class comment.
private volatile Map<String, RSGroupInfo> rsGroupMap = Collections.emptyMap();
private static final class RSGroupInfoHolder {
final ImmutableMap<String, RSGroupInfo> groupName2Group;
final ImmutableMap<TableName, RSGroupInfo> tableName2Group;

RSGroupInfoHolder() {
this(Collections.emptyMap());
}

RSGroupInfoHolder(Map<String, RSGroupInfo> rsGroupMap) {
ImmutableMap.Builder<String, RSGroupInfo> group2Name2GroupBuilder = ImmutableMap.builder();
ImmutableMap.Builder<TableName, RSGroupInfo> tableName2GroupBuilder = ImmutableMap.builder();
rsGroupMap.forEach((groupName, rsGroupInfo) -> {
group2Name2GroupBuilder.put(groupName, rsGroupInfo);
if (!groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
rsGroupInfo.getTables()
.forEach(tableName -> tableName2GroupBuilder.put(tableName, rsGroupInfo));
}
});
this.groupName2Group = group2Name2GroupBuilder.build();
this.tableName2Group = tableName2GroupBuilder.build();
}
}

private volatile RSGroupInfoHolder holder = new RSGroupInfoHolder();

private final MasterServices masterServices;
private final AsyncClusterConnection conn;
Expand All @@ -160,9 +186,10 @@ private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException


private synchronized void init() throws IOException {
refresh();
refresh(false);
serverEventsListenerThread.start();
masterServices.getServerManager().registerListener(serverEventsListenerThread);
migrate();
}

static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
Expand All @@ -179,6 +206,7 @@ public void start() {
@Override
public synchronized void addRSGroup(RSGroupInfo rsGroupInfo) throws IOException {
checkGroupName(rsGroupInfo.getName());
Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
if (rsGroupMap.get(rsGroupInfo.getName()) != null ||
rsGroupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
throw new DoNotRetryIOException("Group already exists: " + rsGroupInfo.getName());
Expand Down Expand Up @@ -235,7 +263,7 @@ public synchronized Set<Address> moveServers(Set<Address> servers, String srcGro
}
dst.addServer(el);
}
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group);
newGroupMap.put(src.getName(), src);
newGroupMap.put(dst.getName(), dst);
flushConfig(newGroupMap);
Expand All @@ -244,7 +272,7 @@ public synchronized Set<Address> moveServers(Set<Address> servers, String srcGro

@Override
public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException {
for (RSGroupInfo info : rsGroupMap.values()) {
for (RSGroupInfo info : holder.groupName2Group.values()) {
if (info.containsServer(serverHostPort)) {
return info;
}
Expand All @@ -254,11 +282,12 @@ public RSGroupInfo getRSGroupOfServer(Address serverHostPort) throws IOException

@Override
public RSGroupInfo getRSGroup(String groupName) {
return rsGroupMap.get(groupName);
return holder.groupName2Group.get(groupName);
}

@Override
public synchronized void removeRSGroup(String groupName) throws IOException {
Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
if (!rsGroupMap.containsKey(groupName) || groupName.equals(RSGroupInfo.DEFAULT_GROUP)) {
throw new DoNotRetryIOException(
"Group " + groupName + " does not exist or is a reserved " + "group");
Expand All @@ -270,7 +299,7 @@ public synchronized void removeRSGroup(String groupName) throws IOException {

@Override
public List<RSGroupInfo> listRSGroups() {
return Lists.newArrayList(rsGroupMap.values());
return Lists.newArrayList(holder.groupName2Group.values());
}

@Override
Expand Down Expand Up @@ -298,7 +327,7 @@ public synchronized void removeServers(Set<Address> servers) throws IOException
}

if (rsGroupInfos.size() > 0) {
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
Map<String, RSGroupInfo> newGroupMap = Maps.newHashMap(holder.groupName2Group);
newGroupMap.putAll(rsGroupInfos);
flushConfig(newGroupMap);
}
Expand Down Expand Up @@ -349,9 +378,90 @@ private List<RSGroupInfo> retrieveGroupListFromZookeeper() throws IOException {
return RSGroupInfoList;
}

@Override
public void refresh() throws IOException {
refresh(false);
private void migrate(Collection<RSGroupInfo> groupList) {
TableDescriptors tds = masterServices.getTableDescriptors();
for (RSGroupInfo groupInfo : groupList) {
if (groupInfo.getName().equals(RSGroupInfo.DEFAULT_GROUP)) {
continue;
}
SortedSet<TableName> failedTables = new TreeSet<>();
for (TableName tableName : groupInfo.getTables()) {
LOG.debug("Migrating {} in group {}", tableName, groupInfo.getName());
TableDescriptor oldTd;
try {
oldTd = tds.get(tableName);
} catch (IOException e) {
LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e);
failedTables.add(tableName);
continue;
}
if (oldTd == null) {
continue;
}
if (oldTd.getRegionServerGroup().isPresent()) {
// either we have already migrated it or that user has set the rs group using the new
// code which will set the group directly on table descriptor, skip.
LOG.debug("Skip migrating {} since it is already in group {}", tableName,
oldTd.getRegionServerGroup().get());
continue;
}
TableDescriptor newTd = TableDescriptorBuilder.newBuilder(oldTd)
.setRegionServerGroup(groupInfo.getName()).build();
// This is a bit tricky. Since we know that the region server group config in
// TableDescriptor will only be used at master side, it is fine to just update the table
// descriptor on file system and also the cache, without reopening all the regions. This
// will be much faster than the normal modifyTable. And when upgrading, we will update
// master first and then region server, so after all the region servers has been reopened,
// the new TableDescriptor will be loaded.
try {
tds.add(newTd);
} catch (IOException e) {
LOG.warn("Failed to migrate {} in group {}", tableName, groupInfo.getName(), e);
failedTables.add(tableName);
continue;
}
}
LOG.debug("Done migrating {}, failed tables {}", groupInfo.getName(), failedTables);
synchronized (RSGroupInfoManagerImpl.this) {
Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
RSGroupInfo currentInfo = rsGroupMap.get(groupInfo.getName());
if (currentInfo != null) {
RSGroupInfo newInfo =
new RSGroupInfo(currentInfo.getName(), currentInfo.getServers(), failedTables);
Map<String, RSGroupInfo> newGroupMap = new HashMap<>(rsGroupMap);
newGroupMap.put(groupInfo.getName(), newInfo);
try {
flushConfig(newGroupMap);
} catch (IOException e) {
LOG.warn("Failed to persist rs group {}", newInfo.getName(), e);
}
}
}
}
}

// Migrate the table rs group info from RSGroupInfo into the table descriptor
// Notice that we do not want to block the initialize so this will be done in background, and
// during the migrating, the rs group info maybe incomplete and cause region to be misplaced.
private void migrate() {
Thread migrateThread = new Thread(MIGRATE_THREAD_NAME) {

@Override
public void run() {
LOG.info("Start migrating table rs group config");
while (!masterServices.isStopped()) {
Collection<RSGroupInfo> groups = holder.groupName2Group.values();
boolean hasTables = groups.stream().anyMatch(r -> !r.getTables().isEmpty());
if (!hasTables) {
break;
}
migrate(groups);
}
LOG.info("Done migrating table rs group info");
}
};
migrateThread.setDaemon(true);
migrateThread.start();
}

/**
Expand Down Expand Up @@ -381,7 +491,7 @@ private synchronized void refresh(boolean forceOnline) throws IOException {
newGroupMap.put(group.getName(), group);
}
resetRSGroupMap(newGroupMap);
updateCacheOfRSGroups(rsGroupMap.keySet());
updateCacheOfRSGroups(newGroupMap.keySet());
}

private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOException {
Expand Down Expand Up @@ -411,20 +521,20 @@ private void flushConfigTable(Map<String, RSGroupInfo> groupMap) throws IOExcept
}

private synchronized void flushConfig() throws IOException {
flushConfig(this.rsGroupMap);
flushConfig(holder.groupName2Group);
}

private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) throws IOException {
// For offline mode persistence is still unavailable
// We're refreshing in-memory state but only for servers in default group
if (!isOnline()) {
if (newGroupMap == this.rsGroupMap) {
if (newGroupMap == holder.groupName2Group) {
// When newGroupMap is this.rsGroupMap itself,
// do not need to check default group and other groups as followed
return;
}

Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(rsGroupMap);
Map<String, RSGroupInfo> oldGroupMap = Maps.newHashMap(holder.groupName2Group);
RSGroupInfo oldDefaultGroup = oldGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
RSGroupInfo newDefaultGroup = newGroupMap.remove(RSGroupInfo.DEFAULT_GROUP);
if (!oldGroupMap.equals(newGroupMap) /* compare both tables and servers in other groups */ ||
Expand All @@ -438,7 +548,7 @@ private synchronized void flushConfig(Map<String, RSGroupInfo> newGroupMap) thro

// Refresh rsGroupMap
// according to the inputted newGroupMap (an updated copy of rsGroupMap)
rsGroupMap = newGroupMap;
this.holder = new RSGroupInfoHolder(newGroupMap);

// Do not need to update tableMap
// because only the update on servers in default group is allowed above,
Expand Down Expand Up @@ -495,8 +605,7 @@ private void saveRSGroupMapToZK(Map<String, RSGroupInfo> newGroupMap) throws IOE
* Make changes visible. Caller must be synchronized on 'this'.
*/
private void resetRSGroupMap(Map<String, RSGroupInfo> newRSGroupMap) {
// Make maps Immutable.
this.rsGroupMap = Collections.unmodifiableMap(newRSGroupMap);
this.holder = new RSGroupInfoHolder(newRSGroupMap);
}

/**
Expand Down Expand Up @@ -549,6 +658,7 @@ private SortedSet<Address> getDefaultServers() throws IOException {
// Called by ServerEventsListenerThread. Synchronize on this because redoing
// the rsGroupMap then writing it out.
private synchronized void updateDefaultServers(SortedSet<Address> servers) {
Map<String, RSGroupInfo> rsGroupMap = holder.groupName2Group;
RSGroupInfo info = rsGroupMap.get(RSGroupInfo.DEFAULT_GROUP);
RSGroupInfo newInfo = new RSGroupInfo(info.getName(), servers);
HashMap<String, RSGroupInfo> newGroupMap = Maps.newHashMap(rsGroupMap);
Expand Down Expand Up @@ -647,6 +757,8 @@ private boolean waitForGroupTableOnline() {
online = true;
// flush any inconsistencies between ZK and HTable
RSGroupInfoManagerImpl.this.flushConfig();
// migrate after we are online.
migrate();
return true;
} catch (Exception e) {
LOG.warn("Failed to perform check", e);
Expand Down Expand Up @@ -725,4 +837,10 @@ private void checkGroupName(String groupName) throws ConstraintException {
throw new ConstraintException("RSGroup name should only contain alphanumeric characters");
}
}


@Override
public RSGroupInfo getRSGroupForTable(TableName tableName) throws IOException {
return holder.tableName2Group.get(tableName);
}
}
Loading

0 comments on commit b38a793

Please sign in to comment.