Skip to content

Commit

Permalink
Temporal compaction for temporal column of type date
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Aug 19, 2015
1 parent e89ffc1 commit 4accbc7
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 8 deletions.
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.raptor.metadata.TableMetadata;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.type.Type;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
Expand Down Expand Up @@ -198,7 +199,8 @@ private void discoverShards()
compactionSetCreator = new FileCompactionSetCreator(maxShardSize);
}
else {
compactionSetCreator = new TemporalCompactionSetCreator(maxShardSize);
Type type = metadataDao.getTableColumn(tableId, temporalColumnId).getDataType();
compactionSetCreator = new TemporalCompactionSetCreator(maxShardSize, type);
shards = filterShardsWithTemporalMetadata(shardMetadata, tableId, temporalColumnId);
}
addToCompactionQueue(compactionSetCreator, tableId, shards);
Expand Down
Expand Up @@ -14,6 +14,9 @@
package com.facebook.presto.raptor.storage;

import com.facebook.presto.raptor.metadata.ShardMetadata;
import com.facebook.presto.spi.type.DateType;
import com.facebook.presto.spi.type.TimestampType;
import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
Expand All @@ -26,18 +29,23 @@
import java.util.List;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

public class TemporalCompactionSetCreator
implements CompactionSetCreator
{
private final long maxShardSizeBytes;
private final Type type;

public TemporalCompactionSetCreator(DataSize maxShardSize)
public TemporalCompactionSetCreator(DataSize maxShardSize, Type type)
{
requireNonNull(maxShardSize, "maxShardSize is null");
checkArgument(type == DateType.DATE || type == TimestampType.TIMESTAMP, "type must be timestamp or date");

this.maxShardSizeBytes = maxShardSize.toBytes();
this.type = requireNonNull(type, "type is null");
}

@Override
Expand All @@ -49,7 +57,7 @@ public Set<CompactionSet> createCompactionSets(long tableId, Set<ShardMetadata>

ImmutableSet.Builder<CompactionSet> compactionSets = ImmutableSet.builder();
// don't compact shards across days
Multimap<Long, ShardMetadata> shardsByDays = getShardsByDays(shardMetadata);
Multimap<Long, ShardMetadata> shardsByDays = getShardsByDays(shardMetadata, type);

for (Collection<ShardMetadata> shardSet : shardsByDays.asMap().values()) {
List<ShardMetadata> shards = shardSet.stream()
Expand Down Expand Up @@ -78,7 +86,7 @@ public Set<CompactionSet> createCompactionSets(long tableId, Set<ShardMetadata>
return compactionSets.build();
}

private static Multimap<Long, ShardMetadata> getShardsByDays(Set<ShardMetadata> shardMetadata)
private static Multimap<Long, ShardMetadata> getShardsByDays(Set<ShardMetadata> shardMetadata, Type type)
{
// bucket shards by the start day
ImmutableMultimap.Builder<Long, ShardMetadata> shardsByDays = ImmutableMultimap.builder();
Expand All @@ -87,14 +95,18 @@ private static Multimap<Long, ShardMetadata> getShardsByDays(Set<ShardMetadata>
shardMetadata.stream()
.filter(shard -> shard.getRangeStart().isPresent() && shard.getRangeEnd().isPresent())
.forEach(shard -> {
long day = determineDay(shard.getRangeStart().getAsLong(), shard.getRangeEnd().getAsLong());
long day = determineDay(shard.getRangeStart().getAsLong(), shard.getRangeEnd().getAsLong(), type);
shardsByDays.put(day, shard);
});
return shardsByDays.build();
}

private static long determineDay(long rangeStart, long rangeEnd)
private static long determineDay(long rangeStart, long rangeEnd, Type type)
{
if (type == DateType.DATE) {
return rangeStart;
}

long startDay = Duration.ofMillis(rangeStart).toDays();
long endDay = Duration.ofMillis(rangeEnd).toDays();
if (startDay == endDay) {
Expand Down
Expand Up @@ -26,6 +26,8 @@
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

import static com.facebook.presto.spi.type.DateType.DATE;
import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
Expand Down Expand Up @@ -76,7 +78,7 @@ public void testNonTemporalCompactionSet()
public void testTemporalCompactionNoCompactionAcrossDays()
throws Exception
{
CompactionSetCreator compactionSetCreator = new TemporalCompactionSetCreator(new DataSize(100, BYTE));
CompactionSetCreator compactionSetCreator = new TemporalCompactionSetCreator(new DataSize(100, BYTE), TIMESTAMP);
long tableId = 1L;
long day1 = Duration.ofDays(Duration.ofMillis(System.currentTimeMillis()).toDays()).toMillis();
long day2 = Duration.ofDays(Duration.ofMillis(day1).toDays() + 1).toMillis();
Expand All @@ -99,7 +101,7 @@ public void testTemporalCompactionNoCompactionAcrossDays()
public void testTemporalCompactionSpanningDays()
throws Exception
{
CompactionSetCreator compactionSetCreator = new TemporalCompactionSetCreator(new DataSize(100, BYTE));
CompactionSetCreator compactionSetCreator = new TemporalCompactionSetCreator(new DataSize(100, BYTE), TIMESTAMP);
long tableId = 1L;
long day1 = Duration.ofDays(Duration.ofMillis(System.currentTimeMillis()).toDays()).toMillis();
long day2 = Duration.ofDays(Duration.ofMillis(day1).toDays() + 1).toMillis();
Expand All @@ -124,6 +126,33 @@ public void testTemporalCompactionSpanningDays()
assertEquals(compactionSets, expected);
}

@Test
public void testTemporalCompactionDate()
throws Exception
{
CompactionSetCreator compactionSetCreator = new TemporalCompactionSetCreator(new DataSize(100, BYTE), DATE);
long tableId = 1L;
long day1 = Duration.ofMillis(System.currentTimeMillis()).toDays();
long day2 = day1 + 1;
long day3 = day1 + 2;

List<ShardMetadata> inputShards = ImmutableList.of(
shardWithRange(10, day1, day1),
shardWithRange(10, day2, day2),
shardWithRange(10, day3, day3),
shardWithRange(10, day1, day3),
shardWithRange(10, day2, day3),
shardWithRange(10, day1, day2));

Set<CompactionSet> actual = compactionSetCreator.createCompactionSets(tableId, ImmutableSet.copyOf(inputShards));
assertEquals(actual.size(), 3);
Set<CompactionSet> expected = ImmutableSet.of(
new CompactionSet(tableId, ImmutableSet.of(inputShards.get(0), inputShards.get(3), inputShards.get(5))),
new CompactionSet(tableId, ImmutableSet.of(inputShards.get(1), inputShards.get(4))),
new CompactionSet(tableId, ImmutableSet.of(inputShards.get(2))));
assertEquals(actual, expected);
}

private static ShardMetadata shardWithSize(long uncompressedSize)
{
return new ShardMetadata(
Expand Down

0 comments on commit 4accbc7

Please sign in to comment.