Skip to content

Commit

Permalink
Collect statistics for intermediate data writes separately
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed May 10, 2019
1 parent f7ce54f commit 3b3524f
Show file tree
Hide file tree
Showing 18 changed files with 97 additions and 25 deletions.
Expand Up @@ -161,6 +161,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur
0,
0,
0,
0,
ImmutableList.of(),
0,
true,
Expand Down Expand Up @@ -233,6 +234,7 @@ private QueryStatistics createQueryStatistics(QueryInfo queryInfo)
queryStats.getOutputPositions(),
queryStats.getWrittenOutputLogicalDataSize().toBytes(),
queryStats.getWrittenOutputPositions(),
queryStats.getWrittenIntermediatePhysicalDataSize().toBytes(),
queryStats.getCumulativeUserMemory(),
queryStats.getStageGcStatistics(),
queryStats.getCompletedDrivers(),
Expand Down
Expand Up @@ -439,6 +439,8 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
long writtenOutputLogicalDataSize = 0;
long writtenOutputPhysicalDataSize = 0;

long writtenIntermediatePhysicalDataSize = 0;

ImmutableList.Builder<StageGcStatistics> stageGcStatistics = ImmutableList.builder();

boolean fullyBlocked = rootStage.isPresent();
Expand Down Expand Up @@ -469,24 +471,31 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
blockedReasons.addAll(stageStats.getBlockedReasons());
}

Optional<PlanFragment> plan = stageInfo.getPlan();
if (plan.isPresent() && plan.get().getPartitionedSourceNodes().stream().anyMatch(TableScanNode.class::isInstance)) {
rawInputDataSize += stageStats.getRawInputDataSize().toBytes();
rawInputPositions += stageStats.getRawInputPositions();
if (stageInfo.getPlan().isPresent()) {
PlanFragment plan = stageInfo.getPlan().get();
if (plan.getPartitionedSourceNodes().stream().anyMatch(TableScanNode.class::isInstance)) {
rawInputDataSize += stageStats.getRawInputDataSize().toBytes();
rawInputPositions += stageStats.getRawInputPositions();

processedInputDataSize += stageStats.getProcessedInputDataSize().toBytes();
processedInputPositions += stageStats.getProcessedInputPositions();
}
processedInputDataSize += stageStats.getProcessedInputDataSize().toBytes();
processedInputPositions += stageStats.getProcessedInputPositions();
}

writtenOutputPositions += stageInfo.getStageStats().getOperatorSummaries().stream()
.filter(stats -> stats.getOperatorType().equals(TableWriterOperator.class.getSimpleName()))
.mapToLong(OperatorStats::getInputPositions)
.sum();
writtenOutputLogicalDataSize += stageInfo.getStageStats().getOperatorSummaries().stream()
.filter(stats -> stats.getOperatorType().equals(TableWriterOperator.class.getSimpleName()))
.mapToLong(stats -> stats.getInputDataSize().toBytes())
.sum();
writtenOutputPhysicalDataSize += stageStats.getPhysicalWrittenDataSize().toBytes();
if (plan.isMaterializedExchangeSource()) {
writtenOutputPhysicalDataSize += stageStats.getPhysicalWrittenDataSize().toBytes();
}
else {
writtenOutputPositions += stageInfo.getStageStats().getOperatorSummaries().stream()
.filter(stats -> stats.getOperatorType().equals(TableWriterOperator.class.getSimpleName()))
.mapToLong(OperatorStats::getInputPositions)
.sum();
writtenOutputLogicalDataSize += stageInfo.getStageStats().getOperatorSummaries().stream()
.filter(stats -> stats.getOperatorType().equals(TableWriterOperator.class.getSimpleName()))
.mapToLong(stats -> stats.getInputDataSize().toBytes())
.sum();
writtenOutputPhysicalDataSize += stageStats.getPhysicalWrittenDataSize().toBytes();
}
}

stageGcStatistics.add(stageStats.getGcInfo());

Expand Down Expand Up @@ -553,6 +562,8 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
succinctBytes(writtenOutputLogicalDataSize),
succinctBytes(writtenOutputPhysicalDataSize),

succinctBytes(writtenIntermediatePhysicalDataSize),

stageGcStatistics.build(),

operatorStatsSummary.build());
Expand Down Expand Up @@ -1019,6 +1030,7 @@ private static QueryStats pruneQueryStats(QueryStats queryStats)
queryStats.getWrittenOutputPositions(),
queryStats.getWrittenOutputLogicalDataSize(),
queryStats.getWrittenOutputPhysicalDataSize(),
queryStats.getWrittenIntermediatePhysicalDataSize(),
queryStats.getStageGcStatistics(),
ImmutableList.of()); // Remove the operator summaries as OperatorInfo (especially ExchangeClientStatus) can hold onto a large amount of memory
}
Expand Down
Expand Up @@ -91,6 +91,8 @@ public class QueryStats
private final DataSize writtenOutputLogicalDataSize;
private final DataSize writtenOutputPhysicalDataSize;

private final DataSize writtenIntermediatePhysicalDataSize;

private final List<StageGcStatistics> stageGcStatistics;

private final List<OperatorStats> operatorSummaries;
Expand Down Expand Up @@ -148,6 +150,8 @@ public QueryStats(
@JsonProperty("writtenOutputLogicalDataSize") DataSize writtenOutputLogicalDataSize,
@JsonProperty("writtenOutputPhysicalDataSize") DataSize writtenOutputPhysicalDataSize,

@JsonProperty("writtenIntermediatePhysicalDataSize") DataSize writtenIntermediatePhysicalDataSize,

@JsonProperty("stageGcStatistics") List<StageGcStatistics> stageGcStatistics,

@JsonProperty("operatorSummaries") List<OperatorStats> operatorSummaries)
Expand Down Expand Up @@ -213,6 +217,7 @@ public QueryStats(
this.writtenOutputPositions = writtenOutputPositions;
this.writtenOutputLogicalDataSize = requireNonNull(writtenOutputLogicalDataSize, "writtenOutputLogicalDataSize is null");
this.writtenOutputPhysicalDataSize = requireNonNull(writtenOutputPhysicalDataSize, "writtenOutputPhysicalDataSize is null");
this.writtenIntermediatePhysicalDataSize = requireNonNull(writtenIntermediatePhysicalDataSize, "writtenIntermediatePhysicalDataSize is null");

this.stageGcStatistics = ImmutableList.copyOf(requireNonNull(stageGcStatistics, "stageGcStatistics is null"));

Expand Down Expand Up @@ -264,6 +269,7 @@ public static QueryStats immediateFailureQueryStats()
0,
new DataSize(0, BYTE),
new DataSize(0, BYTE),
new DataSize(0, BYTE),
ImmutableList.of(),
ImmutableList.of());
}
Expand Down Expand Up @@ -515,6 +521,12 @@ public DataSize getWrittenOutputPhysicalDataSize()
return writtenOutputPhysicalDataSize;
}

@JsonProperty
public DataSize getWrittenIntermediatePhysicalDataSize()
{
return writtenIntermediatePhysicalDataSize;
}

@JsonProperty
public List<StageGcStatistics> getStageGcStatistics()
{
Expand Down
Expand Up @@ -52,6 +52,7 @@ public class PlanFragment
private final List<RemoteSourceNode> remoteSourceNodes;
private final PartitioningScheme partitioningScheme;
private final StageExecutionDescriptor stageExecutionDescriptor;
private final boolean materializedExchangeSource;
private final StatsAndCosts statsAndCosts;
private final Optional<String> jsonRepresentation;

Expand All @@ -64,6 +65,7 @@ public PlanFragment(
@JsonProperty("partitionedSources") List<PlanNodeId> partitionedSources,
@JsonProperty("partitioningScheme") PartitioningScheme partitioningScheme,
@JsonProperty("stageExecutionDescriptor") StageExecutionDescriptor stageExecutionDescriptor,
@JsonProperty("materializedExchangeSource") boolean materializedExchangeSource,
@JsonProperty("statsAndCosts") StatsAndCosts statsAndCosts,
@JsonProperty("jsonRepresentation") Optional<String> jsonRepresentation)
{
Expand All @@ -74,6 +76,7 @@ public PlanFragment(
this.partitionedSources = ImmutableList.copyOf(requireNonNull(partitionedSources, "partitionedSources is null"));
this.partitionedSourcesSet = ImmutableSet.copyOf(partitionedSources);
this.stageExecutionDescriptor = requireNonNull(stageExecutionDescriptor, "stageExecutionDescriptor is null");
this.materializedExchangeSource = materializedExchangeSource;
this.statsAndCosts = requireNonNull(statsAndCosts, "statsAndCosts is null");
this.jsonRepresentation = requireNonNull(jsonRepresentation, "jsonRepresentation is null");

Expand Down Expand Up @@ -141,6 +144,12 @@ public StageExecutionDescriptor getStageExecutionDescriptor()
return stageExecutionDescriptor;
}

@JsonProperty
public boolean isMaterializedExchangeSource()
{
return materializedExchangeSource;
}

@JsonProperty
public StatsAndCosts getStatsAndCosts()
{
Expand Down Expand Up @@ -206,17 +215,17 @@ private static void findRemoteSourceNodes(PlanNode node, Builder<RemoteSourceNod

public PlanFragment withBucketToPartition(Optional<int[]> bucketToPartition)
{
return new PlanFragment(id, root, symbols, partitioning, partitionedSources, partitioningScheme.withBucketToPartition(bucketToPartition), stageExecutionDescriptor, statsAndCosts, jsonRepresentation);
return new PlanFragment(id, root, symbols, partitioning, partitionedSources, partitioningScheme.withBucketToPartition(bucketToPartition), stageExecutionDescriptor, materializedExchangeSource, statsAndCosts, jsonRepresentation);
}

public PlanFragment withFixedLifespanScheduleGroupedExecution(List<PlanNodeId> capableTableScanNodes)
{
return new PlanFragment(id, root, symbols, partitioning, partitionedSources, partitioningScheme, StageExecutionDescriptor.fixedLifespanScheduleGroupedExecution(capableTableScanNodes), statsAndCosts, jsonRepresentation);
return new PlanFragment(id, root, symbols, partitioning, partitionedSources, partitioningScheme, StageExecutionDescriptor.fixedLifespanScheduleGroupedExecution(capableTableScanNodes), materializedExchangeSource, statsAndCosts, jsonRepresentation);
}

public PlanFragment withDynamicLifespanScheduleGroupedExecution(List<PlanNodeId> capableTableScanNodes)
{
return new PlanFragment(id, root, symbols, partitioning, partitionedSources, partitioningScheme, StageExecutionDescriptor.dynamicLifespanScheduleGroupedExecution(capableTableScanNodes), statsAndCosts, jsonRepresentation);
return new PlanFragment(id, root, symbols, partitioning, partitionedSources, partitioningScheme, StageExecutionDescriptor.dynamicLifespanScheduleGroupedExecution(capableTableScanNodes), materializedExchangeSource, statsAndCosts, jsonRepresentation);
}

@Override
Expand Down
Expand Up @@ -149,7 +149,9 @@ public SubPlan createSubPlans(Session session, Plan plan, boolean forceSingleNod
idAllocator,
new SymbolAllocator(plan.getTypes().allTypes()));

FragmentProperties properties = new FragmentProperties(new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), plan.getRoot().getOutputSymbols()));
FragmentProperties properties = new FragmentProperties(
new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), plan.getRoot().getOutputSymbols()),
false);
if (forceSingleNode || isForceSingleNodeOutput(session)) {
properties = properties.setSingleNodeDistribution();
}
Expand Down Expand Up @@ -239,6 +241,7 @@ private SubPlan reassignPartitioningHandleIfNecessaryHelper(Session session, Sub
outputPartitioningScheme.isReplicateNullsAndAny(),
outputPartitioningScheme.getBucketToPartition()),
fragment.getStageExecutionDescriptor(),
fragment.isMaterializedExchangeSource(),
fragment.getStatsAndCosts(),
fragment.getJsonRepresentation());

Expand Down Expand Up @@ -315,6 +318,7 @@ private SubPlan buildFragment(PlanNode root, FragmentProperties properties, Plan
schedulingOrder,
properties.getPartitioningScheme(),
StageExecutionDescriptor.ungroupedExecution(),
properties.isMaterializedExchangeSource(),
statsAndCosts.getForSubplan(root),
Optional.of(jsonFragmentPlan(root, fragmentSymbolTypes, metadata.getFunctionManager(), session)));

Expand Down Expand Up @@ -416,7 +420,9 @@ else if (exchange.getType() == ExchangeNode.Type.REPARTITION) {

ImmutableList.Builder<SubPlan> builder = ImmutableList.builder();
for (int sourceIndex = 0; sourceIndex < exchange.getSources().size(); sourceIndex++) {
FragmentProperties childProperties = new FragmentProperties(partitioningScheme.translateOutputLayout(exchange.getInputs().get(sourceIndex)));
FragmentProperties childProperties = new FragmentProperties(
partitioningScheme.translateOutputLayout(exchange.getInputs().get(sourceIndex)),
context.get().isMaterializedExchangeSource());
builder.add(buildSubPlan(exchange.getSources().get(sourceIndex), childProperties, context));
}

Expand Down Expand Up @@ -476,9 +482,9 @@ private PlanNode createRemoteMaterializedExchange(ExchangeNode exchange, Rewrite
partitioningSymbolAssignments.getConstants(),
partitioningMetadata);

FragmentProperties writeProperties = new FragmentProperties(new PartitioningScheme(
Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()),
write.getOutputSymbols()));
FragmentProperties writeProperties = new FragmentProperties(
new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), write.getOutputSymbols()),
true);
writeProperties.setCoordinatorOnlyDistribution();

List<SubPlan> children = ImmutableList.of(buildSubPlan(write, writeProperties, context));
Expand Down Expand Up @@ -675,13 +681,15 @@ private static class FragmentProperties
private final List<SubPlan> children = new ArrayList<>();

private final PartitioningScheme partitioningScheme;
private final boolean materializedExchangeSource;

private Optional<PartitioningHandle> partitioningHandle = Optional.empty();
private final Set<PlanNodeId> partitionedSources = new HashSet<>();

public FragmentProperties(PartitioningScheme partitioningScheme)
public FragmentProperties(PartitioningScheme partitioningScheme, boolean materializedExchangeSource)
{
this.partitioningScheme = partitioningScheme;
this.materializedExchangeSource = materializedExchangeSource;
}

public List<SubPlan> getChildren()
Expand Down Expand Up @@ -800,6 +808,11 @@ public PartitioningScheme getPartitioningScheme()
return partitioningScheme;
}

public boolean isMaterializedExchangeSource()
{
return materializedExchangeSource;
}

public PartitioningHandle getPartitioningHandle()
{
return partitioningHandle.get();
Expand Down
Expand Up @@ -324,6 +324,7 @@ public static String graphvizLogicalPlan(PlanNode plan, TypeProvider types, Sess
ImmutableList.of(plan.getId()),
new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), plan.getOutputSymbols()),
StageExecutionDescriptor.ungroupedExecution(),
false,
StatsAndCosts.empty(),
Optional.empty());
return GraphvizPrinter.printLogical(ImmutableList.of(fragment), session);
Expand Down
Expand Up @@ -155,6 +155,7 @@ public QueryInfo getQueryInfo()
30,
new DataSize(31, BYTE),
new DataSize(32, BYTE),
new DataSize(33, BYTE),
ImmutableList.of(),
ImmutableList.of()),
Optional.empty(),
Expand Down
Expand Up @@ -120,6 +120,7 @@ public MockRemoteTask createTableScanTask(TaskId taskId, InternalNode newNode, L
ImmutableList.of(sourceId),
new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol)),
StageExecutionDescriptor.ungroupedExecution(),
false,
StatsAndCosts.empty(),
Optional.empty());

Expand Down
Expand Up @@ -102,6 +102,7 @@ private TaskTestUtils()
new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(SYMBOL))
.withBucketToPartition(Optional.of(new int[1])),
StageExecutionDescriptor.ungroupedExecution(),
false,
StatsAndCosts.empty(),
Optional.empty());

Expand Down
Expand Up @@ -190,6 +190,8 @@ public class TestQueryStats
new DataSize(31, BYTE),
new DataSize(32, BYTE),

new DataSize(33, BYTE),

ImmutableList.of(new StageGcStatistics(
101,
102,
Expand Down Expand Up @@ -260,9 +262,12 @@ public static void assertExpectedQueryStats(QueryStats actual)
assertEquals(actual.getOutputPositions(), 29);

assertEquals(actual.getWrittenOutputPositions(), 30);

assertEquals(actual.getWrittenOutputLogicalDataSize(), new DataSize(31, BYTE));
assertEquals(actual.getWrittenOutputPhysicalDataSize(), new DataSize(32, BYTE));

assertEquals(actual.getWrittenIntermediatePhysicalDataSize(), new DataSize(33, BYTE));

assertEquals(actual.getStageGcStatistics().size(), 1);
StageGcStatistics gcStatistics = actual.getStageGcStatistics().get(0);
assertEquals(gcStatistics.getStageId(), 101);
Expand Down
Expand Up @@ -177,6 +177,7 @@ private static PlanFragment createExchangePlanFragment()
ImmutableList.of(planNode.getId()),
new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), planNode.getOutputSymbols()),
StageExecutionDescriptor.ungroupedExecution(),
false,
StatsAndCosts.empty(),
Optional.empty());
}
Expand Down
Expand Up @@ -333,6 +333,7 @@ private static PlanFragment createValuesPlan()
ImmutableList.of(valuesNodeId),
new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol)),
StageExecutionDescriptor.ungroupedExecution(),
false,
StatsAndCosts.empty(),
Optional.empty());

Expand Down
Expand Up @@ -264,6 +264,7 @@ private static PlanFragment createFragment(PlanNode planNode)
ImmutableList.of(planNode.getId()),
new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), planNode.getOutputSymbols()),
StageExecutionDescriptor.ungroupedExecution(),
false,
StatsAndCosts.empty(),
Optional.empty());
}
Expand Down
Expand Up @@ -481,6 +481,7 @@ private static SubPlan createPlan()
ImmutableList.of(TABLE_SCAN_NODE_ID),
new PartitioningScheme(Partitioning.create(SINGLE_DISTRIBUTION, ImmutableList.of()), ImmutableList.of(symbol)),
StageExecutionDescriptor.ungroupedExecution(),
false,
StatsAndCosts.empty(),
Optional.empty());

Expand Down

0 comments on commit 3b3524f

Please sign in to comment.