diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java index a3f98f13d79d1d..97be59245c763e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AllocatedSlot.java @@ -107,15 +107,6 @@ public ResourceID getTaskManagerId() { return getTaskManagerLocation().getResourceID(); } - /** - * Gets the number of the slot. - * - * @return The number of the slot on the TaskManager. - */ - public int getPhysicalSlotNumber() { - return physicalSlotNumber; - } - /** * Gets the resource profile of the slot. * @@ -145,6 +136,15 @@ public TaskManagerGateway getTaskManagerGateway() { return taskManagerGateway; } + /** + * Returns true if this slot is not being used (e.g. a logical slot is allocated from this slot). + * + * @return true if a logical slot is allocated from this slot, otherwise false + */ + public boolean isUsed() { + return logicalSlotReference.get() != null; + } + /** * Triggers the release of the logical slot. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index a72f57be24459f..68f5be66e3100f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -281,7 +281,11 @@ public void returnAllocatedSlot(SlotRequestID slotRequestId) { final AllocatedSlot allocatedSlot = allocatedSlots.remove(slotRequestId); if (allocatedSlot != null) { - internalReturnAllocatedSlot(allocatedSlot); + if (allocatedSlot.releaseLogicalSlot()) { + tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); + } else { + throw new RuntimeException("Could not release allocated slot " + allocatedSlot + '.'); + } } else { log.debug("There is no allocated slot with request id {}. Ignoring this request.", slotRequestId); } @@ -342,9 +346,8 @@ CompletableFuture internalAllocateSlot( try { return allocatedSlot.allocateSimpleSlot(requestId, Locality.UNKNOWN); } catch (SlotException e) { - internalReturnAllocatedSlot(allocatedSlot); - - throw new CompletionException("Could not allocate a logical simple slot.", e); + throw new CompletionException("Could not allocate a logical simple slot from allocate slot " + + allocatedSlot + '.', e); } }); } @@ -464,6 +467,7 @@ private void failPendingRequest(PendingRequest pendingRequest, Exception e) { Preconditions.checkNotNull(e); if (!pendingRequest.getAllocatedSlotFuture().isDone()) { + LOG.info("Failing pending request {}.", pendingRequest.getSlotRequestId()); pendingRequest.getAllocatedSlotFuture().completeExceptionally(e); } } @@ -497,28 +501,25 @@ private void checkTimeoutRequestWaitingForResourceManager(SlotRequestID slotRequ // ------------------------------------------------------------------------ /** - * Return the slot back to this pool without releasing it. It's mainly called by failed / cancelled tasks, and the - * slot can be reused by other pending requests if the resource profile matches.n + * Tries to fulfill with the given allocated slot a pending slot request or add the + * allocated slot to the set of available slots if no matching request is available. * * @param allocatedSlot which shall be returned */ - private void internalReturnAllocatedSlot(AllocatedSlot allocatedSlot) { - if (allocatedSlot.releaseLogicalSlot()) { + private void tryFulfillSlotRequestOrMakeAvailable(AllocatedSlot allocatedSlot) { + Preconditions.checkState(!allocatedSlot.isUsed(), "Provided slot is still in use."); - final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot); + final PendingRequest pendingRequest = pollMatchingPendingRequest(allocatedSlot); - if (pendingRequest != null) { - LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]", - pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId()); + if (pendingRequest != null) { + LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]", + pendingRequest.getSlotRequestId(), allocatedSlot.getAllocationId()); - allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); - pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot); - } else { - LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId()); - availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); - } + allocatedSlots.add(pendingRequest.getSlotRequestId(), allocatedSlot); + pendingRequest.getAllocatedSlotFuture().complete(allocatedSlot); } else { - LOG.debug("Failed to mark the logical slot of {} as released.", allocatedSlot); + LOG.debug("Adding returned slot [{}] to available slots", allocatedSlot.getAllocationId()); + availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); } } @@ -643,7 +644,7 @@ public CompletableFuture offerSlot( // we were actually not waiting for this: // - could be that this request had been fulfilled // - we are receiving the slots from TaskManagers after becoming leaders - availableSlots.add(allocatedSlot, clock.relativeTimeMillis()); + tryFulfillSlotRequestOrMakeAvailable(allocatedSlot); } // we accepted the request in any case. slot will be released after it idled for diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java index ec20f6b3208789..1af9cce834bba8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; @@ -50,7 +51,9 @@ import java.util.Collections; import java.util.List; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE; @@ -383,6 +386,76 @@ public void testSlotRequestCancellationUponFailingRequest() throws Exception { } } + /** + * Tests that unused offered slots are directly used to fulfill pending slot + * requests. + * + *

See FLINK-8089 + */ + @Test + public void testFulfillingSlotRequestsWithUnusedOfferedSlots() throws Exception { + final SlotPool slotPool = new SlotPool(rpcService, jobId); + + final JobMasterId jobMasterId = JobMasterId.generate(); + final String jobMasterAddress = "foobar"; + final CompletableFuture allocationIdFuture = new CompletableFuture<>(); + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + + resourceManagerGateway.setRequestSlotConsumer( + (SlotRequest slotRequest) -> allocationIdFuture.complete(slotRequest.getAllocationId())); + + final SlotRequestID slotRequestId1 = new SlotRequestID(); + final SlotRequestID slotRequestId2 = new SlotRequestID(); + + try { + slotPool.start(jobMasterId, jobMasterAddress); + + final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); + + final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class)); + + slotPoolGateway.connectToResourceManager(resourceManagerGateway); + + CompletableFuture slotFuture1 = slotPoolGateway.allocateSlot( + slotRequestId1, + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + // wait for the first slot request + final AllocationID allocationId = allocationIdFuture.get(); + + CompletableFuture slotFuture2 = slotPoolGateway.allocateSlot( + slotRequestId2, + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + slotPoolGateway.cancelSlotRequest(slotRequestId1); + + try { + // this should fail with a CancellationException + slotFuture1.get(); + fail("The first slot future should have failed because it was cancelled."); + } catch (ExecutionException ee) { + assertTrue(ExceptionUtils.stripExecutionException(ee) instanceof CancellationException); + } + + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + + slotPoolGateway.registerTaskManager(taskManagerLocation.getResourceID()).get(); + + assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); + + // the slot offer should fulfill the second slot request + assertEquals(allocationId, slotFuture2.get().getAllocationId()); + } finally { + RpcUtils.terminateRpcEndpoint(slotPool, timeout); + } + } + private static ResourceManagerGateway createResourceManagerGatewayMock() { ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); when(resourceManagerGateway