Skip to content

Commit

Permalink
Add stats to compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Sep 2, 2015
1 parent b26724a commit ee56589
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 14 deletions.
Expand Up @@ -31,9 +31,12 @@
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.airlift.log.Logger;
import io.airlift.stats.CounterStat;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import org.skife.jdbi.v2.IDBI;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
Expand Down Expand Up @@ -67,6 +70,7 @@
import static java.lang.String.format;
import static java.util.Collections.nCopies;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand All @@ -88,7 +92,7 @@ public class ShardCompactionManager
private final AtomicBoolean shutdown = new AtomicBoolean();

// Tracks shards that are scheduled for compaction so that we do not schedule them more than once
private final Set<Long> shardsBeingCompacted = newConcurrentHashSet();
private final Set<Long> shardsInProgress = newConcurrentHashSet();
private final BlockingQueue<CompactionSet> compactionQueue = new LinkedBlockingQueue<>();

private final MetadataDao metadataDao;
Expand All @@ -101,6 +105,9 @@ public class ShardCompactionManager
private final long maxShardRows;
private final IDBI dbi;

private final CounterStat compactionSuccessCount = new CounterStat();
private final CounterStat compactionFailureCount = new CounterStat();

@Inject
public ShardCompactionManager(@ForMetadata IDBI dbi, NodeManager nodeManager, ShardManager shardManager, ShardCompactor compactor, StorageManagerConfig config)
{
Expand Down Expand Up @@ -138,8 +145,14 @@ public ShardCompactionManager(
checkArgument(maxShardRows > 0, "maxShardRows must be > 0");
this.maxShardRows = maxShardRows;

checkArgument(compactionThreads > 0, "compactionThreads must be > 0");
this.compactionService = newFixedThreadPool(compactionThreads, daemonThreadsNamed("shard-compactor-%s"));
this.compactionEnabled = compactionEnabled;
if (compactionEnabled) {
checkArgument(compactionThreads > 0, "compactionThreads must be > 0");
this.compactionService = newFixedThreadPool(compactionThreads, daemonThreadsNamed("shard-compactor-%s"));
}
else {
this.compactionService = null;
}
}

@PostConstruct
Expand Down Expand Up @@ -187,7 +200,7 @@ private void discoverShards()
Set<ShardMetadata> shardMetadata = shardManager.getNodeTableShards(currentNodeIdentifier, tableId);
Set<ShardMetadata> shards = shardMetadata.stream()
.filter(this::needsCompaction)
.filter(shard -> !shardsBeingCompacted.contains(shard.getShardId()))
.filter(shard -> !shardsInProgress.contains(shard.getShardId()))
.collect(toSet());
if (shards.size() <= 1) {
continue;
Expand Down Expand Up @@ -273,7 +286,7 @@ private void addToCompactionQueue(CompactionSetCreator compactionSetCreator, lon

compactionSet.getShardsToCompact().stream()
.map(ShardMetadata::getShardId)
.forEach(shardsBeingCompacted::add);
.forEach(shardsInProgress::add);

compactionQueue.add(compactionSet);
}
Expand All @@ -300,7 +313,16 @@ public void run()
while (!Thread.currentThread().isInterrupted() && !shutdown.get()) {
try {
CompactionSet compactionSet = compactionQueue.take();
compactionService.submit(new CompactionJob(compactionSet));
runAsync(new CompactionJob(compactionSet), compactionService)
.whenComplete((none, throwable) -> {
if (throwable == null) {
compactionSuccessCount.update(1);
}
else {
log.warn(throwable, "Error in compaction");
compactionFailureCount.update(1);
}
});
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -329,13 +351,13 @@ public void run()
TableMetadata tableMetadata = getTableMetadata(compactionSet.getTableId());
List<ShardInfo> newShards = performCompaction(shardUuids, tableMetadata);
shardManager.replaceShardIds(tableMetadata.getTableId(), tableMetadata.getColumns(), shardIds, newShards);
shardsBeingCompacted.removeAll(shardIds);
shardsInProgress.removeAll(shardIds);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
finally {
shardsBeingCompacted.removeAll(shardIds);
shardsInProgress.removeAll(shardIds);
}
}

Expand Down Expand Up @@ -363,4 +385,24 @@ private TableMetadata getTableMetadata(long tableId)
return new TableMetadata(tableId, columns, sortColumnIds);

}

@Managed
public int getShardsInProgress()
{
return shardsInProgress.size();
}

@Managed
@Nested
public CounterStat getCompactionSuccessCount()
{
return compactionSuccessCount;
}

@Managed
@Nested
public CounterStat getCompactionFailureCount()
{
return compactionFailureCount;
}
}
Expand Up @@ -21,9 +21,12 @@
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.SortOrder;
import com.facebook.presto.spi.type.Type;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import io.airlift.stats.DistributionStat;
import io.airlift.units.Duration;
import org.weakref.jmx.Managed;
import org.weakref.jmx.Nested;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -44,6 +47,11 @@ public final class ShardCompactor
{
private final StorageManager storageManager;

private final DistributionStat inputShardsPerCompaction = new DistributionStat();
private final DistributionStat outputShardsPerCompaction = new DistributionStat();
private final DistributionStat compactionLatencyMillis = new DistributionStat();
private final DistributionStat sortedCompactionLatencyMillis = new DistributionStat();

@Inject
public ShardCompactor(StorageManager storageManager)
{
Expand All @@ -53,6 +61,7 @@ public ShardCompactor(StorageManager storageManager)
public List<ShardInfo> compact(Set<UUID> uuids, List<ColumnInfo> columns)
throws IOException
{
long start = System.nanoTime();
List<Long> columnIds = columns.stream().map(ColumnInfo::getColumnId).collect(toList());
List<Type> columnTypes = columns.stream().map(ColumnInfo::getType).collect(toList());

Expand All @@ -71,12 +80,21 @@ public List<ShardInfo> compact(Set<UUID> uuids, List<ColumnInfo> columns)
}
}
}
return storagePageSink.commit();
List<ShardInfo> shardInfos = storagePageSink.commit();

inputShardsPerCompaction.add(uuids.size());
outputShardsPerCompaction.add(shardInfos.size());
compactionLatencyMillis.add(Duration.nanosSince(start).toMillis());

return shardInfos;
}

public List<ShardInfo> compactSorted(Set<UUID> uuids, List<ColumnInfo> columns, List<Long> sortColumnIds, List<SortOrder> sortOrders)
throws IOException
{
checkArgument(sortColumnIds.size() == sortOrders.size(), "sortColumnIds and sortOrders must be of the same size");

long start = System.nanoTime();
List<Long> columnIds = columns.stream().map(ColumnInfo::getColumnId).collect(toList());
List<Type> columnTypes = columns.stream().map(ColumnInfo::getType).collect(toList());

Expand Down Expand Up @@ -108,10 +126,13 @@ public List<ShardInfo> compactSorted(Set<UUID> uuids, List<ColumnInfo> columns,
rowSources.add(rowSource);
}
outputPageSink.flush();
return outputPageSink.commit();
}
catch (IOException exception) {
throw Throwables.propagate(exception);
List<ShardInfo> shardInfos = outputPageSink.commit();

inputShardsPerCompaction.add(uuids.size());
outputShardsPerCompaction.add(shardInfos.size());
sortedCompactionLatencyMillis.add(Duration.nanosSince(start).toMillis());

return shardInfos;
}
finally {
outputPageSink.flush();
Expand Down Expand Up @@ -236,4 +257,32 @@ private static boolean isNullOrEmptyPage(Page nextPage)
{
return nextPage == null || nextPage.getPositionCount() == 0;
}

@Managed
@Nested
public DistributionStat getInputShardsPerCompaction()
{
return inputShardsPerCompaction;
}

@Managed
@Nested
public DistributionStat getOutputShardsPerCompaction()
{
return outputShardsPerCompaction;
}

@Managed
@Nested
public DistributionStat getCompactionLatencyMillis()
{
return compactionLatencyMillis;
}

@Managed
@Nested
public DistributionStat getSortedCompactionLatencyMillis()
{
return sortedCompactionLatencyMillis;
}
}
Expand Up @@ -47,5 +47,7 @@ public void configure(Binder binder)

newExporter(binder).export(ShardRecoveryManager.class).as(generatedNameOf(ShardRecoveryManager.class, connectorId));
newExporter(binder).export(StorageManager.class).as(generatedNameOf(OrcStorageManager.class, connectorId));
newExporter(binder).export(ShardCompactionManager.class).as(generatedNameOf(ShardCompactionManager.class, connectorId));
newExporter(binder).export(ShardCompactor.class).as(generatedNameOf(ShardCompactor.class, connectorId));
}
}

0 comments on commit ee56589

Please sign in to comment.