Skip to content

Commit

Permalink
Perform fine grained scaling only on NMs registered with zero capacity
Browse files Browse the repository at this point in the history
Summary of major changes made:
- Introduced a "zero" profile in .yml file. A NM launched with this profile advertizes (0G,0CPU) to RM.
- When an offer is received:
   - Myriad uses the offer to launch a new NM, if there are pending "flexup" requests
     and an NM is not already running on the slave the resources are offered on.
   - (if above is not true,) if the slave node is running a NM originally launched with (0G,0CPU) capacity,
     then Myriad uses the offer to dynamically increase the capacity of the NM.
   - (if above is not true,) Myriad rejects the offer.
- Moved the FGS classes to a separate package (.fgs).
- Built a "filter" mechanism using which FGS can receive interceptor callbacks only for NMs launched
  with (0G,0CPU) capacity.
  • Loading branch information
smarella committed Jul 11, 2015
1 parent 2b76199 commit 8e355b5
Show file tree
Hide file tree
Showing 16 changed files with 192 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
import com.ebay.myriad.policy.NodeScaleDownPolicy;
import com.ebay.myriad.scheduler.MyriadDriverManager;
import com.ebay.myriad.scheduler.MyriadScheduler;
import com.ebay.myriad.scheduler.NMHeartBeatHandler;
import com.ebay.myriad.scheduler.fgs.NMHeartBeatHandler;
import com.ebay.myriad.scheduler.NMProfileManager;
import com.ebay.myriad.scheduler.NodeStore;
import com.ebay.myriad.scheduler.OfferLifecycleManager;
import com.ebay.myriad.scheduler.fgs.NodeStore;
import com.ebay.myriad.scheduler.fgs.OfferLifecycleManager;
import com.ebay.myriad.scheduler.ReconcileService;
import com.ebay.myriad.scheduler.TaskFactory;
import com.ebay.myriad.scheduler.TaskFactory.NMTaskFactoryImpl;
import com.ebay.myriad.scheduler.YarnNodeCapacityManager;
import com.ebay.myriad.scheduler.fgs.YarnNodeCapacityManager;
import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
import com.ebay.myriad.state.MyriadState;
import com.ebay.myriad.state.SchedulerState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.ebay.myriad.scheduler;

import com.ebay.myriad.state.NodeTask;
import com.ebay.myriad.state.SchedulerState;
import com.google.common.base.Preconditions;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
Expand Down Expand Up @@ -71,4 +72,23 @@ public static boolean isUniqueHostname(Protos.OfferOrBuilder offer,
LOGGER.debug("Offer's hostname {} is unique: {}", offerHostname, uniqueHostname);
return uniqueHostname;
}

/**
* Determines if a given host has a nodemanager running with zero profile. Node Managers
* launched with zero profile (zero cpu & memory) are eligible for fine grained scaling.
* Node Managers launched with a non-zero profile size are not eligible for fine grained scaling.
*
* @param hostName
* @return
*/
public static boolean isEligibleForFineGrainedScaling(String hostName, SchedulerState state) {
for (NodeTask activeNMTask : state.getActiveTasks()) {
if (activeNMTask.getProfile().getCpus() == 0 &&
activeNMTask.getProfile().getMemory() == 0 &&
activeNMTask.getHostname().equals(hostName)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.ebay.myriad.scheduler.*;
import com.ebay.myriad.scheduler.event.ResourceOffersEvent;
import com.ebay.myriad.scheduler.fgs.OfferLifecycleManager;
import com.ebay.myriad.state.NodeTask;
import com.ebay.myriad.state.SchedulerState;
import com.lmax.disruptor.EventHandler;
Expand Down Expand Up @@ -59,10 +60,7 @@ public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEv
@Inject
private TaskUtils taskUtils;

@Inject
private YarnNodeCapacityManager yarnNodeCapacityManager;

@Inject
@Inject
private OfferLifecycleManager offerLifecycleMgr;

@Override
Expand Down Expand Up @@ -116,8 +114,21 @@ public void onEvent(ResourceOffersEvent event, long sequence,
driver.declineOffer(offer.getId());
}
}
} else {
offerLifecycleMgr.addOffers(offers);
}

for (Offer offer : offers) {
if (SchedulerUtils.isEligibleForFineGrainedScaling(offer.getHostname(), schedulerState)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Picking an offer from slave with hostname {} for fine grained scaling.",
offer.getHostname());
}
offerLifecycleMgr.addOffers(offer);
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Declining offer {} from slave {}.", offer, offer.getHostname());
}
driver.declineOffer(offer.getId());
}
}
} finally {
driverOperationLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.ebay.myriad.scheduler;
package com.ebay.myriad.scheduler.fgs;

import java.util.ArrayList;
import java.util.Collection;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package com.ebay.myriad.scheduler;
package com.ebay.myriad.scheduler.fgs;

import com.ebay.myriad.executor.ContainerTaskStatusRequest;
import com.ebay.myriad.scheduler.MyriadDriver;
import com.ebay.myriad.scheduler.SchedulerUtils;
import com.ebay.myriad.scheduler.TaskFactory;
import com.ebay.myriad.scheduler.yarn.interceptor.BaseInterceptor;
import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
import com.ebay.myriad.state.SchedulerState;
import com.google.gson.Gson;
import java.nio.charset.Charset;
import java.util.ArrayList;
Expand All @@ -11,6 +15,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
Expand All @@ -35,6 +40,7 @@ public class NMHeartBeatHandler extends BaseInterceptor {
private final YarnNodeCapacityManager yarnNodeCapacityMgr;
private final OfferLifecycleManager offerLifecycleMgr;
private final NodeStore nodeStore;
private final SchedulerState state;

@Inject
public NMHeartBeatHandler(
Expand All @@ -43,7 +49,8 @@ public NMHeartBeatHandler(
MyriadDriver myriadDriver,
YarnNodeCapacityManager yarnNodeCapacityMgr,
OfferLifecycleManager offerLifecycleMgr,
NodeStore nodeStore) {
NodeStore nodeStore,
SchedulerState state) {

if (registry != null) {
registry.register(this);
Expand All @@ -54,16 +61,34 @@ public NMHeartBeatHandler(
this.yarnNodeCapacityMgr = yarnNodeCapacityMgr;
this.offerLifecycleMgr = offerLifecycleMgr;
this.nodeStore = nodeStore;
this.state = state;
}

@Override
public CallBackFilter getCallBackFilter() {
return new CallBackFilter() {
@Override
public boolean allowCallBacksForNode(NodeId nodeManager) {
return SchedulerUtils.isEligibleForFineGrainedScaling(nodeManager.getHost(), state);
}
};
}

@Override
public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
switch (event.getType()) {
case STARTED: {
// TODO (Santosh) Don't zero out NM's capacity as it's causing trouble with app submissions.
RMNode rmNode = context.getRMNodes().get(event.getNodeId());
rmNode.getTotalCapability().setMemory(0);
rmNode.getTotalCapability().setVirtualCores(0);
Resource totalCapability = rmNode.getTotalCapability();
if (totalCapability.getMemory() != 0 ||
totalCapability.getVirtualCores() != 0) {
LOGGER.warn("FineGrainedScaling feature got invoked for a " +
"NM with non-zero capacity. Host: {}, Mem: {}, CPU: {}. Setting the NM's capacity to (0G,0CPU)",
rmNode.getHostName(),
totalCapability.getMemory(), totalCapability.getVirtualCores());
totalCapability.setMemory(0);
totalCapability.setVirtualCores(0);
}
}
break;

Expand Down Expand Up @@ -93,29 +118,15 @@ private void handleStatusUpdate(RMNodeEvent event, RMContext context) {
RMNode rmNode = context.getRMNodes().get(event.getNodeId());
String hostName = rmNode.getNodeID().getHost();

validateContainerCount(statusEvent, rmNode);
nodeStore.getNode(hostName).snapshotRunningContainers();
sendStatusUpdatesToMesosForCompletedContainers(statusEvent);

// New capacity of the node =
// original capacity the NM registered with (profile) +
// resources under use on the node (due to previous offers) +
// new resources offered by mesos for the node
yarnNodeCapacityMgr.setNodeCapacity(rmNode,
Resources.add(rmNode.getTotalCapability(),
Resources.add(getResourcesUnderUse(statusEvent),
getNewResourcesOfferedByMesos(hostName))));
}

private void validateContainerCount(RMNodeStatusEvent statusEvent, RMNode rmNode) {
int rmSchNodeNumContainers = yarnScheduler.getSchedulerNode(rmNode.getNodeID()).getNumContainers();
List<ContainerStatus> nmContainers = statusEvent.getContainers();

if (nmContainers.size() != rmSchNodeNumContainers) {
LOGGER.warn("Node: {}, Num Containers known by RM scheduler: {}, "
+ "Num containers NM is reporting: {}",
rmNode.getNodeID().getHost(), rmSchNodeNumContainers, nmContainers.size());
}
getNewResourcesOfferedByMesos(hostName)));
}

private Resource getNewResourcesOfferedByMesos(String hostname) {
Expand All @@ -124,15 +135,20 @@ private Resource getNewResourcesOfferedByMesos(String hostname) {
LOGGER.debug("No offer feed for: {}", hostname);
return Resource.newInstance(0, 0);
}
Resource offeredResources = Resource.newInstance(0, 0);
List<Offer> offers = new ArrayList<>();
Protos.Offer offer;
while ((offer = feed.poll()) != null) {
offers.add(offer);
offerLifecycleMgr.markAsConsumed(offer);
}
Resources.addTo(offeredResources, OfferUtils.getYarnResourcesFromMesosOffers(offers));
return offeredResources;
Resource fromMesosOffers = OfferUtils.getYarnResourcesFromMesosOffers(offers);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("NM on host {} got {} CPUs and {} memory from mesos",
hostname, fromMesosOffers.getVirtualCores(), fromMesosOffers.getMemory());
}

return fromMesosOffers;
}

private Resource getResourcesUnderUse(RMNodeStatusEvent statusEvent) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.ebay.myriad.scheduler;
package com.ebay.myriad.scheduler.fgs;

import java.util.HashSet;
import java.util.Set;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.ebay.myriad.scheduler;
package com.ebay.myriad.scheduler.fgs;

import java.util.concurrent.ConcurrentHashMap;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.ebay.myriad.scheduler;
package com.ebay.myriad.scheduler.fgs;

import java.util.concurrent.ConcurrentLinkedQueue;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.ebay.myriad.scheduler;
package com.ebay.myriad.scheduler.fgs;

import com.ebay.myriad.scheduler.MyriadDriver;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -47,7 +48,7 @@ public void declineOffer(Protos.Offer offer) {
LOGGER.debug("Declined offer {}", offer.getId());
}

public void addOffers(List<Protos.Offer> offers) {
public void addOffers(Protos.Offer... offers) {
for (Protos.Offer offer : offers) {
String hostname = offer.getHostname();
Node node = nodeStore.getNode(hostname);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.ebay.myriad.scheduler;
package com.ebay.myriad.scheduler.fgs;

import java.util.Collection;
import org.apache.hadoop.yarn.api.records.Resource;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package com.ebay.myriad.scheduler;
package com.ebay.myriad.scheduler.fgs;

import com.ebay.myriad.executor.ContainerTaskStatusRequest;
import com.ebay.myriad.scheduler.MyriadDriver;
import com.ebay.myriad.scheduler.SchedulerUtils;
import com.ebay.myriad.scheduler.TaskFactory;
import com.ebay.myriad.scheduler.yarn.interceptor.BaseInterceptor;
import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
import com.ebay.myriad.state.SchedulerState;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.HashSet;
Expand Down Expand Up @@ -48,6 +52,7 @@ public class YarnNodeCapacityManager extends BaseInterceptor {
private final OfferLifecycleManager offerLifecycleMgr;
private final NodeStore nodeStore;
private final TaskFactory taskFactory;
private final SchedulerState state;

@Inject
public YarnNodeCapacityManager(InterceptorRegistry registry,
Expand All @@ -56,7 +61,8 @@ public YarnNodeCapacityManager(InterceptorRegistry registry,
MyriadDriver myriadDriver,
TaskFactory taskFactory,
OfferLifecycleManager offerLifecycleMgr,
NodeStore nodeStore) {
NodeStore nodeStore,
SchedulerState state) {
if (registry != null) {
registry.register(this);
}
Expand All @@ -66,9 +72,20 @@ public YarnNodeCapacityManager(InterceptorRegistry registry,
this.taskFactory = taskFactory;
this.offerLifecycleMgr = offerLifecycleMgr;
this.nodeStore = nodeStore;
this.state = state;
}

@Override
public CallBackFilter getCallBackFilter() {
return new CallBackFilter() {
@Override
public boolean allowCallBacksForNode(NodeId nodeManager) {
return SchedulerUtils.isEligibleForFineGrainedScaling(nodeManager.getHost(), state);
}
};
}

@Override
public void afterSchedulerEventHandled(SchedulerEvent event) {
switch (event.getType()) {
case NODE_ADDED: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ebay.myriad.scheduler.yarn.interceptor;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
Expand All @@ -18,7 +19,17 @@ public class BaseInterceptor implements YarnSchedulerInterceptor {
protected BaseInterceptor() {
}

@Override
@Override
public CallBackFilter getCallBackFilter() {
return new CallBackFilter() {
@Override
public boolean allowCallBacksForNode(NodeId nodeManager) {
return true;
}
};
}

@Override
public void init(Configuration conf, AbstractYarnScheduler yarnScheduler, RMContext rmContext) throws IOException {
}

Expand Down

0 comments on commit 8e355b5

Please sign in to comment.