Skip to content

Commit

Permalink
Drop cancelled queries from throttler queue
Browse files Browse the repository at this point in the history
Two changes here:
1) Reformat the query throttler to better integrate the big query stuff
2) Enable query throttler to drop cancelled queued queries
  • Loading branch information
erichwang committed Jan 22, 2015
1 parent 91cfd99 commit 9e464a4
Showing 1 changed file with 108 additions and 67 deletions.
Expand Up @@ -21,9 +21,8 @@
import com.facebook.presto.sql.parser.ParsingException; import com.facebook.presto.sql.parser.ParsingException;
import com.facebook.presto.sql.parser.SqlParser; import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.tree.Statement; import com.facebook.presto.sql.tree.Statement;
import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import io.airlift.concurrent.AsyncSemaphore; import io.airlift.concurrent.AsyncSemaphore;
import io.airlift.concurrent.SetThreadName; import io.airlift.concurrent.SetThreadName;
Expand Down Expand Up @@ -54,12 +53,16 @@
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;


import static com.facebook.presto.SystemSessionProperties.isBigQueryEnabled; import static com.facebook.presto.SystemSessionProperties.isBigQueryEnabled;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_QUEUE_FULL; import static com.facebook.presto.spi.StandardErrorCode.QUERY_QUEUE_FULL;
import static com.facebook.presto.spi.StandardErrorCode.USER_CANCELED; import static com.facebook.presto.spi.StandardErrorCode.USER_CANCELED;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableList; import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static io.airlift.concurrent.Threads.threadsNamed; import static io.airlift.concurrent.Threads.threadsNamed;
import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newCachedThreadPool;
Expand Down Expand Up @@ -373,106 +376,144 @@ private QueryInfo createFailedQuery(Session session, String query, QueryId query
return execution.getQueryInfo(); return execution.getQueryInfo();
} }


private static class QueryStarter /**
* Set up a callback to fire when a query is completed. The callback will be called at most once.
*/
private static void addCompletionCallback(QueryExecution queryExecution, Runnable callback)
{ {
private final int maxQueuedQueries; AtomicBoolean taskExecuted = new AtomicBoolean();
private final AtomicInteger queryQueueSize = new AtomicInteger(); queryExecution.addStateChangeListener(newValue -> {
private final AsyncSemaphore<QueryExecution> queryAsyncSemaphore; if (newValue.isDone() && taskExecuted.compareAndSet(false, true)) {
callback.run();
}
});
// Need to do this check in case the state changed before we added the previous state change listener
if (queryExecution.getQueryInfo().getState().isDone() && taskExecuted.compareAndSet(false, true)) {
callback.run();
}
}


private final int maxQueuedBigQueries; @ThreadSafe
private final AtomicInteger bigQueryQueueSize = new AtomicInteger(); private static class QueryStarter
private final AsyncSemaphore<QueryExecution> bigQueryAsyncSemaphore; {
private final QueryQueue queryQueue;
private final QueryQueue bigQueryQueue;


public QueryStarter(Executor queryExecutor, SqlQueryManagerStats stats, QueryManagerConfig config) public QueryStarter(Executor queryExecutor, SqlQueryManagerStats stats, QueryManagerConfig config)
{ {
checkNotNull(queryExecutor, "queryExecutor is null"); checkNotNull(queryExecutor, "queryExecutor is null");
checkNotNull(stats, "stats is null"); checkNotNull(stats, "stats is null");
checkNotNull(config, "config is null");


this.maxQueuedQueries = config.getMaxQueuedQueries(); this.queryQueue = new QueryQueue(queryExecutor, stats, config.getMaxQueuedQueries(), config.getMaxConcurrentQueries());
this.queryAsyncSemaphore = new AsyncSemaphore<>(config.getMaxConcurrentQueries(), queryExecutor, new QuerySubmitter(queryExecutor, stats, queryQueueSize)); this.bigQueryQueue = new QueryQueue(queryExecutor, stats, config.getMaxQueuedBigQueries(), config.getMaxConcurrentBigQueries());
this.maxQueuedBigQueries = config.getMaxQueuedBigQueries();
this.bigQueryAsyncSemaphore = new AsyncSemaphore<>(config.getMaxConcurrentBigQueries(), queryExecutor, new QuerySubmitter(queryExecutor, stats, bigQueryQueueSize));
} }


public boolean submit(QueryExecution queryExecution) public boolean submit(QueryExecution queryExecution)
{ {
AtomicInteger queueSize;
int maxQueueSize;
AsyncSemaphore<QueryExecution> asyncSemaphore;
if (isBigQueryEnabled(queryExecution.getQueryInfo().getSession(), false)) { if (isBigQueryEnabled(queryExecution.getQueryInfo().getSession(), false)) {
queueSize = bigQueryQueueSize; return bigQueryQueue.enqueue(queryExecution);
maxQueueSize = maxQueuedBigQueries;
asyncSemaphore = bigQueryAsyncSemaphore;
} }
else { else {
queueSize = queryQueueSize; return queryQueue.enqueue(queryExecution);
maxQueueSize = maxQueuedQueries;
asyncSemaphore = queryAsyncSemaphore;
} }
if (queueSize.incrementAndGet() > maxQueueSize) {
queueSize.decrementAndGet();
return false;
}
asyncSemaphore.submit(queryExecution);
return true;
} }


public int getQueryQueueSize() public int getQueryQueueSize()
{ {
return queryQueueSize.get(); return queryQueue.getQueueSize();
} }


public int getBigQueryQueueSize() public int getBigQueryQueueSize()
{ {
return bigQueryQueueSize.get(); return bigQueryQueue.getQueueSize();
} }


private static class QuerySubmitter private static class QueryQueue
implements Function<QueryExecution, ListenableFuture<?>>
{ {
private final Executor queryExecutor; private final int maxQueuedQueries;
private final SqlQueryManagerStats stats; private final AtomicInteger queryQueueSize = new AtomicInteger();
private final AtomicInteger queueSize; private final AsyncSemaphore<QueueEntry> asyncSemaphore;


public QuerySubmitter(Executor queryExecutor, SqlQueryManagerStats stats, AtomicInteger queueSize) private QueryQueue(Executor queryExecutor, SqlQueryManagerStats stats, int maxQueuedQueries, int maxConcurrentQueries)
{ {
this.queryExecutor = checkNotNull(queryExecutor, "queryExecutor is null"); checkNotNull(queryExecutor, "queryExecutor is null");
this.stats = checkNotNull(stats, "stats is null"); checkNotNull(stats, "stats is null");
this.queueSize = checkNotNull(queueSize, "queueSize is null"); checkArgument(maxQueuedQueries > 0, "maxQueuedQueries must be greater than zero");
checkArgument(maxConcurrentQueries > 0, "maxConcurrentQueries must be greater than zero");

this.maxQueuedQueries = maxQueuedQueries;
this.asyncSemaphore = new AsyncSemaphore<>(maxConcurrentQueries,
queryExecutor,
queueEntry -> {
QueryExecution queryExecution = queueEntry.dequeue();
if (queryExecution == null) {
// Entry was dequeued earlier and so this query is already done
return Futures.immediateFuture(null);
}
else {
SettableFuture<?> settableFuture = SettableFuture.create();
addCompletionCallback(queryExecution, () -> settableFuture.set(null));
if (!settableFuture.isDone()) { // Only execute if the query is not already completed (e.g. cancelled)
queryExecutor.execute(() -> {
try (SetThreadName setThreadName = new SetThreadName("Query-%s", queryExecution.getQueryInfo().getQueryId())) {
stats.queryStarted();
queryExecution.start();
}
});
}
return settableFuture;
}
});
} }


@Override public int getQueueSize()
public ListenableFuture<?> apply(final QueryExecution queryExecution) {
return queryQueueSize.get();
}

public boolean enqueue(QueryExecution queryExecution)
{
if (queryQueueSize.incrementAndGet() > maxQueuedQueries) {
queryQueueSize.decrementAndGet();
return false;
}

QueueEntry queueEntry = new QueueEntry(queryExecution, aVoid -> queryQueueSize.decrementAndGet());
// Add a callback to dequeue the entry if it is ever completed.
// This enables us to remove the entry sooner if is cancelled before starting,
// and has no effect if called after starting.
addCompletionCallback(queryExecution, queueEntry::dequeue);
asyncSemaphore.submit(queueEntry);
return true;
}

private static class QueueEntry
{ {
queueSize.decrementAndGet(); private final AtomicBoolean dequeued = new AtomicBoolean();
final SettableFuture<?> settableFuture = SettableFuture.create(); private final AtomicReference<QueryExecution> queryExecution;
queryExecution.addStateChangeListener(new StateChangeListener<QueryState>() private final Consumer<Void> onDequeue;

private QueueEntry(QueryExecution queryExecution, Consumer<Void> onDequeue)
{ {
@Override checkNotNull(queryExecution, "queryExecution is null");
public void stateChanged(QueryState newValue) checkNotNull(onDequeue, "onDequeue is null");
{
if (newValue.isDone()) { this.queryExecution = new AtomicReference<>(queryExecution);
settableFuture.set(null); this.onDequeue = onDequeue;
}
}
});
if (queryExecution.getQueryInfo().getState().isDone()) {
settableFuture.set(null);
} }
else {
queryExecutor.execute(new Runnable() /**
{ * Can be called multiple times on the same QueueEntry, but the onDequeue Consumer will only be called once
@Override * and only one caller will get the QueryExecution.
public void run() */
{ public QueryExecution dequeue()
try (SetThreadName setThreadName = new SetThreadName("Query-%s", queryExecution.getQueryInfo().getQueryId())) { {
stats.queryStarted(); if (dequeued.compareAndSet(false, true)) {
queryExecution.start(); onDequeue.accept(null);
} }
} return queryExecution.getAndSet(null);
});
} }
return settableFuture;
} }
} }
} }
Expand Down

0 comments on commit 9e464a4

Please sign in to comment.