Skip to content

Commit

Permalink
Add shard size constraint to Raptor shards
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Mar 12, 2015
1 parent 327bb85 commit c71414c
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 21 deletions.
Expand Up @@ -51,11 +51,8 @@ public class RaptorPageSink
private final List<Integer> sortFields;
private final List<SortOrder> sortOrders;

private final long maxRowCount;
private final PageBuffer pageBuffer;

private long rowCount;

public RaptorPageSink(
PageSorter pageSorter,
StorageManager storageManager,
Expand All @@ -80,10 +77,7 @@ public RaptorPageSink(
this.sortTypes = ImmutableList.copyOf(sortFields.stream().map(columnTypes::get).collect(toList()));
this.sortOrders = ImmutableList.copyOf(checkNotNull(sortOrders, "sortOrders is null"));

this.maxRowCount = storageManager.getMaxRowCount();
this.pageBuffer = new PageBuffer(storageManager.getMaxBufferSize().toBytes());

this.rowCount = 0;
}

@Override
Expand All @@ -100,7 +94,6 @@ public void appendPage(Page page, Block sampleWeightBlock)
}

pageBuffer.add(page);
rowCount += page.getPositionCount();
}

@Override
Expand Down Expand Up @@ -153,11 +146,10 @@ private Page createPageWithSampleWeightBlock(Page page, Block sampleWeightBlock)
*/
private void flushPageBufferIfNecessary(int rowsToAdd)
{
if (rowCount >= maxRowCount) {
if (storagePageSink.isFull()) {
// This StoragePageSink is full, flush it for the next batch of pages
flushPages(pageBuffer.getPages());
pageBuffer.reset();
rowCount = 0;
storagePageSink.flush();
return;
}
Expand Down
Expand Up @@ -78,6 +78,7 @@ public class OrcFileWriter
private final Object row;

private long rowCount;
private long uncompressedSize;

public OrcFileWriter(List<Long> columnIds, List<Type> columnTypes, File target)
{
Expand Down Expand Up @@ -108,6 +109,7 @@ public void appendPages(List<Page> pages)
appendRow(page, position);
}
}
updateDataSize(pages);
}

public void appendPages(List<Page> inputPages, int[] pageIndexes, int[] positionIndexes)
Expand All @@ -117,6 +119,14 @@ public void appendPages(List<Page> inputPages, int[] pageIndexes, int[] position
Page page = inputPages.get(pageIndexes[i]);
appendRow(page, positionIndexes[i]);
}
updateDataSize(inputPages);
}

private void updateDataSize(List<Page> pages)
{
for (Page page : pages) {
uncompressedSize += page.getSizeInBytes();
}
}

private void appendRow(Page page, int position)
Expand Down Expand Up @@ -151,6 +161,11 @@ public long getRowCount()
return rowCount;
}

public long getUncompressedSize()
{
return uncompressedSize;
}

private static OrcSerde createSerializer(Configuration conf, Properties properties)
{
OrcSerde serde = new OrcSerde();
Expand Down
Expand Up @@ -72,6 +72,7 @@ public class OrcStorageManager
private final ShardRecoveryManager recoveryManager;
private final Duration recoveryTimeout;
private final long rowsPerShard;
private final DataSize maxShardSize;
private final DataSize maxBufferSize;

@Inject
Expand All @@ -87,6 +88,7 @@ public OrcStorageManager(
recoveryManager,
config.getShardRecoveryTimeout(),
config.getRowsPerShard(),
config.getMaxShardSize(),
config.getMaxBufferSize());
}

Expand All @@ -97,6 +99,7 @@ public OrcStorageManager(
ShardRecoveryManager recoveryManager,
Duration shardRecoveryTimeout,
long rowsPerShard,
DataSize maxShardSize,
DataSize maxBufferSize)
{
this.nodeId = checkNotNull(nodeId, "nodeId is null");
Expand All @@ -107,6 +110,7 @@ public OrcStorageManager(

checkArgument(rowsPerShard > 0, "rowsPerShard must be > 0");
this.rowsPerShard = rowsPerShard;
this.maxShardSize = checkNotNull(maxShardSize, "maxShardSize is null");
this.maxBufferSize = checkNotNull(maxBufferSize, "maxBufferSize is null");
}

Expand Down Expand Up @@ -152,7 +156,7 @@ public ConnectorPageSource getPageSource(UUID shardUuid, List<Long> columnIds, L
@Override
public StoragePageSink createStoragePageSink(List<Long> columnIds, List<Type> columnTypes)
{
return new OrcStoragePageSink(columnIds, columnTypes);
return new OrcStoragePageSink(columnIds, columnTypes, rowsPerShard, maxShardSize);
}

private void writeShard(UUID shardUuid)
Expand Down Expand Up @@ -181,12 +185,6 @@ private void writeShard(UUID shardUuid)
}
}

@Override
public long getMaxRowCount()
{
return rowsPerShard;
}

@Override
public DataSize getMaxBufferSize()
{
Expand Down Expand Up @@ -273,13 +271,17 @@ private class OrcStoragePageSink
private final List<Type> columnTypes;

private final List<ShardInfo> shards = new ArrayList<>();
private final long rowsPerShard;
private final DataSize maxShardSize;

private boolean committed;
private OrcFileWriter writer;
private UUID shardUuid;

public OrcStoragePageSink(List<Long> columnIds, List<Type> columnTypes)
public OrcStoragePageSink(List<Long> columnIds, List<Type> columnTypes, long rowsPerShard, DataSize maxShardSize)
{
this.rowsPerShard = rowsPerShard;
this.maxShardSize = maxShardSize;
this.columnIds = ImmutableList.copyOf(checkNotNull(columnIds, "columnIds is null"));
this.columnTypes = ImmutableList.copyOf(checkNotNull(columnTypes, "columnTypes is null"));
}
Expand All @@ -298,6 +300,15 @@ public void appendPages(List<Page> inputPages, int[] pageIndexes, int[] position
writer.appendPages(inputPages, pageIndexes, positionIndexes);
}

@Override
public boolean isFull()
{
if (writer == null) {
return false;
}
return (writer.getRowCount() >= rowsPerShard) || (writer.getUncompressedSize() >= maxShardSize.toBytes());
}

@Override
public void flush()
{
Expand All @@ -308,7 +319,7 @@ public void flush()
List<ColumnStats> columns = computeShardStats(stagingFile, columnIds, columnTypes);
Set<String> nodes = ImmutableSet.of(nodeId);
long rowCount = writer.getRowCount();
long dataSize = stagingFile.length();
long dataSize = stagingFile.length(); // compressed size

shards.add(new ShardInfo(shardUuid, nodes, columns, rowCount, dataSize));

Expand Down
Expand Up @@ -30,7 +30,5 @@ public interface StorageManager

boolean isBackupAvailable();

long getMaxRowCount();

DataSize getMaxBufferSize();
}
Expand Up @@ -17,6 +17,7 @@
import io.airlift.configuration.ConfigDescription;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.MaxDataSize;
import io.airlift.units.MinDataSize;

import javax.annotation.Nullable;
Expand All @@ -37,7 +38,9 @@ public class StorageManagerConfig
private Duration missingShardDiscoveryInterval = new Duration(5, TimeUnit.MINUTES);
private DataSize orcMaxMergeDistance = new DataSize(1, MEGABYTE);
private int recoveryThreads = 10;

private long rowsPerShard = 1_000_000;
private DataSize maxShardSize = new DataSize(256, MEGABYTE);
private DataSize maxBufferSize = new DataSize(256, MEGABYTE);

@NotNull
Expand Down Expand Up @@ -136,6 +139,21 @@ public StorageManagerConfig setRowsPerShard(long rowsPerShard)
return this;
}

@MinDataSize("1MB")
@MaxDataSize("1GB")
public DataSize getMaxShardSize()
{
return maxShardSize;
}

@Config("storage.max-shard-size")
@ConfigDescription("Approximate maximum uncompressed size of a shard")
public StorageManagerConfig setMaxShardSize(DataSize maxShardSize)
{
this.maxShardSize = maxShardSize;
return this;
}

@MinDataSize("1MB")
public DataSize getMaxBufferSize()
{
Expand Down
Expand Up @@ -24,6 +24,8 @@ public interface StoragePageSink

void appendPages(List<Page> pages, int[] pageIndexes, int[] positionIndexes);

boolean isFull();

void flush();

List<ShardInfo> commit();
Expand Down
Expand Up @@ -71,6 +71,7 @@
import static io.airlift.slice.Slices.utf8Slice;
import static io.airlift.slice.Slices.wrappedBuffer;
import static io.airlift.testing.FileUtils.deleteRecursively;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
Expand All @@ -93,6 +94,7 @@ public class TestOrcStorageManager
private static final Duration SHARD_RECOVERY_TIMEOUT = new Duration(30, TimeUnit.SECONDS);
private static final DataSize MAX_BUFFER_SIZE = new DataSize(256, MEGABYTE);
private static final int ROWS_PER_SHARD = 100;
private static final DataSize MAX_FILE_SIZE = new DataSize(1, MEGABYTE);

private final NodeManager nodeManager = new InMemoryNodeManager();
private Handle dummyHandle;
Expand Down Expand Up @@ -382,9 +384,54 @@ public void testShardStatsDateTimestamp()
assertColumnStats(stats, 2, minTimestamp, maxTimestamp);
}

@Test
public void testRowsPerShard()
throws Exception
{
OrcStorageManager manager = createOrcStorageManager(storageService, recoveryManager, 2, new DataSize(2, MEGABYTE));

List<Long> columnIds = ImmutableList.of(3L, 7L);
List<Type> columnTypes = ImmutableList.<Type>of(BIGINT, VARCHAR);

StoragePageSink sink = manager.createStoragePageSink(columnIds, columnTypes);
List<Page> pages = rowPagesBuilder(columnTypes)
.row(123, "hello")
.row(456, "bye")
.build();
sink.appendPages(pages);
assertTrue(sink.isFull());
}

@Test
public void testMaxFileSize()
throws Exception
{
List<Long> columnIds = ImmutableList.of(3L, 7L);
List<Type> columnTypes = ImmutableList.<Type>of(BIGINT, VARCHAR);

List<Page> pages = rowPagesBuilder(columnTypes)
.row(123, "hello")
.row(456, "bye")
.build();
long dataSize = 0;
for (Page page : pages) {
dataSize += page.getSizeInBytes();
}

OrcStorageManager manager = createOrcStorageManager(storageService, recoveryManager, 20, new DataSize(dataSize, BYTE));
StoragePageSink sink = manager.createStoragePageSink(columnIds, columnTypes);
sink.appendPages(pages);
assertTrue(sink.isFull());
}

public static OrcStorageManager createOrcStorageManager(StorageService storageService, ShardRecoveryManager recoveryManager)
{
return new OrcStorageManager(CURRENT_NODE, storageService, ORC_MAX_MERGE_DISTANCE, recoveryManager, SHARD_RECOVERY_TIMEOUT, ROWS_PER_SHARD, MAX_BUFFER_SIZE);
return createOrcStorageManager(storageService, recoveryManager, ROWS_PER_SHARD, MAX_FILE_SIZE);
}

public static OrcStorageManager createOrcStorageManager(StorageService storageService, ShardRecoveryManager recoveryManager, int rowsPerShard, DataSize maxFileSize)
{
return new OrcStorageManager(CURRENT_NODE, storageService, ORC_MAX_MERGE_DISTANCE, recoveryManager, SHARD_RECOVERY_TIMEOUT, rowsPerShard, maxFileSize, MAX_BUFFER_SIZE);
}

private static void assertColumnStats(List<ColumnStats> list, long columnId, Object min, Object max)
Expand Down
Expand Up @@ -45,6 +45,7 @@ public void testDefaults()
.setMissingShardDiscoveryInterval(new Duration(5, MINUTES))
.setRecoveryThreads(10)
.setRowsPerShard(1_000_000)
.setMaxShardSize(new DataSize(256, MEGABYTE))
.setMaxBufferSize(new DataSize(256, MEGABYTE)));

}
Expand All @@ -60,6 +61,7 @@ public void testExplicitPropertyMappings()
.put("storage.missing-shard-discovery-interval", "4m")
.put("storage.max-recovery-threads", "12")
.put("storage.rows-per-shard", "10000")
.put("storage.max-shard-size", "10MB")
.put("storage.max-buffer-size", "512MB")
.build();

Expand All @@ -71,6 +73,7 @@ public void testExplicitPropertyMappings()
.setMissingShardDiscoveryInterval(new Duration(4, MINUTES))
.setRecoveryThreads(12)
.setRowsPerShard(10_000)
.setMaxShardSize(new DataSize(10, MEGABYTE))
.setMaxBufferSize(new DataSize(512, MEGABYTE));

assertFullMapping(properties, expected);
Expand Down

0 comments on commit c71414c

Please sign in to comment.