Skip to content

Commit

Permalink
Add upgradeFiles to Upgrade code
Browse files Browse the repository at this point in the history
* Add upgradeFiles method for upgrading files to Upgrader
* Refactor status handling in UpgradeCoordinator to allow upgradeFiles
to run concurrently and wait for metadata upgrade to complete
* Implement upgradeFiles method in Upgrader9to10
* Add dropSortedMapWALFiles for resolving sorted map files that may
still be around during upgrade. Follow up for apache#2117
* Closes apache#2179
  • Loading branch information
milleruntime committed Jun 30, 2021
1 parent 6250aa5 commit 2a48ffe
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 6 deletions.
Expand Up @@ -272,12 +272,14 @@ synchronized void setManagerState(ManagerState newState) {
+ " all logs and file a bug.");
}
upgradeMetadataFuture = upgradeCoordinator.upgradeMetadata(getContext(), nextEvent);
upgradeFilesFuture = upgradeCoordinator.upgradeFiles(getContext(), nextEvent);
}
}

private final UpgradeCoordinator upgradeCoordinator = new UpgradeCoordinator();

private Future<Void> upgradeMetadataFuture;
private Future<Void> upgradeFilesFuture;

private ManagerClientServiceHandler clientHandler;

Expand Down Expand Up @@ -1120,6 +1122,9 @@ boolean canSuspendTablets() {
if (null != upgradeMetadataFuture) {
upgradeMetadataFuture.get();
}
if (null != upgradeFilesFuture) {
upgradeFilesFuture.get();
}
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException("Metadata upgrade failed", e);
}
Expand Down
Expand Up @@ -27,6 +27,7 @@

import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.manager.EventCoordinator;
import org.apache.accumulo.server.ServerConstants;
import org.apache.accumulo.server.ServerContext;
Expand Down Expand Up @@ -69,6 +70,16 @@ public boolean isParentLevelUpgraded(KeyExtent extent) {
return extent.isMeta();
}
},

/**
* This signifies that only zookeeper, root and metadata table have been upgraded.
*/
UPGRADED_METADATA {
@Override
public boolean isParentLevelUpgraded(KeyExtent extent) {
return !extent.isMeta();
}
},
/**
* This signifies that everything (zookeeper, root table, metadata table) is upgraded.
*/
Expand Down Expand Up @@ -98,7 +109,7 @@ public boolean isParentLevelUpgraded(KeyExtent extent) {
private static Logger log = LoggerFactory.getLogger(UpgradeCoordinator.class);

private int currentVersion;
private Map<Integer,Upgrader> upgraders = Map.of(ServerConstants.SHORTEN_RFILE_KEYS,
private final Map<Integer,Upgrader> upgraders = Map.of(ServerConstants.SHORTEN_RFILE_KEYS,
new Upgrader8to9(), ServerConstants.CRYPTO_CHANGES, new Upgrader9to10());

private volatile UpgradeStatus status;
Expand Down Expand Up @@ -166,7 +177,7 @@ public synchronized Future<Void> upgradeMetadata(ServerContext context,

if (currentVersion < ServerConstants.DATA_VERSION) {
return ThreadPools.createThreadPool(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
"UpgradeMetadataThreads", new SynchronousQueue<Runnable>(), OptionalInt.empty(), false)
"UpgradeMetadataThreads", new SynchronousQueue<>(), OptionalInt.empty(), false)
.submit(() -> {
try {
for (int v = currentVersion; v < ServerConstants.DATA_VERSION; v++) {
Expand All @@ -181,10 +192,7 @@ public synchronized Future<Void> upgradeMetadata(ServerContext context,
upgraders.get(v).upgradeMetadata(context);
}

log.info("Updating persistent data version.");
ServerUtil.updateAccumuloVersion(context.getVolumeManager(), currentVersion);
log.info("Upgrade complete");
setStatus(UpgradeStatus.COMPLETE, eventCoordinator);
setStatus(UpgradeStatus.UPGRADED_METADATA, eventCoordinator);
} catch (Exception e) {
handleFailure(e);
}
Expand All @@ -195,6 +203,44 @@ public synchronized Future<Void> upgradeMetadata(ServerContext context,
}
}

public synchronized Future<Void> upgradeFiles(ServerContext context,
EventCoordinator eventCoordinator) {
if (status == UpgradeStatus.COMPLETE)
return CompletableFuture.completedFuture(null);

if (currentVersion < ServerConstants.DATA_VERSION) {
return ThreadPools.createThreadPool(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
"UpgradeFilesThreads", new SynchronousQueue<>(), OptionalInt.empty(), false)
.submit(() -> {
try {
for (int v = currentVersion; v < ServerConstants.DATA_VERSION; v++) {
log.info("Upgrading files from data version {}", v);
upgraders.get(v).upgradeFiles(context);
}

log.info("Upgrade files completed");
while (status != UpgradeStatus.UPGRADED_METADATA) {
log.info("Waiting for upgrade metadata to complete");
UtilWaitThread.sleepUninterruptibly(5, TimeUnit.SECONDS);
}
completeUpgrade(context, eventCoordinator);
} catch (Exception e) {
handleFailure(e);
}
return null;
});
} else {
return CompletableFuture.completedFuture(null);
}
}

private void completeUpgrade(ServerContext context, EventCoordinator eventCoordinator) {
log.info("Updating persistent data version.");
ServerUtil.updateAccumuloVersion(context.getVolumeManager(), currentVersion);
log.info("Upgrade complete");
setStatus(UpgradeStatus.COMPLETE, eventCoordinator);
}

public UpgradeStatus getStatus() {
return status;
}
Expand Down
Expand Up @@ -35,4 +35,6 @@ public interface Upgrader {
void upgradeRoot(ServerContext ctx);

void upgradeMetadata(ServerContext ctx);

void upgradeFiles(ServerContext ctx);
}
Expand Up @@ -41,4 +41,9 @@ public void upgradeMetadata(ServerContext ctx) {
// There is no action that needs to be taken for metadata
}

@Override
public void upgradeFiles(ServerContext ctx) {
// There is no action that needs to be taken for the files
}

}
Expand Up @@ -105,6 +105,12 @@
* <a href="https://github.com/apache/accumulo/issues/1642">#1642</a>, and
* <a href="https://github.com/apache/accumulo/issues/1643">#1643</a> as well.</li>
* </ul>
*
* Sorted recovery was updated to use RFiles instead of map files. So to prevent issues during
* tablet recovery, remove the old temporary map files and resort using RFiles. This is done in
* {@link #dropSortedMapWALFiles(VolumeManager)}. For more information see the following issues:
* <a href="https://github.com/apache/accumulo/issues/2117">#2117</a> and
* <a href="https://github.com/apache/accumulo/issues/2179">#2179</a>
*/
public class Upgrader9to10 implements Upgrader {

Expand Down Expand Up @@ -144,6 +150,11 @@ public void upgradeMetadata(ServerContext ctx) {
upgradeFileDeletes(ctx, Ample.DataLevel.USER);
}

@Override
public void upgradeFiles(ServerContext ctx) {
dropSortedMapWALFiles(ctx.getVolumeManager());
}

private void setMetaTableProps(ServerContext ctx) {
try {
TablePropUtil.setTableProperty(ctx, RootTable.ID,
Expand Down Expand Up @@ -726,4 +737,37 @@ static Path resolveRelativeDelete(String oldDelete, String upgradeProperty) {
}
return new Path(upgradeProperty, VolumeManager.FileType.TABLE.getDirectory() + oldDelete);
}

/**
* Remove old temporary map files to prevent problems during recovery.
*/
static void dropSortedMapWALFiles(VolumeManager vm) {
Path recoveryDir = new Path("/accumulo/recovery");
try {
if (!vm.exists(recoveryDir)) {
log.info("There are no recover files in /accumulo/recovery");
return;
}
List<Path> directoriesToDrop = new ArrayList<>();
for (FileStatus walDir : vm.listStatus(recoveryDir)) {
// map files will be in a directory starting with "part"
Path walDirPath = walDir.getPath();
for (FileStatus dirOrFile : vm.listStatus(walDirPath)) {
if (dirOrFile.isDirectory()) {
directoriesToDrop.add(walDirPath);
break;
}
}
}
if (!directoriesToDrop.isEmpty()) {
log.info("Found {} old sorted map directories to delete.", directoriesToDrop.size());
for (Path dir : directoriesToDrop) {
log.info("Deleting everything in old sorted map directory: {}", dir);
vm.deleteRecursively(dir);
}
}
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
}
}
Expand Up @@ -24,11 +24,13 @@
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand All @@ -49,6 +51,7 @@
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.gc.GcVolumeUtil;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.junit.Test;
Expand Down Expand Up @@ -324,4 +327,43 @@ private void verifyPathsReplaced(List<Mutation> expected, List<Mutation> results

assertEquals("Replacements should have update for every delete", deleteCount, updateCount);
}

@Test
public void testDropSortedMapWALs() throws IOException {
Path recoveryDir = new Path("/accumulo/recovery");
VolumeManager fs = createMock(VolumeManager.class);
FileStatus[] dirs = new FileStatus[2];
dirs[0] = createMock(FileStatus.class);
Path dir0 = new Path("/accumulo/recovery/A123456789");
FileStatus[] dir0Files = new FileStatus[1];
dir0Files[0] = createMock(FileStatus.class);
dirs[1] = createMock(FileStatus.class);
Path dir1 = new Path("/accumulo/recovery/B123456789");
FileStatus[] dir1Files = new FileStatus[1];
dir1Files[0] = createMock(FileStatus.class);
Path part1Dir = new Path("/accumulo/recovery/B123456789/part-r-0000");

expect(fs.exists(recoveryDir)).andReturn(true).once();
expect(fs.listStatus(recoveryDir)).andReturn(dirs).once();
expect(dirs[0].getPath()).andReturn(dir0).once();
expect(fs.listStatus(dir0)).andReturn(dir0Files).once();
expect(dir0Files[0].isDirectory()).andReturn(false).once();

expect(dirs[1].getPath()).andReturn(dir1).once();
expect(fs.listStatus(dir1)).andReturn(dir1Files).once();
expect(dir1Files[0].isDirectory()).andReturn(true).once();
expect(dir1Files[0].getPath()).andReturn(part1Dir).once();

expect(fs.deleteRecursively(dir1)).andReturn(true).once();

replay(fs, dirs[0], dirs[1], dir0Files[0], dir1Files[0]);
Upgrader9to10.dropSortedMapWALFiles(fs);

reset(fs);

// test case where there is no recovery
expect(fs.exists(recoveryDir)).andReturn(false).once();
replay(fs);
Upgrader9to10.dropSortedMapWALFiles(fs);
}
}

0 comments on commit 2a48ffe

Please sign in to comment.