Skip to content

Commit

Permalink
[FLINK-7956] [flip6] Add support for queued scheduling with slot shar…
Browse files Browse the repository at this point in the history
…ing to SlotPool

This commit adds support for queued scheduling with slot sharing to the
SlotPool. The idea of slot sharing is that multiple tasks can run in the
same slot. Moreover, queued scheduling means that a slot request must not
be completed right away but at a later point in time. This allows to
start new TaskExecutors in case that there are no more slots left.

The main component responsible for the management of shared slots is the
SlotSharingManager. The SlotSharingManager maintains internally a tree-like
structure which stores the SlotContext future of the underlying
AllocatedSlot. Whenever this future is completed potentially pending
LogicalSlot instantiations are executed and sent to the slot requester.

A shared slot is represented by a MultiTaskSlot which can harbour multiple
TaskSlots. A TaskSlot can either be a MultiTaskSlot or a SingleTaskSlot.

In order to represent co-location constraints, we first obtain a root
MultiTaskSlot and then allocate a nested MultiTaskSlot in which the
co-located tasks are allocated. The corresponding SlotRequestID is assigned
to the CoLocationConstraint in order to make the TaskSlot retrievable for
other tasks assigned to the same CoLocationConstraint.

Port SchedulerSlotSharingTest, SchedulerIsolatedTasksTest and
ScheduleWithCoLocationHintTest to run with SlotPool.

Restructure SlotPool components.

Add SlotSharingManagerTest, SlotPoolSlotSharingTest and
SlotPoolCoLocationTest.

This closes apache#5091.
  • Loading branch information
tillrohrmann committed Dec 3, 2017
1 parent fc0a311 commit 711cadc
Show file tree
Hide file tree
Showing 76 changed files with 4,731 additions and 1,799 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;

import java.io.Serializable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
Expand All @@ -51,6 +52,8 @@

import org.slf4j.Logger;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -441,9 +444,11 @@ public CompletableFuture<Execution> allocateAndAssignSlotForExecution(
// this method only works if the execution is in the state 'CREATED'
if (transitionState(CREATED, SCHEDULED)) {

final SlotSharingGroupId slotSharingGroupId = sharingGroup != null ? sharingGroup.getSlotSharingGroupId() : null;

ScheduledUnit toSchedule = locationConstraint == null ?
new ScheduledUnit(this, sharingGroup) :
new ScheduledUnit(this, sharingGroup, locationConstraint);
new ScheduledUnit(this, slotSharingGroupId) :
new ScheduledUnit(this, slotSharingGroupId, locationConstraint);

// calculate the preferred locations
final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = calculatePreferredLocations(locationPreferenceConstraint);
Expand All @@ -461,7 +466,7 @@ public CompletableFuture<Execution> allocateAndAssignSlotForExecution(
return this;
} else {
// release the slot
logicalSlot.releaseSlot();
logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to execution " + this + '.'));

throw new CompletionException(new FlinkException("Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
}
Expand Down Expand Up @@ -513,7 +518,7 @@ public void deploy() throws JobException {

// race double check, did we fail/cancel and do we need to release the slot?
if (this.state != DEPLOYING) {
slot.releaseSlot();
slot.releaseSlot(new FlinkException("Actual state of execution " + this + " (" + state + ") does not match expected state DEPLOYING."));
return;
}

Expand Down Expand Up @@ -622,7 +627,7 @@ else if (current == CREATED || current == SCHEDULED) {
try {
vertex.getExecutionGraph().deregisterExecution(this);

releaseAssignedResource();
releaseAssignedResource(new FlinkException("Execution " + this + " was cancelled."));
}
finally {
vertex.executionCanceled(this);
Expand Down Expand Up @@ -889,7 +894,7 @@ void markFinished(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics met

updateAccumulatorsAndMetrics(userAccumulators, metrics);

releaseAssignedResource();
releaseAssignedResource(null);

vertex.getExecutionGraph().deregisterExecution(this);
}
Expand Down Expand Up @@ -942,7 +947,7 @@ else if (current == CANCELING || current == RUNNING || current == DEPLOYING) {

if (transitionState(current, CANCELED)) {
try {
releaseAssignedResource();
releaseAssignedResource(new FlinkException("Execution " + this + " was cancelled."));

vertex.getExecutionGraph().deregisterExecution(this);
}
Expand Down Expand Up @@ -1034,7 +1039,7 @@ private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumul
updateAccumulatorsAndMetrics(userAccumulators, metrics);

try {
releaseAssignedResource();
releaseAssignedResource(t);
vertex.getExecutionGraph().deregisterExecution(this);
}
finally {
Expand Down Expand Up @@ -1175,12 +1180,14 @@ private void sendUpdatePartitionInfoRpcCall(
/**
* Releases the assigned resource and completes the release future
* once the assigned resource has been successfully released
*
* @param cause for the resource release, null if none
*/
private void releaseAssignedResource() {
private void releaseAssignedResource(@Nullable Throwable cause) {
final LogicalSlot slot = assignedResource;

if (slot != null) {
slot.releaseSlot().whenComplete(
slot.releaseSlot(cause).whenComplete(
(Object ignored, Throwable throwable) -> {
if (throwable != null) {
releaseFuture.completeExceptionally(throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.LogicalSlot;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -163,8 +165,9 @@ public void markDead() {
* owning the assignment group lock wants to give itself back to the instance which requires
* the instance lock
*/
final FlinkException cause = new FlinkException("Instance " + this + " has been marked as dead.");
for (Slot slot : slots) {
slot.releaseInstanceSlot();
slot.releaseSlot(cause);
}
}

Expand Down Expand Up @@ -321,8 +324,9 @@ public void cancelAndReleaseAllSlots() {
copy = new ArrayList<Slot>(this.allocatedSlots);
}

final FlinkException cause = new FlinkException("Cancel and release all slots of instance " + this + '.');
for (Slot slot : copy) {
slot.releaseInstanceSlot();
slot.releaseSlot(cause);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.flink.runtime.instance;

import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -55,8 +57,6 @@ public class SharedSlot extends Slot implements LogicalSlot {
/** The set os sub-slots allocated from this shared slot */
private final Set<Slot> subSlots;

private final CompletableFuture<?> cancellationFuture = new CompletableFuture<>();

// ------------------------------------------------------------------------
// Old Constructors (prior FLIP-6)
// ------------------------------------------------------------------------
Expand All @@ -72,9 +72,9 @@ public class SharedSlot extends Slot implements LogicalSlot {
* @param assignmentGroup The assignment group that this shared slot belongs to.
*/
public SharedSlot(
SlotOwner owner, TaskManagerLocation location, int slotNumber,
TaskManagerGateway taskManagerGateway,
SlotSharingGroupAssignment assignmentGroup) {
SlotOwner owner, TaskManagerLocation location, int slotNumber,
TaskManagerGateway taskManagerGateway,
SlotSharingGroupAssignment assignmentGroup) {

this(owner, location, slotNumber, taskManagerGateway, assignmentGroup, null, null);
}
Expand Down Expand Up @@ -174,6 +174,11 @@ public boolean hasChildren() {
return subSlots.size() > 0;
}

@Override
public Locality getLocality() {
return Locality.UNKNOWN;
}

@Override
public boolean tryAssignPayload(Payload payload) {
throw new UnsupportedOperationException("Cannot assign an execution attempt id to a shared slot.");
Expand All @@ -186,9 +191,7 @@ public Payload getPayload() {
}

@Override
public CompletableFuture<?> releaseSlot() {
cancellationFuture.completeExceptionally(new FlinkException("Shared slot " + this + " is being released."));

public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
assignmentGroup.releaseSharedSlot(this);

if (!(isReleased() && subSlots.isEmpty())) {
Expand All @@ -198,11 +201,6 @@ public CompletableFuture<?> releaseSlot() {
return CompletableFuture.completedFuture(null);
}

@Override
public void releaseInstanceSlot() {
releaseSlot();
}

@Override
public int getPhysicalSlotNumber() {
return getRootSlotNumber();
Expand All @@ -214,8 +212,14 @@ public AllocationID getAllocationId() {
}

@Override
public SlotRequestID getSlotRequestId() {
return getSlotContext().getSlotRequestId();
public SlotRequestId getSlotRequestId() {
return NO_SLOT_REQUEST_ID;
}

@Nullable
@Override
public SlotSharingGroupId getSlotSharingGroupId() {
return NO_SLOT_SHARING_GROUP_ID;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.slots.SimpleSlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotContext;
import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotContext;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
Expand Down Expand Up @@ -69,8 +70,8 @@ public class SimpleSlot extends Slot implements LogicalSlot {
* @param taskManagerGateway The gateway to communicate with the TaskManager of this slot
*/
public SimpleSlot(
SlotOwner owner, TaskManagerLocation location, int slotNumber,
TaskManagerGateway taskManagerGateway) {
SlotOwner owner, TaskManagerLocation location, int slotNumber,
TaskManagerGateway taskManagerGateway) {
this(owner, location, slotNumber, taskManagerGateway, null, null);
}

Expand All @@ -97,7 +98,6 @@ public SimpleSlot(
parent != null ?
parent.getSlotContext() :
new SimpleSlotContext(
NO_SLOT_REQUEST_ID,
NO_ALLOCATION_ID,
location,
slotNumber,
Expand Down Expand Up @@ -218,18 +218,13 @@ public void setLocality(Locality locality) {
// ------------------------------------------------------------------------

@Override
public void releaseInstanceSlot() {
releaseSlot();
}

@Override
public CompletableFuture<?> releaseSlot() {
public CompletableFuture<?> releaseSlot(@Nullable Throwable cause) {
if (!isCanceled()) {
final CompletableFuture<?> terminationFuture;

if (payload != null) {
// trigger the failure of the slot payload
payload.fail(new FlinkException("TaskManager was lost/killed: " + getTaskManagerLocation()));
payload.fail(cause != null ? cause : new FlinkException("TaskManager was lost/killed: " + getTaskManagerLocation()));

// wait for the termination of the payload before releasing the slot
terminationFuture = payload.getTerminalStateFuture();
Expand Down Expand Up @@ -276,8 +271,14 @@ public AllocationID getAllocationId() {
}

@Override
public SlotRequestID getSlotRequestId() {
return getSlotContext().getSlotRequestId();
public SlotRequestId getSlotRequestId() {
return NO_SLOT_REQUEST_ID;
}

@Nullable
@Override
public SlotSharingGroupId getSlotSharingGroupId() {
return NO_SLOT_SHARING_GROUP_ID;
}

// ------------------------------------------------------------------------
Expand Down

0 comments on commit 711cadc

Please sign in to comment.