Skip to content

Commit

Permalink
Fix deadlock in SqlStageExecution.waitForNewExchangesOrBuffers
Browse files Browse the repository at this point in the history
The waiting code only breaks if all sources are completed, but
once a source completes we must set no more splits on the tasks.
Upon review of the waiting code, this entire method can be replaced
with a simple timed wait.
  • Loading branch information
dain committed Apr 29, 2015
1 parent a32f88b commit 04ad885
Showing 1 changed file with 13 additions and 34 deletions.
Expand Up @@ -775,7 +775,19 @@ private void updateNewExchangesAndBuffers(boolean waitUntilFinished)
return;
}

waitForNewExchangesOrBuffers();
synchronized (this) {
// wait for a state change
//
// NOTE this must be a wait with a timeout since there is no notification
// for new exchanges from the child stages
try {
TimeUnit.MILLISECONDS.timedWait(this, 100);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
}
}
}

Expand Down Expand Up @@ -811,39 +823,6 @@ private boolean addNewExchangesAndBuffers()
return finished;
}

private synchronized void waitForNewExchangesOrBuffers()
{
while (!getState().isDone()) {
// if next loop will finish, don't wait
Set<PlanNodeId> completeSources = updateCompleteSources();
boolean allSourceComplete = completeSources.containsAll(allSources);
if (allSourceComplete && getCurrentOutputBuffers().isNoMoreBufferIds()) {
return;
}
// do we have a new set of output buffers?
if (nextOutputBuffers != null) {
return;
}

// do we have new exchange locations?
if (!getNewExchangeLocations().isEmpty()) {
return;
}

// wait for a state change
//
// NOTE this must be a wait with a timeout since there is no notification
// for new exchanges from the child stages
try {
TimeUnit.MILLISECONDS.timedWait(this, 100);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
}
}

private Set<PlanNodeId> updateCompleteSources()
{
for (RemoteSourceNode remoteSourceNode : fragment.getRemoteSourceNodes()) {
Expand Down

0 comments on commit 04ad885

Please sign in to comment.