Skip to content

Commit

Permalink
Introduce yield signals to operators
Browse files Browse the repository at this point in the history
Splits can run for hours even after they have been killed. This is due
to expensive functions (e.g., regexp) or cross join. Generally, an
operator is not going to stop until it hits the page limit. The stop
condition is not subjective to time constraints given time checking is
at driver level. This patch adds a yield signal that will be set when
the time is up. An operator is actively checking the yield signal and
can stop even when the page is not full.
The patch adds yield signals to lookup join operators and cursor
processors
  • Loading branch information
highker committed Aug 20, 2017
1 parent 2dc00ed commit e13a99d
Show file tree
Hide file tree
Showing 13 changed files with 431 additions and 87 deletions.
Expand Up @@ -234,6 +234,7 @@ public ListenableFuture<?> processFor(Duration duration)

Optional<ListenableFuture<?>> result = tryWithLock(100, TimeUnit.MILLISECONDS, () -> {
driverContext.startProcessTimer();
driverContext.getYieldSignal().setWithDelay(maxRuntime, driverContext.getYieldExecutor());
try {
long start = System.nanoTime();
do {
Expand All @@ -245,6 +246,7 @@ public ListenableFuture<?> processFor(Duration duration)
while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
}
finally {
driverContext.getYieldSignal().reset();
driverContext.recordProcessed();
}
return NOT_BLOCKED;
Expand Down
Expand Up @@ -85,6 +85,8 @@ public class DriverContext
private final AtomicLong systemMemoryReservation = new AtomicLong();
private final AtomicLong revocableMemoryReservation = new AtomicLong();

private final DriverYieldSignal yieldSignal;

private final List<OperatorContext> operatorContexts = new CopyOnWriteArrayList<>();
private final boolean partitioned;

Expand All @@ -94,6 +96,7 @@ public DriverContext(PipelineContext pipelineContext, Executor notificationExecu
this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null");
this.yieldExecutor = requireNonNull(yieldExecutor, "scheduler is null");
this.partitioned = partitioned;
this.yieldSignal = new DriverYieldSignal();
}

public TaskId getTaskId()
Expand Down Expand Up @@ -272,6 +275,11 @@ public void freeSpill(long bytes)
pipelineContext.freeSpill(bytes);
}

public DriverYieldSignal getYieldSignal()
{
return yieldSignal;
}

public long getSystemMemoryUsage()
{
return systemMemoryReservation.get();
Expand Down Expand Up @@ -457,6 +465,11 @@ public boolean isPartitioned()
return partitioned;
}

public ScheduledExecutorService getYieldExecutor()
{
return yieldExecutor;
}

private long currentThreadUserTime()
{
if (!isCpuTimerEnabled()) {
Expand Down
@@ -0,0 +1,95 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator;

import com.google.common.annotations.VisibleForTesting;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

/**
* Methods setWithDelay and reset should be used in pairs;
* usually follow the following idiom:
* <pre> {@code
* DriverYieldSignal signal = ...;
* signal.setWithDelay(duration, executor);
* try {
* // block
* } finally {
* signal.reset();
* }
* </pre>
*/
@ThreadSafe
public class DriverYieldSignal
{
@GuardedBy("this")
private long runningSequence;

@GuardedBy("this")
private ScheduledFuture<?> yieldFuture;

private final AtomicBoolean yield = new AtomicBoolean();

public synchronized void setWithDelay(long maxRunNanos, ScheduledExecutorService executor)
{
checkState(yieldFuture == null, "there is an ongoing yield");
checkState(!isSet(), "yield while driver was not running");

this.runningSequence++;
long expectedRunningSequence = this.runningSequence;
yieldFuture = executor.schedule(() -> {
synchronized (this) {
if (expectedRunningSequence == runningSequence && yieldFuture != null) {
yield.set(true);
}
}
}, maxRunNanos, NANOSECONDS);
}

public synchronized void reset()
{
checkState(yieldFuture != null, "there is no ongoing yield");
yield.set(false);
yieldFuture.cancel(true);
yieldFuture = null;
}

public boolean isSet()
{
return yield.get();
}

public synchronized String toString()
{
return toStringHelper(this)
.add("yieldScheduled", yieldFuture != null)
.add("yield", yield.get())
.toString();
}

@VisibleForTesting
public synchronized void forceYieldForTesting()
{
yield.set(true);
}
}
Expand Up @@ -32,8 +32,6 @@
public class LookupJoinOperator
implements Operator, Closeable
{
private static final int MAX_POSITIONS_EVALUATED_PER_CALL = 10000;

private final OperatorContext operatorContext;
private final List<Type> types;
private final ListenableFuture<? extends LookupSource> lookupSourceFuture;
Expand Down Expand Up @@ -153,11 +151,11 @@ public Page getOutput()
}

// join probe page with the lookup source
Counter lookupPositionsConsidered = new Counter();
DriverYieldSignal yieldSignal = operatorContext.getDriverContext().getYieldSignal();
if (probe != null) {
while (true) {
while (!yieldSignal.isSet()) {
if (probe.getPosition() >= 0) {
if (!joinCurrentPosition(lookupPositionsConsidered)) {
if (!joinCurrentPosition(yieldSignal)) {
break;
}
if (!currentProbePositionProducedRow) {
Expand Down Expand Up @@ -210,11 +208,10 @@ public void close()
*
* @return true if all eligible rows have been produced; false otherwise (because pageBuilder became full)
*/
private boolean joinCurrentPosition(Counter lookupPositionsConsidered)
private boolean joinCurrentPosition(DriverYieldSignal yieldSignal)
{
// while we have a position on lookup side to join against...
while (joinPosition >= 0) {
lookupPositionsConsidered.increment();
if (lookupSource.isJoinPositionEligible(joinPosition, probe.getPosition(), probe.getPage())) {
currentProbePositionProducedRow = true;

Expand All @@ -229,10 +226,7 @@ private boolean joinCurrentPosition(Counter lookupPositionsConsidered)
// get next position on lookup side for this probe row
joinPosition = lookupSource.getNextJoinPosition(joinPosition, probe.getPosition(), probe.getPage());

if (lookupPositionsConsidered.get() >= MAX_POSITIONS_EVALUATED_PER_CALL) {
return false;
}
if (pageBuilder.isFull()) {
if (yieldSignal.isSet() || pageBuilder.isFull()) {
return false;
}
}
Expand Down Expand Up @@ -278,20 +272,4 @@ private boolean outerJoinCurrentPosition()
}
return true;
}

// This class needs to be public because LookupJoinOperator is isolated.
public static class Counter
{
private int count;

public void increment()
{
count++;
}

public int get()
{
return count;
}
}
}
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.memory.LocalMemoryContext;
import com.facebook.presto.metadata.Split;
import com.facebook.presto.operator.project.CursorProcessor;
import com.facebook.presto.operator.project.CursorProcessorOutput;
import com.facebook.presto.operator.project.PageProcessor;
import com.facebook.presto.operator.project.PageProcessorOutput;
import com.facebook.presto.spi.ColumnHandle;
Expand Down Expand Up @@ -50,8 +51,6 @@
public class ScanFilterAndProjectOperator
implements SourceOperator, Closeable
{
private static final int ROWS_PER_PAGE = 16384;

private final OperatorContext operatorContext;
private final PlanNodeId planNodeId;
private final PageSourceProvider pageSourceProvider;
Expand Down Expand Up @@ -236,20 +235,17 @@ public Page getOutput()

private Page processColumnSource()
{
if (!finishing) {
int rowsProcessed = cursorProcessor.process(operatorContext.getSession().toConnectorSession(), cursor, ROWS_PER_PAGE, pageBuilder);

DriverYieldSignal yieldSignal = operatorContext.getDriverContext().getYieldSignal();
if (!finishing && !yieldSignal.isSet()) {
CursorProcessorOutput output = cursorProcessor.process(operatorContext.getSession().toConnectorSession(), yieldSignal, cursor, pageBuilder);
pageSourceMemoryContext.setBytes(cursor.getSystemMemoryUsage());

long bytesProcessed = cursor.getCompletedBytes() - completedBytes;
long elapsedNanos = cursor.getReadTimeNanos() - readTimeNanos;
operatorContext.recordGeneratedInput(bytesProcessed, rowsProcessed, elapsedNanos);
operatorContext.recordGeneratedInput(bytesProcessed, output.getProcessedRows(), elapsedNanos);
completedBytes = cursor.getCompletedBytes();
readTimeNanos = cursor.getReadTimeNanos();

if (rowsProcessed == 0) {
finishing = true;
}
finishing = output.isNoMoreRows();
}

// only return a page if buffer is full or we are finishing
Expand Down
Expand Up @@ -13,14 +13,12 @@
*/
package com.facebook.presto.operator.project;

import com.facebook.presto.operator.DriverYieldSignal;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PageBuilder;
import com.facebook.presto.spi.RecordCursor;

public interface CursorProcessor
{
/**
* @return 0 if processing is complete
*/
int process(ConnectorSession session, RecordCursor cursor, int count, PageBuilder pageBuilder);
CursorProcessorOutput process(ConnectorSession session, DriverYieldSignal yieldSignal, RecordCursor cursor, PageBuilder pageBuilder);
}
@@ -0,0 +1,49 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.operator.project;

import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;

public class CursorProcessorOutput
{
private final int processedRows;
private final boolean finished;

public CursorProcessorOutput(int processedRows, boolean finished)
{
checkArgument(processedRows >= 0, "processedRows should be no smaller than 0");
this.processedRows = processedRows;
this.finished = finished;
}

public int getProcessedRows()
{
return processedRows;
}

public boolean isNoMoreRows()
{
return finished;
}

@Override
public String toString()
{
return toStringHelper(this)
.add("processedRows", processedRows)
.add("finished", finished)
.toString();
}
}
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.Session;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.operator.DriverYieldSignal;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PageBuilder;
import com.facebook.presto.spi.RecordCursor;
Expand All @@ -40,6 +41,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.lang.Boolean.TRUE;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;

public class InterpretedCursorProcessor
implements CursorProcessor
Expand Down Expand Up @@ -91,19 +93,19 @@ private static ExpressionInterpreter getExpressionInterpreter(
}

@Override
public int process(ConnectorSession session, RecordCursor cursor, int count, PageBuilder pageBuilder)
public CursorProcessorOutput process(ConnectorSession session, DriverYieldSignal yieldSignal, RecordCursor cursor, PageBuilder pageBuilder)
{
checkArgument(!pageBuilder.isFull(), "page builder can't be full");
checkArgument(count > 0, "count must be > 0");
requireNonNull(yieldSignal, "yieldSignal is null");

int position = 0;
for (; position < count; position++) {
if (pageBuilder.isFull()) {
break;
while (true) {
if (pageBuilder.isFull() || yieldSignal.isSet()) {
return new CursorProcessorOutput(position, false);
}

if (!cursor.advanceNextPosition()) {
break;
return new CursorProcessorOutput(position, true);
}

if (filter(cursor)) {
Expand All @@ -112,8 +114,8 @@ public int process(ConnectorSession session, RecordCursor cursor, int count, Pag
project(cursor, channel, pageBuilder);
}
}
position++;
}
return position;
}

private boolean filter(RecordCursor cursor)
Expand Down

0 comments on commit e13a99d

Please sign in to comment.