From 32269bc30b21b98f33bb4bf82d2166a79a71e56c Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Mon, 13 Nov 2017 15:42:07 +0100 Subject: [PATCH] [FLINK-8089] Also check for other pending slot requests in offerSlot Not only check for a slot request with the right allocation id but also check whether we can fulfill other pending slot requests with an unclaimed offered slot before adding it to the list of available slots. This closes #5090. --- .../flink/runtime/instance/AllocatedSlot.java | 18 ++--- .../flink/runtime/instance/SlotPool.java | 41 ++++++----- .../flink/runtime/instance/SlotPoolTest.java | 73 +++++++++++++++++++ 3 files changed, 103 insertions(+), 29 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java index 47e2fe4fa0dbe9..a748a632125317 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java @@ -107,15 +107,6 @@ public ResourceID getTaskManagerId() { return getTaskManagerLocation().getResourceID(); } - /** - * Gets the number of the slot. - * - * @return The number of the slot on the TaskManager. - */ - public int getPhysicalSlotNumber() { - return physicalSlotNumber; - } - /** * Gets the resource profile of the slot. * @@ -145,6 +136,15 @@ public TaskManagerGateway getTaskManagerGateway() { return taskManagerGateway; } + /** + * Returns true if this slot is not being used (e.g. a logical slot is allocated from this slot). + * + * @return true if a logical slot is allocated from this slot, otherwise false + */ + public boolean isUsed() { + return logicalSlotReference.get() != null; + } + /** * Triggers the release of the logical slot. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index c6d11a5baf43b2..8bf69908845ee9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -281,7 +281,11 @@ public void returnAllocatedSlot(SlotRequestID slotRequestId) { final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId); if (allocatedSlot != null) { - internalReturnAllocatedSlot(allocatedSlot); + if (allocatedSlot.releaseLogicalSlot()) { + tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); + } else { + throw new RuntimeException("Could not release allocated slot " + allocatedSlot + '.'); + } } else { log.debug("There is no allocated slot with request id {}. Ignoring this request.", slotRequestId); } @@ -342,9 +346,8 @@ CompletableFuture internalAllocateSlot( try { return allocatedSlot.allocateSimpleSlot(requestId, Locality.UNKNOWN); } catch (SlotException e) { - internalReturnAllocatedSlot(allocatedSlot); - - throw new CompletionException("Could not allocate a logical simple slot.", e); + throw new CompletionException("Could not allocate a logical simple slot from allocate slot " + + allocatedSlot + '.', e); } }); } @@ -464,6 +467,7 @@ private void failPendingRequest(PendingRequest pendingRequest, Exception e) { Preconditions.checkNotNull(e); if (!pendingRequest.getAllocatedSlotFuture().isDone()) { + LOG.info("Failing pending request {}.", pendingRequest.getSlotRequestId()); pendingRequest.getAllocatedSlotFuture().completeExceptionally(e); } } @@ -497,28 +501,25 @@ private void checkTimeoutRequestWaitingForResourceManager(SlotRequestID slotRequ // ------------------------------------------------------------------------ /** - * Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the - * slot can be reused by other pending requests if the resource profile matches.n + * Tries to fulfill with the given allocated slot a pending slot request or add the + * allocated slot to the set of available slots if no matching request is available. * * @param allocatedSlot which shall be returned */ - private void internalReturnAllocatedSlot(AllocatedSlot allocatedSlot) { - if (allocatedSlot.releaseLogicalSlot()) { + private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) { + Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use."); - final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot); + final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot); - if (pendingRequest != null) { - LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]", - pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId()); + if (pendingRequest != null) { + LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]", + pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId()); - allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); - pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot); - } else { - LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId()); - availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); - } + allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); + pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot); } else { - LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot); + LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId()); + availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); } } @@ -643,7 +644,7 @@ public CompletableFuture offerSlot( // we were actually not waiting for this: // - could be that this request had been fulfilled // - we are receiving the slots from TaskManagers after becoming leaders - availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); + tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); } // we accepted the request in any case. slot will be released after it idled for diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java index ec20f6b3208789..7f3d0c8b5401f6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; @@ -50,7 +51,9 @@ import java.util.Collections; import java.util.List; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE; @@ -383,6 +386,76 @@ public void testSlotRequestCancellationUponFailingRequest() throws Exception { } } + /** + * Tests that unused offered slots are directly used to fulfil pending slot + * requests. + * + *

See FLINK-8089 + */ + @Test + public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception { + final SlotPool slotPool = new SlotPool(rpcService, jobId); + + final JobMasterId jobMasterId = JobMasterId.generate(); + final String jobMasterAddress = "foobar"; + final CompletableFuture allocationIdFuture = new CompletableFuture<>(); + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + + resourceManagerGateway.setRequestSlotConsumer( + (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId())); + + final SlotRequestID slotRequestId1 = new SlotRequestID(); + final SlotRequestID slotRequestId2 = new SlotRequestID(); + + try { + slotPool.start(jobMasterId, jobMasterAddress); + + final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); + + final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class)); + + slotPoolGateway.connectToResourceManager(resourceManagerGateway); + + CompletableFuture slotFuture1 = slotPoolGateway.allocateSlot( + slotRequestId1, + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + // wait for the first slot request + final AllocationID allocationId = allocationIdFuture.get(); + + CompletableFuture slotFuture2 = slotPoolGateway.allocateSlot( + slotRequestId2, + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + slotPoolGateway.cancelSlotRequest(slotRequestId1); + + try { + // this should fail with a CancellationException + slotFuture1.get(); + fail("The first slot future should have failed because it was cancelled."); + } catch (ExecutionException ee) { + assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof CancellationException); + } + + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(); + + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); + + // the slot offer should fulfil the second slot request + assertEquals(allocationId, slotFuture2.get().getAllocationId()); + } finally { + RpcUtils.terminateRpcEndpoint(slotPool, timeout); + } + } + private static ResourceManagerGateway createResourceManagerGatewayMock() { ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); when(resourceManagerGateway