Skip to content

Commit

Permalink
Convert StatementResource to use async HTTP responses
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Oct 7, 2017
1 parent d07c78e commit b305f35
Show file tree
Hide file tree
Showing 21 changed files with 508 additions and 319 deletions.
Expand Up @@ -113,6 +113,7 @@ public QueryResults(
this.nextUri = nextUri;
this.columns = (columns != null) ? ImmutableList.copyOf(columns) : null;
this.data = (data != null) ? unmodifiableIterable(data) : null;
checkArgument(data == null || columns != null, "data present without columns");
this.stats = requireNonNull(stats, "stats is null");
this.error = error;
this.updateType = updateType;
Expand Down
Expand Up @@ -30,6 +30,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -140,10 +141,15 @@ public void onFailure(Throwable throwable)
}

@Override
public Duration waitForStateChange(QueryState currentState, Duration maxWait)
throws InterruptedException
public ListenableFuture<QueryOutputInfo> getOutputInfo()
{
return stateMachine.waitForStateChange(currentState, maxWait);
return SettableFuture.create();
}

@Override
public ListenableFuture<QueryState> getStateChange(QueryState currentState)
{
return stateMachine.getStateChange(currentState);
}

@Override
Expand Down
Expand Up @@ -22,6 +22,8 @@
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;

import java.net.URI;
Expand All @@ -30,6 +32,7 @@
import java.util.concurrent.TimeUnit;

import static com.facebook.presto.memory.LocalMemoryManager.GENERAL_POOL;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static java.util.Objects.requireNonNull;

public class FailedQueryExecution
Expand Down Expand Up @@ -111,10 +114,15 @@ public void start()
}

@Override
public Duration waitForStateChange(QueryState currentState, Duration maxWait)
throws InterruptedException
public ListenableFuture<QueryOutputInfo> getOutputInfo()
{
return maxWait;
return SettableFuture.create();
}

@Override
public ListenableFuture<QueryState> getStateChange(QueryState currentState)
{
return immediateFuture(queryInfo.getState());
}

@Override
Expand Down
Expand Up @@ -13,20 +13,29 @@
*/
package com.facebook.presto.execution;

import com.facebook.presto.OutputBuffers.OutputBufferId;
import com.facebook.presto.Session;
import com.facebook.presto.execution.StateMachine.StateChangeListener;
import com.facebook.presto.memory.VersionedMemoryPoolId;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.resourceGroups.QueryType;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.Statement;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;

import java.net.URI;
import java.util.List;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

public interface QueryExecution
{
QueryId getQueryId();
Expand All @@ -35,15 +44,16 @@ public interface QueryExecution

QueryState getState();

ListenableFuture<QueryState> getStateChange(QueryState currentState);

ListenableFuture<QueryOutputInfo> getOutputInfo();

Optional<ResourceGroupId> getResourceGroup();

void setResourceGroup(ResourceGroupId resourceGroupId);

Plan getQueryPlan();

Duration waitForStateChange(QueryState currentState, Duration maxWait)
throws InterruptedException;

VersionedMemoryPoolId getMemoryPool();

void setMemoryPool(VersionedMemoryPoolId poolId);
Expand Down Expand Up @@ -77,4 +87,33 @@ interface QueryExecutionFactory<T extends QueryExecution>
}

Optional<QueryType> getQueryType();

class QueryOutputInfo
{
private final List<String> columnNames;
private final List<Type> columnTypes;
private final SetMultimap<OutputBufferId, URI> bufferLocations;

public QueryOutputInfo(List<String> columnNames, List<Type> columnTypes, SetMultimap<OutputBufferId, URI> bufferLocations)
{
this.columnNames = ImmutableList.copyOf(requireNonNull(columnNames, "columnNames is null"));
this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null"));
this.bufferLocations = ImmutableSetMultimap.copyOf(requireNonNull(bufferLocations, "bufferLocations is null"));
}

public List<String> getColumnNames()
{
return columnNames;
}

public List<Type> getColumnTypes()
{
return columnTypes;
}

public SetMultimap<OutputBufferId, URI> getBufferLocations()
{
return bufferLocations;
}
}
}
Expand Up @@ -13,11 +13,12 @@
*/
package com.facebook.presto.execution;

import com.facebook.presto.execution.QueryExecution.QueryOutputInfo;
import com.facebook.presto.server.SessionContext;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.resourceGroups.ResourceGroupId;
import com.facebook.presto.sql.planner.Plan;
import io.airlift.units.Duration;
import com.google.common.util.concurrent.ListenableFuture;

import java.util.List;
import java.util.Optional;
Expand All @@ -26,8 +27,9 @@ public interface QueryManager
{
List<QueryInfo> getAllQueryInfo();

Duration waitForStateChange(QueryId queryId, QueryState currentState, Duration maxWait)
throws InterruptedException;
ListenableFuture<QueryOutputInfo> getOutputInfo(QueryId queryId);

ListenableFuture<QueryState> getStateChange(QueryId queryId, QueryState currentState);

QueryInfo getQueryInfo(QueryId queryId);

Expand Down
Expand Up @@ -701,10 +701,9 @@ public void addQueryInfoStateChangeListener(StateChangeListener<QueryInfo> state
fireOnceStateChangeListener.stateChanged(finalQueryInfo.get());
}

public Duration waitForStateChange(QueryState currentState, Duration maxWait)
throws InterruptedException
public ListenableFuture<QueryState> getStateChange(QueryState currentState)
{
return queryState.waitForStateChange(currentState, maxWait);
return queryState.getStateChange(currentState);
}

public void recordHeartbeat()
Expand Down
Expand Up @@ -74,6 +74,10 @@
import com.facebook.presto.sql.tree.Statement;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.SetThreadName;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
Expand Down Expand Up @@ -134,6 +138,7 @@ public final class SqlQueryExecution
private final ExecutionPolicy executionPolicy;
private final List<Expression> parameters;
private final SplitSchedulerStats schedulerStats;
private final SettableFuture<QueryOutputInfo> outputInfo = SettableFuture.create();

public SqlQueryExecution(QueryId queryId,
String query,
Expand Down Expand Up @@ -442,6 +447,20 @@ private void planDistribution(PlanRoot plan)

queryScheduler.set(scheduler);

Futures.addCallback(scheduler.getRootStageOutputBufferLocations(), new FutureCallback<QueryOutputInfo>()
{
@Override
public void onSuccess(QueryOutputInfo result)
{
outputInfo.set(result);
}

@Override
public void onFailure(Throwable t)
{
}
});

// if query was canceled during scheduler creation, abort the scheduler
// directly since the callback may have already fired
if (stateMachine.isDone()) {
Expand Down Expand Up @@ -494,12 +513,15 @@ public void fail(Throwable cause)
}

@Override
public Duration waitForStateChange(QueryState currentState, Duration maxWait)
throws InterruptedException
public ListenableFuture<QueryOutputInfo> getOutputInfo()
{
try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
return stateMachine.waitForStateChange(currentState, maxWait);
}
return Futures.nonCancellationPropagating(outputInfo);
}

@Override
public ListenableFuture<QueryState> getStateChange(QueryState currentState)
{
return stateMachine.getStateChange(currentState);
}

@Override
Expand Down
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.event.query.QueryMonitor;
import com.facebook.presto.execution.QueryExecution.QueryExecutionFactory;
import com.facebook.presto.execution.QueryExecution.QueryOutputInfo;
import com.facebook.presto.execution.SqlQueryExecution.SqlQueryExecutionFactory;
import com.facebook.presto.execution.resourceGroups.QueryQueueFullException;
import com.facebook.presto.execution.scheduler.NodeSchedulerConfig;
Expand All @@ -38,6 +39,7 @@
import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.Statement;
import com.facebook.presto.transaction.TransactionManager;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.ThreadPoolExecutorMBean;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
Expand Down Expand Up @@ -80,6 +82,7 @@
import static com.facebook.presto.sql.planner.ExpressionInterpreter.verifyExpressionIsConstant;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static io.airlift.concurrent.Threads.threadsNamed;
import static io.airlift.units.Duration.nanosSince;
import static java.lang.String.format;
Expand Down Expand Up @@ -271,18 +274,29 @@ public List<QueryInfo> getAllQueryInfo()
}

@Override
public Duration waitForStateChange(QueryId queryId, QueryState currentState, Duration maxWait)
throws InterruptedException
public ListenableFuture<QueryOutputInfo> getOutputInfo(QueryId queryId)
{
requireNonNull(queryId, "queryId is null");
requireNonNull(maxWait, "maxWait is null");

QueryExecution query = queries.get(queryId);
if (query == null) {
return maxWait;
return immediateFailedFuture(new NoSuchElementException());
}

return query.waitForStateChange(currentState, maxWait);
return query.getOutputInfo();
}

@Override
public ListenableFuture<QueryState> getStateChange(QueryId queryId, QueryState currentState)
{
requireNonNull(queryId, "queryId is null");

QueryExecution query = queries.get(queryId);
if (query == null) {
return immediateFailedFuture(new NoSuchElementException());
}

return query.getStateChange(currentState);
}

@Override
Expand Down
Expand Up @@ -146,6 +146,11 @@ public PlanFragment getFragment()
return stateMachine.getFragment();
}

public OutputBuffers getOutputBuffers()
{
return outputBuffers.get();
}

public void beginScheduling()
{
stateMachine.transitionToScheduling();
Expand Down
Expand Up @@ -19,7 +19,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.log.Logger;
import io.airlift.units.Duration;

import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
Expand All @@ -38,7 +37,6 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

/**
* Simple state machine which holds a single state. Callers can register for
Expand Down Expand Up @@ -274,39 +272,6 @@ public void addStateChangeListener(StateChangeListener<T> stateChangeListener)
}
}

/**
* Wait for the state to not be {@code .equals()} to the specified current state.
*/
public Duration waitForStateChange(T currentState, Duration maxWait)
throws InterruptedException
{
checkState(!Thread.holdsLock(lock), "Can not wait for state change while holding the lock");
requireNonNull(currentState, "currentState is null");
requireNonNull(maxWait, "maxWait is null");

// don't wait if the state has already changed, or we are in a terminal state
if (isPossibleStateChange(currentState)) {
return maxWait;
}

// wait for task state to change
long remainingNanos = maxWait.roundTo(NANOSECONDS);
long start = System.nanoTime();
long end = start + remainingNanos;

synchronized (lock) {
while (remainingNanos > 0 && !isPossibleStateChange(currentState)) {
// wait for timeout or notification
NANOSECONDS.timedWait(lock, remainingNanos);
remainingNanos = end - System.nanoTime();
}
}
if (remainingNanos < 0) {
remainingNanos = 0;
}
return new Duration(remainingNanos, NANOSECONDS);
}

private boolean isPossibleStateChange(T currentState)
{
return !state.equals(currentState) || isTerminalState(state);
Expand Down

0 comments on commit b305f35

Please sign in to comment.