Skip to content

Commit

Permalink
Do not add exchange locations for finished tasks
Browse files Browse the repository at this point in the history
This prevents query hangs when using writer scaling as it can create new
tasks with exchange locations referencing tasks that are already expired
(and thus will be auto-created as new tasks that will never complete).
  • Loading branch information
electrum committed Dec 29, 2017
1 parent 886cdf9 commit 2ec80be
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
Expand Up @@ -84,7 +84,7 @@ public final class SqlStageExecution
private final Set<TaskId> finishedTasks = newConcurrentHashSet(); private final Set<TaskId> finishedTasks = newConcurrentHashSet();
private final AtomicBoolean splitsScheduled = new AtomicBoolean(); private final AtomicBoolean splitsScheduled = new AtomicBoolean();


private final Multimap<PlanNodeId, URI> exchangeLocations = HashMultimap.create(); private final Multimap<PlanNodeId, RemoteTask> sourceTasks = HashMultimap.create();
private final Set<PlanNodeId> completeSources = newConcurrentHashSet(); private final Set<PlanNodeId> completeSources = newConcurrentHashSet();
private final Set<PlanFragmentId> completeSourceFragments = newConcurrentHashSet(); private final Set<PlanFragmentId> completeSourceFragments = newConcurrentHashSet();


Expand Down Expand Up @@ -231,19 +231,20 @@ public StageInfo getStageInfo()
ImmutableList::of); ImmutableList::of);
} }


public synchronized void addExchangeLocations(PlanFragmentId fragmentId, Set<URI> exchangeLocations, boolean noMoreExchangeLocations) public synchronized void addExchangeLocations(PlanFragmentId fragmentId, Set<RemoteTask> sourceTasks, boolean noMoreExchangeLocations)
{ {
requireNonNull(fragmentId, "fragmentId is null"); requireNonNull(fragmentId, "fragmentId is null");
requireNonNull(exchangeLocations, "exchangeLocations is null"); requireNonNull(sourceTasks, "sourceTasks is null");


RemoteSourceNode remoteSource = exchangeSources.get(fragmentId); RemoteSourceNode remoteSource = exchangeSources.get(fragmentId);
checkArgument(remoteSource != null, "Unknown remote source %s. Known sources are %s", fragmentId, exchangeSources.keySet()); checkArgument(remoteSource != null, "Unknown remote source %s. Known sources are %s", fragmentId, exchangeSources.keySet());


this.exchangeLocations.putAll(remoteSource.getId(), exchangeLocations); this.sourceTasks.putAll(remoteSource.getId(), sourceTasks);


for (RemoteTask task : getAllTasks()) { for (RemoteTask task : getAllTasks()) {
ImmutableMultimap.Builder<PlanNodeId, Split> newSplits = ImmutableMultimap.builder(); ImmutableMultimap.Builder<PlanNodeId, Split> newSplits = ImmutableMultimap.builder();
for (URI exchangeLocation : exchangeLocations) { for (RemoteTask sourceTask : sourceTasks) {
URI exchangeLocation = sourceTask.getTaskStatus().getSelf();
newSplits.put(remoteSource.getId(), createRemoteSplitFor(task.getTaskId(), exchangeLocation)); newSplits.put(remoteSource.getId(), createRemoteSplitFor(task.getTaskId(), exchangeLocation));
} }
task.addSplits(newSplits.build()); task.addSplits(newSplits.build());
Expand Down Expand Up @@ -349,9 +350,13 @@ private synchronized RemoteTask scheduleTask(Node node, TaskId taskId, Multimap<


ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder(); ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder();
initialSplits.putAll(sourceSplits); initialSplits.putAll(sourceSplits);
for (Entry<PlanNodeId, URI> entry : exchangeLocations.entries()) {
initialSplits.put(entry.getKey(), createRemoteSplitFor(taskId, entry.getValue())); sourceTasks.forEach((planNodeId, task) -> {
} TaskStatus status = task.getTaskStatus();
if (status.getState() != TaskState.FINISHED) {
initialSplits.put(planNodeId, createRemoteSplitFor(taskId, status.getSelf()));
}
});


OutputBuffers outputBuffers = this.outputBuffers.get(); OutputBuffers outputBuffers = this.outputBuffers.get();
checkState(outputBuffers != null, "Initial output buffers must be set before a task can be scheduled"); checkState(outputBuffers != null, "Initial output buffers must be set before a task can be scheduled");
Expand Down
Expand Up @@ -155,7 +155,7 @@ public SqlQueryScheduler(QueryStateMachine queryStateMachine,


OutputBufferId rootBufferId = Iterables.getOnlyElement(rootOutputBuffers.getBuffers().keySet()); OutputBufferId rootBufferId = Iterables.getOnlyElement(rootOutputBuffers.getBuffers().keySet());
List<SqlStageExecution> stages = createStages( List<SqlStageExecution> stages = createStages(
(fragmentId, exchangeLocations, noMoreExchangeLocations) -> updateQueryOutputLocations(queryStateMachine, rootBufferId, exchangeLocations, noMoreExchangeLocations), (fragmentId, tasks, noMoreExchangeLocations) -> updateQueryOutputLocations(queryStateMachine, rootBufferId, tasks, noMoreExchangeLocations),
new AtomicInteger(), new AtomicInteger(),
locationFactory, locationFactory,
plan.withBucketToPartition(Optional.of(new int[1])), plan.withBucketToPartition(Optional.of(new int[1])),
Expand Down Expand Up @@ -217,9 +217,10 @@ else if (queryStateMachine.getQueryState() == QueryState.STARTING) {
} }
} }


private static void updateQueryOutputLocations(QueryStateMachine queryStateMachine, OutputBufferId rootBufferId, Set<URI> exchangeLocations, boolean noMoreExchangeLocations) private static void updateQueryOutputLocations(QueryStateMachine queryStateMachine, OutputBufferId rootBufferId, Set<RemoteTask> tasks, boolean noMoreExchangeLocations)
{ {
Set<URI> bufferLocations = exchangeLocations.stream() Set<URI> bufferLocations = tasks.stream()
.map(task -> task.getTaskStatus().getSelf())
.map(location -> uriBuilderFrom(location).appendPath("results").appendPath(rootBufferId.toString()).build()) .map(location -> uriBuilderFrom(location).appendPath("results").appendPath(rootBufferId.toString()).build())
.collect(toImmutableSet()); .collect(toImmutableSet());
queryStateMachine.updateOutputLocations(bufferLocations, noMoreExchangeLocations); queryStateMachine.updateOutputLocations(bufferLocations, noMoreExchangeLocations);
Expand Down Expand Up @@ -573,7 +574,7 @@ private static ListenableFuture<?> whenAllStages(Collection<SqlStageExecution> s


private interface ExchangeLocationsConsumer private interface ExchangeLocationsConsumer
{ {
void addExchangeLocations(PlanFragmentId fragmentId, Set<URI> exchangeLocations, boolean noMoreExchangeLocations); void addExchangeLocations(PlanFragmentId fragmentId, Set<RemoteTask> tasks, boolean noMoreExchangeLocations);
} }


private static class StageLinkage private static class StageLinkage
Expand Down Expand Up @@ -637,10 +638,7 @@ public void processScheduleResults(StageState newState, Set<RemoteTask> newTasks
} }


// Add an exchange location to the parent stage for each new task // Add an exchange location to the parent stage for each new task
Set<URI> newExchangeLocations = newTasks.stream() parent.addExchangeLocations(currentStageFragmentId, newTasks, noMoreTasks);
.map(task -> task.getTaskStatus().getSelf())
.collect(toImmutableSet());
parent.addExchangeLocations(currentStageFragmentId, newExchangeLocations, noMoreTasks);


if (!childOutputBufferManagers.isEmpty()) { if (!childOutputBufferManagers.isEmpty()) {
// Add an output buffer to the child stages for each new task // Add an output buffer to the child stages for each new task
Expand Down

0 comments on commit 2ec80be

Please sign in to comment.