From ae7b97150e11a15d6591e225f3b15ef584526532 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Tue, 4 Jun 2019 22:48:48 +0200 Subject: [PATCH] Port ScanFilterAndProjectOperator to WorkProcessorSourceOperator --- .../hive/TestOrcPageSourceMemoryTracking.java | 1 + .../ScanFilterAndProjectOperator.java | 260 ++++++++---------- .../WorkProcessorSourceOperatorAdapter.java | 245 +++++++++++++++++ ...BenchmarkScanFilterAndProjectOperator.java | 1 + 4 files changed, 360 insertions(+), 147 deletions(-) create mode 100644 presto-main/src/main/java/io/prestosql/operator/WorkProcessorSourceOperatorAdapter.java diff --git a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestOrcPageSourceMemoryTracking.java b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestOrcPageSourceMemoryTracking.java index dbf49cf79faca..2b716c4c9e4d8 100644 --- a/presto-hive/src/test/java/io/prestosql/plugin/hive/TestOrcPageSourceMemoryTracking.java +++ b/presto-hive/src/test/java/io/prestosql/plugin/hive/TestOrcPageSourceMemoryTracking.java @@ -527,6 +527,7 @@ public SourceOperator newScanFilterAndProjectOperator(DriverContext driverContex 0); SourceOperator operator = sourceOperatorFactory.createOperator(driverContext); operator.addSplit(new Split(new CatalogName("test"), TestingSplit.createLocalSplit(), Lifespan.taskWide())); + operator.noMoreSplits(); return operator; } diff --git a/presto-main/src/main/java/io/prestosql/operator/ScanFilterAndProjectOperator.java b/presto-main/src/main/java/io/prestosql/operator/ScanFilterAndProjectOperator.java index 107e2c867ec49..51d38ef4f560d 100644 --- a/presto-main/src/main/java/io/prestosql/operator/ScanFilterAndProjectOperator.java +++ b/presto-main/src/main/java/io/prestosql/operator/ScanFilterAndProjectOperator.java @@ -14,14 +14,16 @@ package io.prestosql.operator; import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import io.airlift.units.DataSize; +import io.airlift.units.Duration; +import io.prestosql.Session; import io.prestosql.memory.context.AggregatedMemoryContext; import io.prestosql.memory.context.LocalMemoryContext; +import io.prestosql.memory.context.MemoryTrackingContext; import io.prestosql.metadata.Split; import io.prestosql.metadata.TableHandle; import io.prestosql.operator.WorkProcessor.ProcessState; +import io.prestosql.operator.WorkProcessor.TransformationState; import io.prestosql.operator.project.CursorProcessor; import io.prestosql.operator.project.CursorProcessorOutput; import io.prestosql.operator.project.PageProcessor; @@ -38,7 +40,6 @@ import io.prestosql.split.PageSourceProvider; import io.prestosql.sql.planner.plan.PlanNodeId; -import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; @@ -48,33 +49,33 @@ import static com.google.common.base.Preconditions.checkState; import static io.airlift.concurrent.MoreFutures.toListenableFuture; +import static io.airlift.units.DataSize.Unit.BYTE; import static io.prestosql.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static io.prestosql.operator.PageUtils.recordMaterializedBytes; +import static io.prestosql.operator.WorkProcessor.TransformationState.finished; +import static io.prestosql.operator.WorkProcessor.TransformationState.ofResult; import static io.prestosql.operator.project.MergePages.mergePages; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.NANOSECONDS; public class ScanFilterAndProjectOperator - implements SourceOperator, Closeable + implements WorkProcessorSourceOperator { - private final OperatorContext operatorContext; - private final PlanNodeId planNodeId; - private final SettableFuture blockedOnSplits = SettableFuture.create(); + private final WorkProcessor pages; - private WorkProcessor pages; private RecordCursor cursor; private ConnectorPageSource pageSource; - private Split split; - private boolean operatorFinishing; - - private long deltaPositionsCount; - private long deltaPhysicalBytes; - private long deltaPhysicalReadTimeNanos; - private long deltaProcessedBytes; + private long processedPositions; + private long processedBytes; + private long physicalBytes; + private long readTimeNanos; private ScanFilterAndProjectOperator( - OperatorContext operatorContext, - PlanNodeId sourceId, + Session session, + MemoryTrackingContext memoryTrackingContext, + DriverYieldSignal yieldSignal, + WorkProcessor splits, PageSourceProvider pageSourceProvider, CursorProcessor cursorProcessor, PageProcessor pageProcessor, @@ -84,54 +85,24 @@ private ScanFilterAndProjectOperator( DataSize minOutputPageSize, int minOutputPageRowCount) { - this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); - this.planNodeId = requireNonNull(sourceId, "sourceId is null"); - - pages = WorkProcessor.create( + pages = splits.flatTransform( new SplitToPages( + session, + yieldSignal, pageSourceProvider, cursorProcessor, pageProcessor, table, columns, types, - operatorContext.aggregateSystemMemoryContext(), + requireNonNull(memoryTrackingContext, "memoryTrackingContext is null").aggregateSystemMemoryContext(), minOutputPageSize, - minOutputPageRowCount)) - .transformProcessor(WorkProcessor::flatten) - .finishWhen(() -> operatorFinishing); + minOutputPageRowCount)); } @Override - public OperatorContext getOperatorContext() + public Supplier> getUpdatablePageSourceSupplier() { - return operatorContext; - } - - @Override - public PlanNodeId getSourceId() - { - return planNodeId; - } - - @Override - public Supplier> addSplit(Split split) - { - requireNonNull(split, "split is null"); - checkState(this.split == null, "Table scan split already set"); - - if (operatorFinishing) { - return Optional::empty; - } - - this.split = split; - - Object splitInfo = split.getInfo(); - if (splitInfo != null) { - operatorContext.setInfoSupplier(() -> new SplitOperatorInfo(splitInfo)); - } - blockedOnSplits.set(null); - return () -> { if (pageSource instanceof UpdatablePageSource) { return Optional.of((UpdatablePageSource) pageSource); @@ -141,79 +112,62 @@ public Supplier> addSplit(Split split) } @Override - public void noMoreSplits() - { - blockedOnSplits.set(null); - } - - @Override - public void close() + public DataSize getPhysicalInputDataSize() { - if (pageSource != null) { - try { - pageSource.close(); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - else if (cursor != null) { - cursor.close(); - } + return new DataSize(physicalBytes, BYTE); } @Override - public void finish() + public long getPhysicalInputPositions() { - blockedOnSplits.set(null); - operatorFinishing = true; + return processedPositions; } @Override - public final boolean isFinished() + public DataSize getInputDataSize() { - return pages.isFinished(); + return new DataSize(processedBytes, BYTE); } @Override - public ListenableFuture isBlocked() + public long getInputPositions() { - if (pages.isBlocked()) { - return pages.getBlockedFuture(); - } - - return NOT_BLOCKED; + return processedPositions; } @Override - public final boolean needsInput() + public Duration getReadTime() { - return false; + return new Duration(readTimeNanos, NANOSECONDS); } @Override - public final void addInput(Page page) + public WorkProcessor getOutputPages() { - throw new UnsupportedOperationException(); + return pages; } @Override - public Page getOutput() + public void close() { - if (!pages.process()) { - return null; + if (pageSource != null) { + try { + pageSource.close(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } } - - if (pages.isFinished()) { - return null; + else if (cursor != null) { + cursor.close(); } - - return pages.getResult(); } private class SplitToPages - implements WorkProcessor.Process> + implements WorkProcessor.Transformation> { + final Session session; + final DriverYieldSignal yieldSignal; final PageSourceProvider pageSourceProvider; final CursorProcessor cursorProcessor; final PageProcessor pageProcessor; @@ -227,9 +181,9 @@ private class SplitToPages final DataSize minOutputPageSize; final int minOutputPageRowCount; - boolean finished; - SplitToPages( + Session session, + DriverYieldSignal yieldSignal, PageSourceProvider pageSourceProvider, CursorProcessor cursorProcessor, PageProcessor pageProcessor, @@ -240,6 +194,8 @@ private class SplitToPages DataSize minOutputPageSize, int minOutputPageRowCount) { + this.session = requireNonNull(session, "session is null"); + this.yieldSignal = requireNonNull(yieldSignal, "yieldSignal is null"); this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null"); this.cursorProcessor = requireNonNull(cursorProcessor, "cursorProcessor is null"); this.pageProcessor = requireNonNull(pageProcessor, "pageProcessor is null"); @@ -255,42 +211,38 @@ private class SplitToPages } @Override - public ProcessState> process() + public TransformationState> process(Split split) { - if (finished) { + if (split == null) { memoryContext.close(); - return ProcessState.finished(); - } - - if (!blockedOnSplits.isDone()) { - return ProcessState.blocked(blockedOnSplits); + return finished(); } - finished = true; + checkState(cursor == null && pageSource == null, "Table scan split already set"); ConnectorPageSource source; if (split.getConnectorSplit() instanceof EmptySplit) { source = new EmptySplitPageSource(); } else { - source = pageSourceProvider.createPageSource(operatorContext.getSession(), split, table, columns); + source = pageSourceProvider.createPageSource(session, split, table, columns); } if (source instanceof RecordPageSource) { cursor = ((RecordPageSource) source).getCursor(); - return ProcessState.ofResult(processColumnSource()); + return ofResult(processColumnSource()); } else { pageSource = source; - return ProcessState.ofResult(processPageSource()); + return ofResult(processPageSource()); } } WorkProcessor processColumnSource() { return WorkProcessor - .create(new RecordCursorToPages(cursorProcessor, types, pageSourceMemoryContext, outputMemoryContext)) - .yielding(() -> operatorContext.getDriverContext().getYieldSignal().isSet()) + .create(new RecordCursorToPages(session, yieldSignal, cursorProcessor, types, pageSourceMemoryContext, outputMemoryContext)) + .yielding(yieldSignal::isSet) .withProcessStateMonitor(state -> memoryContext.setBytes(localAggregatedMemoryContext.getBytes())); } @@ -298,39 +250,39 @@ WorkProcessor processPageSource() { return WorkProcessor .create(new ConnectorPageSourceToPages(pageSourceMemoryContext)) - .yielding(() -> operatorContext.getDriverContext().getYieldSignal().isSet()) + .yielding(yieldSignal::isSet) .flatMap(page -> pageProcessor.createWorkProcessor( - operatorContext.getSession().toConnectorSession(), - operatorContext.getDriverContext().getYieldSignal(), + session.toConnectorSession(), + yieldSignal, outputMemoryContext, page)) .transformProcessor(processor -> mergePages(types, minOutputPageSize.toBytes(), minOutputPageRowCount, processor, localAggregatedMemoryContext)) - .withProcessStateMonitor(state -> { - memoryContext.setBytes(localAggregatedMemoryContext.getBytes()); - operatorContext.recordPhysicalInputWithTiming(deltaPhysicalBytes, deltaPositionsCount, deltaPhysicalReadTimeNanos); - operatorContext.recordProcessedInput(deltaProcessedBytes, deltaPositionsCount); - deltaPositionsCount = 0; - deltaPhysicalBytes = 0; - deltaPhysicalReadTimeNanos = 0; - deltaProcessedBytes = 0; - }); + .withProcessStateMonitor(state -> memoryContext.setBytes(localAggregatedMemoryContext.getBytes())); } } private class RecordCursorToPages implements WorkProcessor.Process { + final Session session; + final DriverYieldSignal yieldSignal; final CursorProcessor cursorProcessor; final PageBuilder pageBuilder; final LocalMemoryContext pageSourceMemoryContext; final LocalMemoryContext outputMemoryContext; - long completedBytes; - long readTimeNanos; boolean finished; - RecordCursorToPages(CursorProcessor cursorProcessor, List types, LocalMemoryContext pageSourceMemoryContext, LocalMemoryContext outputMemoryContext) + RecordCursorToPages( + Session session, + DriverYieldSignal yieldSignal, + CursorProcessor cursorProcessor, + List types, + LocalMemoryContext pageSourceMemoryContext, + LocalMemoryContext outputMemoryContext) { + this.session = session; + this.yieldSignal = yieldSignal; this.cursorProcessor = cursorProcessor; this.pageBuilder = new PageBuilder(types); this.pageSourceMemoryContext = pageSourceMemoryContext; @@ -341,16 +293,13 @@ private class RecordCursorToPages public ProcessState process() { if (!finished) { - DriverYieldSignal yieldSignal = operatorContext.getDriverContext().getYieldSignal(); - CursorProcessorOutput output = cursorProcessor.process(operatorContext.getSession().toConnectorSession(), yieldSignal, cursor, pageBuilder); + CursorProcessorOutput output = cursorProcessor.process(session.toConnectorSession(), yieldSignal, cursor, pageBuilder); pageSourceMemoryContext.setBytes(cursor.getSystemMemoryUsage()); - long bytesProcessed = cursor.getCompletedBytes() - completedBytes; - long elapsedNanos = cursor.getReadTimeNanos() - readTimeNanos; - operatorContext.recordPhysicalInputWithTiming(bytesProcessed, output.getProcessedRows(), elapsedNanos); + processedPositions += output.getProcessedRows(); // TODO: derive better values for cursors - operatorContext.recordProcessedInput(bytesProcessed, output.getProcessedRows()); - completedBytes = cursor.getCompletedBytes(); + processedBytes = cursor.getCompletedBytes(); + physicalBytes = cursor.getCompletedBytes(); readTimeNanos = cursor.getReadTimeNanos(); if (output.isNoMoreRows()) { finished = true; @@ -380,9 +329,6 @@ private class ConnectorPageSourceToPages { final LocalMemoryContext pageSourceMemoryContext; - long completedBytes; - long readTimeNanos; - ConnectorPageSourceToPages(LocalMemoryContext pageSourceMemoryContext) { this.pageSourceMemoryContext = pageSourceMemoryContext; @@ -412,23 +358,19 @@ public ProcessState process() } } - page = recordMaterializedBytes(page, sizeInBytes -> deltaProcessedBytes += sizeInBytes); + page = recordMaterializedBytes(page, sizeInBytes -> processedBytes += sizeInBytes); // update operator stats - long endCompletedBytes = pageSource.getCompletedBytes(); - long endReadTimeNanos = pageSource.getReadTimeNanos(); - deltaPositionsCount += page.getPositionCount(); - deltaPhysicalBytes += endCompletedBytes - completedBytes; - deltaPhysicalReadTimeNanos += endReadTimeNanos - readTimeNanos; - completedBytes = endCompletedBytes; - readTimeNanos = endReadTimeNanos; + processedPositions += page.getPositionCount(); + physicalBytes = pageSource.getCompletedBytes(); + readTimeNanos = pageSource.getReadTimeNanos(); return ProcessState.ofResult(page); } } public static class ScanFilterAndProjectOperatorFactory - implements SourceOperatorFactory + implements SourceOperatorFactory, WorkProcessorSourceOperatorFactory { private final int operatorId; private final PlanNodeId planNodeId; @@ -469,20 +411,44 @@ public ScanFilterAndProjectOperatorFactory( this.minOutputPageRowCount = minOutputPageRowCount; } + @Override + public int getOperatorId() + { + return operatorId; + } + @Override public PlanNodeId getSourceId() { return sourceId; } + @Override + public String getOperatorType() + { + return ScanFilterAndProjectOperator.class.getSimpleName(); + } + @Override public SourceOperator createOperator(DriverContext driverContext) { checkState(!closed, "Factory is already closed"); - OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, ScanFilterAndProjectOperator.class.getSimpleName()); + OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, getOperatorType()); + return new WorkProcessorSourceOperatorAdapter(operatorContext, this); + } + + @Override + public WorkProcessorSourceOperator create( + Session session, + MemoryTrackingContext memoryTrackingContext, + DriverYieldSignal yieldSignal, + WorkProcessor splits) + { return new ScanFilterAndProjectOperator( - operatorContext, - sourceId, + session, + memoryTrackingContext, + yieldSignal, + splits, pageSourceProvider, cursorProcessor.get(), pageProcessor.get(), diff --git a/presto-main/src/main/java/io/prestosql/operator/WorkProcessorSourceOperatorAdapter.java b/presto-main/src/main/java/io/prestosql/operator/WorkProcessorSourceOperatorAdapter.java new file mode 100644 index 0000000000000..4b888e3b1abfc --- /dev/null +++ b/presto-main/src/main/java/io/prestosql/operator/WorkProcessorSourceOperatorAdapter.java @@ -0,0 +1,245 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.operator; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.prestosql.memory.context.MemoryTrackingContext; +import io.prestosql.metadata.Split; +import io.prestosql.spi.Page; +import io.prestosql.spi.connector.UpdatablePageSource; +import io.prestosql.sql.planner.plan.PlanNodeId; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.function.Supplier; + +import static io.prestosql.operator.WorkProcessor.ProcessState.blocked; +import static io.prestosql.operator.WorkProcessor.ProcessState.finished; +import static io.prestosql.operator.WorkProcessor.ProcessState.ofResult; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +public class WorkProcessorSourceOperatorAdapter + implements SourceOperator +{ + private final OperatorContext operatorContext; + private final PlanNodeId sourceId; + private final WorkProcessorSourceOperator sourceOperator; + private final WorkProcessor pages; + private final SplitBuffer splitBuffer; + + private boolean operatorFinishing; + + private long previousPhysicalInputBytes; + private long previousPhysicalInputPositions; + private long previousInternalNetworkInputBytes; + private long previousInternalNetworkPositions; + private long previousInputBytes; + private long previousInputPositions; + private long previousReadTimeNanos; + + public WorkProcessorSourceOperatorAdapter(OperatorContext operatorContext, WorkProcessorSourceOperatorFactory sourceOperatorFactory) + { + this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); + this.sourceId = requireNonNull(sourceOperatorFactory, "sourceOperatorFactory is null").getSourceId(); + this.splitBuffer = new SplitBuffer(); + this.sourceOperator = sourceOperatorFactory + .create( + operatorContext.getSession(), + new MemoryTrackingContext( + operatorContext.aggregateUserMemoryContext(), + operatorContext.aggregateRevocableMemoryContext(), + operatorContext.aggregateSystemMemoryContext()), + operatorContext.getDriverContext().getYieldSignal(), + WorkProcessor.create(splitBuffer)); + this.pages = sourceOperator.getOutputPages() + .map(Page::getLoadedPage) + .withProcessStateMonitor(state -> updateOperatorStats()) + .finishWhen(() -> operatorFinishing); + } + + @Override + public PlanNodeId getSourceId() + { + return sourceId; + } + + @Override + public Supplier> addSplit(Split split) + { + if (operatorFinishing) { + return Optional::empty; + } + + Object splitInfo = split.getInfo(); + if (splitInfo != null) { + operatorContext.setInfoSupplier(() -> new SplitOperatorInfo(splitInfo)); + } + + splitBuffer.add(split); + return sourceOperator.getUpdatablePageSourceSupplier(); + } + + @Override + public void noMoreSplits() + { + splitBuffer.noMoreSplits(); + } + + @Override + public OperatorContext getOperatorContext() + { + return operatorContext; + } + + @Override + public ListenableFuture isBlocked() + { + if (!pages.isBlocked()) { + return NOT_BLOCKED; + } + + return pages.getBlockedFuture(); + } + + @Override + public boolean needsInput() + { + return false; + } + + @Override + public void addInput(Page page) + { + throw new UnsupportedOperationException(); + } + + @Override + public Page getOutput() + { + if (!pages.process()) { + return null; + } + + if (pages.isFinished()) { + return null; + } + + return pages.getResult(); + } + + @Override + public void finish() + { + operatorFinishing = true; + noMoreSplits(); + } + + @Override + public boolean isFinished() + { + return pages.isFinished(); + } + + @Override + public void close() + throws Exception + { + sourceOperator.close(); + } + + private void updateOperatorStats() + { + long currentPhysicalInputBytes = sourceOperator.getPhysicalInputDataSize().toBytes(); + long currentPhysicalInputPositions = sourceOperator.getPhysicalInputPositions(); + long currentReadTimeNanos = sourceOperator.getReadTime().roundTo(NANOSECONDS); + + long currentInternalNetworkInputBytes = sourceOperator.getInternalNetworkInputDataSize().toBytes(); + long currentInternalNetworkPositions = sourceOperator.getInternalNetworkPositions(); + + long currentInputBytes = sourceOperator.getInputDataSize().toBytes(); + long currentInputPositions = sourceOperator.getInputPositions(); + + if (currentPhysicalInputBytes != previousPhysicalInputBytes + || currentPhysicalInputPositions != previousPhysicalInputPositions + || currentReadTimeNanos != previousReadTimeNanos) { + operatorContext.recordPhysicalInputWithTiming( + currentPhysicalInputBytes - previousPhysicalInputBytes, + currentPhysicalInputPositions - previousPhysicalInputPositions, + currentReadTimeNanos - previousReadTimeNanos); + + previousPhysicalInputBytes = currentPhysicalInputBytes; + previousPhysicalInputPositions = currentPhysicalInputPositions; + previousReadTimeNanos = currentReadTimeNanos; + } + + if (currentInternalNetworkInputBytes != previousInternalNetworkInputBytes + || currentInternalNetworkPositions != previousInternalNetworkPositions) { + operatorContext.recordNetworkInput( + currentInternalNetworkInputBytes - previousInternalNetworkInputBytes, + currentInternalNetworkPositions - previousInternalNetworkPositions); + + previousInternalNetworkInputBytes = currentInternalNetworkInputBytes; + previousInternalNetworkPositions = currentInternalNetworkPositions; + } + + if (currentInputBytes != previousInputBytes + || currentInputPositions != previousInputPositions) { + operatorContext.recordProcessedInput( + currentInputBytes - previousInputBytes, + currentInputPositions - previousInputPositions); + + previousInputBytes = currentInputBytes; + previousInputPositions = currentInputPositions; + } + } + + private class SplitBuffer + implements WorkProcessor.Process + { + private final List pendingSplits = new ArrayList<>(); + + private SettableFuture blockedOnSplits = SettableFuture.create(); + private boolean noMoreSplits; + + @Override + public WorkProcessor.ProcessState process() + { + if (pendingSplits.isEmpty()) { + if (noMoreSplits) { + return finished(); + } + + blockedOnSplits = SettableFuture.create(); + return blocked(blockedOnSplits); + } + + return ofResult(pendingSplits.remove(0)); + } + + void add(Split split) + { + pendingSplits.add(split); + blockedOnSplits.set(null); + } + + void noMoreSplits() + { + noMoreSplits = true; + blockedOnSplits.set(null); + } + } +} diff --git a/presto-main/src/test/java/io/prestosql/operator/BenchmarkScanFilterAndProjectOperator.java b/presto-main/src/test/java/io/prestosql/operator/BenchmarkScanFilterAndProjectOperator.java index 8b01363565895..0db80b54acae2 100644 --- a/presto-main/src/test/java/io/prestosql/operator/BenchmarkScanFilterAndProjectOperator.java +++ b/presto-main/src/test/java/io/prestosql/operator/BenchmarkScanFilterAndProjectOperator.java @@ -261,6 +261,7 @@ public List benchmarkColumnOriented(Context context) ImmutableList.Builder outputPages = ImmutableList.builder(); operator.addSplit(new Split(new CatalogName("test"), createLocalSplit(), Lifespan.taskWide())); + operator.noMoreSplits(); for (int loops = 0; !operator.isFinished() && loops < 1_000_000; loops++) { Page outputPage = operator.getOutput();