Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dynamicFilterSplitsProcessed to OperatorStats #3217

Merged
merged 1 commit into from Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -75,6 +75,7 @@ public void testJoinWithEmptyBuildSide()
OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch:lineitem");
// Probe-side is not scanned at all, due to dynamic filtering:
assertEquals(probeStats.getInputPositions(), 0L);
assertEquals(probeStats.getDynamicFilterSplitsProcessed(), probeStats.getTotalDrivers());
}

private OperatorStats searchScanFilterAndProjectOperatorStats(QueryId queryId, String tableName)
Expand Down
Expand Up @@ -81,6 +81,8 @@ public class OperatorContext
private final CounterStat outputDataSize = new CounterStat();
private final CounterStat outputPositions = new CounterStat();

private final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong();

private final AtomicLong physicalWrittenDataSize = new AtomicLong();

private final AtomicReference<SettableFuture<?>> memoryFuture;
Expand Down Expand Up @@ -211,6 +213,11 @@ public void recordOutput(long sizeInBytes, long positions)
outputPositions.update(positions);
}

public void recordDynamicFilterSplitProcessed(long dynamicFilterSplits)
{
dynamicFilterSplitsProcessed.getAndAdd(dynamicFilterSplits);
}

public void recordPhysicalWrittenData(long sizeInBytes)
{
physicalWrittenDataSize.getAndAdd(sizeInBytes);
Expand Down Expand Up @@ -511,6 +518,8 @@ public OperatorStats getOperatorStats()
succinctBytes(outputDataSize.getTotalCount()),
outputPositions.getTotalCount(),

dynamicFilterSplitsProcessed.get(),

succinctBytes(physicalWrittenDataSize.get()),

new Duration(blockedWallNanos.get(), NANOSECONDS).convertToMostSuccinctTimeUnit(),
Expand Down
19 changes: 19 additions & 0 deletions presto-main/src/main/java/io/prestosql/operator/OperatorStats.java
Expand Up @@ -61,6 +61,8 @@ public class OperatorStats
private final DataSize outputDataSize;
private final long outputPositions;

private final long dynamicFilterSplitsProcessed;

private final DataSize physicalWrittenDataSize;

private final Duration blockedWall;
Expand Down Expand Up @@ -111,6 +113,8 @@ public OperatorStats(
@JsonProperty("outputDataSize") DataSize outputDataSize,
@JsonProperty("outputPositions") long outputPositions,

@JsonProperty("dynamicFilterSplitsProcessed") long dynamicFilterSplitsProcessed,

@JsonProperty("physicalWrittenDataSize") DataSize physicalWrittenDataSize,

@JsonProperty("blockedWall") Duration blockedWall,
Expand Down Expand Up @@ -163,6 +167,8 @@ public OperatorStats(
checkArgument(outputPositions >= 0, "outputPositions is negative");
this.outputPositions = outputPositions;

this.dynamicFilterSplitsProcessed = dynamicFilterSplitsProcessed;

this.physicalWrittenDataSize = requireNonNull(physicalWrittenDataSize, "writtenDataSize is null");

this.blockedWall = requireNonNull(blockedWall, "blockedWall is null");
Expand Down Expand Up @@ -319,6 +325,12 @@ public long getOutputPositions()
return outputPositions;
}

@JsonProperty
public long getDynamicFilterSplitsProcessed()
{
return dynamicFilterSplitsProcessed;
}

@JsonProperty
public DataSize getPhysicalWrittenDataSize()
{
Expand Down Expand Up @@ -437,6 +449,8 @@ public OperatorStats add(Iterable<OperatorStats> operators)
long outputDataSize = this.outputDataSize.toBytes();
long outputPositions = this.outputPositions;

long dynamicFilterSplitsProcessed = this.dynamicFilterSplitsProcessed;

long physicalWrittenDataSize = this.physicalWrittenDataSize.toBytes();

long blockedWall = this.blockedWall.roundTo(NANOSECONDS);
Expand Down Expand Up @@ -481,6 +495,8 @@ public OperatorStats add(Iterable<OperatorStats> operators)
outputDataSize += operator.getOutputDataSize().toBytes();
outputPositions += operator.getOutputPositions();

dynamicFilterSplitsProcessed += operator.getDynamicFilterSplitsProcessed();

physicalWrittenDataSize += operator.getPhysicalWrittenDataSize().toBytes();

finishCalls += operator.getFinishCalls();
Expand Down Expand Up @@ -537,6 +553,8 @@ public OperatorStats add(Iterable<OperatorStats> operators)
succinctBytes(outputDataSize),
outputPositions,

dynamicFilterSplitsProcessed,

succinctBytes(physicalWrittenDataSize),

new Duration(blockedWall, NANOSECONDS).convertToMostSuccinctTimeUnit(),
Expand Down Expand Up @@ -601,6 +619,7 @@ public OperatorStats summarize()
getOutputCpu,
outputDataSize,
outputPositions,
dynamicFilterSplitsProcessed,
physicalWrittenDataSize,
blockedWall,
finishCalls,
Expand Down
Expand Up @@ -75,6 +75,7 @@ public class ScanFilterAndProjectOperator
private long processedBytes;
private long physicalBytes;
private long readTimeNanos;
private long dynamicFilterSplitsProcessed;

private ScanFilterAndProjectOperator(
Session session,
Expand Down Expand Up @@ -150,6 +151,12 @@ public Duration getReadTime()
return new Duration(readTimeNanos, NANOSECONDS);
}

@Override
public long getDynamicFilterSplitsProcessed()
{
return dynamicFilterSplitsProcessed;
}

@Override
public WorkProcessor<Page> getOutputPages()
{
Expand Down Expand Up @@ -235,6 +242,10 @@ public TransformationState<WorkProcessor<Page>> process(Split split)

checkState(cursor == null && pageSource == null, "Table scan split already set");

if (!dynamicFilter.get().isAll()) {
dynamicFilterSplitsProcessed++;
}

ConnectorPageSource source;
if (split.getConnectorSplit() instanceof EmptySplit) {
source = new EmptyPageSource();
Expand Down
Expand Up @@ -293,6 +293,9 @@ public Page getOutput()
return null;
}
if (source == null) {
if (!dynamicFilter.get().isAll()) {
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
operatorContext.recordDynamicFilterSplitProcessed(1L);
}
source = pageSourceProvider.createPageSource(operatorContext.getSession(), split, table, columns, dynamicFilter);
}

Expand Down
Expand Up @@ -107,6 +107,12 @@ public long getInputPositions()
return splitToPages.getInputPositions();
}

@Override
public long getDynamicFilterSplitsProcessed()
{
return splitToPages.getDynamicFilterSplitsProcessed();
}

@Override
public Duration getReadTime()
{
Expand All @@ -132,6 +138,7 @@ private static class SplitToPages

long processedBytes;
long processedPositions;
long dynamicFilterSplitsProcessed;

@Nullable
ConnectorPageSource source;
Expand Down Expand Up @@ -160,6 +167,9 @@ public TransformationState<WorkProcessor<Page>> process(Split split)
}

checkState(source == null, "Table scan split already set");
if (!dynamicFilter.get().isAll()) {
dynamicFilterSplitsProcessed++;
}
source = pageSourceProvider.createPageSource(session, split, table, columns, dynamicFilter);
return TransformationState.ofResult(
WorkProcessor.create(new ConnectorPageSourceToPages(aggregatedMemoryContext, source))
Expand Down Expand Up @@ -204,6 +214,11 @@ long getInputPositions()
return processedPositions;
}

long getDynamicFilterSplitsProcessed()
{
return dynamicFilterSplitsProcessed;
}

Duration getReadTime()
{
if (source == null) {
Expand Down
Expand Up @@ -201,9 +201,12 @@ private void workProcessorOperatorStateMonitor(WorkProcessor.ProcessState<Page>

long deltaReadTimeNanos = deltaAndSet(context.readTimeNanos, sourceOperator.getReadTime().roundTo(NANOSECONDS));

long deltaDynamicFilterSplitsProcessed = deltaAndSet(context.dynamicFilterSplitsProcessed, sourceOperator.getDynamicFilterSplitsProcessed());

operatorContext.recordPhysicalInputWithTiming(deltaPhysicalInputDataSize, deltaPhysicalInputPositions, deltaReadTimeNanos);
operatorContext.recordNetworkInput(deltaInternalNetworkInputDataSize, deltaInternalNetworkInputPositions);
operatorContext.recordProcessedInput(deltaInputDataSize, deltaInputPositions);
operatorContext.recordDynamicFilterSplitProcessed(deltaDynamicFilterSplitsProcessed);
}

if (state.getType() == FINISHED) {
Expand Down Expand Up @@ -303,6 +306,8 @@ private List<OperatorStats> getNestedOperatorStats()
succinctBytes(context.outputDataSize.get()),
context.outputPositions.get(),

context.dynamicFilterSplitsProcessed.get(),

DataSize.ofBytes(0),

new Duration(context.blockedWallNanos.get(), NANOSECONDS),
Expand Down Expand Up @@ -641,6 +646,8 @@ private static class WorkProcessorOperatorContext
final AtomicLong outputDataSize = new AtomicLong();
final AtomicLong outputPositions = new AtomicLong();

final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong();

final AtomicLong peakUserMemoryReservation = new AtomicLong();
final AtomicLong peakSystemMemoryReservation = new AtomicLong();
final AtomicLong peakRevocableMemoryReservation = new AtomicLong();
Expand Down
Expand Up @@ -62,4 +62,9 @@ default Duration getReadTime()
{
return new Duration(0, NANOSECONDS);
}

default long getDynamicFilterSplitsProcessed()
{
return 0;
}
}
Expand Up @@ -51,6 +51,7 @@ public class WorkProcessorSourceOperatorAdapter
private long previousInputBytes;
private long previousInputPositions;
private long previousReadTimeNanos;
private long previousDynamicFilterSplitsProcessed;

public interface AdapterWorkProcessorSourceOperatorFactory
extends WorkProcessorSourceOperatorFactory
Expand Down Expand Up @@ -188,6 +189,8 @@ private void updateOperatorStats()
long currentInputBytes = sourceOperator.getInputDataSize().toBytes();
long currentInputPositions = sourceOperator.getInputPositions();

long currentDynamicFilterSplitsProcessed = sourceOperator.getDynamicFilterSplitsProcessed();

if (currentPhysicalInputBytes != previousPhysicalInputBytes
|| currentPhysicalInputPositions != previousPhysicalInputPositions
|| currentReadTimeNanos != previousReadTimeNanos) {
Expand Down Expand Up @@ -220,6 +223,11 @@ private void updateOperatorStats()
previousInputBytes = currentInputBytes;
previousInputPositions = currentInputPositions;
}

if (currentDynamicFilterSplitsProcessed != previousDynamicFilterSplitsProcessed) {
operatorContext.recordDynamicFilterSplitProcessed(currentDynamicFilterSplitsProcessed - previousDynamicFilterSplitsProcessed);
previousDynamicFilterSplitsProcessed = currentDynamicFilterSplitsProcessed;
}
}

private static class SplitBuffer
Expand Down
Expand Up @@ -60,6 +60,7 @@ public class TestQueryStats
new Duration(114, NANOSECONDS),
succinctBytes(116L),
117L,
1833,
succinctBytes(118L),
new Duration(119, NANOSECONDS),
120L,
Expand Down Expand Up @@ -98,6 +99,7 @@ public class TestQueryStats
new Duration(214, NANOSECONDS),
succinctBytes(216L),
217L,
2833,
succinctBytes(218L),
new Duration(219, NANOSECONDS),
220L,
Expand Down Expand Up @@ -136,6 +138,7 @@ public class TestQueryStats
new Duration(314, NANOSECONDS),
succinctBytes(316L),
317L,
3833,
succinctBytes(318L),
new Duration(319, NANOSECONDS),
320L,
Expand Down
Expand Up @@ -57,6 +57,7 @@ public class TestOperatorStats
new Duration(11, NANOSECONDS),
DataSize.ofBytes(12),
13,
533,

DataSize.ofBytes(14),

Expand Down Expand Up @@ -103,6 +104,7 @@ public class TestOperatorStats
new Duration(11, NANOSECONDS),
DataSize.ofBytes(12),
13,
533,

DataSize.ofBytes(14),

Expand Down Expand Up @@ -159,6 +161,8 @@ public static void assertExpectedOperatorStats(OperatorStats actual)
assertEquals(actual.getOutputDataSize(), DataSize.ofBytes(12));
assertEquals(actual.getOutputPositions(), 13);

assertEquals(actual.getDynamicFilterSplitsProcessed(), 533);

assertEquals(actual.getPhysicalWrittenDataSize(), DataSize.ofBytes(14));

assertEquals(actual.getBlockedWall(), new Duration(15, NANOSECONDS));
Expand Down Expand Up @@ -207,6 +211,8 @@ public void testAdd()
assertEquals(actual.getOutputDataSize(), DataSize.ofBytes(3 * 12));
assertEquals(actual.getOutputPositions(), 3 * 13);

assertEquals(actual.getDynamicFilterSplitsProcessed(), 3 * 533);

assertEquals(actual.getPhysicalWrittenDataSize(), DataSize.ofBytes(3 * 14));

assertEquals(actual.getBlockedWall(), new Duration(3 * 15, NANOSECONDS));
Expand Down Expand Up @@ -253,6 +259,8 @@ public void testAddMergeable()
assertEquals(actual.getOutputDataSize(), DataSize.ofBytes(3 * 12));
assertEquals(actual.getOutputPositions(), 3 * 13);

assertEquals(actual.getDynamicFilterSplitsProcessed(), 3 * 533);

assertEquals(actual.getPhysicalWrittenDataSize(), DataSize.ofBytes(3 * 14));

assertEquals(actual.getBlockedWall(), new Duration(3 * 15, NANOSECONDS));
Expand Down