Skip to content

Commit

Permalink
HDDS-3809. Make number of open containers on a datanode a function of…
Browse files Browse the repository at this point in the history
… no of volumes reported by it. (apache#1081)
  • Loading branch information
bshashikant authored and rakeshadr committed Sep 3, 2020
1 parent 6ec384e commit cf4e73f
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 18 deletions.
2 changes: 1 addition & 1 deletion hadoop-hdds/common/src/main/resources/ozone-default.xml
Expand Up @@ -818,7 +818,7 @@
<name>ozone.scm.pipeline.owner.container.count</name>
<value>3</value>
<tag>OZONE, SCM, PIPELINE</tag>
<description>Number of containers per owner in a pipeline.
<description>Number of containers per owner per disk in a pipeline.
</description>
</property>
<property>
Expand Down
Expand Up @@ -66,7 +66,7 @@ public class SCMContainerManager implements ContainerManager {

private final ContainerStateManager containerStateManager;

private final int numContainerPerOwnerInPipeline;
private final int numContainerPerVolume;

private final SCMContainerManagerMetrics scmContainerManagerMetrics;

Expand Down Expand Up @@ -98,7 +98,7 @@ public SCMContainerManager(
this.lock = new ReentrantLock();
this.pipelineManager = pipelineManager;
this.containerStateManager = new ContainerStateManager(conf);
this.numContainerPerOwnerInPipeline = conf
this.numContainerPerVolume = conf
.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);

Expand Down Expand Up @@ -432,7 +432,8 @@ public ContainerInfo getMatchingContainer(final long sizeRequired,
synchronized (pipeline) {
containerIDs = getContainersForOwner(pipeline, owner);

if (containerIDs.size() < numContainerPerOwnerInPipeline) {
if (containerIDs.size() < numContainerPerVolume * pipelineManager.
getNumHealthyVolumes(pipeline)) {
containerInfo =
containerStateManager.allocateContainer(
pipelineManager, owner, pipeline);
Expand Down
Expand Up @@ -108,6 +108,28 @@ public List<StorageReportProto> getStorageReports() {
}
}

/**
* Returns count of healthy volumes reported from datanode.
* @return count of healthy volumes
*/
public int getHealthyVolumeCount() {
try {
lock.readLock().lock();
return storageReports.size() - getFailedVolumeCount();
} finally {
lock.readLock().unlock();
}
}

/**
* Returns count of failed volumes reported from datanode.
* @return count of failed volumes
*/
private int getFailedVolumeCount() {
return (int) storageReports.stream().
filter(e -> e.hasFailed() ? e.getFailed() : false).count();
}

/**
* Returns the last updated time of datanode info.
* @return the last updated time of datanode info.
Expand Down
Expand Up @@ -213,4 +213,6 @@ void processNodeReport(DatanodeDetails datanodeDetails,
* @return cluster map
*/
NetworkTopology getClusterNetworkTopologyMap();

int getNumHealthyVolumes(List <DatanodeDetails> dnList);
}
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -509,6 +510,26 @@ public Map<String, Long> getNodeInfo() {
return nodeInfo;
}

/**
* Returns the max of no healthy volumes reported out of the set
* of datanodes constituting the pipeline.
*/
@Override
public int getNumHealthyVolumes(List<DatanodeDetails> dnList) {
List<Integer> volumeCountList = new ArrayList<>(dnList.size());
for (DatanodeDetails dn : dnList) {
try {
volumeCountList.add(nodeStateManager.getNode(dn).
getHealthyVolumeCount());
} catch (NodeNotFoundException e) {
LOG.warn("Cannot generate NodeStat, datanode {} not found.",
dn.getUuid());
}
}
Preconditions.checkArgument(!volumeCountList.isEmpty());
return Collections.max(volumeCountList);
}

/**
* Get set of pipelines a datanode is part of.
*
Expand Down
Expand Up @@ -89,6 +89,8 @@ void scrubPipeline(ReplicationType type, ReplicationFactor factor)

void incNumBlocksAllocatedMetric(PipelineID id);

int getNumHealthyVolumes(Pipeline pipeline);

/**
* Activates a dormant pipeline.
*
Expand Down
Expand Up @@ -661,6 +661,16 @@ public void close() throws IOException {
pipelineFactory.shutdown();
}

/**
* returns max count of healthy volumes from the set of
* datanodes constituting the pipeline.
* @param pipeline
* @return healthy volume count
*/
public int getNumHealthyVolumes(Pipeline pipeline) {
return nodeManager.getNumHealthyVolumes(pipeline.getNodes());
}

protected ReadWriteLock getLock() {
return lock;
}
Expand Down
Expand Up @@ -222,27 +222,34 @@ public static StorageReportProto getRandomStorageReport(UUID nodeId,
StorageTypeProto.DISK);
}

/**
* Creates storage report with the given information.
*
* @param nodeId datanode id
* @param path storage dir
* @param capacity storage size
* @param used space used
* @param remaining space remaining
* @param type type of storage
*
* @return StorageReportProto
*/
public static StorageReportProto createStorageReport(UUID nodeId, String path,
long capacity, long used, long remaining, StorageTypeProto type) {
long capacity, long used, long remaining, StorageTypeProto type) {
return createStorageReport(nodeId, path, capacity, used, remaining,
type, false);
}
/**
* Creates storage report with the given information.
*
* @param nodeId datanode id
* @param path storage dir
* @param capacity storage size
* @param used space used
* @param remaining space remaining
* @param type type of storage
*
* @return StorageReportProto
*/
public static StorageReportProto createStorageReport(UUID nodeId, String path,
long capacity, long used, long remaining, StorageTypeProto type,
boolean failed) {
Preconditions.checkNotNull(nodeId);
Preconditions.checkNotNull(path);
StorageReportProto.Builder srb = StorageReportProto.newBuilder();
srb.setStorageUuid(nodeId.toString())
.setStorageLocation(path)
.setCapacity(capacity)
.setScmUsed(used)
.setFailed(failed)
.setRemaining(remaining);
StorageTypeProto storageTypeProto =
type == null ? StorageTypeProto.DISK : type;
Expand Down
Expand Up @@ -298,6 +298,72 @@ public void testBlockDistribution() throws Exception {
}
}


@Test
public void testBlockDistributionWithMultipleDisks() throws Exception {
int threadCount = numContainerPerOwnerInPipeline *
numContainerPerOwnerInPipeline;
nodeManager.setNumHealthyVolumes(numContainerPerOwnerInPipeline);
List<ExecutorService> executors = new ArrayList<>(threadCount);
for (int i = 0; i < threadCount; i++) {
executors.add(Executors.newSingleThreadExecutor());
}
pipelineManager.createPipeline(type, factor);
TestUtils.openAllRatisPipelines(pipelineManager);
Map<Long, List<AllocatedBlock>> allocatedBlockMap =
new ConcurrentHashMap<>();
List<CompletableFuture<AllocatedBlock>> futureList =
new ArrayList<>(threadCount);
for (int i = 0; i < threadCount; i++) {
final CompletableFuture<AllocatedBlock> future =
new CompletableFuture<>();
CompletableFuture.supplyAsync(() -> {
try {
List<AllocatedBlock> blockList;
AllocatedBlock block = blockManager
.allocateBlock(DEFAULT_BLOCK_SIZE, type, factor,
OzoneConsts.OZONE,
new ExcludeList());
long containerId = block.getBlockID().getContainerID();
if (!allocatedBlockMap.containsKey(containerId)) {
blockList = new ArrayList<>();
} else {
blockList = allocatedBlockMap.get(containerId);
}
blockList.add(block);
allocatedBlockMap.put(containerId, blockList);
future.complete(block);
} catch (IOException e) {
future.completeExceptionally(e);
}
return future;
}, executors.get(i));
futureList.add(future);
}
try {
CompletableFuture
.allOf(futureList.toArray(
new CompletableFuture[futureList.size()])).get();
Assert.assertTrue(
pipelineManager.getPipelines(type).size() == 1);
Pipeline pipeline = pipelineManager.getPipelines(type).get(0);
// total no of containers to be created will be number of healthy
// volumes * number of numContainerPerOwnerInPipeline which is equal to
// the thread count
Assert.assertTrue(threadCount == pipelineManager.
getNumberOfContainers(pipeline.getId()));
Assert.assertTrue(
allocatedBlockMap.size() == threadCount);
Assert.assertTrue(allocatedBlockMap.
values().size() == threadCount);
allocatedBlockMap.values().stream().forEach(v -> {
Assert.assertTrue(v.size() == 1);
});
} catch (Exception e) {
Assert.fail("testAllocateBlockInParallel failed");
}
}

@Test
public void testAllocateOversizedBlock() throws Exception {
long size = 6 * GB;
Expand Down
Expand Up @@ -92,6 +92,7 @@ public class MockNodeManager implements NodeManager {
private final Node2ContainerMap node2ContainerMap;
private NetworkTopology clusterMap;
private ConcurrentMap<String, Set<String>> dnsToUuidMap;
private int numHealthyDisksPerDatanode;

public MockNodeManager(NetworkTopologyImpl clusterMap,
List<DatanodeDetails> nodes,
Expand Down Expand Up @@ -121,6 +122,7 @@ public MockNodeManager(NetworkTopologyImpl clusterMap,
}
safemode = false;
this.commandMap = new HashMap<>();
numHealthyDisksPerDatanode = 1;
}

public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
Expand Down Expand Up @@ -569,6 +571,15 @@ public void setNetworkTopology(NetworkTopology topology) {
this.clusterMap = topology;
}

@Override
public int getNumHealthyVolumes(List<DatanodeDetails> dnList) {
return numHealthyDisksPerDatanode;
}

public void setNumHealthyVolumes(int value) {
numHealthyDisksPerDatanode = value;
}

/**
* A class to declare some values for the nodes so that our tests
* won't fail.
Expand Down
Expand Up @@ -848,11 +848,12 @@ public void testScmStatsFromNodeReport()
final long capacity = 2000;
final long used = 100;
final long remaining = capacity - used;

List<DatanodeDetails> dnList = new ArrayList<>(nodeCount);
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
EventQueue eventQueue = (EventQueue) scm.getEventQueue();
for (int x = 0; x < nodeCount; x++) {
DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
dnList.add(dn);
UUID dnId = dn.getUuid();
long free = capacity - used;
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
Expand All @@ -871,9 +872,57 @@ public void testScmStatsFromNodeReport()
.getScmUsed().get());
assertEquals(remaining * nodeCount, (long) nodeManager.getStats()
.getRemaining().get());
assertEquals(1, nodeManager.getNumHealthyVolumes(dnList));
dnList.clear();
}
}

/**
* Test multiple nodes sending initial heartbeat with their node report
* with multiple volumes.
*
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
@Test
public void tesVolumeInfoFromNodeReport()
throws IOException, InterruptedException, AuthenticationException {
OzoneConfiguration conf = getConf();
conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 1000,
MILLISECONDS);
final int volumeCount = 10;
final long capacity = 2000;
final long used = 100;
List<DatanodeDetails> dnList = new ArrayList<>(1);
try (SCMNodeManager nodeManager = createNodeManager(conf)) {
EventQueue eventQueue = (EventQueue) scm.getEventQueue();
DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
dnList.add(dn);
UUID dnId = dn.getUuid();
long free = capacity - used;
List<StorageReportProto> reports = new ArrayList<>(volumeCount);
boolean failed = true;
for (int x = 0; x < volumeCount; x++) {
String storagePath = testDir.getAbsolutePath() + "/" + dnId;
reports.add(TestUtils
.createStorageReport(dnId, storagePath, capacity,
used, free, null, failed));
failed = !failed;
}
nodeManager.register(dn, TestUtils.createNodeReport(reports), null);
nodeManager.processHeartbeat(dn);
//TODO: wait for EventQueue to be processed
eventQueue.processAll(8000L);

assertEquals(1, nodeManager.getNodeCount(HEALTHY));
assertEquals(volumeCount / 2,
nodeManager.getNumHealthyVolumes(dnList));
dnList.clear();
}
}


/**
* Test single node stat update based on nodereport from different heartbeat
* status (healthy, stale and dead).
Expand Down
Expand Up @@ -343,4 +343,9 @@ public List<DatanodeDetails> getNodesByAddress(String address) {
public NetworkTopology getClusterNetworkTopologyMap() {
return null;
}

@Override
public int getNumHealthyVolumes(List<DatanodeDetails> dnList) {
return 0;
}
}

0 comments on commit cf4e73f

Please sign in to comment.