diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index a07807b1998..b9774aab9b1 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -818,7 +818,7 @@ ozone.scm.pipeline.owner.container.count 3 OZONE, SCM, PIPELINE - Number of containers per owner in a pipeline. + Number of containers per owner per disk in a pipeline. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java index 34177f08864..e09486e2b60 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java @@ -66,7 +66,7 @@ public class SCMContainerManager implements ContainerManager { private final ContainerStateManager containerStateManager; - private final int numContainerPerOwnerInPipeline; + private final int numContainerPerVolume; private final SCMContainerManagerMetrics scmContainerManagerMetrics; @@ -98,7 +98,7 @@ public SCMContainerManager( this.lock = new ReentrantLock(); this.pipelineManager = pipelineManager; this.containerStateManager = new ContainerStateManager(conf); - this.numContainerPerOwnerInPipeline = conf + this.numContainerPerVolume = conf .getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT); @@ -432,7 +432,8 @@ public ContainerInfo getMatchingContainer(final long sizeRequired, synchronized (pipeline) { containerIDs = getContainersForOwner(pipeline, owner); - if (containerIDs.size() < numContainerPerOwnerInPipeline) { + if (containerIDs.size() < numContainerPerVolume * pipelineManager. + getNumHealthyVolumes(pipeline)) { containerInfo = containerStateManager.allocateContainer( pipelineManager, owner, pipeline); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java index d06ea2a3b3f..b39440f41f9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java @@ -108,6 +108,28 @@ public List getStorageReports() { } } + /** + * Returns count of healthy volumes reported from datanode. + * @return count of healthy volumes + */ + public int getHealthyVolumeCount() { + try { + lock.readLock().lock(); + return storageReports.size() - getFailedVolumeCount(); + } finally { + lock.readLock().unlock(); + } + } + + /** + * Returns count of failed volumes reported from datanode. + * @return count of failed volumes + */ + private int getFailedVolumeCount() { + return (int) storageReports.stream(). + filter(e -> e.hasFailed() ? e.getFailed() : false).count(); + } + /** * Returns the last updated time of datanode info. * @return the last updated time of datanode info. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java index 37562fe9f29..df21b84eafd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java @@ -213,4 +213,6 @@ void processNodeReport(DatanodeDetails datanodeDetails, * @return cluster map */ NetworkTopology getClusterNetworkTopologyMap(); + + int getNumHealthyVolumes(List dnList); } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index 005881c0117..1a0cec3b217 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.Collections; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.stream.Collectors; @@ -509,6 +510,26 @@ public Map getNodeInfo() { return nodeInfo; } + /** + * Returns the max of no healthy volumes reported out of the set + * of datanodes constituting the pipeline. + */ + @Override + public int getNumHealthyVolumes(List dnList) { + List volumeCountList = new ArrayList<>(dnList.size()); + for (DatanodeDetails dn : dnList) { + try { + volumeCountList.add(nodeStateManager.getNode(dn). + getHealthyVolumeCount()); + } catch (NodeNotFoundException e) { + LOG.warn("Cannot generate NodeStat, datanode {} not found.", + dn.getUuid()); + } + } + Preconditions.checkArgument(!volumeCountList.isEmpty()); + return Collections.max(volumeCountList); + } + /** * Get set of pipelines a datanode is part of. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java index 48068d82fe5..857f76e88e5 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java @@ -89,6 +89,8 @@ void scrubPipeline(ReplicationType type, ReplicationFactor factor) void incNumBlocksAllocatedMetric(PipelineID id); + int getNumHealthyVolumes(Pipeline pipeline); + /** * Activates a dormant pipeline. * diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 6fce895185f..e7540590ae9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -661,6 +661,16 @@ public void close() throws IOException { pipelineFactory.shutdown(); } + /** + * returns max count of healthy volumes from the set of + * datanodes constituting the pipeline. + * @param pipeline + * @return healthy volume count + */ + public int getNumHealthyVolumes(Pipeline pipeline) { + return nodeManager.getNumHealthyVolumes(pipeline.getNodes()); + } + protected ReadWriteLock getLock() { return lock; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index 03ed0f7123e..f4f17598ed0 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@ -222,20 +222,26 @@ public static StorageReportProto getRandomStorageReport(UUID nodeId, StorageTypeProto.DISK); } - /** - * Creates storage report with the given information. - * - * @param nodeId datanode id - * @param path storage dir - * @param capacity storage size - * @param used space used - * @param remaining space remaining - * @param type type of storage - * - * @return StorageReportProto - */ public static StorageReportProto createStorageReport(UUID nodeId, String path, - long capacity, long used, long remaining, StorageTypeProto type) { + long capacity, long used, long remaining, StorageTypeProto type) { + return createStorageReport(nodeId, path, capacity, used, remaining, + type, false); + } + /** + * Creates storage report with the given information. + * + * @param nodeId datanode id + * @param path storage dir + * @param capacity storage size + * @param used space used + * @param remaining space remaining + * @param type type of storage + * + * @return StorageReportProto + */ + public static StorageReportProto createStorageReport(UUID nodeId, String path, + long capacity, long used, long remaining, StorageTypeProto type, + boolean failed) { Preconditions.checkNotNull(nodeId); Preconditions.checkNotNull(path); StorageReportProto.Builder srb = StorageReportProto.newBuilder(); @@ -243,6 +249,7 @@ public static StorageReportProto createStorageReport(UUID nodeId, String path, .setStorageLocation(path) .setCapacity(capacity) .setScmUsed(used) + .setFailed(failed) .setRemaining(remaining); StorageTypeProto storageTypeProto = type == null ? StorageTypeProto.DISK : type; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java index e0ba53c7e94..a72031c4249 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java @@ -298,6 +298,72 @@ public void testBlockDistribution() throws Exception { } } + + @Test + public void testBlockDistributionWithMultipleDisks() throws Exception { + int threadCount = numContainerPerOwnerInPipeline * + numContainerPerOwnerInPipeline; + nodeManager.setNumHealthyVolumes(numContainerPerOwnerInPipeline); + List executors = new ArrayList<>(threadCount); + for (int i = 0; i < threadCount; i++) { + executors.add(Executors.newSingleThreadExecutor()); + } + pipelineManager.createPipeline(type, factor); + TestUtils.openAllRatisPipelines(pipelineManager); + Map> allocatedBlockMap = + new ConcurrentHashMap<>(); + List> futureList = + new ArrayList<>(threadCount); + for (int i = 0; i < threadCount; i++) { + final CompletableFuture future = + new CompletableFuture<>(); + CompletableFuture.supplyAsync(() -> { + try { + List blockList; + AllocatedBlock block = blockManager + .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, + OzoneConsts.OZONE, + new ExcludeList()); + long containerId = block.getBlockID().getContainerID(); + if (!allocatedBlockMap.containsKey(containerId)) { + blockList = new ArrayList<>(); + } else { + blockList = allocatedBlockMap.get(containerId); + } + blockList.add(block); + allocatedBlockMap.put(containerId, blockList); + future.complete(block); + } catch (IOException e) { + future.completeExceptionally(e); + } + return future; + }, executors.get(i)); + futureList.add(future); + } + try { + CompletableFuture + .allOf(futureList.toArray( + new CompletableFuture[futureList.size()])).get(); + Assert.assertTrue( + pipelineManager.getPipelines(type).size() == 1); + Pipeline pipeline = pipelineManager.getPipelines(type).get(0); + // total no of containers to be created will be number of healthy + // volumes * number of numContainerPerOwnerInPipeline which is equal to + // the thread count + Assert.assertTrue(threadCount == pipelineManager. + getNumberOfContainers(pipeline.getId())); + Assert.assertTrue( + allocatedBlockMap.size() == threadCount); + Assert.assertTrue(allocatedBlockMap. + values().size() == threadCount); + allocatedBlockMap.values().stream().forEach(v -> { + Assert.assertTrue(v.size() == 1); + }); + } catch (Exception e) { + Assert.fail("testAllocateBlockInParallel failed"); + } + } + @Test public void testAllocateOversizedBlock() throws Exception { long size = 6 * GB; diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java index 54f6ee43334..5b635a7bee9 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java @@ -92,6 +92,7 @@ public class MockNodeManager implements NodeManager { private final Node2ContainerMap node2ContainerMap; private NetworkTopology clusterMap; private ConcurrentMap> dnsToUuidMap; + private int numHealthyDisksPerDatanode; public MockNodeManager(NetworkTopologyImpl clusterMap, List nodes, @@ -121,6 +122,7 @@ public MockNodeManager(NetworkTopologyImpl clusterMap, } safemode = false; this.commandMap = new HashMap<>(); + numHealthyDisksPerDatanode = 1; } public MockNodeManager(boolean initializeFakeNodes, int nodeCount) { @@ -569,6 +571,15 @@ public void setNetworkTopology(NetworkTopology topology) { this.clusterMap = topology; } + @Override + public int getNumHealthyVolumes(List dnList) { + return numHealthyDisksPerDatanode; + } + + public void setNumHealthyVolumes(int value) { + numHealthyDisksPerDatanode = value; + } + /** * A class to declare some values for the nodes so that our tests * won't fail. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index df5cb2de255..7a58d46ab68 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@ -848,11 +848,12 @@ public void testScmStatsFromNodeReport() final long capacity = 2000; final long used = 100; final long remaining = capacity - used; - + List dnList = new ArrayList<>(nodeCount); try (SCMNodeManager nodeManager = createNodeManager(conf)) { EventQueue eventQueue = (EventQueue) scm.getEventQueue(); for (int x = 0; x < nodeCount; x++) { DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails(); + dnList.add(dn); UUID dnId = dn.getUuid(); long free = capacity - used; String storagePath = testDir.getAbsolutePath() + "/" + dnId; @@ -871,9 +872,57 @@ public void testScmStatsFromNodeReport() .getScmUsed().get()); assertEquals(remaining * nodeCount, (long) nodeManager.getStats() .getRemaining().get()); + assertEquals(1, nodeManager.getNumHealthyVolumes(dnList)); + dnList.clear(); + } + } + + /** + * Test multiple nodes sending initial heartbeat with their node report + * with multiple volumes. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void tesVolumeInfoFromNodeReport() + throws IOException, InterruptedException, AuthenticationException { + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000, + MILLISECONDS); + final int volumeCount = 10; + final long capacity = 2000; + final long used = 100; + List dnList = new ArrayList<>(1); + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + EventQueue eventQueue = (EventQueue) scm.getEventQueue(); + DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails(); + dnList.add(dn); + UUID dnId = dn.getUuid(); + long free = capacity - used; + List reports = new ArrayList<>(volumeCount); + boolean failed = true; + for (int x = 0; x < volumeCount; x++) { + String storagePath = testDir.getAbsolutePath() + "/" + dnId; + reports.add(TestUtils + .createStorageReport(dnId, storagePath, capacity, + used, free, null, failed)); + failed = !failed; + } + nodeManager.register(dn, TestUtils.createNodeReport(reports), null); + nodeManager.processHeartbeat(dn); + //TODO: wait for EventQueue to be processed + eventQueue.processAll(8000L); + + assertEquals(1, nodeManager.getNodeCount(HEALTHY)); + assertEquals(volumeCount / 2, + nodeManager.getNumHealthyVolumes(dnList)); + dnList.clear(); } } + /** * Test single node stat update based on nodereport from different heartbeat * status (healthy, stale and dead). diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java index 9ca3f18c0c7..a9b879f86ec 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java @@ -343,4 +343,9 @@ public List getNodesByAddress(String address) { public NetworkTopology getClusterNetworkTopologyMap() { return null; } + + @Override + public int getNumHealthyVolumes(List dnList) { + return 0; + } }