Skip to content

Commit

Permalink
[FLINK-8089] Also check for other pending slot requests in offerSlot
Browse files Browse the repository at this point in the history
Not only check for a slot request with the right allocation id but also check
whether we can fulfill other pending slot requests with an unclaimed offered
slot before adding it to the list of available slots.

This closes apache#5090.
  • Loading branch information
tillrohrmann committed Dec 14, 2017
1 parent 254aebb commit 351a88b
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 29 deletions.
Expand Up @@ -107,15 +107,6 @@ public ResourceID getTaskManagerId() {
return getTaskManagerLocation().getResourceID();
}

/**
* Gets the number of the slot.
*
* @return The number of the slot on the TaskManager.
*/
public int getPhysicalSlotNumber() {
return physicalSlotNumber;
}

/**
* Gets the resource profile of the slot.
*
Expand Down Expand Up @@ -145,6 +136,15 @@ public TaskManagerGateway getTaskManagerGateway() {
return taskManagerGateway;
}

/**
* Returns true if this slot is not being used (e.g. a logical slot is allocated from this slot).
*
* @return true if a logical slot is allocated from this slot, otherwise false
*/
public boolean isUsed() {
return logicalSlotReference.get() != null;
}

/**
* Triggers the release of the logical slot.
*/
Expand Down
Expand Up @@ -281,7 +281,11 @@ public void returnAllocatedSlot(SlotRequestID slotRequestId) {
final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId);

if (allocatedSlot != null) {
internalReturnAllocatedSlot(allocatedSlot);
if (allocatedSlot.releaseLogicalSlot()) {
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
} else {
throw new RuntimeException("Could not release allocated slot " + allocatedSlot + '.');
}
} else {
log.debug("There is no allocated slot with request id {}. Ignoring this request.", slotRequestId);
}
Expand Down Expand Up @@ -342,9 +346,8 @@ CompletableFuture<LogicalSlot> internalAllocateSlot(
try {
return allocatedSlot.allocateSimpleSlot(requestId, Locality.UNKNOWN);
} catch (SlotException e) {
internalReturnAllocatedSlot(allocatedSlot);

throw new CompletionException("Could not allocate a logical simple slot.", e);
throw new CompletionException("Could not allocate a logical simple slot from allocate slot " +
allocatedSlot + '.', e);
}
});
}
Expand Down Expand Up @@ -464,6 +467,7 @@ private void failPendingRequest(PendingRequest pendingRequest, Exception e) {
Preconditions.checkNotNull(e);

if (!pendingRequest.getAllocatedSlotFuture().isDone()) {
LOG.info("Failing pending request {}.", pendingRequest.getSlotRequestId());
pendingRequest.getAllocatedSlotFuture().completeExceptionally(e);
}
}
Expand Down Expand Up @@ -497,28 +501,25 @@ private void checkTimeoutRequestWaitingForResourceManager(SlotRequestID slotRequ
// ------------------------------------------------------------------------

/**
* Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the
* slot can be reused by other pending requests if the resource profile matches.n
* Tries to fulfill with the given allocated slot a pending slot request or add the
* allocated slot to the set of available slots if no matching request is available.
*
* @param allocatedSlot which shall be returned
*/
private void internalReturnAllocatedSlot(AllocatedSlot allocatedSlot) {
if (allocatedSlot.releaseLogicalSlot()) {
private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) {
Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use.");

final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);
final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot);

if (pendingRequest != null) {
LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());
if (pendingRequest != null) {
LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId());

allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
} else {
LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
}
allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot);
pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot);
} else {
LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot);
LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId());
availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
}
}

Expand Down Expand Up @@ -643,7 +644,7 @@ public CompletableFuture<Boolean> offerSlot(
// we were actually not waiting for this:
// - could be that this request had been fulfilled
// - we are receiving the slots from TaskManagers after becoming leaders
availableSlots.add(allocatedSlot, clock.relativeTimeMillis());
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
}

// we accepted the request in any case. slot will be released after it idled for
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;

Expand All @@ -50,7 +51,9 @@

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
Expand Down Expand Up @@ -383,6 +386,76 @@ public void testSlotRequestCancellationUponFailingRequest() throws Exception {
}
}

/**
* Tests that unused offered slots are directly used to fulfill pending slot
* requests.
*
* <p>See FLINK-8089
*/
@Test
public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception {
final SlotPool slotPool = new SlotPool(rpcService, jobId);

final JobMasterId jobMasterId = JobMasterId.generate();
final String jobMasterAddress = "foobar";
final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>();
final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();

resourceManagerGateway.setRequestSlotConsumer(
(SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId()));

final SlotRequestID slotRequestId1 = new SlotRequestID();
final SlotRequestID slotRequestId2 = new SlotRequestID();

try {
slotPool.start(jobMasterId, jobMasterAddress);

final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class);

final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class));

slotPoolGateway.connectToResourceManager(resourceManagerGateway);

CompletableFuture<LogicalSlot> slotFuture1 = slotPoolGateway.allocateSlot(
slotRequestId1,
scheduledUnit,
ResourceProfile.UNKNOWN,
Collections.emptyList(),
timeout);

// wait for the first slot request
final AllocationID allocationId = allocationIdFuture.get();

CompletableFuture<LogicalSlot> slotFuture2 = slotPoolGateway.allocateSlot(
slotRequestId2,
scheduledUnit,
ResourceProfile.UNKNOWN,
Collections.emptyList(),
timeout);

slotPoolGateway.cancelSlotRequest(slotRequestId1);

try {
// this should fail with a CancellationException
slotFuture1.get();
fail("The first slot future should have failed because it was cancelled.");
} catch (ExecutionException ee) {
assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof CancellationException);
}

final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);

slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get();

assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get());

// the slot offer should fulfill the second slot request
assertEquals(allocationId, slotFuture2.get().getAllocationId());
} finally {
RpcUtils.terminateRpcEndpoint(slotPool, timeout);
}
}

private static ResourceManagerGateway createResourceManagerGatewayMock() {
ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
when(resourceManagerGateway
Expand Down

0 comments on commit 351a88b

Please sign in to comment.