Skip to content

Commit

Permalink
Add spilled data to query stats, CLI, and Web UI
Browse files Browse the repository at this point in the history
This commit adds instrumentation about the spilled data size in the
query summary and the web UI.

This implementation adds an operator stat that is rolled up
to the query level. I followed the approach in the comment:
prestodb/presto#8010 (comment)

Extracted-From: https://github.com/prestodb/presto
  • Loading branch information
Anoop Johnson authored and dain committed Feb 5, 2019
1 parent d6fb5e0 commit d7d6166
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 5 deletions.
15 changes: 13 additions & 2 deletions presto-cli/src/main/java/io/prestosql/cli/StatusPrinter.java
Expand Up @@ -79,6 +79,7 @@ public StatusPrinter(StatementClient client, PrintStream out, boolean debug)
Per Node: 2.5 parallelism, 473K rows/s, 41.1MB/s
Parallelism: 2.5
Peak Memory: 1.97GB
Spilled: 20GB
0:13 [6.45M rows, 560MB] [ 473K rows/s, 41.1MB/s] [=========>> ] 20%
STAGES ROWS ROWS/s BYTES BYTES/s PEND RUN DONE
Expand Down Expand Up @@ -203,8 +204,13 @@ public void printFinalInfo()
// Parallelism: 5.3
out.println(format("Parallelism: %.1f", parallelism));

//Peak Memory: 1.97GB
// Peak Memory: 1.97GB
reprintLine("Peak Memory: " + formatDataSize(bytes(stats.getPeakMemoryBytes()), true));

// Spilled Data: 20GB
if (stats.getSpilledBytes() > 0) {
reprintLine("Spilled: " + formatDataSize(bytes(stats.getSpilledBytes()), true));
}
}

// 0:32 [2.12GB, 15M rows] [67MB/s, 463K rows/s]
Expand Down Expand Up @@ -294,8 +300,13 @@ private void printQueryInfo(QueryStatusInfo results, WarningsPrinter warningsPri
// Parallelism: 5.3
reprintLine(format("Parallelism: %.1f", parallelism));

//Peak Memory: 1.97GB
// Peak Memory: 1.97GB
reprintLine("Peak Memory: " + formatDataSize(bytes(stats.getPeakMemoryBytes()), true));

// Spilled Data: 20GB
if (stats.getSpilledBytes() > 0) {
reprintLine("Spilled: " + formatDataSize(bytes(stats.getSpilledBytes()), true));
}
}

verify(terminalWidth >= 75); // otherwise handled above
Expand Down
Expand Up @@ -43,6 +43,7 @@ public class StatementStats
private final long processedRows;
private final long processedBytes;
private final long peakMemoryBytes;
private final long spilledBytes;
private final StageStats rootStage;

@JsonCreator
Expand All @@ -62,6 +63,7 @@ public StatementStats(
@JsonProperty("processedRows") long processedRows,
@JsonProperty("processedBytes") long processedBytes,
@JsonProperty("peakMemoryBytes") long peakMemoryBytes,
@JsonProperty("spilledBytes") long spilledBytes,
@JsonProperty("rootStage") StageStats rootStage)
{
this.state = requireNonNull(state, "state is null");
Expand All @@ -79,6 +81,7 @@ public StatementStats(
this.processedRows = processedRows;
this.processedBytes = processedBytes;
this.peakMemoryBytes = peakMemoryBytes;
this.spilledBytes = spilledBytes;
this.rootStage = rootStage;
}

Expand Down Expand Up @@ -188,6 +191,12 @@ public OptionalDouble getProgressPercentage()
return OptionalDouble.of(min(100, (completedSplits * 100.0) / totalSplits));
}

@JsonProperty
public long getSpilledBytes()
{
return spilledBytes;
}

@Override
public String toString()
{
Expand All @@ -207,6 +216,7 @@ public String toString()
.add("processedRows", processedRows)
.add("processedBytes", processedBytes)
.add("peakMemoryBytes", peakMemoryBytes)
.add("spilledBytes", spilledBytes)
.add("rootStage", rootStage)
.toString();
}
Expand All @@ -233,6 +243,7 @@ public static class Builder
private long processedRows;
private long processedBytes;
private long peakMemoryBytes;
private long spilledBytes;
private StageStats rootStage;

private Builder() {}
Expand Down Expand Up @@ -327,6 +338,12 @@ public Builder setPeakMemoryBytes(long peakMemoryBytes)
return this;
}

public Builder setSpilledBytes(long spilledBytes)
{
this.spilledBytes = spilledBytes;
return this;
}

public Builder setRootStage(StageStats rootStage)
{
this.rootStage = rootStage;
Expand All @@ -351,6 +368,7 @@ public StatementStats build()
processedRows,
processedBytes,
peakMemoryBytes,
spilledBytes,
rootStage);
}
}
Expand Down
Expand Up @@ -90,7 +90,7 @@ private String newQueryResults(Integer partialCancelId, Integer nextUriId, List<
nextUriId == null ? null : server.url(format("/v1/statement/%s/%s", queryId, nextUriId)).uri(),
responseColumns,
data,
new StatementStats(state, state.equals("QUEUED"), true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null),
new StatementStats(state, state.equals("QUEUED"), true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null),
null,
ImmutableList.of(),
null,
Expand Down
Expand Up @@ -592,4 +592,12 @@ public OptionalDouble getProgressPercentage()
}
return OptionalDouble.of(min(100, (completedDrivers * 100.0) / totalDrivers));
}

@JsonProperty
public DataSize getSpilledDataSize()
{
return succinctBytes(operatorSummaries.stream()
.mapToLong(stats -> stats.getSpilledDataSize().toBytes())
.sum());
}
}
Expand Up @@ -81,7 +81,7 @@ public class OperatorContext

private final OperationTiming finishTiming = new OperationTiming();

private final SpillContext spillContext;
private final OperatorSpillContext spillContext;
private final AtomicReference<Supplier<OperatorInfo>> infoSupplier = new AtomicReference<>();

private final AtomicLong peakUserMemoryReservation = new AtomicLong();
Expand Down Expand Up @@ -480,6 +480,8 @@ public OperatorStats getOperatorStats()
succinctBytes(peakSystemMemoryReservation.get()),
succinctBytes(peakTotalMemoryReservation.get()),

succinctBytes(spillContext.getSpilledBytes()),

memoryFuture.get().isDone() ? Optional.empty() : Optional.of(WAITING_FOR_MEMORY),
info);
}
Expand Down Expand Up @@ -523,6 +525,7 @@ private static class OperatorSpillContext
{
private final DriverContext driverContext;
private final AtomicLong reservedBytes = new AtomicLong();
private final AtomicLong spilledBytes = new AtomicLong();

public OperatorSpillContext(DriverContext driverContext)
{
Expand All @@ -535,13 +538,19 @@ public void updateBytes(long bytes)
if (bytes >= 0) {
reservedBytes.addAndGet(bytes);
driverContext.reserveSpill(bytes);
spilledBytes.addAndGet(bytes);
}
else {
reservedBytes.accumulateAndGet(-bytes, this::decrementSpilledReservation);
driverContext.freeSpill(-bytes);
}
}

public long getSpilledBytes()
{
return spilledBytes.longValue();
}

private long decrementSpilledReservation(long reservedBytes, long bytesBeingFreed)
{
checkArgument(bytesBeingFreed >= 0);
Expand Down
19 changes: 19 additions & 0 deletions presto-main/src/main/java/io/prestosql/operator/OperatorStats.java
Expand Up @@ -74,6 +74,8 @@ public class OperatorStats
private final DataSize peakSystemMemoryReservation;
private final DataSize peakTotalMemoryReservation;

private final DataSize spilledDataSize;

private final Optional<BlockedReason> blockedReason;

private final OperatorInfo info;
Expand Down Expand Up @@ -119,6 +121,8 @@ public OperatorStats(
@JsonProperty("peakSystemMemoryReservation") DataSize peakSystemMemoryReservation,
@JsonProperty("peakTotalMemoryReservation") DataSize peakTotalMemoryReservation,

@JsonProperty("spilledDataSize") DataSize spilledDataSize,

@JsonProperty("blockedReason") Optional<BlockedReason> blockedReason,

@JsonProperty("info") OperatorInfo info)
Expand Down Expand Up @@ -167,6 +171,8 @@ public OperatorStats(
this.peakSystemMemoryReservation = requireNonNull(peakSystemMemoryReservation, "peakSystemMemoryReservation is null");
this.peakTotalMemoryReservation = requireNonNull(peakTotalMemoryReservation, "peakTotalMemoryReservation is null");

this.spilledDataSize = requireNonNull(spilledDataSize, "spilledDataSize is null");

this.blockedReason = blockedReason;

this.info = info;
Expand Down Expand Up @@ -358,6 +364,12 @@ public DataSize getPeakTotalMemoryReservation()
return peakTotalMemoryReservation;
}

@JsonProperty
public DataSize getSpilledDataSize()
{
return spilledDataSize;
}

@JsonProperty
public Optional<BlockedReason> getBlockedReason()
{
Expand Down Expand Up @@ -411,6 +423,8 @@ public OperatorStats add(Iterable<OperatorStats> operators)
long peakSystemMemory = this.peakSystemMemoryReservation.toBytes();
long peakTotalMemory = this.peakTotalMemoryReservation.toBytes();

long spilledDataSize = this.spilledDataSize.toBytes();

Optional<BlockedReason> blockedReason = this.blockedReason;

Mergeable<OperatorInfo> base = getMergeableInfoOrNull(info);
Expand Down Expand Up @@ -451,6 +465,8 @@ public OperatorStats add(Iterable<OperatorStats> operators)
peakSystemMemory = max(peakSystemMemory, operator.getPeakSystemMemoryReservation().toBytes());
peakTotalMemory = max(peakTotalMemory, operator.getPeakTotalMemoryReservation().toBytes());

spilledDataSize += operator.getSpilledDataSize().toBytes();

if (operator.getBlockedReason().isPresent()) {
blockedReason = operator.getBlockedReason();
}
Expand Down Expand Up @@ -501,6 +517,8 @@ public OperatorStats add(Iterable<OperatorStats> operators)
succinctBytes(peakSystemMemory),
succinctBytes(peakTotalMemory),

succinctBytes(spilledDataSize),

blockedReason,

(OperatorInfo) base);
Expand Down Expand Up @@ -556,6 +574,7 @@ public OperatorStats summarize()
peakUserMemoryReservation,
peakSystemMemoryReservation,
peakTotalMemoryReservation,
spilledDataSize,
blockedReason,
(info != null && info.isFinal()) ? info : null);
}
Expand Down
Expand Up @@ -639,6 +639,7 @@ private static StatementStats toStatementStats(QueryInfo queryInfo)
.setProcessedRows(queryStats.getRawInputPositions())
.setProcessedBytes(queryStats.getRawInputDataSize().toBytes())
.setPeakMemoryBytes(queryStats.getPeakUserMemoryReservation().toBytes())
.setSpilledBytes(queryStats.getSpilledDataSize().toBytes())
.setRootStage(toStageStats(outputStage))
.build();
}
Expand Down
2 changes: 1 addition & 1 deletion presto-main/src/main/resources/webapp/dist/query.js

Large diffs are not rendered by default.

Expand Up @@ -1388,6 +1388,16 @@ export class QueryDetail extends React.Component {
{query.queryStats.physicalWrittenDataSize}
</td>
</tr>
{parseDataSize(query.queryStats.spilledDataSize) > 0 &&
<tr>
<td className="info-title">
Spilled Data
</td>
<td className="info-text">
{query.queryStats.spilledDataSize}
</td>
</tr>
}
</tbody>
</table>
</div>
Expand Down
Expand Up @@ -70,6 +70,7 @@ public class TestQueryStats
succinctBytes(127L),
succinctBytes(128L),
succinctBytes(129L),
succinctBytes(130L),
Optional.empty(),
null),
new OperatorStats(
Expand Down Expand Up @@ -104,6 +105,7 @@ public class TestQueryStats
succinctBytes(227L),
succinctBytes(228L),
succinctBytes(229L),
succinctBytes(230L),
Optional.empty(),
null),
new OperatorStats(
Expand Down Expand Up @@ -138,6 +140,7 @@ public class TestQueryStats
succinctBytes(327L),
succinctBytes(328L),
succinctBytes(329L),
succinctBytes(330L),
Optional.empty(),
null));

Expand Down Expand Up @@ -253,6 +256,7 @@ public static void assertExpectedQueryStats(QueryStats actual)
assertEquals(actual.getPeakTotalMemoryReservation(), new DataSize(21, BYTE));
assertEquals(actual.getPeakTaskUserMemory(), new DataSize(22, BYTE));
assertEquals(actual.getPeakTaskTotalMemory(), new DataSize(23, BYTE));
assertEquals(actual.getSpilledDataSize(), new DataSize(690, BYTE));

assertEquals(actual.getTotalScheduledTime(), new Duration(20, NANOSECONDS));
assertEquals(actual.getTotalCpuTime(), new Duration(21, NANOSECONDS));
Expand Down
Expand Up @@ -71,6 +71,7 @@ public class TestOperatorStats
new DataSize(22, BYTE),
new DataSize(23, BYTE),
new DataSize(24, BYTE),
new DataSize(25, BYTE),
Optional.empty(),
NON_MERGEABLE_INFO);

Expand Down Expand Up @@ -113,6 +114,7 @@ public class TestOperatorStats
new DataSize(22, BYTE),
new DataSize(23, BYTE),
new DataSize(24, BYTE),
new DataSize(25, BYTE),
Optional.empty(),
MERGEABLE_INFO);

Expand Down Expand Up @@ -164,6 +166,7 @@ public static void assertExpectedOperatorStats(OperatorStats actual)
assertEquals(actual.getPeakUserMemoryReservation(), new DataSize(22, BYTE));
assertEquals(actual.getPeakSystemMemoryReservation(), new DataSize(23, BYTE));
assertEquals(actual.getPeakTotalMemoryReservation(), new DataSize(24, BYTE));
assertEquals(actual.getSpilledDataSize(), new DataSize(25, BYTE));
assertEquals(actual.getInfo().getClass(), SplitOperatorInfo.class);
assertEquals(((SplitOperatorInfo) actual.getInfo()).getSplitInfo(), NON_MERGEABLE_INFO.getSplitInfo());
}
Expand Down Expand Up @@ -207,6 +210,7 @@ public void testAdd()
assertEquals(actual.getPeakUserMemoryReservation(), new DataSize(22, BYTE));
assertEquals(actual.getPeakSystemMemoryReservation(), new DataSize(23, BYTE));
assertEquals(actual.getPeakTotalMemoryReservation(), new DataSize(24, BYTE));
assertEquals(actual.getSpilledDataSize(), new DataSize(3 * 25, BYTE));
assertNull(actual.getInfo());
}

Expand Down Expand Up @@ -249,6 +253,7 @@ public void testAddMergeable()
assertEquals(actual.getPeakUserMemoryReservation(), new DataSize(22, BYTE));
assertEquals(actual.getPeakSystemMemoryReservation(), new DataSize(23, BYTE));
assertEquals(actual.getPeakTotalMemoryReservation(), new DataSize(24, BYTE));
assertEquals(actual.getSpilledDataSize(), new DataSize(3 * 25, BYTE));
assertEquals(actual.getInfo().getClass(), PartitionedOutputInfo.class);
assertEquals(((PartitionedOutputInfo) actual.getInfo()).getPagesAdded(), 3 * MERGEABLE_INFO.getPagesAdded());
}
Expand Down

0 comments on commit d7d6166

Please sign in to comment.