Skip to content

Commit

Permalink
Fix condition for SharedBuffer completion
Browse files Browse the repository at this point in the history
Currently, the shared buffer is marked as complete when all the client
pages are acknowledged and the shared buffer receives
noMorePages. However, if the client does not see the noMorePages flag,
we can destroy the SharedBuffer when the client thinks it is still
alive. If the SharedBuffer is destroyed, and the client keeps asking for
more pages (because it doesn't know that there are no more pages), we
create a new empty shared buffer for the client and tell the client that
we don't have any moro pages. This makes the client wait indefinitely
for new data.

To fix this, SharedBuffer should be marked complete when both conditions are satisfied:
- client has acknowledged all the pages
- client knows that there will be no more pages

Once the client sees all the pages and the no more pages signal,
explicitly send a delete to the SharedBuffer to indicate completion.
  • Loading branch information
nileema committed Sep 10, 2015
1 parent 90f68df commit fcb6985
Show file tree
Hide file tree
Showing 13 changed files with 271 additions and 132 deletions.
Expand Up @@ -30,6 +30,7 @@ public final class PrestoHeaders
public static final String PRESTO_MAX_SIZE = "X-Presto-Max-Size";
public static final String PRESTO_PAGE_TOKEN = "X-Presto-Page-Sequence-Id";
public static final String PRESTO_PAGE_NEXT_TOKEN = "X-Presto-Page-End-Sequence-Id";
public static final String PRESTO_BUFFER_COMPLETE = "X-Presto-Buffer-Complete";

private PrestoHeaders() {}
}
Expand Up @@ -24,21 +24,21 @@

public class BufferResult
{
public static BufferResult emptyResults(long token, boolean bufferClosed)
public static BufferResult emptyResults(long token, boolean bufferComplete)
{
return new BufferResult(token, token, bufferClosed, ImmutableList.<Page>of());
return new BufferResult(token, token, bufferComplete, ImmutableList.<Page>of());
}

private final long token;
private final long nextToken;
private final boolean bufferClosed;
private final boolean bufferComplete;
private final List<Page> pages;

public BufferResult(long token, long nextToken, boolean bufferClosed, List<Page> pages)
public BufferResult(long token, long nextToken, boolean bufferComplete, List<Page> pages)
{
this.token = token;
this.nextToken = nextToken;
this.bufferClosed = bufferClosed;
this.bufferComplete = bufferComplete;
this.pages = ImmutableList.copyOf(checkNotNull(pages, "pages is null"));
}

Expand All @@ -52,9 +52,9 @@ public long getNextToken()
return nextToken;
}

public boolean isBufferClosed()
public boolean isBufferComplete()
{
return bufferClosed;
return bufferComplete;
}

public List<Page> getPages()
Expand Down Expand Up @@ -84,14 +84,14 @@ public boolean equals(Object o)
BufferResult that = (BufferResult) o;
return Objects.equals(token, that.token) &&
Objects.equals(nextToken, that.nextToken) &&
Objects.equals(bufferClosed, that.bufferClosed) &&
Objects.equals(bufferComplete, that.bufferComplete) &&
Objects.equals(pages, that.pages);
}

@Override
public int hashCode()
{
return Objects.hash(token, nextToken, bufferClosed, pages);
return Objects.hash(token, nextToken, bufferComplete, pages);
}

@Override
Expand All @@ -100,7 +100,7 @@ public String toString()
return toStringHelper(this)
.add("token", token)
.add("nextToken", nextToken)
.add("bufferClosed", bufferClosed)
.add("bufferComplete", bufferComplete)
.add("pages", pages)
.toString();
}
Expand Down
Expand Up @@ -81,6 +81,9 @@ private synchronized void addToMasterBuffer(Page page)
updateMemoryUsage(bytesAdded);
}

/**
* @return at least one page if we have pages in buffer, empty list otherwise
*/
public synchronized List<Page> getPages(DataSize maxSize, long sequenceId)
{
long maxBytes = maxSize.toBytes();
Expand Down
Expand Up @@ -357,7 +357,7 @@ private void checkFlushComplete()

if (state.get() == FLUSHING) {
for (NamedBuffer namedBuffer : namedBuffers.values()) {
if (!namedBuffer.checkCompletion()) {
if (!namedBuffer.isFinished()) {
return;
}
}
Expand Down Expand Up @@ -397,11 +397,6 @@ private void updateState()
// this might have freed up space in the buffers, try to dequeue pages
partitionBuffers.values().forEach(PartitionBuffer::dequeuePages);
}

// remove any completed buffers
if (!state.canAddPages()) {
namedBuffers.values().forEach(SharedBuffer.NamedBuffer::checkCompletion);
}
}
finally {
checkFlushComplete();
Expand Down Expand Up @@ -484,11 +479,17 @@ public BufferResult getPages(long startingSequenceId, DataSize maxSize)
sequenceId = startingSequenceId;
}

if (checkCompletion()) {
if (isFinished()) {
return emptyResults(startingSequenceId, true);
}

List<Page> pages = partitionBuffer.getPages(maxSize, sequenceId);

// if we can't have any more pages, indicate that the buffer is complete
if (pages.isEmpty() && !state.get().canAddPages()) {
return emptyResults(startingSequenceId, true);
}

return new BufferResult(startingSequenceId, startingSequenceId + pages.size(), false, pages);
}

Expand All @@ -497,24 +498,12 @@ public void abort()
checkHoldsLock();

finished.set(true);
checkFlushComplete();
}

public boolean checkCompletion()
public boolean isFinished()
{
checkHoldsLock();
// WARNING: finish must short circuit this call, or the call to checkFlushComplete below will cause an infinite recursion
if (finished.get()) {
return true;
}

long pagesAdded = partitionBuffer.getPageCount();
if (!state.get().canAddPages() && sequenceId.get() >= pagesAdded) {
// WARNING: finish must set before the call to checkFlushComplete of the short circuit above will not trigger and the code enter an infinite recursion
finished.set(true);

// check if master buffer is finished
checkFlushComplete();
}
return finished.get();
}

Expand Down Expand Up @@ -592,7 +581,7 @@ public boolean execute()
checkFlushComplete();

// if we got an empty result, wait for more pages
if (bufferResult.isEmpty() && !bufferResult.isBufferClosed()) {
if (bufferResult.isEmpty() && !bufferResult.isBufferComplete()) {
return false;
}

Expand Down

0 comments on commit fcb6985

Please sign in to comment.