Skip to content

Commit

Permalink
Add dynamicFilterSplitsProcessed to OperatorStats
Browse files Browse the repository at this point in the history
  • Loading branch information
raunaqmorarka committed Mar 24, 2020
1 parent 4ac2fd4 commit 14bdcc1
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 0 deletions.
Expand Up @@ -75,6 +75,7 @@ public void testJoinWithEmptyBuildSide()
OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch:lineitem");
// Probe-side is not scanned at all, due to dynamic filtering:
assertEquals(probeStats.getInputPositions(), 0L);
assertEquals(probeStats.getDynamicFilterSplitsProcessed(), probeStats.getTotalDrivers());
}

private OperatorStats searchScanFilterAndProjectOperatorStats(QueryId queryId, String tableName)
Expand Down
Expand Up @@ -81,6 +81,8 @@ public class OperatorContext
private final CounterStat outputDataSize = new CounterStat();
private final CounterStat outputPositions = new CounterStat();

private final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong();

private final AtomicLong physicalWrittenDataSize = new AtomicLong();

private final AtomicReference<SettableFuture<?>> memoryFuture;
Expand Down Expand Up @@ -211,6 +213,11 @@ public void recordOutput(long sizeInBytes, long positions)
outputPositions.update(positions);
}

public void recordDynamicFilterSplitProcessed(long dynamicFilterSplits)
{
dynamicFilterSplitsProcessed.getAndAdd(dynamicFilterSplits);
}

public void recordPhysicalWrittenData(long sizeInBytes)
{
physicalWrittenDataSize.getAndAdd(sizeInBytes);
Expand Down Expand Up @@ -511,6 +518,8 @@ public OperatorStats getOperatorStats()
succinctBytes(outputDataSize.getTotalCount()),
outputPositions.getTotalCount(),

dynamicFilterSplitsProcessed.get(),

succinctBytes(physicalWrittenDataSize.get()),

new Duration(blockedWallNanos.get(), NANOSECONDS).convertToMostSuccinctTimeUnit(),
Expand Down
19 changes: 19 additions & 0 deletions presto-main/src/main/java/io/prestosql/operator/OperatorStats.java
Expand Up @@ -61,6 +61,8 @@ public class OperatorStats
private final DataSize outputDataSize;
private final long outputPositions;

private final long dynamicFilterSplitsProcessed;

private final DataSize physicalWrittenDataSize;

private final Duration blockedWall;
Expand Down Expand Up @@ -111,6 +113,8 @@ public OperatorStats(
@JsonProperty("outputDataSize") DataSize outputDataSize,
@JsonProperty("outputPositions") long outputPositions,

@JsonProperty("dynamicFilterSplitsProcessed") long dynamicFilterSplitsProcessed,

@JsonProperty("physicalWrittenDataSize") DataSize physicalWrittenDataSize,

@JsonProperty("blockedWall") Duration blockedWall,
Expand Down Expand Up @@ -163,6 +167,8 @@ public OperatorStats(
checkArgument(outputPositions >= 0, "outputPositions is negative");
this.outputPositions = outputPositions;

this.dynamicFilterSplitsProcessed = dynamicFilterSplitsProcessed;

this.physicalWrittenDataSize = requireNonNull(physicalWrittenDataSize, "writtenDataSize is null");

this.blockedWall = requireNonNull(blockedWall, "blockedWall is null");
Expand Down Expand Up @@ -319,6 +325,12 @@ public long getOutputPositions()
return outputPositions;
}

@JsonProperty
public long getDynamicFilterSplitsProcessed()
{
return dynamicFilterSplitsProcessed;
}

@JsonProperty
public DataSize getPhysicalWrittenDataSize()
{
Expand Down Expand Up @@ -437,6 +449,8 @@ public OperatorStats add(Iterable<OperatorStats> operators)
long outputDataSize = this.outputDataSize.toBytes();
long outputPositions = this.outputPositions;

long dynamicFilterSplitsProcessed = this.dynamicFilterSplitsProcessed;

long physicalWrittenDataSize = this.physicalWrittenDataSize.toBytes();

long blockedWall = this.blockedWall.roundTo(NANOSECONDS);
Expand Down Expand Up @@ -481,6 +495,8 @@ public OperatorStats add(Iterable<OperatorStats> operators)
outputDataSize += operator.getOutputDataSize().toBytes();
outputPositions += operator.getOutputPositions();

dynamicFilterSplitsProcessed += operator.getDynamicFilterSplitsProcessed();

physicalWrittenDataSize += operator.getPhysicalWrittenDataSize().toBytes();

finishCalls += operator.getFinishCalls();
Expand Down Expand Up @@ -537,6 +553,8 @@ public OperatorStats add(Iterable<OperatorStats> operators)
succinctBytes(outputDataSize),
outputPositions,

dynamicFilterSplitsProcessed,

succinctBytes(physicalWrittenDataSize),

new Duration(blockedWall, NANOSECONDS).convertToMostSuccinctTimeUnit(),
Expand Down Expand Up @@ -601,6 +619,7 @@ public OperatorStats summarize()
getOutputCpu,
outputDataSize,
outputPositions,
dynamicFilterSplitsProcessed,
physicalWrittenDataSize,
blockedWall,
finishCalls,
Expand Down
Expand Up @@ -75,6 +75,7 @@ public class ScanFilterAndProjectOperator
private long processedBytes;
private long physicalBytes;
private long readTimeNanos;
private long dynamicFilterSplitsProcessed;

private ScanFilterAndProjectOperator(
Session session,
Expand Down Expand Up @@ -150,6 +151,12 @@ public Duration getReadTime()
return new Duration(readTimeNanos, NANOSECONDS);
}

@Override
public long getDynamicFilterSplitsProcessed()
{
return dynamicFilterSplitsProcessed;
}

@Override
public WorkProcessor<Page> getOutputPages()
{
Expand Down Expand Up @@ -235,6 +242,10 @@ public TransformationState<WorkProcessor<Page>> process(Split split)

checkState(cursor == null && pageSource == null, "Table scan split already set");

if (!dynamicFilter.get().isAll()) {
dynamicFilterSplitsProcessed++;
}

ConnectorPageSource source;
if (split.getConnectorSplit() instanceof EmptySplit) {
source = new EmptyPageSource();
Expand Down
Expand Up @@ -293,6 +293,9 @@ public Page getOutput()
return null;
}
if (source == null) {
if (!dynamicFilter.get().isAll()) {
operatorContext.recordDynamicFilterSplitProcessed(1L);
}
source = pageSourceProvider.createPageSource(operatorContext.getSession(), split, table, columns, dynamicFilter);
}

Expand Down
Expand Up @@ -107,6 +107,12 @@ public long getInputPositions()
return splitToPages.getInputPositions();
}

@Override
public long getDynamicFilterSplitsProcessed()
{
return splitToPages.getDynamicFilterSplitsProcessed();
}

@Override
public Duration getReadTime()
{
Expand All @@ -132,6 +138,7 @@ private static class SplitToPages

long processedBytes;
long processedPositions;
long dynamicFilterSplitsProcessed;

@Nullable
ConnectorPageSource source;
Expand Down Expand Up @@ -160,6 +167,9 @@ public TransformationState<WorkProcessor<Page>> process(Split split)
}

checkState(source == null, "Table scan split already set");
if (!dynamicFilter.get().isAll()) {
dynamicFilterSplitsProcessed++;
}
source = pageSourceProvider.createPageSource(session, split, table, columns, dynamicFilter);
return TransformationState.ofResult(
WorkProcessor.create(new ConnectorPageSourceToPages(aggregatedMemoryContext, source))
Expand Down Expand Up @@ -204,6 +214,11 @@ long getInputPositions()
return processedPositions;
}

long getDynamicFilterSplitsProcessed()
{
return dynamicFilterSplitsProcessed;
}

Duration getReadTime()
{
if (source == null) {
Expand Down
Expand Up @@ -201,9 +201,12 @@ private void workProcessorOperatorStateMonitor(WorkProcessor.ProcessState<Page>

long deltaReadTimeNanos = deltaAndSet(context.readTimeNanos, sourceOperator.getReadTime().roundTo(NANOSECONDS));

long deltaDynamicFilterSplitsProcessed = deltaAndSet(context.dynamicFilterSplitsProcessed, sourceOperator.getDynamicFilterSplitsProcessed());

operatorContext.recordPhysicalInputWithTiming(deltaPhysicalInputDataSize, deltaPhysicalInputPositions, deltaReadTimeNanos);
operatorContext.recordNetworkInput(deltaInternalNetworkInputDataSize, deltaInternalNetworkInputPositions);
operatorContext.recordProcessedInput(deltaInputDataSize, deltaInputPositions);
operatorContext.recordDynamicFilterSplitProcessed(deltaDynamicFilterSplitsProcessed);
}

if (state.getType() == FINISHED) {
Expand Down Expand Up @@ -303,6 +306,8 @@ private List<OperatorStats> getNestedOperatorStats()
succinctBytes(context.outputDataSize.get()),
context.outputPositions.get(),

context.dynamicFilterSplitsProcessed.get(),

DataSize.ofBytes(0),

new Duration(context.blockedWallNanos.get(), NANOSECONDS),
Expand Down Expand Up @@ -641,6 +646,8 @@ private static class WorkProcessorOperatorContext
final AtomicLong outputDataSize = new AtomicLong();
final AtomicLong outputPositions = new AtomicLong();

final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong();

final AtomicLong peakUserMemoryReservation = new AtomicLong();
final AtomicLong peakSystemMemoryReservation = new AtomicLong();
final AtomicLong peakRevocableMemoryReservation = new AtomicLong();
Expand Down
Expand Up @@ -62,4 +62,9 @@ default Duration getReadTime()
{
return new Duration(0, NANOSECONDS);
}

default long getDynamicFilterSplitsProcessed()
{
return 0;
}
}
Expand Up @@ -51,6 +51,7 @@ public class WorkProcessorSourceOperatorAdapter
private long previousInputBytes;
private long previousInputPositions;
private long previousReadTimeNanos;
private long previousDynamicFilterSplitsProcessed;

public interface AdapterWorkProcessorSourceOperatorFactory
extends WorkProcessorSourceOperatorFactory
Expand Down Expand Up @@ -188,6 +189,8 @@ private void updateOperatorStats()
long currentInputBytes = sourceOperator.getInputDataSize().toBytes();
long currentInputPositions = sourceOperator.getInputPositions();

long currentDynamicFilterSplitsProcessed = sourceOperator.getDynamicFilterSplitsProcessed();

if (currentPhysicalInputBytes != previousPhysicalInputBytes
|| currentPhysicalInputPositions != previousPhysicalInputPositions
|| currentReadTimeNanos != previousReadTimeNanos) {
Expand Down Expand Up @@ -220,6 +223,11 @@ private void updateOperatorStats()
previousInputBytes = currentInputBytes;
previousInputPositions = currentInputPositions;
}

if (currentDynamicFilterSplitsProcessed != previousDynamicFilterSplitsProcessed) {
operatorContext.recordDynamicFilterSplitProcessed(currentDynamicFilterSplitsProcessed - previousDynamicFilterSplitsProcessed);
previousDynamicFilterSplitsProcessed = currentDynamicFilterSplitsProcessed;
}
}

private static class SplitBuffer
Expand Down
Expand Up @@ -60,6 +60,7 @@ public class TestQueryStats
new Duration(114, NANOSECONDS),
succinctBytes(116L),
117L,
1833,
succinctBytes(118L),
new Duration(119, NANOSECONDS),
120L,
Expand Down Expand Up @@ -98,6 +99,7 @@ public class TestQueryStats
new Duration(214, NANOSECONDS),
succinctBytes(216L),
217L,
2833,
succinctBytes(218L),
new Duration(219, NANOSECONDS),
220L,
Expand Down Expand Up @@ -136,6 +138,7 @@ public class TestQueryStats
new Duration(314, NANOSECONDS),
succinctBytes(316L),
317L,
3833,
succinctBytes(318L),
new Duration(319, NANOSECONDS),
320L,
Expand Down
Expand Up @@ -57,6 +57,7 @@ public class TestOperatorStats
new Duration(11, NANOSECONDS),
DataSize.ofBytes(12),
13,
533,

DataSize.ofBytes(14),

Expand Down Expand Up @@ -103,6 +104,7 @@ public class TestOperatorStats
new Duration(11, NANOSECONDS),
DataSize.ofBytes(12),
13,
533,

DataSize.ofBytes(14),

Expand Down Expand Up @@ -159,6 +161,8 @@ public static void assertExpectedOperatorStats(OperatorStats actual)
assertEquals(actual.getOutputDataSize(), DataSize.ofBytes(12));
assertEquals(actual.getOutputPositions(), 13);

assertEquals(actual.getDynamicFilterSplitsProcessed(), 533);

assertEquals(actual.getPhysicalWrittenDataSize(), DataSize.ofBytes(14));

assertEquals(actual.getBlockedWall(), new Duration(15, NANOSECONDS));
Expand Down Expand Up @@ -207,6 +211,8 @@ public void testAdd()
assertEquals(actual.getOutputDataSize(), DataSize.ofBytes(3 * 12));
assertEquals(actual.getOutputPositions(), 3 * 13);

assertEquals(actual.getDynamicFilterSplitsProcessed(), 3 * 533);

assertEquals(actual.getPhysicalWrittenDataSize(), DataSize.ofBytes(3 * 14));

assertEquals(actual.getBlockedWall(), new Duration(3 * 15, NANOSECONDS));
Expand Down Expand Up @@ -253,6 +259,8 @@ public void testAddMergeable()
assertEquals(actual.getOutputDataSize(), DataSize.ofBytes(3 * 12));
assertEquals(actual.getOutputPositions(), 3 * 13);

assertEquals(actual.getDynamicFilterSplitsProcessed(), 3 * 533);

assertEquals(actual.getPhysicalWrittenDataSize(), DataSize.ofBytes(3 * 14));

assertEquals(actual.getBlockedWall(), new Duration(3 * 15, NANOSECONDS));
Expand Down

0 comments on commit 14bdcc1

Please sign in to comment.