Skip to content

Commit

Permalink
Fixes for compaction to use correct types
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Nov 12, 2015
1 parent c9b64b0 commit 02029ad
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 73 deletions.
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.airlift.stats.DistributionStat; import io.airlift.stats.DistributionStat;
import io.airlift.units.Duration; import io.airlift.units.Duration;
import org.weakref.jmx.Managed; import org.weakref.jmx.Managed;
Expand All @@ -40,11 +41,15 @@


import static com.facebook.presto.raptor.storage.Row.extractRow; import static com.facebook.presto.raptor.storage.Row.extractRow;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;


public final class ShardCompactor public final class ShardCompactor
{ {
private static final Logger log = Logger.get(ShardCompactor.class);

private final StorageManager storageManager; private final StorageManager storageManager;


private final DistributionStat inputShardsPerCompaction = new DistributionStat(); private final DistributionStat inputShardsPerCompaction = new DistributionStat();
Expand Down Expand Up @@ -111,11 +116,15 @@ public List<ShardInfo> compactSorted(long transactionId, Set<UUID> uuids, List<C
checkArgument(sortColumnIds.size() == sortOrders.size(), "sortColumnIds and sortOrders must be of the same size"); checkArgument(sortColumnIds.size() == sortOrders.size(), "sortColumnIds and sortOrders must be of the same size");


long start = System.nanoTime(); long start = System.nanoTime();

List<Long> columnIds = columns.stream().map(ColumnInfo::getColumnId).collect(toList()); List<Long> columnIds = columns.stream().map(ColumnInfo::getColumnId).collect(toList());
List<Type> columnTypes = columns.stream().map(ColumnInfo::getType).collect(toList()); List<Type> columnTypes = columns.stream().map(ColumnInfo::getType).collect(toList());


checkArgument(columnIds.containsAll(sortColumnIds), "sortColumnIds must be a subset of columnIds"); checkArgument(columnIds.containsAll(sortColumnIds), "sortColumnIds must be a subset of columnIds");
List<Integer> sortIndexes = ImmutableList.copyOf(sortColumnIds.stream().map(sortColumnIds::indexOf).collect(toList()));
List<Integer> sortIndexes = sortColumnIds.stream()
.map(columnIds::indexOf)
.collect(toList());


Queue<SortedRowSource> rowSources = new PriorityQueue<>(); Queue<SortedRowSource> rowSources = new PriorityQueue<>();
StoragePageSink outputPageSink = storageManager.createStoragePageSink(transactionId, columnIds, columnTypes); StoragePageSink outputPageSink = storageManager.createStoragePageSink(transactionId, columnIds, columnTypes);
Expand Down Expand Up @@ -148,6 +157,8 @@ public List<ShardInfo> compactSorted(long transactionId, Set<UUID> uuids, List<C
outputShardsPerCompaction.add(shardInfos.size()); outputShardsPerCompaction.add(shardInfos.size());
sortedCompactionLatencyMillis.add(Duration.nanosSince(start).toMillis()); sortedCompactionLatencyMillis.add(Duration.nanosSince(start).toMillis());


log.info(format("Compacted shards %s into %s", uuids, shardInfos.stream().map(ShardInfo::getShardUuid).collect(toSet())));

return shardInfos; return shardInfos;
} }
catch (IOException | RuntimeException e) { catch (IOException | RuntimeException e) {
Expand Down Expand Up @@ -234,15 +245,16 @@ public int compareTo(SortedRowSource other)
} }


for (int i = 0; i < sortIndexes.size(); i++) { for (int i = 0; i < sortIndexes.size(); i++) {
int index = sortIndexes.get(i); int channel = sortIndexes.get(i);
Type type = columnTypes.get(channel);


Block leftBlock = currentPage.getBlock(index); Block leftBlock = currentPage.getBlock(channel);
int leftBlockPosition = currentPosition; int leftBlockPosition = currentPosition;


Block rightBlock = other.currentPage.getBlock(index); Block rightBlock = other.currentPage.getBlock(channel);
int rightBlockPosition = other.currentPosition; int rightBlockPosition = other.currentPosition;


int compare = sortOrders.get(i).compareBlockValue(columnTypes.get(i), leftBlock, leftBlockPosition, rightBlock, rightBlockPosition); int compare = sortOrders.get(i).compareBlockValue(type, leftBlock, leftBlockPosition, rightBlock, rightBlockPosition);
if (compare != 0) { if (compare != 0) {
return compare; return compare;
} }
Expand Down
Expand Up @@ -561,6 +561,12 @@ private OrcStorageManager createOrcStorageManager(int maxShardRows, DataSize max


public static OrcStorageManager createOrcStorageManager(IDBI dbi, File temporary) public static OrcStorageManager createOrcStorageManager(IDBI dbi, File temporary)
throws IOException throws IOException
{
return createOrcStorageManager(dbi, temporary, MAX_SHARD_ROWS);
}

public static OrcStorageManager createOrcStorageManager(IDBI dbi, File temporary, int maxShardRows)
throws IOException
{ {
File directory = new File(temporary, "data"); File directory = new File(temporary, "data");
StorageService storageService = new FileStorageService(directory); StorageService storageService = new FileStorageService(directory);
Expand All @@ -584,7 +590,7 @@ public static OrcStorageManager createOrcStorageManager(IDBI dbi, File temporary
backupStore, backupStore,
recoveryManager, recoveryManager,
new InMemoryShardRecorder(), new InMemoryShardRecorder(),
MAX_SHARD_ROWS, maxShardRows,
MAX_FILE_SIZE); MAX_FILE_SIZE);
} }


Expand Down

0 comments on commit 02029ad

Please sign in to comment.