From ca1a69a40f8f89a04db5c770c2031008ab4b6b9c Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Sat, 14 Mar 2020 23:28:49 +0530 Subject: [PATCH] Add dynamicFilterSplitsProcessed to OperatorStats --- .../hive/TestHiveDistributedJoinQueries.java | 1 + .../prestosql/operator/OperatorContext.java | 9 +++++++++ .../io/prestosql/operator/OperatorStats.java | 19 +++++++++++++++++++ .../ScanFilterAndProjectOperator.java | 11 +++++++++++ .../prestosql/operator/TableScanOperator.java | 3 +++ .../TableScanWorkProcessorOperator.java | 15 +++++++++++++++ .../WorkProcessorPipelineSourceOperator.java | 7 +++++++ .../operator/WorkProcessorSourceOperator.java | 5 +++++ .../WorkProcessorSourceOperatorAdapter.java | 8 ++++++++ .../prestosql/execution/TestQueryStats.java | 3 +++ .../prestosql/operator/TestOperatorStats.java | 8 ++++++++ 11 files changed, 89 insertions(+) diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDistributedJoinQueries.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDistributedJoinQueries.java index a5480b10feac7..c21123c0bf08e 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDistributedJoinQueries.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestHiveDistributedJoinQueries.java @@ -75,6 +75,7 @@ public void testJoinWithEmptyBuildSide() OperatorStats probeStats = searchScanFilterAndProjectOperatorStats(result.getQueryId(), "tpch:lineitem"); // Probe-side is not scanned at all, due to dynamic filtering: assertEquals(probeStats.getInputPositions(), 0L); + assertEquals(probeStats.getDynamicFilterSplitsProcessed(), probeStats.getTotalDrivers()); } private OperatorStats searchScanFilterAndProjectOperatorStats(QueryId queryId, String tableName) diff --git a/presto-main/src/main/java/io/prestosql/operator/OperatorContext.java b/presto-main/src/main/java/io/prestosql/operator/OperatorContext.java index 46e91196d99d6..174e03bf75113 100644 --- a/presto-main/src/main/java/io/prestosql/operator/OperatorContext.java +++ b/presto-main/src/main/java/io/prestosql/operator/OperatorContext.java @@ -81,6 +81,8 @@ public class OperatorContext private final CounterStat outputDataSize = new CounterStat(); private final CounterStat outputPositions = new CounterStat(); + private final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong(); + private final AtomicLong physicalWrittenDataSize = new AtomicLong(); private final AtomicReference> memoryFuture; @@ -211,6 +213,11 @@ public void recordOutput(long sizeInBytes, long positions) outputPositions.update(positions); } + public void recordDynamicFilterSplitProcessed(long dynamicFilterSplits) + { + dynamicFilterSplitsProcessed.getAndAdd(dynamicFilterSplits); + } + public void recordPhysicalWrittenData(long sizeInBytes) { physicalWrittenDataSize.getAndAdd(sizeInBytes); @@ -511,6 +518,8 @@ public OperatorStats getOperatorStats() succinctBytes(outputDataSize.getTotalCount()), outputPositions.getTotalCount(), + dynamicFilterSplitsProcessed.get(), + succinctBytes(physicalWrittenDataSize.get()), new Duration(blockedWallNanos.get(), NANOSECONDS).convertToMostSuccinctTimeUnit(), diff --git a/presto-main/src/main/java/io/prestosql/operator/OperatorStats.java b/presto-main/src/main/java/io/prestosql/operator/OperatorStats.java index e5dab79292a80..3abb2866bcae9 100644 --- a/presto-main/src/main/java/io/prestosql/operator/OperatorStats.java +++ b/presto-main/src/main/java/io/prestosql/operator/OperatorStats.java @@ -61,6 +61,8 @@ public class OperatorStats private final DataSize outputDataSize; private final long outputPositions; + private final long dynamicFilterSplitsProcessed; + private final DataSize physicalWrittenDataSize; private final Duration blockedWall; @@ -111,6 +113,8 @@ public OperatorStats( @JsonProperty("outputDataSize") DataSize outputDataSize, @JsonProperty("outputPositions") long outputPositions, + @JsonProperty("dynamicFilterSplitsProcessed") long dynamicFilterSplitsProcessed, + @JsonProperty("physicalWrittenDataSize") DataSize physicalWrittenDataSize, @JsonProperty("blockedWall") Duration blockedWall, @@ -163,6 +167,8 @@ public OperatorStats( checkArgument(outputPositions >= 0, "outputPositions is negative"); this.outputPositions = outputPositions; + this.dynamicFilterSplitsProcessed = dynamicFilterSplitsProcessed; + this.physicalWrittenDataSize = requireNonNull(physicalWrittenDataSize, "writtenDataSize is null"); this.blockedWall = requireNonNull(blockedWall, "blockedWall is null"); @@ -319,6 +325,12 @@ public long getOutputPositions() return outputPositions; } + @JsonProperty + public long getDynamicFilterSplitsProcessed() + { + return dynamicFilterSplitsProcessed; + } + @JsonProperty public DataSize getPhysicalWrittenDataSize() { @@ -437,6 +449,8 @@ public OperatorStats add(Iterable operators) long outputDataSize = this.outputDataSize.toBytes(); long outputPositions = this.outputPositions; + long dynamicFilterSplitsProcessed = this.dynamicFilterSplitsProcessed; + long physicalWrittenDataSize = this.physicalWrittenDataSize.toBytes(); long blockedWall = this.blockedWall.roundTo(NANOSECONDS); @@ -481,6 +495,8 @@ public OperatorStats add(Iterable operators) outputDataSize += operator.getOutputDataSize().toBytes(); outputPositions += operator.getOutputPositions(); + dynamicFilterSplitsProcessed += operator.getDynamicFilterSplitsProcessed(); + physicalWrittenDataSize += operator.getPhysicalWrittenDataSize().toBytes(); finishCalls += operator.getFinishCalls(); @@ -537,6 +553,8 @@ public OperatorStats add(Iterable operators) succinctBytes(outputDataSize), outputPositions, + dynamicFilterSplitsProcessed, + succinctBytes(physicalWrittenDataSize), new Duration(blockedWall, NANOSECONDS).convertToMostSuccinctTimeUnit(), @@ -601,6 +619,7 @@ public OperatorStats summarize() getOutputCpu, outputDataSize, outputPositions, + dynamicFilterSplitsProcessed, physicalWrittenDataSize, blockedWall, finishCalls, diff --git a/presto-main/src/main/java/io/prestosql/operator/ScanFilterAndProjectOperator.java b/presto-main/src/main/java/io/prestosql/operator/ScanFilterAndProjectOperator.java index 6d70f2d5decce..ce448e07bbd19 100644 --- a/presto-main/src/main/java/io/prestosql/operator/ScanFilterAndProjectOperator.java +++ b/presto-main/src/main/java/io/prestosql/operator/ScanFilterAndProjectOperator.java @@ -75,6 +75,7 @@ public class ScanFilterAndProjectOperator private long processedBytes; private long physicalBytes; private long readTimeNanos; + private long dynamicFilterSplitsProcessed; private ScanFilterAndProjectOperator( Session session, @@ -150,6 +151,12 @@ public Duration getReadTime() return new Duration(readTimeNanos, NANOSECONDS); } + @Override + public long getDynamicFilterSplitsProcessed() + { + return dynamicFilterSplitsProcessed; + } + @Override public WorkProcessor getOutputPages() { @@ -235,6 +242,10 @@ public TransformationState> process(Split split) checkState(cursor == null && pageSource == null, "Table scan split already set"); + if (!dynamicFilter.get().isAll()) { + dynamicFilterSplitsProcessed++; + } + ConnectorPageSource source; if (split.getConnectorSplit() instanceof EmptySplit) { source = new EmptyPageSource(); diff --git a/presto-main/src/main/java/io/prestosql/operator/TableScanOperator.java b/presto-main/src/main/java/io/prestosql/operator/TableScanOperator.java index 5ad9c33d47543..8a8522ff5e833 100644 --- a/presto-main/src/main/java/io/prestosql/operator/TableScanOperator.java +++ b/presto-main/src/main/java/io/prestosql/operator/TableScanOperator.java @@ -293,6 +293,9 @@ public Page getOutput() return null; } if (source == null) { + if (!dynamicFilter.get().isAll()) { + operatorContext.recordDynamicFilterSplitProcessed(1L); + } source = pageSourceProvider.createPageSource(operatorContext.getSession(), split, table, columns, dynamicFilter); } diff --git a/presto-main/src/main/java/io/prestosql/operator/TableScanWorkProcessorOperator.java b/presto-main/src/main/java/io/prestosql/operator/TableScanWorkProcessorOperator.java index eda458e6ed2e9..05215e28dd2a4 100644 --- a/presto-main/src/main/java/io/prestosql/operator/TableScanWorkProcessorOperator.java +++ b/presto-main/src/main/java/io/prestosql/operator/TableScanWorkProcessorOperator.java @@ -107,6 +107,12 @@ public long getInputPositions() return splitToPages.getInputPositions(); } + @Override + public long getDynamicFilterSplitsProcessed() + { + return splitToPages.getDynamicFilterSplitsProcessed(); + } + @Override public Duration getReadTime() { @@ -132,6 +138,7 @@ private static class SplitToPages long processedBytes; long processedPositions; + long dynamicFilterSplitsProcessed; @Nullable ConnectorPageSource source; @@ -160,6 +167,9 @@ public TransformationState> process(Split split) } checkState(source == null, "Table scan split already set"); + if (!dynamicFilter.get().isAll()) { + dynamicFilterSplitsProcessed++; + } source = pageSourceProvider.createPageSource(session, split, table, columns, dynamicFilter); return TransformationState.ofResult( WorkProcessor.create(new ConnectorPageSourceToPages(aggregatedMemoryContext, source)) @@ -204,6 +214,11 @@ long getInputPositions() return processedPositions; } + long getDynamicFilterSplitsProcessed() + { + return dynamicFilterSplitsProcessed; + } + Duration getReadTime() { if (source == null) { diff --git a/presto-main/src/main/java/io/prestosql/operator/WorkProcessorPipelineSourceOperator.java b/presto-main/src/main/java/io/prestosql/operator/WorkProcessorPipelineSourceOperator.java index a19074c7687e2..490ed70756660 100644 --- a/presto-main/src/main/java/io/prestosql/operator/WorkProcessorPipelineSourceOperator.java +++ b/presto-main/src/main/java/io/prestosql/operator/WorkProcessorPipelineSourceOperator.java @@ -201,9 +201,12 @@ private void workProcessorOperatorStateMonitor(WorkProcessor.ProcessState long deltaReadTimeNanos = deltaAndSet(context.readTimeNanos, sourceOperator.getReadTime().roundTo(NANOSECONDS)); + long deltaDynamicFilterSplitsProcessed = deltaAndSet(context.dynamicFilterSplitsProcessed, sourceOperator.getDynamicFilterSplitsProcessed()); + operatorContext.recordPhysicalInputWithTiming(deltaPhysicalInputDataSize, deltaPhysicalInputPositions, deltaReadTimeNanos); operatorContext.recordNetworkInput(deltaInternalNetworkInputDataSize, deltaInternalNetworkInputPositions); operatorContext.recordProcessedInput(deltaInputDataSize, deltaInputPositions); + operatorContext.recordDynamicFilterSplitProcessed(deltaDynamicFilterSplitsProcessed); } if (state.getType() == FINISHED) { @@ -303,6 +306,8 @@ private List getNestedOperatorStats() succinctBytes(context.outputDataSize.get()), context.outputPositions.get(), + context.dynamicFilterSplitsProcessed.get(), + DataSize.ofBytes(0), new Duration(context.blockedWallNanos.get(), NANOSECONDS), @@ -641,6 +646,8 @@ private static class WorkProcessorOperatorContext final AtomicLong outputDataSize = new AtomicLong(); final AtomicLong outputPositions = new AtomicLong(); + final AtomicLong dynamicFilterSplitsProcessed = new AtomicLong(); + final AtomicLong peakUserMemoryReservation = new AtomicLong(); final AtomicLong peakSystemMemoryReservation = new AtomicLong(); final AtomicLong peakRevocableMemoryReservation = new AtomicLong(); diff --git a/presto-main/src/main/java/io/prestosql/operator/WorkProcessorSourceOperator.java b/presto-main/src/main/java/io/prestosql/operator/WorkProcessorSourceOperator.java index 6bfc547c90162..723031d3c153a 100644 --- a/presto-main/src/main/java/io/prestosql/operator/WorkProcessorSourceOperator.java +++ b/presto-main/src/main/java/io/prestosql/operator/WorkProcessorSourceOperator.java @@ -62,4 +62,9 @@ default Duration getReadTime() { return new Duration(0, NANOSECONDS); } + + default long getDynamicFilterSplitsProcessed() + { + return 0; + } } diff --git a/presto-main/src/main/java/io/prestosql/operator/WorkProcessorSourceOperatorAdapter.java b/presto-main/src/main/java/io/prestosql/operator/WorkProcessorSourceOperatorAdapter.java index 77649adb3146b..9bfdad021d8ad 100644 --- a/presto-main/src/main/java/io/prestosql/operator/WorkProcessorSourceOperatorAdapter.java +++ b/presto-main/src/main/java/io/prestosql/operator/WorkProcessorSourceOperatorAdapter.java @@ -51,6 +51,7 @@ public class WorkProcessorSourceOperatorAdapter private long previousInputBytes; private long previousInputPositions; private long previousReadTimeNanos; + private long previousDynamicFilterSplitsProcessed; public interface AdapterWorkProcessorSourceOperatorFactory extends WorkProcessorSourceOperatorFactory @@ -188,6 +189,8 @@ private void updateOperatorStats() long currentInputBytes = sourceOperator.getInputDataSize().toBytes(); long currentInputPositions = sourceOperator.getInputPositions(); + long currentDynamicFilterSplitsProcessed = sourceOperator.getDynamicFilterSplitsProcessed(); + if (currentPhysicalInputBytes != previousPhysicalInputBytes || currentPhysicalInputPositions != previousPhysicalInputPositions || currentReadTimeNanos != previousReadTimeNanos) { @@ -220,6 +223,11 @@ private void updateOperatorStats() previousInputBytes = currentInputBytes; previousInputPositions = currentInputPositions; } + + if (currentDynamicFilterSplitsProcessed != previousDynamicFilterSplitsProcessed) { + operatorContext.recordDynamicFilterSplitProcessed(currentDynamicFilterSplitsProcessed - previousDynamicFilterSplitsProcessed); + previousDynamicFilterSplitsProcessed = currentDynamicFilterSplitsProcessed; + } } private static class SplitBuffer diff --git a/presto-main/src/test/java/io/prestosql/execution/TestQueryStats.java b/presto-main/src/test/java/io/prestosql/execution/TestQueryStats.java index 1e6a501b3888b..69431e183f16e 100644 --- a/presto-main/src/test/java/io/prestosql/execution/TestQueryStats.java +++ b/presto-main/src/test/java/io/prestosql/execution/TestQueryStats.java @@ -60,6 +60,7 @@ public class TestQueryStats new Duration(114, NANOSECONDS), succinctBytes(116L), 117L, + 1833, succinctBytes(118L), new Duration(119, NANOSECONDS), 120L, @@ -98,6 +99,7 @@ public class TestQueryStats new Duration(214, NANOSECONDS), succinctBytes(216L), 217L, + 2833, succinctBytes(218L), new Duration(219, NANOSECONDS), 220L, @@ -136,6 +138,7 @@ public class TestQueryStats new Duration(314, NANOSECONDS), succinctBytes(316L), 317L, + 3833, succinctBytes(318L), new Duration(319, NANOSECONDS), 320L, diff --git a/presto-main/src/test/java/io/prestosql/operator/TestOperatorStats.java b/presto-main/src/test/java/io/prestosql/operator/TestOperatorStats.java index 41af58720e31d..e0b7b96f292f9 100644 --- a/presto-main/src/test/java/io/prestosql/operator/TestOperatorStats.java +++ b/presto-main/src/test/java/io/prestosql/operator/TestOperatorStats.java @@ -57,6 +57,7 @@ public class TestOperatorStats new Duration(11, NANOSECONDS), DataSize.ofBytes(12), 13, + 533, DataSize.ofBytes(14), @@ -103,6 +104,7 @@ public class TestOperatorStats new Duration(11, NANOSECONDS), DataSize.ofBytes(12), 13, + 533, DataSize.ofBytes(14), @@ -159,6 +161,8 @@ public static void assertExpectedOperatorStats(OperatorStats actual) assertEquals(actual.getOutputDataSize(), DataSize.ofBytes(12)); assertEquals(actual.getOutputPositions(), 13); + assertEquals(actual.getDynamicFilterSplitsProcessed(), 533); + assertEquals(actual.getPhysicalWrittenDataSize(), DataSize.ofBytes(14)); assertEquals(actual.getBlockedWall(), new Duration(15, NANOSECONDS)); @@ -207,6 +211,8 @@ public void testAdd() assertEquals(actual.getOutputDataSize(), DataSize.ofBytes(3 * 12)); assertEquals(actual.getOutputPositions(), 3 * 13); + assertEquals(actual.getDynamicFilterSplitsProcessed(), 3 * 533); + assertEquals(actual.getPhysicalWrittenDataSize(), DataSize.ofBytes(3 * 14)); assertEquals(actual.getBlockedWall(), new Duration(3 * 15, NANOSECONDS)); @@ -253,6 +259,8 @@ public void testAddMergeable() assertEquals(actual.getOutputDataSize(), DataSize.ofBytes(3 * 12)); assertEquals(actual.getOutputPositions(), 3 * 13); + assertEquals(actual.getDynamicFilterSplitsProcessed(), 3 * 533); + assertEquals(actual.getPhysicalWrittenDataSize(), DataSize.ofBytes(3 * 14)); assertEquals(actual.getBlockedWall(), new Duration(3 * 15, NANOSECONDS));