Skip to content

Commit

Permalink
Refactor some code in ShardCompactionManager
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Mar 21, 2016
1 parent cd379a9 commit 99abec6
Showing 1 changed file with 43 additions and 20 deletions.
Expand Up @@ -49,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.UUID;
Expand All @@ -69,6 +70,7 @@
import static com.facebook.presto.spi.type.DateType.DATE;
import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.Iterables.partition;
import static com.google.common.collect.Maps.uniqueIndex;
Expand Down Expand Up @@ -227,33 +229,54 @@ private void discoverShards()
continue;
}

Set<ShardMetadata> shards = entry.getValue().stream()
.filter(this::needsCompaction)
.filter(shard -> !shardsInProgress.contains(shard.getShardUuid()))
.collect(toSet());

if (shards.size() <= 1) {
Optional<Long> temporalColumnId = Optional.ofNullable(metadataDao.getTemporalColumnId(tableId));
if (temporalColumnId.isPresent() && !isValidTemporalColumn(tableId, temporalColumnId.get())) {
continue;
}

Long temporalColumnId = metadataDao.getTemporalColumnId(tableId);
CompactionSetCreator compactionSetCreator;
if (temporalColumnId == null) {
compactionSetCreator = new FileCompactionSetCreator(maxShardSize, maxShardRows);
}
else {
Type type = metadataDao.getTableColumn(tableId, temporalColumnId).getDataType();
if (!type.equals(DATE) && !type.equals(TIMESTAMP)) {
log.warn("Temporal column type of table ID %s set incorrectly to %s", tableId, type);
continue;
}
compactionSetCreator = new TemporalCompactionSetCreator(maxShardSize, maxShardRows, type);
shards = filterShardsWithTemporalMetadata(shards, tableId, temporalColumnId);
}
CompactionSetCreator compactionSetCreator = getCompactionSetCreator(tableId, temporalColumnId);
Set<ShardMetadata> shards = getFilteredShards(entry.getValue(), tableId, temporalColumnId);

addToCompactionQueue(compactionSetCreator, tableId, shards);
}
}

private Set<ShardMetadata> getFilteredShards(List<ShardMetadata> shardMetadatas, long tableId, Optional<Long> temporalColumnId)
{
Set<ShardMetadata> shards = shardMetadatas.stream()
.filter(this::needsCompaction)
.filter(shard -> !shardsInProgress.contains(shard.getShardUuid()))
.collect(toSet());

if (temporalColumnId.isPresent()) {
shards = filterShardsWithTemporalMetadata(shards, tableId, temporalColumnId.get());
}

return shards;
}

private CompactionSetCreator getCompactionSetCreator(long tableId, Optional<Long> temporalColumnId)
{
if (!temporalColumnId.isPresent()) {
return new FileCompactionSetCreator(maxShardSize, maxShardRows);
}

checkState(isValidTemporalColumn(tableId, temporalColumnId.get()), "invalid temporal column type");
Type type = metadataDao.getTableColumn(tableId, temporalColumnId.get()).getDataType();

return new TemporalCompactionSetCreator(maxShardSize, maxShardRows, type);
}

private boolean isValidTemporalColumn(long tableId, long temporalColumnId)
{
Type type = metadataDao.getTableColumn(tableId, temporalColumnId).getDataType();
if (!type.equals(DATE) && !type.equals(TIMESTAMP)) {
log.warn("Temporal column type of table ID %s set incorrectly to %s", tableId, type);
return false;
}
return true;
}

/**
* @return shards that have temporal information
*/
Expand Down

0 comments on commit 99abec6

Please sign in to comment.