Skip to content

Commit

Permalink
WIP apache#936 store root tablet file list in Zookeeper
Browse files Browse the repository at this point in the history
  • Loading branch information
keith-turner committed Aug 7, 2019
1 parent e357d8a commit c83d761
Show file tree
Hide file tree
Showing 19 changed files with 552 additions and 498 deletions.
Expand Up @@ -37,6 +37,11 @@ public class RootTable {
*/
public static final String ZROOT_TABLET = ROOT_TABLET_LOCATION;

/**
* ZK path relative to the zookeeper node where the root tablet gc candidates are stored.
*/
public static final String ZROOT_TABLET_GC_CANDIDATES = ZROOT_TABLET + "/gc_candidates";

public static final KeyExtent EXTENT = new KeyExtent(ID, null, null);
public static final KeyExtent OLD_EXTENT =
new KeyExtent(MetadataTable.ID, TabletsSection.getRow(MetadataTable.ID, null), null);
Expand Down
Expand Up @@ -18,9 +18,12 @@
package org.apache.accumulo.core.metadata.schema;

import java.util.Collection;
import java.util.Iterator;

import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
Expand Down Expand Up @@ -55,6 +58,31 @@
*/
public interface Ample {

/**
* Accumulo is a distributed tree with three levels. This enum is used to communicate to Ample
* that code is interested in operating on the metadata of a data level. Sometimes tables ids or
* key extents are passed to Ample in lieu of a data level, in these cases the data level is
* derived from the table id.
*/
public enum DataLevel {
ROOT(null), METADATA(RootTable.NAME), USER(MetadataTable.NAME);

private final String table;

private DataLevel(String table) {
this.table = table;
}

/**
* @return The name of the Accumulo table in which this data level stores its metadata.
*/
public String metaTable() {
if (table == null)
throw new UnsupportedOperationException();
return table;
}
}

/**
* Read a single tablets metadata. No checking is done for prev row, so it could differ.
*
Expand Down Expand Up @@ -90,7 +118,11 @@ default void putGcCandidates(TableId tableId, Collection<? extends Ample.FileMet
throw new UnsupportedOperationException();
}

default void deleteGcCandidates(TableId tableId, Collection<String> paths) {
default void deleteGcCandidates(DataLevel level, Collection<String> paths) {
throw new UnsupportedOperationException();
}

default Iterator<String> getGcCandidates(DataLevel level, String continuePoint) {
throw new UnsupportedOperationException();
}

Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.TreeMap;

import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.ColumnUpdate;
Expand All @@ -34,6 +35,8 @@
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
import org.apache.hadoop.io.Text;

Expand Down Expand Up @@ -175,10 +178,14 @@ public static RootTabletMetadata fromJson(byte[] bs) {
/**
* Generate initial json for the root tablet metadata.
*/
public static byte[] getInitialJson(String dir) {
public static byte[] getInitialJson(String dir, String file) {
Mutation mutation = RootTable.EXTENT.getPrevRowUpdateMutation();
TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(mutation,
new Value(dir.getBytes(UTF_8)));
ServerColumnFamily.DIRECTORY_COLUMN.put(mutation, new Value(dir.getBytes(UTF_8)));

mutation.put(DataFileColumnFamily.STR_NAME, file, new DataFileValue(0, 0).encodeAsValue());

ServerColumnFamily.TIME_COLUMN.put(mutation,
new Value(new MetadataTime(0, TimeType.LOGICAL).encode()));

RootTabletMetadata rtm = new RootTabletMetadata();

Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
Expand Down Expand Up @@ -69,21 +70,18 @@ private static class Builder implements TableRangeOptions, TableOptions, RangeOp

private List<Text> families = new ArrayList<>();
private List<ColumnFQ> qualifiers = new ArrayList<>();
private String table = MetadataTable.NAME;
private Ample.DataLevel level;
private String table;
private Range range;
private EnumSet<ColumnType> fetchedCols = EnumSet.noneOf(ColumnType.class);
private Text endRow;
private boolean checkConsistency = false;
private boolean saveKeyValues;
private TableId tableId;

// An internal constant that represents a fictional table where the root tablet stores its
// metadata
private static String SEED_TABLE = "accumulo.seed";

@Override
public TabletsMetadata build(AccumuloClient client) {
if (table.equals(SEED_TABLE)) {
if (level == DataLevel.ROOT) {
return buildSeed(client);
} else {
return buildNonSeed(client);
Expand All @@ -100,7 +98,26 @@ private TabletsMetadata buildSeed(AccumuloClient client) {

private TabletsMetadata buildNonSeed(AccumuloClient client) {
try {
Scanner scanner = new IsolatedScanner(client.createScanner(table, Authorizations.EMPTY));

String resolvedTable = null;
if (table == null) {
switch (level) {
case METADATA:
resolvedTable = RootTable.NAME;
break;
case USER:
resolvedTable = MetadataTable.NAME;
break;
default:
throw new IllegalArgumentException();
}
} else {
Preconditions.checkState(level == null);
resolvedTable = table;
}

Scanner scanner =
new IsolatedScanner(client.createScanner(resolvedTable, Authorizations.EMPTY));
scanner.setRange(range);

if (checkConsistency && !fetchedCols.contains(ColumnType.PREV_ROW)) {
Expand Down Expand Up @@ -198,11 +215,11 @@ public Options fetch(ColumnType... colsToFetch) {
@Override
public TableRangeOptions forTable(TableId tableId) {
if (tableId.equals(RootTable.ID)) {
this.table = SEED_TABLE;
this.level = DataLevel.ROOT;
} else if (tableId.equals(MetadataTable.ID)) {
this.table = RootTable.NAME;
this.level = DataLevel.METADATA;
} else {
this.table = MetadataTable.NAME;
this.level = DataLevel.USER;
}

this.tableId = tableId;
Expand Down Expand Up @@ -239,7 +256,6 @@ public Options saveKeyValues() {

@Override
public RangeOptions scanTable(String tableName) {
Preconditions.checkArgument(!tableName.equals(SEED_TABLE));
this.table = tableName;
this.range = TabletsSection.getRange();
return this;
Expand Down
Expand Up @@ -18,7 +18,6 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand All @@ -44,7 +43,6 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,7 +53,6 @@
public class VolumeUtil {

private static final Logger log = LoggerFactory.getLogger(VolumeUtil.class);
private static final SecureRandom rand = new SecureRandom();

private static boolean isActiveVolume(ServerContext context, Path dir) {

Expand Down Expand Up @@ -203,21 +200,17 @@ public static TabletFiles updateTabletVolumes(ServerContext context, ZooLock zoo
}
}

if (extent.isRootTablet()) {
ret.datafiles = tabletFiles.datafiles;
} else {
for (Entry<FileRef,DataFileValue> entry : tabletFiles.datafiles.entrySet()) {
String metaPath = entry.getKey().meta().toString();
String switchedPath = switchVolume(metaPath, FileType.TABLE, replacements);
if (switchedPath != null) {
filesToRemove.add(entry.getKey());
FileRef switchedRef = new FileRef(switchedPath, new Path(switchedPath));
filesToAdd.put(switchedRef, entry.getValue());
ret.datafiles.put(switchedRef, entry.getValue());
log.debug("Replacing volume {} : {} -> {}", extent, metaPath, switchedPath);
} else {
ret.datafiles.put(entry.getKey(), entry.getValue());
}
for (Entry<FileRef,DataFileValue> entry : tabletFiles.datafiles.entrySet()) {
String metaPath = entry.getKey().meta().toString();
String switchedPath = switchVolume(metaPath, FileType.TABLE, replacements);
if (switchedPath != null) {
filesToRemove.add(entry.getKey());
FileRef switchedRef = new FileRef(switchedPath, new Path(switchedPath));
filesToAdd.put(switchedRef, entry.getValue());
ret.datafiles.put(switchedRef, entry.getValue());
log.debug("Replacing volume {} : {} -> {}", extent, metaPath, switchedPath);
} else {
ret.datafiles.put(entry.getKey(), entry.getValue());
}
}

Expand Down Expand Up @@ -276,52 +269,10 @@ private static String decommisionedTabletDir(ServerContext context, ZooLock zooL
+ Path.SEPARATOR + dir.getName());

log.info("Updating directory for {} from {} to {}", extent, dir, newDir);
if (extent.isRootTablet()) {
// the root tablet is special case, its files need to be copied if its dir is changed

// this code needs to be idempotent

FileSystem fs1 = vm.getVolumeByPath(dir).getFileSystem();
FileSystem fs2 = vm.getVolumeByPath(newDir).getFileSystem();

if (!same(fs1, dir, fs2, newDir)) {
if (fs2.exists(newDir)) {
Path newDirBackup = getBackupName(newDir);
// never delete anything because were dealing with the root tablet
// one reason this dir may exist is because this method failed previously
log.info("renaming {} to {}", newDir, newDirBackup);
if (!fs2.rename(newDir, newDirBackup)) {
throw new IOException("Failed to rename " + newDir + " to " + newDirBackup);
}
}

// do a lot of logging since this is the root tablet
log.info("copying {} to {}", dir, newDir);
if (!FileUtil.copy(fs1, dir, fs2, newDir, false, context.getHadoopConf())) {
throw new IOException("Failed to copy " + dir + " to " + newDir);
}

// only set the new location in zookeeper after a successful copy
log.info("setting root tablet location to {}", newDir);
context.getAmple().mutateTablet(RootTable.EXTENT).putDir(newDir.toString()).mutate();

// rename the old dir to avoid confusion when someone looks at filesystem... its ok if we
// fail here and this does not happen because the location in
// zookeeper is the authority
Path dirBackup = getBackupName(dir);
log.info("renaming {} to {}", dir, dirBackup);
fs1.rename(dir, dirBackup);
MetadataTableUtil.updateTabletDir(extent, newDir.toString(), context, zooLock);
return newDir.toString();

} else {
log.info("setting root tablet location to {}", newDir);
context.getAmple().mutateTablet(RootTable.EXTENT).putDir(newDir.toString()).mutate();
}

return newDir.toString();
} else {
MetadataTableUtil.updateTabletDir(extent, newDir.toString(), context, zooLock);
return newDir.toString();
}
}

static boolean same(FileSystem fs1, Path dir, FileSystem fs2, Path newDir)
Expand Down Expand Up @@ -372,10 +323,4 @@ private static String hash(FileSystem fs, Path dir, String name) throws IOExcept
}

}

private static Path getBackupName(Path path) {
return new Path(path.getParent(), path.getName() + "_" + System.currentTimeMillis() + "_"
+ (rand.nextInt(Integer.MAX_VALUE) + 1) + ".bak");
}

}
Expand Up @@ -96,6 +96,7 @@
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.metadata.RootGcCandidates;
import org.apache.accumulo.server.replication.ReplicationUtil;
import org.apache.accumulo.server.replication.StatusCombiner;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
Expand Down Expand Up @@ -365,15 +366,18 @@ private boolean initialize(SiteConfiguration siteConfig, Configuration hadoopCon
fs.choose(chooserEnv, configuredVolumes) + Path.SEPARATOR + ServerConstants.TABLE_DIR
+ Path.SEPARATOR + RootTable.ID + RootTable.ROOT_TABLET_LOCATION).toString();

String ext = FileOperations.getNewFileExtension(DefaultConfiguration.getInstance());
String rootTabletFileName = rootTabletDir + Path.SEPARATOR + "00000_00000." + ext;

try {
initZooKeeper(opts, uuid.toString(), instanceNamePath, rootTabletDir);
initZooKeeper(opts, uuid.toString(), instanceNamePath, rootTabletDir, rootTabletFileName);
} catch (Exception e) {
log.error("FATAL: Failed to initialize zookeeper", e);
return false;
}

try {
initFileSystem(siteConfig, hadoopConf, fs, uuid, rootTabletDir);
initFileSystem(siteConfig, hadoopConf, fs, uuid, rootTabletDir, rootTabletFileName);
} catch (Exception e) {
log.error("FATAL Failed to initialize filesystem", e);

Expand Down Expand Up @@ -489,7 +493,8 @@ private static void initDirs(VolumeManager fs, UUID uuid, String[] baseDirs, boo
}

private void initFileSystem(SiteConfiguration siteConfig, Configuration hadoopConf,
VolumeManager fs, UUID uuid, String rootTabletDir) throws IOException {
VolumeManager fs, UUID uuid, String rootTabletDir, String rootTabletFileName)
throws IOException {
initDirs(fs, uuid, VolumeConfiguration.getVolumeUris(siteConfig, hadoopConf), false);

// initialize initial system tables config in zookeeper
Expand Down Expand Up @@ -523,7 +528,6 @@ private void initFileSystem(SiteConfiguration siteConfig, Configuration hadoopCo
createMetadataFile(fs, metadataFileName, siteConfig, replicationTablet);

// populate the root tablet with info about the metadata table's two initial tablets
String rootTabletFileName = rootTabletDir + Path.SEPARATOR + "00000_00000." + ext;
Text splitPoint = TabletsSection.getRange().getEndKey().getRow();
Tablet tablesTablet =
new Tablet(MetadataTable.ID, tableMetadataTabletDir, null, splitPoint, metadataFileName);
Expand Down Expand Up @@ -603,7 +607,8 @@ private static void createDirectories(VolumeManager fs, String... dirs) throws I
}

private static void initZooKeeper(Opts opts, String uuid, String instanceNamePath,
String rootTabletDir) throws KeeperException, InterruptedException {
String rootTabletDir, String rootTabletFileName)
throws KeeperException, InterruptedException {
// setup basic data in zookeeper
zoo.putPersistentData(Constants.ZROOT, new byte[0], -1, NodeExistsPolicy.SKIP,
Ids.OPEN_ACL_UNSAFE);
Expand Down Expand Up @@ -641,7 +646,10 @@ private static void initZooKeeper(Opts opts, String uuid, String instanceNamePat
zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS, EMPTY_BYTE_ARRAY,
NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET,
RootTabletMetadata.getInitialJson(rootTabletDir), NodeExistsPolicy.FAIL);
RootTabletMetadata.getInitialJson(rootTabletDir, rootTabletFileName),
NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + RootTable.ZROOT_TABLET_GC_CANDIDATES,
new RootGcCandidates().toJson().getBytes(UTF_8), NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTERS, EMPTY_BYTE_ARRAY,
NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZMASTER_LOCK, EMPTY_BYTE_ARRAY,
Expand Down

0 comments on commit c83d761

Please sign in to comment.