Skip to content

Commit

Permalink
Create a new storage page sink if we hit the max row count
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Mar 12, 2015
1 parent 3c37213 commit 094b7ac
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 17 deletions.
Expand Up @@ -136,6 +136,15 @@ private Page createPageWithSampleWeightBlock(Page page, Block sampleWeightBlock)
return new Page(blocks);
}

private void flushPageBufferIfNecessary(int rowsToAdd)
{
if (shouldFlush(rowsToAdd)) {
flushPages(pageBuffer.getPages());
pageBuffer.reset();
storagePageSink.flush();
}
}

/**
* Flushes pages in the PageBuffer to StoragePageSink if ANY of the following is true:
* <ul>
Expand All @@ -144,21 +153,9 @@ private Page createPageWithSampleWeightBlock(Page page, Block sampleWeightBlock)
* <li>pageBuffer has more than Integer.MAX_VALUE rows (PagesSorter.sort can sort Integer.MAX_VALUE rows at a time)</li>
* </ul>
*/
private void flushPageBufferIfNecessary(int rowsToAdd)
private boolean shouldFlush(int rowsToAdd)
{
if (storagePageSink.isFull()) {
// This StoragePageSink is full, flush it for the next batch of pages
flushPages(pageBuffer.getPages());
pageBuffer.reset();
storagePageSink.flush();
return;
}

int maxRemainingRows = Integer.MAX_VALUE - Ints.checkedCast(pageBuffer.getRowCount());
if (pageBuffer.isFull() || (!sortFields.isEmpty() && (rowsToAdd > maxRemainingRows))) {
flushPages(pageBuffer.getPages());
pageBuffer.reset();
}
return storagePageSink.isFull() || !pageBuffer.canAddRows(rowsToAdd);
}

private void flushPages(List<Page> pages)
Expand Down
Expand Up @@ -189,7 +189,7 @@ private void writeShard(UUID shardUuid)
@Override
public PageBuffer createPageBuffer()
{
return new PageBuffer(maxBufferSize.toBytes());
return new PageBuffer(maxBufferSize.toBytes(), Integer.MAX_VALUE);
}

@Override
Expand Down
Expand Up @@ -26,13 +26,16 @@ public class PageBuffer
{
private final long maxMemoryBytes;
private final List<Page> pages = new ArrayList<>();
private final long maxRows;

private long usedMemoryBytes;
private long rowCount;

public PageBuffer(long maxMemoryBytes)
public PageBuffer(long maxMemoryBytes, long maxRows)
{
checkArgument(maxMemoryBytes > 0, "maxMemoryBytes must be positive");
checkArgument(maxRows > 0, "maxRows must be positive");
this.maxRows = maxRows;
this.maxMemoryBytes = maxMemoryBytes;
}

Expand All @@ -51,9 +54,14 @@ public void reset()
usedMemoryBytes = 0;
}

public boolean canAddRows(int rowsToAdd)
{
return !isFull() && rowCount + rowsToAdd < maxRows;
}

public boolean isFull()
{
return usedMemoryBytes >= maxMemoryBytes;
return rowCount >= maxRows || usedMemoryBytes >= maxMemoryBytes;
}

public List<Page> getPages()
Expand Down

0 comments on commit 094b7ac

Please sign in to comment.