Skip to content

Commit

Permalink
Add PageWriter abstraction to RaptorPageSink
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Feb 3, 2016
1 parent 2796829 commit 62100c7
Showing 1 changed file with 46 additions and 7 deletions.
Expand Up @@ -28,6 +28,7 @@
import io.airlift.slice.Slices; import io.airlift.slice.Slices;
import io.airlift.units.DataSize; import io.airlift.units.DataSize;


import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
Expand All @@ -51,7 +52,7 @@ public class RaptorPageSink
private final List<SortOrder> sortOrders; private final List<SortOrder> sortOrders;
private final long maxBufferBytes; private final long maxBufferBytes;


private final PageBuffer pageBuffer; private final PageWriter pageWriter;


public RaptorPageSink( public RaptorPageSink(
PageSorter pageSorter, PageSorter pageSorter,
Expand Down Expand Up @@ -79,7 +80,7 @@ public RaptorPageSink(
this.sortFields = ImmutableList.copyOf(sortColumnIds.stream().map(columnIds::indexOf).collect(toList())); this.sortFields = ImmutableList.copyOf(sortColumnIds.stream().map(columnIds::indexOf).collect(toList()));
this.sortOrders = ImmutableList.copyOf(requireNonNull(sortOrders, "sortOrders is null")); this.sortOrders = ImmutableList.copyOf(requireNonNull(sortOrders, "sortOrders is null"));


this.pageBuffer = createPageBuffer(OptionalInt.empty()); this.pageWriter = new SimplePageWriter();
} }


@Override @Override
Expand All @@ -93,15 +94,17 @@ public void appendPage(Page page, Block sampleWeightBlock)
page = createPageWithSampleWeightBlock(page, sampleWeightBlock); page = createPageWithSampleWeightBlock(page, sampleWeightBlock);
} }


pageBuffer.add(page); pageWriter.appendPage(page);
} }


@Override @Override
public Collection<Slice> finish() public Collection<Slice> finish()
{ {
pageBuffer.flush(); List<ShardInfo> shards = new ArrayList<>();

for (PageBuffer pageBuffer : pageWriter.getPageBuffers()) {
List<ShardInfo> shards = pageBuffer.getStoragePageSink().commit(); pageBuffer.flush();
shards.addAll(pageBuffer.getStoragePageSink().commit());
}


ImmutableList.Builder<Slice> fragments = ImmutableList.builder(); ImmutableList.Builder<Slice> fragments = ImmutableList.builder();
for (ShardInfo shard : shards) { for (ShardInfo shard : shards) {
Expand All @@ -113,7 +116,18 @@ public Collection<Slice> finish()
@Override @Override
public void abort() public void abort()
{ {
pageBuffer.getStoragePageSink().rollback(); RuntimeException error = new RuntimeException("Exception during rollback");
for (PageBuffer pageBuffer : pageWriter.getPageBuffers()) {
try {
pageBuffer.getStoragePageSink().rollback();
}
catch (Throwable t) {
error.addSuppressed(t);
}
}
if (error.getSuppressed().length > 0) {
throw error;
}
} }


private PageBuffer createPageBuffer(OptionalInt bucketNumber) private PageBuffer createPageBuffer(OptionalInt bucketNumber)
Expand Down Expand Up @@ -147,4 +161,29 @@ private Page createPageWithSampleWeightBlock(Page page, Block sampleWeightBlock)
} }
return new Page(blocks); return new Page(blocks);
} }

private interface PageWriter
{
void appendPage(Page page);

List<PageBuffer> getPageBuffers();
}

private class SimplePageWriter
implements PageWriter
{
private final PageBuffer pageBuffer = createPageBuffer(OptionalInt.empty());

@Override
public void appendPage(Page page)
{
pageBuffer.add(page);
}

@Override
public List<PageBuffer> getPageBuffers()
{
return ImmutableList.of(pageBuffer);
}
}
} }

0 comments on commit 62100c7

Please sign in to comment.