Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedule And Launch workers on consolidated offers #146

Conversation

dsKarthick
Copy link
Collaborator

@brndnmtthws @DarinJ @erikdw @JessicaLHartog Please 👀

Better version of #111.

Adresses following problems:

  • Offers get too fragmented. So even though there are enough resources, because they are fragmented across multiple offers, they are not useful
  • Some times, offer that is available at the time of allSlotsAvailableForScheduling gets removed from the rotating map before the storm-core invokes schedule to actually schedule the offers. This leads to a race-condition where some of the n workers take a long time to launch.

Karthick Duraisamy Soundararaj added 3 commits June 6, 2016 14:16
… and use it to schedule and assign workers with

consolidated offers per node. Adresses the following problems.

Problem 1:
    Types of OfferResources was limited - CPU, MEM and PORTS. Each of them where either RESERVED or UNRESERVED.
    We treated RESERVED or UNRESERVED resources the same way. So the logic in OfferResources was simple. But
    we now have three different types of RESERVED, STATICALLY_RESERVED and DYNAMICALLY_RESERVED and we are starting
    to have special handling on each of the types of resources. One example of such code is
     https://github.com/mesos/storm/pull/111/files#diff-9e83ab25db3f6d262627383d8aa8f815.

Problem 2:
    Currently, offers related logic is spread across OfferResources and RotatingMap.
    What we ultimately need is an interface which could be used across various parts of the framework.
    This commit introduces a 'storm.mesos.resources' package which is the first step in achieving the
    aforementioned goal. This package enables us to define an interface like 'storm.mesos.resources.Aggregator'
    and have various aggregators implement 'storm.mesos.resources.Aggregator' thereby abstracting the aggegation
    logic(including locking) from the rest of the packages.
@dsKarthick dsKarthick changed the title Schedule workers on consolidated offers Schedule And Launch workers on consolidated offers Jun 6, 2016
following error while building java7 binary.

```
   testGetTasksToLaunchForOneTopologyWithOneOffer(storm.mesos.MesosNimbusTest)  Time elapsed: 0.158 sec  <<< ERROR!
   java.lang.NoSuchMethodError: java.lang.String.join(Ljava/lang/CharSequence;Ljava/lang/Iterable;)Ljava/lang/String;
```

Fixing ^^ by using org.apache.commons.lang3.StringUtils.join instead of java.lang.String.join
try {
_state = new LocalStateShim(localDir);
} catch (IOException exp) {
// TODO(ksoundararaj) : Should we exit here?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should exit here, because without the LocalState (_state) this class cannot do its job.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 exit or runtime exception.

Copy link
Collaborator Author

@dsKarthick dsKarthick Jun 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacing lines 207 and 208 with the following

throw new RuntimeException(String.format("Encountered IOException while setting up LocalState at %s : %s", localDir, exp));

Offer offer = TestUtils.buildOffer("0-1", "h1", 0, 0);
OfferResources offerResources = new OfferResources(offer);

assertTrue(TestUtils.calculateAllAvailableScalarResources(offerResources, ResourceType.CPU) == 0);
Copy link
Collaborator

@JessicaLHartog JessicaLHartog Jun 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above (0) for lines 45, 46, 53, 54, 60, 61, 63

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public void testToIgnoreDynamicResources() {
  ScalarResource scalarResource = new ScalarResource(ResourceType.CPU);
  scalarResource.add(new ResourceEntries.ScalarResourceEntry(100.0), ReservationType.STATICALLY_RESERVED);
  scalarResource.toString();

  // Note that buidOffer adds
  Offer offer = TestUtils.buildOffer("0-1", "h1", 0, 0);
  OfferResources offerResources = new OfferResources(offer);

  assertEquals(0, TestUtils.calculateAllAvailableScalarResources(offerResources, ResourceType.CPU), DELTA_FOR_DOUBLE_COMPARISON);
  assertEquals(0, TestUtils.calculateAllAvailableScalarResources(offerResources, ResourceType.MEM), DELTA_FOR_DOUBLE_COMPARISON);

  assertTrue(offerResources.getHostName().equals(offer.getHostname()));
  assertTrue(offerResources.getSlaveID().equals(offer.getSlaveId()));

  offer = TestUtils.buildOfferWithReservation("offer1", "h1", 2, 1000, 6, 1000);
  offerResources = new OfferResources(offer);
  assertEquals(8, TestUtils.calculateAllAvailableScalarResources(offerResources, ResourceType.CPU), DELTA_FOR_DOUBLE_COMPARISON);
  assertEquals(2000, TestUtils.calculateAllAvailableScalarResources(offerResources, ResourceType.MEM), DELTA_FOR_DOUBLE_COMPARISON);
  assertTrue(offerResources.getHostName().equals(offer.getHostname()));
  assertTrue(offerResources.getSlaveID().equals(offer.getSlaveId()));

  offer = TestUtils.buildOfferWithPorts("offer1", "h1", 2.0, 2000, 3000, 3100);
  offerResources = new OfferResources(offer);
  assertEquals(2.0, TestUtils.calculateAllAvailableScalarResources(offerResources, ResourceType.CPU), DELTA_FOR_DOUBLE_COMPARISON);
  assertEquals(2000, TestUtils.calculateAllAvailableScalarResources(offerResources, ResourceType.MEM),DELTA_FOR_DOUBLE_COMPARISON);
  List<Long> rangeResources = TestUtils.calculateAllAvailableRangeResources(offerResources, ResourceType.PORTS);
  assertTrue(rangeResources.size() == 101);
}

@erikdw
Copy link
Collaborator

erikdw commented Jun 10, 2016

I'm not entirely sure why you chose to keep a lot of code related to reservations in the offer resource aggregator logic, given that it isn't used right now, and we need a lot more work to actually support reservations, per issue #148. So that logic feels like it is outside the scope of this change. Hence I'm a bit wary about committing a bunch of unused code for supporting reservations at the moment. However, I guess we can just accept the changes if it's gonna be a lot of work to excise the handling you added before you discovered #148.

… Jessica Hartog.

Still TODO:
  1. Refactor MesosNimbus.getTasksToLaunch as per Erik's comment
  2. Refactor SchedulerUtils.getPorts as per Jessica's comment
@dsKarthick
Copy link
Collaborator Author

dsKarthick commented Jun 13, 2016

@erikdw

I'm not entirely sure why you chose to keep a lot of code related to reservations in the offer resource aggregator logic

In general, I wanted to keep all the aggregation logic for a couple of reasons

  • If someone wanted the faulty code for some reasons, I wanted to be in a position to make quick changes. This is the primary reason.
  • Existing code provides all that we need to support reservations except for storing and restoring state across restarts. Once we arrive at a decision on how we maintain the state across restarts, existing code would make implementation relatively a lot easier.

I guess we can just accept the changes if it's gonna be a lot of work to excise the handling you added before you discovered #148.

  • Whether its a lot of work or not depends on what you want to excise. If its some unused functions, it doesn't take much time. If you want the codebase to be reservation agnostic, then its means redoing the entire pr.

Karthick Duraisamy Soundararaj added 2 commits June 13, 2016 14:34
      1. Refactor MesosNimbus.getTasksToLaunch as per Erik's comment
      2. Refactor SchedulerUtils.getPorts as per Jessica's comment
@dsKarthick
Copy link
Collaborator Author

@erikdw @JessicaLHartog Addressed all your comments. Please 👀 when you get a chance.

}
}

synchronized (_offersLock) {
computeLaunchList(topologies, slotsForTopologiesNeedingAssignments);
/**
* We need to call getConsolidatedOfferResourcesPerNode again here for the following
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add " reasons" after following

Copy link
Collaborator Author

@dsKarthick dsKarthick Jun 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

/**
       * We need to call getConsolidatedOfferResourcesPerNode again here for the following reasons
       * ....
*/

@erikdw
Copy link
Collaborator

erikdw commented Jun 15, 2016

I'm not entirely sure why you chose to keep a lot of code related to reservations in the offer resource aggregator logic
In general, I wanted to keep all the aggregation logic for a couple of reasons

  • If someone wanted the faulty code for some reasons, I wanted to be in a position to make quick changes. This is the primary reason.
  • Existing code provides all that we need to support reservations except for storing and restoring state across restarts. Once we arrive at a decision on how we maintain the state across restarts, existing code would make implementation relatively a lot easier.

Makes sense. The change is just a little bigger than I want, but it's already here and we've already reviewed it. Let's just try to be more modular in the future if possible. I realize #148 snuck up on us, but I was also just surprised at the amount of design and effort you put into handling reservations, since I thought we were going to come back to that after finishing the offer consolidation. But we're in a much better position to support reservations now. Honestly, looking at how much code you've written, I'd be surprised if other frameworks weren't adding similar logic. I really feel like there should be a "mesos utility" library that provides stuff like this offer aggregation & the pretty printing.

I guess we can just accept the changes if it's gonna be a lot of work to excise the handling you added before you discovered #148.

Whether its a lot of work or not depends on what you want to excise. If its some unused functions, it doesn't take much time. If you want the codebase to be reservation agnostic, then its means redoing the entire pr.

We definitely don't wanna redo the entire PR. I suppose there is some flag in Maven we could enable that would complain about unused code, and then we could annotate those methods appropriately to avoid the complaint? i.e., committing a bunch of unused code with no indication about it not necessarily being functional is dangerous for future maintainers that may use it and not be aware that this is just groundwork code that needs to be tested.

double executorMem = MesosCommon.executorMem(mesosStormConf);

for (WorkerSlot slot : slotList) {
double workerCpu = MesosCommon.topologyWorkerCpu(mesosStormConf, topologyDetails);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move these 2 lines (setting worker{Cpu,Mem}) up out of this for loop, to be beside the executor{Cpu,Mem} lookups. i.e., these values are static during the loop.

Copy link
Collaborator Author

@dsKarthick dsKarthick Jun 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved

for (String topologyId : slots.keySet()) {
      Collection<WorkerSlot> slotList = slots.get(topologyId);
      TopologyDetails topologyDetails = topologies.getById(topologyId);
      Set<String> hostsWithSupervisors = new HashSet<>();

      double workerCpu = MesosCommon.topologyWorkerCpu(mesosStormConf, topologyDetails);
      double workerMem = MesosCommon.topologyWorkerMem(mesosStormConf, topologyDetails);
      double executorCpu = MesosCommon.executorCpu(mesosStormConf);
      double executorMem = MesosCommon.executorMem(mesosStormConf);

      for (WorkerSlot slot : slotList) {
        double requiredCpu = workerCpu;
        double requiredMem = workerMem;
        ....
       }
       ....
}

@dsKarthick
Copy link
Collaborator Author

dsKarthick commented Jun 16, 2016

I really feel like there should be a "mesos utility" library that provides stuff like this offer aggregation & the pretty printing.

Thats a very good point. Something that @brndnmtthws and @tnachen could throw some light on.

I realize #148 snuck up on us, but I was also just surprised at the amount of design and effort you put into handling reservations, since I thought we were going to come back to that after finishing the offer consolidation.

I did not want to do this huge refactor until I saw how messy OfferResources.java got. Please note how null is being returned all over the place and just how complicated the entire class is. Couple of things that motivated me to the extensive refactor : 1. Its just hard to figure out bugs, if any with the old code. 2. I wanted add unit tests and do as much testing as I can before shipping this code. Doing so for the old code and porting all the changes including to the newer code in the future is lot of redundant work is what I felt.

committing a bunch of unused code with no indication about it not necessarily being functional is dangerous for future maintainers

valid concern. Will see if I can find anything. TODO(ksoundararaj)

@dsKarthick
Copy link
Collaborator Author

committing a bunch of unused code with no indication about it not necessarily being functional is dangerous for future maintainers

Unfortunately, I couldnt find a way to do this. Java itself doesnt support such a thing https://bugs.openjdk.java.net/browse/JDK-8026357. I dont find an appropriate check in checkstyle plugin http://checkstyle.sourceforge.net/checks.html

@dsKarthick
Copy link
Collaborator Author

@erikdw @JessicaLHartog Addressed second round of comments

@dsKarthick
Copy link
Collaborator Author

@erikdw @JessicaLHartog

@@ -93,15 +93,15 @@ public ReservationType getReservationType() {
public RangeResourceEntry add(ResourceEntry<Long> resourceEntry) {
Copy link
Collaborator

@erikdw erikdw Jun 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dsKarthick : can you please explain exactly what your logic / vision is for all of this RangeResource stuff? My understanding from briefly reading this code is that we are adding an Offer's port resources (which could have say ports: [1, 21-23, 10] ) to the existing aggregated port resources from this host (which could be say [15-17, 25, 40-60] ). But I'm not clear how you're then merging the new offer's list of port resources into the existing aggregated ports.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e., please try to answer these "asks" from @JessicaLHartog :

Before we continue this discussion, can you please provide some comments and/or an example for how this code is supposed to behave?

It would probably be most beneficial for my understanding if you provide this explanation without looking at the code aside from the signature provided here:
public RangeResourceEntry add(RangeResourceEntry<Long> resourceEntry)

Copy link
Collaborator Author

@dsKarthick dsKarthick Jun 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please try to answer these "asks" from @JessicaLHartog :

Tried to address the following concerns via unit tests

  • Before we continue this discussion, can you please provide some comments and/or an example for how this code is supposed to behave?
  • It would probably be most beneficial for my understanding if you provide this explanation without looking at the code aside from the signature provided here

Initially the code looked a bit confusing. The latest code code looks pretty straightforward and tests demonstrate the usage. Looks like you guys are in favor of comments. So let me go ahead and add them anyway.

Copy link
Collaborator Author

@dsKarthick dsKarthick Jun 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please explain exactly what your logic / vision is for all of this RangeResource stuff? My understanding from briefly reading this code is that we are adding an Offer's port resources (which could have say ports: [1, 21-23, 10] ) to the existing aggregated port resources from this host (which could be say [15-17, 25, 40-60] ). But I'm not clear how you're then merging the new offer's list of port resources into the existing aggregated ports.

ResourceEntry class provides an api called add and remove. Its upto the client that uses this api, in this case, ScalarResource and RangeResource to use however they want. Lets look at how both of them are using these add and remove methods.
* add and remove methods for ScalarResource helps reduce the number of objects we create dramatically because add and remove methods just operate on a singe long value.
* However in case of RangeResource, like you pointed in one of your earlier comments in this pr, we do not gain much by trying to maintain a perfectly merged range. So these methods are not being currently used.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question was more specific. As I noted, I get the gist of what you're doing, I just cannot figure out precisely how you are trying to handle ranges.

if (_preferReservedResources) {
Collections.sort(offerResources, new ResourceRoleComparator());
if (!aggregatedOffers.isFit(mesosStormConf, topologyDetails, workerPort, hostsWithSupervisors.contains(workerHost))) {
LOG.error(String.format("Unable to launch worker %s. Required cpu: %f, Required mem: %f. Available aggregatedOffers : %s",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the availability of this particular port will determine whether or not the assignment will fit, can we also include the workerPort in this log line?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. Missed this comment. Thanks for fixing on your inherited PR.

@erikdw
Copy link
Collaborator

erikdw commented Jul 27, 2016

As @JessicaLHartog noted, this PR has been continued in #154, so I'm closing it.

@erikdw erikdw closed this Jul 27, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants