Skip to content

Commit

Permalink
Rework sorted WAL files Upgrade code
Browse files Browse the repository at this point in the history
* Follow on work for apache#2185
* Improve upgrade code in Upgrader9to10.dropSortedMapWALFiles()
  • Loading branch information
milleruntime committed Aug 13, 2021
1 parent baf7a6a commit dd16633
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
Expand Down Expand Up @@ -108,7 +109,7 @@
*
* Sorted recovery was updated to use RFiles instead of map files. So to prevent issues during
* tablet recovery, remove the old temporary map files and resort using RFiles. This is done in
* {@link #dropSortedMapWALFiles(VolumeManager)}. For more information see the following issues:
* {@link #dropSortedMapWALFiles(ServerContext)}. For more information see the following issues:
* <a href="https://github.com/apache/accumulo/issues/2117">#2117</a> and
* <a href="https://github.com/apache/accumulo/issues/2179">#2179</a>
*/
Expand All @@ -134,6 +135,8 @@ public void upgradeZookeeper(ServerContext ctx) {
upgradeRootTabletMetadata(ctx);
renameOldMasterPropsinZK(ctx);
createExternalCompactionNodes(ctx);
// special case where old files need to be deleted
dropSortedMapWALFiles(ctx);
}

@Override
Expand All @@ -148,8 +151,6 @@ public void upgradeMetadata(ServerContext ctx) {
upgradeRelativePaths(ctx, Ample.DataLevel.USER);
upgradeDirColumns(ctx, Ample.DataLevel.USER);
upgradeFileDeletes(ctx, Ample.DataLevel.USER);
// special case where old files need to be deleted
dropSortedMapWALFiles(ctx.getVolumeManager());
}

private void setMetaTableProps(ServerContext ctx) {
Expand Down Expand Up @@ -738,33 +739,36 @@ static Path resolveRelativeDelete(String oldDelete, String upgradeProperty) {
/**
* Remove old temporary map files to prevent problems during recovery.
*/
static void dropSortedMapWALFiles(VolumeManager vm) {
Path recoveryDir = new Path("/accumulo/recovery");
try {
if (!vm.exists(recoveryDir)) {
log.info("There are no recovery files in /accumulo/recovery");
return;
}
List<Path> directoriesToDrop = new ArrayList<>();
for (FileStatus walDir : vm.listStatus(recoveryDir)) {
// map files will be in a directory starting with "part"
Path walDirPath = walDir.getPath();
for (FileStatus dirOrFile : vm.listStatus(walDirPath)) {
if (dirOrFile.isDirectory()) {
directoriesToDrop.add(walDirPath);
break;
static void dropSortedMapWALFiles(ServerContext context) {
VolumeManager vm = context.getVolumeManager();
for (Volume volume : vm.getVolumes()) {
Path recoveryDir = volume.prefixChild("/accumulo/recovery");
try {
if (!vm.exists(recoveryDir)) {
log.info("There are no recovery files in /accumulo/recovery");
return;
}
List<Path> directoriesToDrop = new ArrayList<>();
for (FileStatus walDir : vm.listStatus(recoveryDir)) {
// map files will be in a directory starting with "part"
Path walDirPath = walDir.getPath();
for (FileStatus dirOrFile : vm.listStatus(walDirPath)) {
if (dirOrFile.isDirectory()) {
directoriesToDrop.add(walDirPath);
break;
}
}
}
}
if (!directoriesToDrop.isEmpty()) {
log.info("Found {} old sorted map directories to delete.", directoriesToDrop.size());
for (Path dir : directoriesToDrop) {
log.info("Deleting everything in old sorted map directory: {}", dir);
vm.deleteRecursively(dir);
if (!directoriesToDrop.isEmpty()) {
log.info("Found {} old sorted map directories to delete.", directoriesToDrop.size());
for (Path dir : directoriesToDrop) {
log.info("Deleting everything in old sorted map directory: {}", dir);
vm.deleteRecursively(dir);
}
}
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,20 @@
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
Expand All @@ -49,9 +51,14 @@
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.volume.Volume;
import org.apache.accumulo.core.volume.VolumeImpl;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.gc.GcVolumeUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.junit.Test;
Expand Down Expand Up @@ -330,40 +337,44 @@ private void verifyPathsReplaced(List<Mutation> expected, List<Mutation> results

@Test
public void testDropSortedMapWALs() throws IOException {
Path recoveryDir = new Path("/accumulo/recovery");
VolumeManager fs = createMock(VolumeManager.class);
FileSystem fs = new Path("file:///").getFileSystem(new Configuration(false));

List<String> volumes = Arrays.asList("/vol1/", "/vol2/");
Collection<Volume> vols =
volumes.stream().map(s -> new VolumeImpl(fs, s)).collect(Collectors.toList());

ServerContext context = createMock(ServerContext.class);
Path recoveryDir1 = new Path("file:/vol1/accumulo/recovery");
Path recoveryDir2 = new Path("file:/vol2/accumulo/recovery");
VolumeManager volumeManager = createMock(VolumeManager.class);
FileStatus[] dirs = new FileStatus[2];
dirs[0] = createMock(FileStatus.class);
Path dir0 = new Path("/accumulo/recovery/A123456789");
Path dir0 = new Path("file:/vol1/accumulo/recovery/A123456789");
FileStatus[] dir0Files = new FileStatus[1];
dir0Files[0] = createMock(FileStatus.class);
dirs[1] = createMock(FileStatus.class);
Path dir1 = new Path("/accumulo/recovery/B123456789");
Path dir1 = new Path("file:/vol1/accumulo/recovery/B123456789");
FileStatus[] dir1Files = new FileStatus[1];
dir1Files[0] = createMock(FileStatus.class);
Path part1Dir = new Path("/accumulo/recovery/B123456789/part-r-0000");
Path part1Dir = new Path("file:/vol1/accumulo/recovery/B123456789/part-r-0000");

expect(fs.exists(recoveryDir)).andReturn(true).once();
expect(fs.listStatus(recoveryDir)).andReturn(dirs).once();
expect(context.getVolumeManager()).andReturn(volumeManager).once();
expect(volumeManager.getVolumes()).andReturn(vols).once();
expect(volumeManager.exists(recoveryDir1)).andReturn(true).once();
expect(volumeManager.exists(recoveryDir2)).andReturn(false).once();
expect(volumeManager.listStatus(recoveryDir1)).andReturn(dirs).once();
expect(dirs[0].getPath()).andReturn(dir0).once();
expect(fs.listStatus(dir0)).andReturn(dir0Files).once();
expect(volumeManager.listStatus(dir0)).andReturn(dir0Files).once();
expect(dir0Files[0].isDirectory()).andReturn(false).once();

expect(dirs[1].getPath()).andReturn(dir1).once();
expect(fs.listStatus(dir1)).andReturn(dir1Files).once();
expect(volumeManager.listStatus(dir1)).andReturn(dir1Files).once();
expect(dir1Files[0].isDirectory()).andReturn(true).once();
expect(dir1Files[0].getPath()).andReturn(part1Dir).once();

expect(fs.deleteRecursively(dir1)).andReturn(true).once();

replay(fs, dirs[0], dirs[1], dir0Files[0], dir1Files[0]);
Upgrader9to10.dropSortedMapWALFiles(fs);

reset(fs);
expect(volumeManager.deleteRecursively(dir1)).andReturn(true).once();

// test case where there is no recovery
expect(fs.exists(recoveryDir)).andReturn(false).once();
replay(fs);
Upgrader9to10.dropSortedMapWALFiles(fs);
replay(context, volumeManager, dirs[0], dirs[1], dir0Files[0], dir1Files[0]);
Upgrader9to10.dropSortedMapWALFiles(context);
}
}

0 comments on commit dd16633

Please sign in to comment.