Skip to content

Commit

Permalink
Update Myriad's hadoop dependency from version 2.5 to 2.7.
Browse files Browse the repository at this point in the history
The following problems surfaced during this change:
1. The API that Myriad was using to update NM's capacity is removed in 2.7. It's replaced with a NODE_RESOURCE_UPDATE event (YARN-1506).
2. RM rejects app submissions if the size of the AM container requested is greater than the capacity of the largest NM (YARN-3079, YARN-2604).
   Since FGS sets the NM's capacity to zero during NM's registration with RM, the app submission fails.
  • Loading branch information
smarella committed Jul 11, 2015
1 parent b543e17 commit 2b76199
Show file tree
Hide file tree
Showing 15 changed files with 265 additions and 239 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ subprojects {

ext {
mesosVer = "0.21.1"
hadoopVer = "2.5.0"
hadoopVer = "2.7.0"

This comment has been minimized.

Copy link
@yufeldman

yufeldman Jul 23, 2015

We need to externalize those versions, as different people can have different dependencies. Not for this PR though

metricsVer = "3.1.0"
}

Expand Down
11 changes: 5 additions & 6 deletions myriad-scheduler/src/main/java/com/ebay/myriad/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.inject.Injector;
import org.apache.commons.collections.MapUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -62,30 +63,28 @@ public class Main {

public static void initialize(Configuration hadoopConf,
AbstractYarnScheduler yarnScheduler,
RMContext rmContext,
InterceptorRegistry registry) throws Exception {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
MyriadConfiguration cfg = mapper.readValue(
Thread.currentThread().getContextClassLoader().getResource("myriad-config-default.yml"),
MyriadConfiguration.class);

MyriadModule myriadModule = new MyriadModule(cfg, hadoopConf, yarnScheduler, registry);
MyriadModule myriadModule = new MyriadModule(cfg, hadoopConf, yarnScheduler, rmContext, registry);
MesosModule mesosModule = new MesosModule();
injector = Guice.createInjector(
myriadModule, mesosModule,
new WebAppGuiceModule());

new Main().run(cfg, hadoopConf, yarnScheduler, registry);
new Main().run(cfg);
}

// TODO (Kannan Rajah) Hack to get injector in unit test.
public static Injector getInjector() {
return injector;
}

public void run(MyriadConfiguration cfg,
Configuration hadoopConf,
AbstractYarnScheduler yarnScheduler,
InterceptorRegistry registry) throws Exception {
public void run(MyriadConfiguration cfg) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Bindings: " + injector.getAllBindings());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.google.inject.Singleton;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.mesos.state.State;
import org.slf4j.Logger;
Expand All @@ -53,15 +54,18 @@ public class MyriadModule extends AbstractModule {
private MyriadConfiguration cfg;
private Configuration hadoopConf;
private AbstractYarnScheduler yarnScheduler;
private final RMContext rmContext;
private InterceptorRegistry interceptorRegistry;

public MyriadModule(MyriadConfiguration cfg,
Configuration hadoopConf,
AbstractYarnScheduler yarnScheduler,
RMContext rmContext,
InterceptorRegistry interceptorRegistry) {
this.cfg = cfg;
this.hadoopConf = hadoopConf;
this.yarnScheduler = yarnScheduler;
this.rmContext = rmContext;
this.interceptorRegistry = interceptorRegistry;
}

Expand All @@ -70,6 +74,7 @@ protected void configure() {
LOGGER.debug("Configuring guice");
bind(MyriadConfiguration.class).toInstance(cfg);
bind(Configuration.class).toInstance(hadoopConf);
bind(RMContext.class).toInstance(rmContext);
bind(AbstractYarnScheduler.class).toInstance(yarnScheduler);
bind(InterceptorRegistry.class).toInstance(interceptorRegistry);
bind(MyriadDriverManager.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ public void afterSchedulerEventHandled(SchedulerEvent event) {
case NODE_REMOVED:
onNodeRemoved((NodeRemovedSchedulerEvent) event);
break;

default:
LOGGER.warn("event type not supported");
break;
}
} catch (ClassCastException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
package com.ebay.myriad.scheduler;

import com.ebay.myriad.executor.ContainerTaskStatusRequest;
import com.ebay.myriad.scheduler.yarn.interceptor.BaseInterceptor;
import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
import com.google.gson.Gson;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

import javax.inject.Inject;

import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.Offer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ebay.myriad.executor.ContainerTaskStatusRequest;
import com.ebay.myriad.scheduler.yarn.interceptor.BaseInterceptor;
import com.ebay.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
import com.google.gson.Gson;

/**
* Handles node manager heartbeat.
*/
Expand Down Expand Up @@ -60,13 +60,10 @@ public NMHeartBeatHandler(
public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
switch (event.getType()) {
case STARTED: {
// TODO (Santosh) We can't zero out resources here in all cases. For e.g.
// this event might be fired when an existing node is rejoining the
// cluster (NM restart) as well. Sometimes this event can have a list of
// container statuses, which, capacity/fifo schedulers seem to handle
// (perhaps due to work preserving NM restart).
// TODO (Santosh) Don't zero out NM's capacity as it's causing trouble with app submissions.
RMNode rmNode = context.getRMNodes().get(event.getNodeId());
yarnNodeCapacityMgr.removeCapacity(rmNode);
rmNode.getTotalCapability().setMemory(0);
rmNode.getTotalCapability().setVirtualCores(0);
}
break;

Expand All @@ -80,10 +77,8 @@ public void beforeRMNodeEventHandled(RMNodeEvent event, RMContext context) {
}
break;

default: {
LOGGER.debug("Unhandled event: {}", event.getClass().getName());
}
break;
default:
break;
}
}

Expand All @@ -96,54 +91,99 @@ private void handleStatusUpdate(RMNodeEvent event, RMContext context) {

RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
RMNode rmNode = context.getRMNodes().get(event.getNodeId());
SchedulerNode schedulerNode = yarnScheduler.getSchedulerNode(rmNode.getNodeID());
String hostName = rmNode.getNodeID().getHost();

int rmSchNodeNumContainers = schedulerNode.getNumContainers();
validateContainerCount(statusEvent, rmNode);
nodeStore.getNode(hostName).snapshotRunningContainers();
sendStatusUpdatesToMesosForCompletedContainers(statusEvent);

// New capacity of the node =
// original capacity the NM registered with (profile) +
// resources under use on the node (due to previous offers) +
// new resources offered by mesos for the node
yarnNodeCapacityMgr.setNodeCapacity(rmNode,
Resources.add(rmNode.getTotalCapability(),
Resources.add(getResourcesUnderUse(statusEvent),
getNewResourcesOfferedByMesos(hostName))));
}

private void validateContainerCount(RMNodeStatusEvent statusEvent, RMNode rmNode) {
int rmSchNodeNumContainers = yarnScheduler.getSchedulerNode(rmNode.getNodeID()).getNumContainers();
List<ContainerStatus> nmContainers = statusEvent.getContainers();

if (nmContainers.size() != rmSchNodeNumContainers) {
LOGGER.warn("Node: {}, Num Containers known by RM scheduler: {}, "
+ "Num containers NM is reporting: {}",
hostName, rmSchNodeNumContainers, nmContainers.size());
rmNode.getNodeID().getHost(), rmSchNodeNumContainers, nmContainers.size());
}
}

private Resource getNewResourcesOfferedByMesos(String hostname) {
OfferFeed feed = offerLifecycleMgr.getOfferFeed(hostname);
if (feed == null) {
LOGGER.debug("No offer feed for: {}", hostname);
return Resource.newInstance(0, 0);
}
Resource offeredResources = Resource.newInstance(0, 0);
List<Offer> offers = new ArrayList<>();
Protos.Offer offer;
while ((offer = feed.poll()) != null) {
offers.add(offer);
offerLifecycleMgr.markAsConsumed(offer);
}
Resources.addTo(offeredResources, OfferUtils.getYarnResourcesFromMesosOffers(offers));
return offeredResources;
}

private Resource getResourcesUnderUse(RMNodeStatusEvent statusEvent) {
Resource usedResources = Resource.newInstance(0, 0);
for (ContainerStatus status : statusEvent.getContainers()) {
if (status.getState() == ContainerState.NEW || status.getState() == ContainerState.RUNNING) {
Resources.addTo(usedResources, yarnScheduler.getRMContainer(status.getContainerId()).getAllocatedResource());
}
}
return usedResources;
}

private void sendStatusUpdatesToMesosForCompletedContainers(RMNodeStatusEvent statusEvent) {
// Send task update to Mesos
for (ContainerStatus status : nmContainers) {
Protos.SlaveID slaveId = nodeStore.getNode(statusEvent.getNodeId().getHost()).getSlaveId();
for (ContainerStatus status : statusEvent.getContainers()) {
ContainerId containerId = status.getContainerId();
Protos.SlaveID slaveId = nodeStore.getNode(hostName).getSlaveId();
if (status.getState() == ContainerState.COMPLETE) {
LOGGER.debug("Task complete: {}", containerId);
yarnNodeCapacityMgr.removeCapacity(rmNode, containerId);
requestExecutorToSendTaskStatusUpdate(
slaveId,
containerId, Protos.TaskState.TASK_FINISHED);
requestExecutorToSendTaskStatusUpdate(slaveId, containerId, Protos.TaskState.TASK_FINISHED);
} else { // state == NEW | RUNNING
requestExecutorToSendTaskStatusUpdate(
slaveId,
containerId, Protos.TaskState.TASK_RUNNING);
requestExecutorToSendTaskStatusUpdate(slaveId, containerId, Protos.TaskState.TASK_RUNNING);
}
}
}


/**
* sends a request to executor on the given slave to send back a status update
* for the mesos task launched for this container.
*
* TODO(Santosh):
* Framework messages are unreliable. Try a NM auxiliary service that can help
* send out the status messages from NM itself. NM and MyriadExecutor would need
* to be merged into a single process.
*
* @param slaveId
* @param containerId
* @param taskState
*/
private void requestExecutorToSendTaskStatusUpdate(Protos.SlaveID slaveId,
ContainerId containerId,
Protos.TaskState taskState) {
final String mesosTaskId = ContainerTaskStatusRequest.YARN_CONTAINER_TASK_ID_PREFIX + containerId.toString();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Sending out framework message requesting the executor to send {} status for task: {}",
taskState.name(), mesosTaskId);
}
ContainerTaskStatusRequest containerTaskStatusRequest = new ContainerTaskStatusRequest();
containerTaskStatusRequest.setMesosTaskId(
ContainerTaskStatusRequest.YARN_CONTAINER_TASK_ID_PREFIX + containerId.toString());
containerTaskStatusRequest.setMesosTaskId(mesosTaskId);
containerTaskStatusRequest.setState(taskState.name());
myriadDriver.getDriver().sendFrameworkMessage(
getExecutorId(slaveId),
slaveId,
myriadDriver.getDriver().sendFrameworkMessage(getExecutorId(slaveId), slaveId,
new Gson().toJson(containerTaskStatusRequest).getBytes(Charset.defaultCharset()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void setExecInfo(Protos.ExecutorInfo execInfo) {
this.execInfo = execInfo;
}

public void snapshotContainers() {
public void snapshotRunningContainers() {
this.containerSnapshot = new HashSet<>(node.getRunningContainers());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.ebay.myriad.scheduler;

import java.util.Collection;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.Offer;

/**
* Utility class that provides useful methods that deal with Mesos offers.
*/
public class OfferUtils {

/**
* Transforms a collection of mesos offers into {@link Resource}.
*
* @param offers collection of mesos offers
* @return a single resource object equivalent to the cumulative sum of mesos offers
*/
public static Resource getYarnResourcesFromMesosOffers(Collection<Offer> offers) {
double cpus = 0.0;
double mem = 0.0;

for (Protos.Offer offer : offers) {
for (Protos.Resource resource : offer.getResourcesList()) {
if (resource.getName().equalsIgnoreCase("cpus")) {
cpus += resource.getScalar().getValue();
} else if (resource.getName().equalsIgnoreCase("mem")) {
mem += resource.getScalar().getValue();
}
}
}
return Resource.newInstance((int) mem, (int) cpus);
}

}

1 comment on commit 2b76199

@smarella
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed to mention in the commit message:

Problem 1 mentioned in the commit message is fixed in this change. i.e. 1. Myriad's code is changed to use the new NODE_RESOURCE_UPDATE_EVENT to update NM's capacity.

Please sign in to comment.