From dd16633b9ea0b9c27736d0599008dbfb285f2a5a Mon Sep 17 00:00:00 2001 From: Mike Miller Date: Thu, 12 Aug 2021 10:12:36 -0400 Subject: [PATCH] Rework sorted WAL files Upgrade code * Follow on work for #2185 * Improve upgrade code in Upgrader9to10.dropSortedMapWALFiles() --- .../manager/upgrade/Upgrader9to10.java | 56 ++++++++++--------- .../manager/upgrade/Upgrader9to10Test.java | 51 ++++++++++------- 2 files changed, 61 insertions(+), 46 deletions(-) diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java index 597f88c875d..d1b36212109 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader9to10.java @@ -71,6 +71,7 @@ import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher; import org.apache.accumulo.core.tabletserver.log.LogEntry; import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.volume.Volume; import org.apache.accumulo.fate.zookeeper.ZooReaderWriter; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; @@ -108,7 +109,7 @@ * * Sorted recovery was updated to use RFiles instead of map files. So to prevent issues during * tablet recovery, remove the old temporary map files and resort using RFiles. This is done in - * {@link #dropSortedMapWALFiles(VolumeManager)}. For more information see the following issues: + * {@link #dropSortedMapWALFiles(ServerContext)}. For more information see the following issues: * #2117 and * #2179 */ @@ -134,6 +135,8 @@ public void upgradeZookeeper(ServerContext ctx) { upgradeRootTabletMetadata(ctx); renameOldMasterPropsinZK(ctx); createExternalCompactionNodes(ctx); + // special case where old files need to be deleted + dropSortedMapWALFiles(ctx); } @Override @@ -148,8 +151,6 @@ public void upgradeMetadata(ServerContext ctx) { upgradeRelativePaths(ctx, Ample.DataLevel.USER); upgradeDirColumns(ctx, Ample.DataLevel.USER); upgradeFileDeletes(ctx, Ample.DataLevel.USER); - // special case where old files need to be deleted - dropSortedMapWALFiles(ctx.getVolumeManager()); } private void setMetaTableProps(ServerContext ctx) { @@ -738,33 +739,36 @@ static Path resolveRelativeDelete(String oldDelete, String upgradeProperty) { /** * Remove old temporary map files to prevent problems during recovery. */ - static void dropSortedMapWALFiles(VolumeManager vm) { - Path recoveryDir = new Path("/accumulo/recovery"); - try { - if (!vm.exists(recoveryDir)) { - log.info("There are no recovery files in /accumulo/recovery"); - return; - } - List directoriesToDrop = new ArrayList<>(); - for (FileStatus walDir : vm.listStatus(recoveryDir)) { - // map files will be in a directory starting with "part" - Path walDirPath = walDir.getPath(); - for (FileStatus dirOrFile : vm.listStatus(walDirPath)) { - if (dirOrFile.isDirectory()) { - directoriesToDrop.add(walDirPath); - break; + static void dropSortedMapWALFiles(ServerContext context) { + VolumeManager vm = context.getVolumeManager(); + for (Volume volume : vm.getVolumes()) { + Path recoveryDir = volume.prefixChild("/accumulo/recovery"); + try { + if (!vm.exists(recoveryDir)) { + log.info("There are no recovery files in /accumulo/recovery"); + return; + } + List directoriesToDrop = new ArrayList<>(); + for (FileStatus walDir : vm.listStatus(recoveryDir)) { + // map files will be in a directory starting with "part" + Path walDirPath = walDir.getPath(); + for (FileStatus dirOrFile : vm.listStatus(walDirPath)) { + if (dirOrFile.isDirectory()) { + directoriesToDrop.add(walDirPath); + break; + } } } - } - if (!directoriesToDrop.isEmpty()) { - log.info("Found {} old sorted map directories to delete.", directoriesToDrop.size()); - for (Path dir : directoriesToDrop) { - log.info("Deleting everything in old sorted map directory: {}", dir); - vm.deleteRecursively(dir); + if (!directoriesToDrop.isEmpty()) { + log.info("Found {} old sorted map directories to delete.", directoriesToDrop.size()); + for (Path dir : directoriesToDrop) { + log.info("Deleting everything in old sorted map directory: {}", dir); + vm.deleteRecursively(dir); + } } + } catch (IOException ioe) { + throw new UncheckedIOException(ioe); } - } catch (IOException ioe) { - throw new UncheckedIOException(ioe); } } } diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader9to10Test.java b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader9to10Test.java index 25800da5b50..d386820f584 100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader9to10Test.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader9to10Test.java @@ -24,7 +24,6 @@ import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; -import static org.easymock.EasyMock.reset; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -32,10 +31,13 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.SortedMap; import java.util.TreeMap; +import java.util.stream.Collectors; import org.apache.accumulo.core.client.AccumuloClient; import org.apache.accumulo.core.client.BatchWriter; @@ -49,9 +51,14 @@ import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.volume.Volume; +import org.apache.accumulo.core.volume.VolumeImpl; +import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.gc.GcVolumeUtil; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.junit.Test; @@ -330,40 +337,44 @@ private void verifyPathsReplaced(List expected, List results @Test public void testDropSortedMapWALs() throws IOException { - Path recoveryDir = new Path("/accumulo/recovery"); - VolumeManager fs = createMock(VolumeManager.class); + FileSystem fs = new Path("file:///").getFileSystem(new Configuration(false)); + + List volumes = Arrays.asList("/vol1/", "/vol2/"); + Collection vols = + volumes.stream().map(s -> new VolumeImpl(fs, s)).collect(Collectors.toList()); + + ServerContext context = createMock(ServerContext.class); + Path recoveryDir1 = new Path("file:/vol1/accumulo/recovery"); + Path recoveryDir2 = new Path("file:/vol2/accumulo/recovery"); + VolumeManager volumeManager = createMock(VolumeManager.class); FileStatus[] dirs = new FileStatus[2]; dirs[0] = createMock(FileStatus.class); - Path dir0 = new Path("/accumulo/recovery/A123456789"); + Path dir0 = new Path("file:/vol1/accumulo/recovery/A123456789"); FileStatus[] dir0Files = new FileStatus[1]; dir0Files[0] = createMock(FileStatus.class); dirs[1] = createMock(FileStatus.class); - Path dir1 = new Path("/accumulo/recovery/B123456789"); + Path dir1 = new Path("file:/vol1/accumulo/recovery/B123456789"); FileStatus[] dir1Files = new FileStatus[1]; dir1Files[0] = createMock(FileStatus.class); - Path part1Dir = new Path("/accumulo/recovery/B123456789/part-r-0000"); + Path part1Dir = new Path("file:/vol1/accumulo/recovery/B123456789/part-r-0000"); - expect(fs.exists(recoveryDir)).andReturn(true).once(); - expect(fs.listStatus(recoveryDir)).andReturn(dirs).once(); + expect(context.getVolumeManager()).andReturn(volumeManager).once(); + expect(volumeManager.getVolumes()).andReturn(vols).once(); + expect(volumeManager.exists(recoveryDir1)).andReturn(true).once(); + expect(volumeManager.exists(recoveryDir2)).andReturn(false).once(); + expect(volumeManager.listStatus(recoveryDir1)).andReturn(dirs).once(); expect(dirs[0].getPath()).andReturn(dir0).once(); - expect(fs.listStatus(dir0)).andReturn(dir0Files).once(); + expect(volumeManager.listStatus(dir0)).andReturn(dir0Files).once(); expect(dir0Files[0].isDirectory()).andReturn(false).once(); expect(dirs[1].getPath()).andReturn(dir1).once(); - expect(fs.listStatus(dir1)).andReturn(dir1Files).once(); + expect(volumeManager.listStatus(dir1)).andReturn(dir1Files).once(); expect(dir1Files[0].isDirectory()).andReturn(true).once(); expect(dir1Files[0].getPath()).andReturn(part1Dir).once(); - expect(fs.deleteRecursively(dir1)).andReturn(true).once(); - - replay(fs, dirs[0], dirs[1], dir0Files[0], dir1Files[0]); - Upgrader9to10.dropSortedMapWALFiles(fs); - - reset(fs); + expect(volumeManager.deleteRecursively(dir1)).andReturn(true).once(); - // test case where there is no recovery - expect(fs.exists(recoveryDir)).andReturn(false).once(); - replay(fs); - Upgrader9to10.dropSortedMapWALFiles(fs); + replay(context, volumeManager, dirs[0], dirs[1], dir0Files[0], dir1Files[0]); + Upgrader9to10.dropSortedMapWALFiles(context); } }