Skip to content

Commit

Permalink
Consider rowCount for creating a compaction set
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Sep 2, 2015
1 parent 9e35cba commit cf70280
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 13 deletions.
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Verify.verify;
import static java.util.Comparator.comparing;
Expand All @@ -30,10 +31,13 @@ public final class FileCompactionSetCreator
implements CompactionSetCreator
{
private final DataSize maxShardSize;
private final long maxShardRows;

public FileCompactionSetCreator(DataSize maxShardSize)
public FileCompactionSetCreator(DataSize maxShardSize, long maxShardRows)
{
this.maxShardSize = checkNotNull(maxShardSize, "maxShardSize is null");
checkArgument(maxShardRows > 0, "maxShardRows must be > 0");
this.maxShardRows = maxShardRows;
}

@Override
Expand All @@ -60,12 +64,15 @@ private Set<ShardMetadata> getCompactionSet(List<ShardMetadata> shardMetadata)
ImmutableSet.Builder<ShardMetadata> shards = ImmutableSet.builder();
long maxShardSizeBytes = maxShardSize.toBytes();
long consumedBytes = 0;
long consumedRows = 0;

for (ShardMetadata shard : shardMetadata) {
long uncompressedSize = shard.getUncompressedSize();
if (consumedBytes + uncompressedSize > maxShardSizeBytes) {
if ((consumedBytes + shard.getUncompressedSize() > maxShardSizeBytes) ||
(consumedRows + shard.getRowCount() > maxShardRows)) {
break;
}
consumedBytes += uncompressedSize;
consumedBytes += shard.getUncompressedSize();
consumedRows += shard.getRowCount();
shards.add(shard);
}
return shards.build();
Expand Down
Expand Up @@ -196,11 +196,11 @@ private void discoverShards()
Long temporalColumnId = metadataDao.getTemporalColumnId(tableId);
CompactionSetCreator compactionSetCreator;
if (temporalColumnId == null) {
compactionSetCreator = new FileCompactionSetCreator(maxShardSize);
compactionSetCreator = new FileCompactionSetCreator(maxShardSize, maxShardRows);
}
else {
Type type = metadataDao.getTableColumn(tableId, temporalColumnId).getDataType();
compactionSetCreator = new TemporalCompactionSetCreator(maxShardSize, type);
compactionSetCreator = new TemporalCompactionSetCreator(maxShardSize, maxShardRows, type);
shards = filterShardsWithTemporalMetadata(shardMetadata, tableId, temporalColumnId);
}
addToCompactionQueue(compactionSetCreator, tableId, shards);
Expand Down
Expand Up @@ -38,13 +38,17 @@ public class TemporalCompactionSetCreator
{
private final long maxShardSizeBytes;
private final Type type;
private final long maxShardRows;

public TemporalCompactionSetCreator(DataSize maxShardSize, Type type)
public TemporalCompactionSetCreator(DataSize maxShardSize, long maxShardRows, Type type)
{
requireNonNull(maxShardSize, "maxShardSize is null");
checkArgument(type.equals(DATE) || type.equals(TIMESTAMP), "type must be timestamp or date");

this.maxShardSizeBytes = maxShardSize.toBytes();

checkArgument(maxShardRows > 0, "maxShardRows must be > 0");
this.maxShardRows = maxShardRows;
this.type = requireNonNull(type, "type is null");
}

Expand All @@ -66,17 +70,21 @@ public Set<CompactionSet> createCompactionSets(long tableId, Set<ShardMetadata>
.collect(toList());

long consumedBytes = 0;
long consumedRows = 0;
ImmutableSet.Builder<ShardMetadata> shardsToCompact = ImmutableSet.builder();

for (ShardMetadata shard : shards) {
if ((consumedBytes + shard.getUncompressedSize()) > maxShardSizeBytes) {
if (((consumedBytes + shard.getUncompressedSize()) > maxShardSizeBytes) ||
(consumedRows + shard.getRowCount() > maxShardRows)) {
// Finalize this compaction set, and start a new one for the rest of the shards
compactionSets.add(new CompactionSet(tableId, shardsToCompact.build()));
shardsToCompact = ImmutableSet.builder();
consumedBytes = 0;
consumedRows = 0;
}
shardsToCompact.add(shard);
consumedBytes += shard.getUncompressedSize();
consumedRows += shard.getRowCount();
}
if (!shardsToCompact.build().isEmpty()) {
// create compaction set for the remaining shards of this day
Expand Down
Expand Up @@ -35,11 +35,13 @@

public class TestCompactionSetCreator
{
private static final long MAX_SHARD_ROWS = 10_000;

@Test
public void testNonTemporalCompactionSetSimple()
throws Exception
{
CompactionSetCreator compactionSetCreator = new FileCompactionSetCreator(new DataSize(1, KILOBYTE));
CompactionSetCreator compactionSetCreator = new FileCompactionSetCreator(new DataSize(1, KILOBYTE), MAX_SHARD_ROWS);

// compact into one shard
Set<ShardMetadata> inputShards = ImmutableSet.of(
Expand All @@ -56,7 +58,7 @@ public void testNonTemporalCompactionSetSimple()
public void testNonTemporalCompactionSet()
throws Exception
{
CompactionSetCreator compactionSetCreator = new FileCompactionSetCreator(new DataSize(100, BYTE));
CompactionSetCreator compactionSetCreator = new FileCompactionSetCreator(new DataSize(100, BYTE), MAX_SHARD_ROWS);
long tableId = 1L;

// compact into two shards
Expand All @@ -78,7 +80,7 @@ public void testNonTemporalCompactionSet()
public void testTemporalCompactionNoCompactionAcrossDays()
throws Exception
{
CompactionSetCreator compactionSetCreator = new TemporalCompactionSetCreator(new DataSize(100, BYTE), TIMESTAMP);
CompactionSetCreator compactionSetCreator = new TemporalCompactionSetCreator(new DataSize(100, BYTE), MAX_SHARD_ROWS, TIMESTAMP);
long tableId = 1L;
long day1 = Duration.ofDays(Duration.ofMillis(System.currentTimeMillis()).toDays()).toMillis();
long day2 = Duration.ofDays(Duration.ofMillis(day1).toDays() + 1).toMillis();
Expand All @@ -101,7 +103,7 @@ public void testTemporalCompactionNoCompactionAcrossDays()
public void testTemporalCompactionSpanningDays()
throws Exception
{
CompactionSetCreator compactionSetCreator = new TemporalCompactionSetCreator(new DataSize(100, BYTE), TIMESTAMP);
CompactionSetCreator compactionSetCreator = new TemporalCompactionSetCreator(new DataSize(100, BYTE), MAX_SHARD_ROWS, TIMESTAMP);
long tableId = 1L;
long day1 = Duration.ofDays(Duration.ofMillis(System.currentTimeMillis()).toDays()).toMillis();
long day2 = Duration.ofDays(Duration.ofMillis(day1).toDays() + 1).toMillis();
Expand Down Expand Up @@ -130,7 +132,7 @@ public void testTemporalCompactionSpanningDays()
public void testTemporalCompactionDate()
throws Exception
{
CompactionSetCreator compactionSetCreator = new TemporalCompactionSetCreator(new DataSize(100, BYTE), DATE);
CompactionSetCreator compactionSetCreator = new TemporalCompactionSetCreator(new DataSize(100, BYTE), MAX_SHARD_ROWS, DATE);
long tableId = 1L;
long day1 = Duration.ofMillis(System.currentTimeMillis()).toDays();
long day2 = day1 + 1;
Expand Down

0 comments on commit cf70280

Please sign in to comment.