Skip to content

Commit

Permalink
Separate TransformationState and ProcessState
Browse files Browse the repository at this point in the history
ProcessorState was shared between Transformations and Processes. As a
result, certain states could be returned but didn't make sense for a
Process (those requesting more input data).

This commit separates the state classes so that no unsuitable value can
be returned.
  • Loading branch information
findepi committed Oct 31, 2018
1 parent 9658f59 commit b0760b6
Show file tree
Hide file tree
Showing 8 changed files with 305 additions and 211 deletions.
Expand Up @@ -16,7 +16,7 @@
import com.facebook.presto.execution.buffer.SerializedPage;
import com.facebook.presto.memory.context.LocalMemoryContext;
import com.facebook.presto.operator.HttpPageBufferClient.ClientCallback;
import com.facebook.presto.operator.WorkProcessor.ProcessorState;
import com.facebook.presto.operator.WorkProcessor.ProcessState;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -45,9 +45,6 @@
import java.util.concurrent.atomic.AtomicReference;

import static com.facebook.presto.execution.buffer.PageCompression.UNCOMPRESSED;
import static com.facebook.presto.operator.WorkProcessor.ProcessorState.blocked;
import static com.facebook.presto.operator.WorkProcessor.ProcessorState.finished;
import static com.facebook.presto.operator.WorkProcessor.ProcessorState.yield;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static com.google.common.collect.Sets.newConcurrentHashSet;
Expand Down Expand Up @@ -185,18 +182,18 @@ public WorkProcessor<SerializedPage> pages()
SerializedPage page = pollPage();
if (page == null) {
if (isFinished()) {
return finished();
return ProcessState.finished();
}

ListenableFuture<?> blocked = isBlocked();
if (!blocked.isDone()) {
return blocked(blocked);
return ProcessState.blocked(blocked);
}

return yield();
return ProcessState.yield();
}

return ProcessorState.ofResult(page);
return ProcessState.ofResult(page);
});
}

Expand Down
Expand Up @@ -23,9 +23,6 @@
import java.util.function.BooleanSupplier;
import java.util.function.Function;

import static com.facebook.presto.operator.WorkProcessor.ProcessorState.Type.FINISHED;
import static com.facebook.presto.operator.WorkProcessor.ProcessorState.Type.NEEDS_MORE_DATA;
import static com.facebook.presto.operator.WorkProcessor.ProcessorState.Type.YIELD;
import static java.util.Objects.requireNonNull;

public interface WorkProcessor<T>
Expand Down Expand Up @@ -153,40 +150,37 @@ interface Transformation<T, R>
* @param elementOptional an element to be transformed. Will be empty
* when there are no more elements. In such case transformation should
* finish processing and flush any remaining data.
* @return a current transformation state.
* <ul>
* <li>if state is {@link ProcessorState#finished()} then transformation has finished
* and the process method won't be called again;</li>
* <li>if state is {@link ProcessorState#needsMoreData()} then transformation requires
* more data in order to continue. The process method will be called with
* a new input element or with {@link Optional#empty()} if there are no more elements;</li>
* <li>if state is {@link ProcessorState#yield()} then transformation has yielded.
* The process method will be called again with the same input element;</li>
* <li>if state is {@link ProcessorState#blocked(ListenableFuture)} then transformation is blocked.
* The process method will be called again with the same input element after blocked
* future is done;</li>
* <li>if state is {@link ProcessorState#ofResult(Object, boolean)} then the transformation
* has produced a result. If <code>needsMoreData</code> {@link ProcessorState#ofResult(Object, boolean)}
* parameter is <code>true</code> then the process method will be called again with a new element
* (or with {@link Optional#empty()} if there are no more elements).
* If <code>needsMoreData</code> parameter is <code>false</code> then the process method
* will be called again with the same input element.
* </ul>
* @return the current transformation state, optionally bearing a result
* @see TransformationState#needsMoreData()
* @see TransformationState#blocked(ListenableFuture)
* @see TransformationState#yield()
* @see TransformationState#ofResult(Object)
* @see TransformationState#ofResult(Object, boolean)
* @see TransformationState#finished()
*/
ProcessorState<R> process(Optional<T> elementOptional);
TransformationState<R> process(Optional<T> elementOptional);
}

interface Process<T>
{
ProcessorState<T> process();
/**
* Does some work and returns current state.
*
* @return the current state, optionally bearing a result
* @see ProcessState#blocked(ListenableFuture)
* @see ProcessState#yield()
* @see ProcessState#ofResult(Object)
* @see ProcessState#finished()
*/
ProcessState<T> process();
}

@Immutable
final class ProcessorState<T>
final class TransformationState<T>
{
private static final ProcessorState<?> NEEDS_MORE_DATE_STATE = new ProcessorState<>(NEEDS_MORE_DATA, true, Optional.empty(), Optional.empty());
private static final ProcessorState<?> YIELD_STATE = new ProcessorState<>(YIELD, false, Optional.empty(), Optional.empty());
private static final ProcessorState<?> FINISHED_STATE = new ProcessorState<>(FINISHED, false, Optional.empty(), Optional.empty());
private static final TransformationState<?> NEEDS_MORE_DATE_STATE = new TransformationState<>(Type.NEEDS_MORE_DATA, true, Optional.empty(), Optional.empty());
private static final TransformationState<?> YIELD_STATE = new TransformationState<>(Type.YIELD, false, Optional.empty(), Optional.empty());
private static final TransformationState<?> FINISHED_STATE = new TransformationState<>(Type.FINISHED, false, Optional.empty(), Optional.empty());

enum Type
{
Expand All @@ -197,50 +191,74 @@ enum Type
FINISHED
}

private final ProcessorState.Type type;
private final Type type;
private final boolean needsMoreData;
private final Optional<T> result;
private final Optional<ListenableFuture<?>> blocked;

private ProcessorState(Type type, boolean needsMoreData, Optional<T> result, Optional<ListenableFuture<?>> blocked)
private TransformationState(Type type, boolean needsMoreData, Optional<T> result, Optional<ListenableFuture<?>> blocked)
{
this.type = requireNonNull(type, "type is null");
this.needsMoreData = needsMoreData;
this.result = requireNonNull(result, "result is null");
this.blocked = requireNonNull(blocked, "blocked is null");
}

/**
* Signals that transformation requires more data in order to continue and no result has been produced.
* {@link #process()} will be called with a new input element or with {@link Optional#empty()} if there
* are no more elements.
*/
@SuppressWarnings("unchecked")
public static <T> ProcessorState<T> needsMoreData()
public static <T> TransformationState<T> needsMoreData()
{
return (ProcessorState<T>) NEEDS_MORE_DATE_STATE;
return (TransformationState<T>) NEEDS_MORE_DATE_STATE;
}

public static <T> ProcessorState<T> blocked(ListenableFuture<?> blocked)
/**
* Signals that transformation is blocked. {@link #process()} will be called again with the same input
* element after {@code blocked} future is done.
*/
public static <T> TransformationState<T> blocked(ListenableFuture<?> blocked)
{
return new ProcessorState<>(Type.BLOCKED, false, Optional.empty(), Optional.of(blocked));
return new TransformationState<>(Type.BLOCKED, false, Optional.empty(), Optional.of(blocked));
}

/**
* Signals that transformation has yielded. {@link #process()} will be called again with the same input element.
*/
@SuppressWarnings("unchecked")
public static <T> ProcessorState<T> yield()
public static <T> TransformationState<T> yield()
{
return (ProcessorState<T>) YIELD_STATE;
return (TransformationState<T>) YIELD_STATE;
}

public static <T> ProcessorState<T> ofResult(T result)
/**
* Signals that transformation has produced a result from its input. {@link #process()} will be called again with
* a new element or with {@link Optional#empty()} if there are no more elements.
*/
public static <T> TransformationState<T> ofResult(T result)
{
return ofResult(result, true);
}

public static <T> ProcessorState<T> ofResult(T result, boolean needsMoreData)
/**
* Signals that transformation has produced a result. If {@code needsMoreData}, {@link #process()} will be called again
* with a new element (or with {@link Optional#empty()} if there are no more elements). If not @{code needsMoreData},
* {@link #process()} will be called again with the same element.
*/
public static <T> TransformationState<T> ofResult(T result, boolean needsMoreData)
{
return new ProcessorState<>(Type.RESULT, needsMoreData, Optional.of(result), Optional.empty());
return new TransformationState<>(Type.RESULT, needsMoreData, Optional.of(result), Optional.empty());
}

/**
* Signals that transformation has finished. {@link #process()} method will not be called again.
*/
@SuppressWarnings("unchecked")
public static <T> ProcessorState<T> finished()
public static <T> TransformationState<T> finished()
{
return (ProcessorState<T>) FINISHED_STATE;
return (TransformationState<T>) FINISHED_STATE;
}

Type getType()
Expand All @@ -263,4 +281,79 @@ Optional<ListenableFuture<?>> getBlocked()
return blocked;
}
}

@Immutable
final class ProcessState<T>
{
private static final ProcessState<?> YIELD_STATE = new ProcessState<>(Type.YIELD, Optional.empty(), Optional.empty());
private static final ProcessState<?> FINISHED_STATE = new ProcessState<>(Type.FINISHED, Optional.empty(), Optional.empty());

enum Type
{
BLOCKED,
YIELD,
RESULT,
FINISHED
}

private final Type type;
private final Optional<T> result;
private final Optional<ListenableFuture<?>> blocked;

private ProcessState(Type type, Optional<T> result, Optional<ListenableFuture<?>> blocked)
{
this.type = requireNonNull(type, "type is null");
this.result = requireNonNull(result, "result is null");
this.blocked = requireNonNull(blocked, "blocked is null");
}

/**
* Signals that process is blocked. {@link #process()} will be called again after {@code blocked} future is done.
*/
public static <T> ProcessState<T> blocked(ListenableFuture<?> blocked)
{
return new ProcessState<>(Type.BLOCKED, Optional.empty(), Optional.of(blocked));
}

/**
* Signals that process has yielded. {@link #process()} will be called again later.
*/
@SuppressWarnings("unchecked")
public static <T> ProcessState<T> yield()
{
return (ProcessState<T>) YIELD_STATE;
}

/**
* Signals that process has produced a result. {@link #process()} will be called again.
*/
public static <T> ProcessState<T> ofResult(T result)
{
return new ProcessState<>(Type.RESULT, Optional.of(result), Optional.empty());
}

/**
* Signals that process has finished. {@link #process()} method will not be called again.
*/
@SuppressWarnings("unchecked")
public static <T> ProcessState<T> finished()
{
return (ProcessState<T>) FINISHED_STATE;
}

Type getType()
{
return type;
}

Optional<T> getResult()
{
return result;
}

Optional<ListenableFuture<?>> getBlocked()
{
return blocked;
}
}
}

0 comments on commit b0760b6

Please sign in to comment.