Skip to content

Commit

Permalink
[FLINK-9912][JM] Release TaskExecutors if they have no slots register…
Browse files Browse the repository at this point in the history
…ed at SlotPool

This commit extends the SlotPools behaviour when failing an allocation by sending a notification
message to the TaskExecutor about the freed slot. Moreover, it checks whether the affected
TaskExecutor has more slots registered or not. In the latter case, the TaskExecutor's connection
will be eagerly closed.

This closes apache#6394.
  • Loading branch information
tillrohrmann authored and kl0u committed Sep 20, 2018
1 parent d0a0230 commit f8fa9c5
Show file tree
Hide file tree
Showing 9 changed files with 347 additions and 61 deletions.
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.types;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.Serializable;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.function.Consumer;

/**
* Serializable {@link Optional}.
*/
public final class SerializableOptional<T extends Serializable> implements Serializable {
private static final long serialVersionUID = -3312769593551775940L;

private static final SerializableOptional<?> EMPTY = new SerializableOptional<>(null);

@Nullable
private final T value;

private SerializableOptional(@Nullable T value) {
this.value = value;
}

public T get() {
if (value == null) {
throw new NoSuchElementException("No value present");
}
return value;
}

public boolean isPresent() {
return value != null;
}

public void ifPresent(Consumer<? super T> consumer) {
if (value != null) {
consumer.accept(value);
}
}

public static <T extends Serializable> SerializableOptional<T> of(@Nonnull T value) {
return new SerializableOptional<>(value);
}

@SuppressWarnings("unchecked")
public static <T extends Serializable> SerializableOptional<T> empty() {
return (SerializableOptional<T>) EMPTY;
}
}
Expand Up @@ -103,6 +103,7 @@
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
Expand Down Expand Up @@ -833,13 +834,25 @@ public void failSlot(
final Exception cause) {

if (registeredTaskManagers.containsKey(taskManagerId)) {
slotPoolGateway.failAllocation(allocationId, cause);
internalFailAllocation(allocationId, cause);
} else {
log.warn("Cannot fail slot " + allocationId + " because the TaskManager " +
taskManagerId + " is unknown.");
}
}

private void internalFailAllocation(AllocationID allocationId, Exception cause) {
final CompletableFuture<SerializableOptional<ResourceID>> emptyTaskExecutorFuture = slotPoolGateway.failAllocation(allocationId, cause);

emptyTaskExecutorFuture.thenAcceptAsync(
resourceIdOptional -> resourceIdOptional.ifPresent(this::releaseEmptyTaskManager),
getMainThreadExecutor());
}

private CompletableFuture<Acknowledge> releaseEmptyTaskManager(ResourceID resourceId) {
return disconnectTaskManager(resourceId, new FlinkException(String.format("No more slots registered at JobMaster %s.", resourceId)));
}

@Override
public CompletableFuture<RegistrationResponse> registerTaskManager(
final String taskManagerRpcAddress,
Expand Down Expand Up @@ -982,7 +995,7 @@ public CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackP

@Override
public void notifyAllocationFailure(AllocationID allocationID, Exception cause) {
slotPoolGateway.failAllocation(allocationID, cause);
internalFailAllocation(allocationID, cause);
}

//----------------------------------------------------------------------------------------------
Expand Down
Expand Up @@ -50,6 +50,7 @@
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.clock.Clock;
import org.apache.flink.runtime.util.clock.SystemClock;
import org.apache.flink.types.SerializableOptional;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -1001,32 +1002,50 @@ public CompletableFuture<Boolean> offerSlot(
* and decided to take it back.
*
* @param allocationID Represents the allocation which should be failed
* @param cause The cause of the failure
* @param cause The cause of the failure
* @return Optional task executor if it has no more slots registered
*/
@Override
public void failAllocation(final AllocationID allocationID, final Exception cause) {
public CompletableFuture<SerializableOptional<ResourceID>> failAllocation(final AllocationID allocationID, final Exception cause) {
final PendingRequest pendingRequest = pendingRequests.removeKeyB(allocationID);
if (pendingRequest != null) {
// request was still pending
failPendingRequest(pendingRequest, cause);
}
else if (availableSlots.tryRemove(allocationID)) {
log.debug("Failed available slot [{}].", allocationID, cause);
return CompletableFuture.completedFuture(SerializableOptional.empty());
}
else {
AllocatedSlot allocatedSlot = allocatedSlots.remove(allocationID);
if (allocatedSlot != null) {
// release the slot.
// since it is not in 'allocatedSlots' any more, it will be dropped o return'
allocatedSlot.releasePayload(cause);
}
else {
log.trace("Outdated request to fail slot [{}].", allocationID, cause);
}
return tryFailingAllocatedSlot(allocationID, cause);
}

// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
}

private CompletableFuture<SerializableOptional<ResourceID>> tryFailingAllocatedSlot(AllocationID allocationID, Exception cause) {
AllocatedSlot allocatedSlot = availableSlots.tryRemove(allocationID);

if (allocatedSlot == null) {
allocatedSlot = allocatedSlots.remove(allocationID);
}

if (allocatedSlot != null) {
log.debug("Failed allocated slot [{}]: {}", allocationID, cause.getMessage());

// notify TaskExecutor about the failure
allocatedSlot.getTaskManagerGateway().freeSlot(allocationID, cause, rpcTimeout);
// release the slot.
// since it is not in 'allocatedSlots' any more, it will be dropped o return'
allocatedSlot.releasePayload(cause);

final ResourceID taskManagerId = allocatedSlot.getTaskManagerId();

if (!availableSlots.containsTaskManager(taskManagerId) && !allocatedSlots.containResource(taskManagerId)) {
return CompletableFuture.completedFuture(SerializableOptional.of(taskManagerId));
}
}

return CompletableFuture.completedFuture(SerializableOptional.empty());
}

// ------------------------------------------------------------------------
// Resource
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -1107,7 +1126,7 @@ private void checkIdleSlot() {

for (AllocatedSlot expiredSlot : expiredSlots) {
final AllocationID allocationID = expiredSlot.getAllocationId();
if (availableSlots.tryRemove(allocationID)) {
if (availableSlots.tryRemove(allocationID) != null) {

log.info("Releasing idle slot [{}].", allocationID);
final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
Expand Down Expand Up @@ -1502,7 +1521,7 @@ Set<AllocatedSlot> removeAllForTaskManager(final ResourceID taskManager) {
}
}

boolean tryRemove(AllocationID slotId) {
AllocatedSlot tryRemove(AllocationID slotId) {
final SlotAndTimestamp sat = availableSlots.remove(slotId);
if (sat != null) {
final AllocatedSlot slot = sat.slot();
Expand All @@ -1522,15 +1541,15 @@ boolean tryRemove(AllocationID slotId) {
availableSlotsByHost.remove(host);
}

return true;
return slot;
}
else {
return false;
return null;
}
}

private void remove(AllocationID slotId) throws IllegalStateException {
if (!tryRemove(slotId)) {
if (tryRemove(slotId) == null) {
throw new IllegalStateException("slot not contained");
}
}
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.types.SerializableOptional;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -126,8 +127,9 @@ CompletableFuture<Collection<SlotOffer>> offerSlots(
*
* @param allocationID identifying the slot which is being failed
* @param cause of the failure
* @return An optional task executor id if this task executor has no more slots registered
*/
void failAllocation(AllocationID allocationID, Exception cause);
CompletableFuture<SerializableOptional<ResourceID>> failAllocation(AllocationID allocationID, Exception cause);

// ------------------------------------------------------------------------
// allocating and disposing slots
Expand Down
Expand Up @@ -34,9 +34,12 @@
import org.apache.flink.runtime.messages.StackTrace;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;

import javax.annotation.Nonnull;

import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;

Expand All @@ -54,6 +57,9 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {

private volatile BiFunction<AllocationID, Throwable, CompletableFuture<Acknowledge>> freeSlotFunction;

@Nonnull
private volatile BiConsumer<InstanceID, Exception> disconnectFromJobManagerConsumer = (ignoredA, ignoredB) -> {};

public SimpleAckingTaskManagerGateway() {
optSubmitConsumer = Optional.empty();
optCancelConsumer = Optional.empty();
Expand All @@ -71,13 +77,19 @@ public void setFreeSlotFunction(BiFunction<AllocationID, Throwable, CompletableF
this.freeSlotFunction = freeSlotFunction;
}

public void setDisconnectFromJobManagerConsumer(@Nonnull BiConsumer<InstanceID, Exception> disconnectFromJobManagerConsumer) {
this.disconnectFromJobManagerConsumer = disconnectFromJobManagerConsumer;
}

@Override
public String getAddress() {
return address;
}

@Override
public void disconnectFromJobManager(InstanceID instanceId, Exception cause) {}
public void disconnectFromJobManager(InstanceID instanceId, Exception cause) {
disconnectFromJobManagerConsumer.accept(instanceId, cause);
}

@Override
public void stopCluster(ApplicationStatus applicationStatus, String message) {}
Expand Down

0 comments on commit f8fa9c5

Please sign in to comment.