Skip to content

Commit

Permalink
Further simplify SourcePartitionedScheduler
Browse files Browse the repository at this point in the history
After removal of grouped execution SourcePartitionedScheduler
can be futher simplified.
  • Loading branch information
sopel39 committed Aug 17, 2022
1 parent f71cb66 commit f45ff16
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ public class ScheduleResult
public enum BlockedReason
{
WRITER_SCALING,
NO_ACTIVE_DRIVER_GROUP,
SPLIT_QUEUES_FULL,
WAITING_FOR_SOURCE,
MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE,
/**/;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.trino.execution.RemoteTask;
import io.trino.execution.TableExecuteContext;
import io.trino.execution.TableExecuteContextManager;
Expand All @@ -30,7 +29,6 @@
import io.trino.split.SplitSource.SplitBatch;
import io.trino.sql.planner.plan.PlanNodeId;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -49,9 +47,6 @@
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.addSuccessCallback;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.concurrent.MoreFutures.whenAnyComplete;
import static io.trino.execution.scheduler.ScheduleResult.BlockedReason.MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE;
import static io.trino.execution.scheduler.ScheduleResult.BlockedReason.NO_ACTIVE_DRIVER_GROUP;
import static io.trino.execution.scheduler.ScheduleResult.BlockedReason.SPLIT_QUEUES_FULL;
import static io.trino.execution.scheduler.ScheduleResult.BlockedReason.WAITING_FOR_SOURCE;
import static java.util.Objects.requireNonNull;
Expand All @@ -66,6 +61,11 @@ private enum State
*/
INITIALIZED,

/**
* At least one split has been added to pendingSplits set.
*/
SPLITS_ADDED,

/**
* All splits from underlying SplitSource have been discovered.
* No more splits will be added to the pendingSplits set.
Expand All @@ -89,14 +89,12 @@ private enum State
private final BooleanSupplier anySourceTaskBlocked;
private final PartitionIdAllocator partitionIdAllocator;
private final Map<InternalNode, RemoteTask> scheduledTasks;
private final Set<Split> pendingSplits = new HashSet<>();

public ListenableFuture<SplitBatch> nextSplitBatchFuture;
public ListenableFuture<Void> placementFuture = immediateVoidFuture();
public final Set<Split> pendingSplits = new HashSet<>();
private ListenableFuture<SplitBatch> nextSplitBatchFuture;
private ListenableFuture<Void> placementFuture = immediateVoidFuture();
private State state = State.INITIALIZED;

private SettableFuture<Void> whenFinished = SettableFuture.create();

private SourcePartitionedScheduler(
StageExecution stageExecution,
PlanNodeId partitionedNode,
Expand Down Expand Up @@ -226,102 +224,85 @@ public synchronized void start()
@Override
public synchronized ScheduleResult schedule()
{
dropListenersFromWhenFinished();
if (state == State.FINISHED) {
return new ScheduleResult(true, ImmutableSet.of(), 0);
}

int overallSplitAssignmentCount = 0;
Multimap<InternalNode, Split> splitAssignment = ImmutableMultimap.of();
ImmutableSet.Builder<RemoteTask> overallNewTasks = ImmutableSet.builder();
List<ListenableFuture<?>> overallBlockedFutures = new ArrayList<>();
boolean anyBlockedOnPlacements = false;
boolean anyBlockedOnNextSplitBatch = false;
boolean anyNotBlocked = false;

if (state != State.FINISHED) {
if (state == State.SPLITS_SCHEDULED) {
verify(nextSplitBatchFuture == null);
}
else if (pendingSplits.isEmpty()) {
// try to get the next batch
if (nextSplitBatchFuture == null) {
nextSplitBatchFuture = splitSource.getNextBatch(splitBatchSize);
Optional<ListenableFuture<Void>> blockedFuture = Optional.empty();
boolean blockedOnPlacements = false;
boolean blockedOnNextSplitBatch = false;

long start = System.nanoTime();
addSuccessCallback(nextSplitBatchFuture, () -> stageExecution.recordGetSplitTime(start));
}
if (state == State.SPLITS_SCHEDULED) {
verify(nextSplitBatchFuture == null);
}
else if (pendingSplits.isEmpty()) {
// try to get the next batch
if (nextSplitBatchFuture == null) {
nextSplitBatchFuture = splitSource.getNextBatch(splitBatchSize);

if (nextSplitBatchFuture.isDone()) {
SplitBatch nextSplits = getFutureValue(nextSplitBatchFuture);
nextSplitBatchFuture = null;
pendingSplits.addAll(nextSplits.getSplits());
if (nextSplits.isLastBatch()) {
if (state == State.INITIALIZED && pendingSplits.isEmpty()) {
// Add an empty split in case no splits have been produced for the source.
// For source operators, they never take input, but they may produce output.
// This is well handled by the execution engine.
// However, there are certain non-source operators that may produce output without any input,
// for example, 1) an AggregationOperator, 2) a HashAggregationOperator where one of the grouping sets is ().
// Scheduling an empty split kicks off necessary driver instantiation to make this work.
pendingSplits.add(new Split(
splitSource.getCatalogHandle(),
new EmptySplit(splitSource.getCatalogHandle())));
}
state = State.SPLITS_SCHEDULED;
}
}
else {
overallBlockedFutures.add(nextSplitBatchFuture);
anyBlockedOnNextSplitBatch = true;
}
long start = System.nanoTime();
addSuccessCallback(nextSplitBatchFuture, () -> stageExecution.recordGetSplitTime(start));
}
if (!anyBlockedOnNextSplitBatch) {
Multimap<InternalNode, Split> splitAssignment = ImmutableMultimap.of();
boolean skip = false;
if (!pendingSplits.isEmpty()) {
if (!placementFuture.isDone()) {
anyBlockedOnPlacements = true;
skip = true;
}
else { // calculate placements for splits
SplitPlacementResult splitPlacementResult = splitPlacementPolicy.computeAssignments(pendingSplits);
splitAssignment = splitPlacementResult.getAssignments(); // remove splits with successful placements
splitAssignment.values().forEach(pendingSplits::remove); // AbstractSet.removeAll performs terribly here.
overallSplitAssignmentCount += splitAssignment.size(); // if not completed placed, mark scheduleGroup as blocked on placement
if (!pendingSplits.isEmpty()) {
placementFuture = splitPlacementResult.getBlocked();
overallBlockedFutures.add(placementFuture);
anyBlockedOnPlacements = true;
}

if (nextSplitBatchFuture.isDone()) {
SplitBatch nextSplits = getFutureValue(nextSplitBatchFuture);
nextSplitBatchFuture = null;
pendingSplits.addAll(nextSplits.getSplits());
if (nextSplits.isLastBatch()) {
if (state == State.INITIALIZED && pendingSplits.isEmpty()) {
// Add an empty split in case no splits have been produced for the source.
// For source operators, they never take input, but they may produce output.
// This is well handled by the execution engine.
// However, there are certain non-source operators that may produce output without any input,
// for example, 1) an AggregationOperator, 2) a HashAggregationOperator where one of the grouping sets is ().
// Scheduling an empty split kicks off necessary driver instantiation to make this work.
pendingSplits.add(new Split(
splitSource.getCatalogHandle(),
new EmptySplit(splitSource.getCatalogHandle())));
}
state = State.SPLITS_SCHEDULED;
}
if (!skip) { // if no new splits will be assigned, update state and attach completion event
if (pendingSplits.isEmpty() && state == State.SPLITS_SCHEDULED) {
state = State.FINISHED;
}
}
else {
blockedFuture = Optional.of(asVoid(nextSplitBatchFuture));
blockedOnNextSplitBatch = true;
}
}

// assign the splits with successful placements
overallNewTasks.addAll(assignSplits(splitAssignment));

// Assert that "placement future is not done" implies "pendingSplits is not empty".
// The other way around is not true. One obvious reason is (un)lucky timing, where the placement is unblocked between `computeAssignments` and this line.
// However, there are other reasons that could lead to this.
// Note that `computeAssignments` is quite broken:
// 1. It always returns a completed future when there are no tasks, regardless of whether all nodes are blocked.
// 2. The returned future will only be completed when a node with an assigned task becomes unblocked. Other nodes don't trigger future completion.
// As a result, to avoid busy loops caused by 1, we check pendingSplits.isEmpty() instead of placementFuture.isDone() here.
if (nextSplitBatchFuture == null && pendingSplits.isEmpty() && state != State.FINISHED) {
anyNotBlocked = true;
}
if (!pendingSplits.isEmpty() && state == State.INITIALIZED) {
state = State.SPLITS_ADDED;
}

if (blockedFuture.isEmpty() && !pendingSplits.isEmpty()) {
if (!placementFuture.isDone()) {
blockedFuture = Optional.of(placementFuture);
blockedOnPlacements = true;
}
else {
// calculate placements for splits
SplitPlacementResult splitPlacementResult = splitPlacementPolicy.computeAssignments(pendingSplits);
splitAssignment = splitPlacementResult.getAssignments(); // remove splits with successful placements
splitAssignment.values().forEach(pendingSplits::remove); // AbstractSet.removeAll performs terribly here.
overallSplitAssignmentCount += splitAssignment.size(); // if not completed placed, mark scheduleGroup as blocked on placement
if (!pendingSplits.isEmpty()) {
placementFuture = splitPlacementResult.getBlocked();
blockedFuture = Optional.of(placementFuture);
blockedOnPlacements = true;
}
}
}

// * `splitSource.isFinished` invocation may fail after `splitSource.close` has been invoked.
// If state is FINISHED, splitSource.isFinished has previously returned true, and splitSource is closed now.
// * Even if `splitSource.isFinished()` return true, it is not necessarily safe to tear down the split source.
// * If anyBlockedOnNextSplitBatch is true, it means we have not checked out the recently completed nextSplitBatch futures,
// which may contain recently published splits. We must not ignore those.
if (state == State.FINISHED && splitSource.isFinished()) {
Optional<List<Object>> tableExecuteSplitsInfo = splitSource.getTableExecuteSplitsInfo();
// assign the splits with successful placements
overallNewTasks.addAll(assignSplits(splitAssignment));

// if no new splits will be assigned, update state and attach completion event
if (pendingSplits.isEmpty() && state == State.SPLITS_SCHEDULED) {
state = State.FINISHED;

Optional<List<Object>> tableExecuteSplitsInfo = splitSource.getTableExecuteSplitsInfo();
// Here we assume that we can get non-empty tableExecuteSplitsInfo only for queries which facilitate single split source.
tableExecuteSplitsInfo.ifPresent(info -> {
TableExecuteContext tableExecuteContext = tableExecuteContextManager.getTableExecuteContextForQuery(stageExecution.getStageId().getQueryId());
Expand All @@ -335,53 +316,35 @@ else if (pendingSplits.isEmpty()) {
overallSplitAssignmentCount);
}

if (anyNotBlocked) {
if (blockedFuture.isEmpty()) {
return new ScheduleResult(false, overallNewTasks.build(), overallSplitAssignmentCount);
}

boolean anySourceTaskBlocked = this.anySourceTaskBlocked.getAsBoolean();
if (anySourceTaskBlocked) {
if (anySourceTaskBlocked.getAsBoolean()) {
// Dynamic filters might not be collected due to build side source tasks being blocked on full buffer.
// In such case probe split generation that is waiting for dynamic filters should be unblocked to prevent deadlock.
dynamicFilterService.unblockStageDynamicFilters(stageExecution.getStageId().getQueryId(), stageExecution.getAttemptId(), stageExecution.getFragment());
}

if (anyBlockedOnPlacements && anySourceTaskBlocked) {
// In a broadcast join, output buffers of the tasks in build source stage have to
// hold onto all data produced before probe side task scheduling finishes,
// even if the data is acknowledged by all known consumers. This is because
// new consumers may be added until the probe side task scheduling finishes.
//
// As a result, the following line is necessary to prevent deadlock
// due to neither build nor probe can make any progress.
// The build side blocks due to a full output buffer.
// In the meantime the probe side split cannot be consumed since
// builder side hash table construction has not finished.
overallNewTasks.addAll(finalizeTaskCreationIfNecessary());
}

ScheduleResult.BlockedReason blockedReason;
if (anyBlockedOnNextSplitBatch) {
blockedReason = anyBlockedOnPlacements ? MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE : WAITING_FOR_SOURCE;
}
else {
blockedReason = anyBlockedOnPlacements ? SPLIT_QUEUES_FULL : NO_ACTIVE_DRIVER_GROUP;
}

overallBlockedFutures.add(whenFinished);

if (state == State.FINISHED && splitSource.isFinished()) {
// Wake up blocked caller so that it will invoke schedule() right away.
// Once schedule is invoked, state will be transitioned to FINISHED.
whenFinished.set(null);
whenFinished = SettableFuture.create();
if (blockedOnPlacements) {
// In a broadcast join, output buffers of the tasks in build source stage have to
// hold onto all data produced before probe side task scheduling finishes,
// even if the data is acknowledged by all known consumers. This is because
// new consumers may be added until the probe side task scheduling finishes.
//
// As a result, the following line is necessary to prevent deadlock
// due to neither build nor probe can make any progress.
// The build side blocks due to a full output buffer.
// In the meantime the probe side split cannot be consumed since
// builder side hash table construction has not finished.
overallNewTasks.addAll(finalizeTaskCreationIfNecessary());
}
}

return new ScheduleResult(
false,
overallNewTasks.build(),
nonCancellationPropagating(asVoid(whenAnyComplete(overallBlockedFutures))),
blockedReason,
nonCancellationPropagating(blockedFuture.get()),
blockedOnNextSplitBatch ? WAITING_FOR_SOURCE : SPLIT_QUEUES_FULL,
overallSplitAssignmentCount);
}

Expand All @@ -390,25 +353,6 @@ private static <T> ListenableFuture<Void> asVoid(ListenableFuture<T> future)
return Futures.transform(future, v -> null, directExecutor());
}

private synchronized void dropListenersFromWhenFinished()
{
// whenFinished may remain in a not-done state for an extended period of time.
// As a result, over time, it can retain a huge number of listener objects.

// Whenever schedule is called, holding onto the previous listener is not useful anymore.
// Therefore, we drop those listeners here by recreating the future.

// Note: The following implementation is thread-safe because whenFinished can only be completed
// while holding the monitor of this.

if (whenFinished.isDone()) {
return;
}

whenFinished.cancel(true);
whenFinished = SettableFuture.create();
}

@Override
public void close()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1582,9 +1582,6 @@ else if (!result.getBlocked().isDone()) {
case SPLIT_QUEUES_FULL:
schedulerStats.getSplitQueuesFull().update(1);
break;
case MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE:
case NO_ACTIVE_DRIVER_GROUP:
break;
default:
throw new UnsupportedOperationException("Unknown blocked reason: " + result.getBlockedReason().get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,29 @@ public void testScheduleNoSplits()
stage.abort();
}

@Test
public void testDoesNotScheduleEmptySplit()
{
PlanFragment plan = createFragment();
NodeTaskMap nodeTaskMap = new NodeTaskMap(finalizerService);
StageExecution stage = createStageExecution(plan, nodeTaskMap);

ConnectorSplitSource splitSource = createFixedSplitSource(2, TestingSplit::createRemoteSplit);
StageScheduler scheduler = getSourcePartitionedScheduler(splitSource, stage, nodeManager, nodeTaskMap, 1, STAGE);

assertEquals(scheduler.schedule().getNewTasks().size(), 1);

// ensure that next batch size fetched by scheduler will be empty and last
splitSource.getNextBatch(1);

ScheduleResult scheduleResult = scheduler.schedule();
assertEquals(scheduleResult.getNewTasks().size(), 0);

assertEffectivelyFinished(scheduleResult, scheduler);

stage.abort();
}

@Test
public void testScheduleSplitsOneAtATime()
{
Expand Down

0 comments on commit f45ff16

Please sign in to comment.