Skip to content

Commit

Permalink
Fix race condition in query queues
Browse files Browse the repository at this point in the history
Lets consider an example from #3637:
 "user.${USER}": {
 "maxConcurrent": 2,
 "maxQueued": 2
 },
 "dashboard.${USER}": {
 "maxConcurrent": 1,
 "maxQueued": 1
 },
When two queries are scheduled at the same time, both of them get permit
and most likely both of them will try to get enqueued.
This situation lead to an error that one of them was not able
to enqueue - "Entering secondary queue failed".
In order to solve that, I removed the check of the of size of queued queries.
It is not needed as it is already covered by checking the permits in reserve method.
Once query got permit it is assumed that it can be enqueued. As a side effect query queue can
for the moment hold more queries in state QUEUED than it is set in configuration.

Task: #3637

Test Plan: Running unit tests
  • Loading branch information
kokosing committed Oct 9, 2015
1 parent 2941779 commit 19755be
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 18 deletions.
Expand Up @@ -28,7 +28,6 @@

public class QueryQueue
{
private final int maxQueuedQueries;
private final AtomicInteger queryQueueSize = new AtomicInteger();
private final AtomicInteger queuePermits;
private final AsyncSemaphore<QueueEntry> asyncSemaphore;
Expand All @@ -39,7 +38,6 @@ public class QueryQueue
checkArgument(maxQueuedQueries > 0, "maxQueuedQueries must be greater than zero");
checkArgument(maxConcurrentQueries > 0, "maxConcurrentQueries must be greater than zero");

this.maxQueuedQueries = maxQueuedQueries;
this.queuePermits = new AtomicInteger(maxQueuedQueries + maxConcurrentQueries);
this.asyncSemaphore = new AsyncSemaphore<>(maxConcurrentQueries,
queryExecutor,
Expand Down Expand Up @@ -70,12 +68,9 @@ public boolean reserve(QueryExecution queryExecution)
return true;
}

public boolean enqueue(QueuedExecution queuedExecution)
public void enqueue(QueuedExecution queuedExecution)
{
if (queryQueueSize.incrementAndGet() > maxQueuedQueries) {
queryQueueSize.decrementAndGet();
return false;
}
queryQueueSize.incrementAndGet();

// Add a callback to dequeue the entry if it is ever completed.
// This enables us to remove the entry sooner if is cancelled before starting,
Expand All @@ -84,7 +79,6 @@ public boolean enqueue(QueuedExecution queuedExecution)
queuedExecution.getCompletionFuture().addListener(entry::dequeue, MoreExecutors.directExecutor());

asyncSemaphore.submit(entry);
return true;
}

private static class QueueEntry
Expand Down
Expand Up @@ -70,9 +70,7 @@ public void start()
});
}
else {
if (!nextQueues.get(0).enqueue(new QueuedExecution(queryExecution, nextQueues.subList(1, nextQueues.size()), executor, stats, listenableFuture))) {
queryExecution.fail(new IllegalStateException("Entering secondary queue failed"));
}
nextQueues.get(0).enqueue(new QueuedExecution(queryExecution, nextQueues.subList(1, nextQueues.size()), executor, stats, listenableFuture));
}
}
}
Expand Up @@ -162,7 +162,8 @@ public boolean submit(QueryExecution queryExecution, Executor executor, SqlQuery
}
}

return queues.get(0).enqueue(createQueuedExecution(queryExecution, queues.subList(1, queues.size()), executor, stats));
queues.get(0).enqueue(createQueuedExecution(queryExecution, queues.subList(1, queues.size()), executor, stats));
return true;
}

// Queues returned have already been created and added queryQueues
Expand Down
Expand Up @@ -17,9 +17,11 @@
import com.facebook.presto.tests.DistributedQueryRunner;
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.testng.annotations.Test;

import java.util.Map;
import java.util.Set;

import static com.facebook.presto.execution.QueryState.FAILED;
import static com.facebook.presto.execution.QueryState.QUEUED;
Expand All @@ -40,6 +42,8 @@ public class TestQueues
.setSource("dashboard")
.build();

private static final String LONG_LASTING_QUERY = "SELECT COUNT(*) FROM lineitem";

@Test(timeOut = 240_000)
public void testSqlQueryQueueManager()
throws Exception
Expand All @@ -50,25 +54,25 @@ public void testSqlQueryQueueManager()

try (DistributedQueryRunner queryRunner = createQueryRunner(properties)) {
// submit first "dashboard" query
QueryId firstDashboardQuery = createQuery(queryRunner, DASHBOARD_SESSION, "SELECT COUNT(*) FROM lineitem");
QueryId firstDashboardQuery = createQuery(queryRunner, DASHBOARD_SESSION, LONG_LASTING_QUERY);

// wait for the first "dashboard" query to start
waitForQueryState(queryRunner, firstDashboardQuery, RUNNING);

// submit second "dashboard" query
QueryId secondDashboardQuery = createQuery(queryRunner, DASHBOARD_SESSION, "SELECT COUNT(*) FROM lineitem");
QueryId secondDashboardQuery = createQuery(queryRunner, DASHBOARD_SESSION, LONG_LASTING_QUERY);

// wait for the second "dashboard" query to be queued ("dashboard.${USER}" queue strategy only allows one "dashboard" query to be accepted for execution)
waitForQueryState(queryRunner, secondDashboardQuery, QUEUED);

// submit first non "dashboard" query
QueryId firstNonDashboardQuery = createQuery(queryRunner, SESSION, "SELECT COUNT(*) FROM lineitem");
QueryId firstNonDashboardQuery = createQuery(queryRunner, SESSION, LONG_LASTING_QUERY);

// wait for the first non "dashboard" query to start
waitForQueryState(queryRunner, firstNonDashboardQuery, RUNNING);

// submit second non "dashboard" query
QueryId secondNonDashboardQuery = createQuery(queryRunner, SESSION, "SELECT COUNT(*) FROM lineitem");
QueryId secondNonDashboardQuery = createQuery(queryRunner, SESSION, LONG_LASTING_QUERY);

// wait for the second non "dashboard" query to be queued ("user.${USER}" queue strategy only allows three user queries to be accepted for execution,
// two "dashboard" and one non "dashboard" queries are already accepted by "user.${USER}" queue)
Expand All @@ -82,6 +86,44 @@ public void testSqlQueryQueueManager()
}
}

@Test(timeOut = 240_000)
public void testSqlQueryQueueManagerWithTwoDashboardQueriesRequestedAtTheSameTime()
throws Exception
{
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("query.queue-config-file", getResourceFilePath("queue_config_dashboard.json"))
.build();

try (DistributedQueryRunner queryRunner = createQueryRunner(properties)) {
QueryId firstDashboardQuery = createQuery(queryRunner, DASHBOARD_SESSION, LONG_LASTING_QUERY);
QueryId secondDashboardQuery = createQuery(queryRunner, DASHBOARD_SESSION, LONG_LASTING_QUERY);

ImmutableSet<QueryState> queuedOrRunning = ImmutableSet.of(QUEUED, RUNNING);
waitForQueryState(queryRunner, firstDashboardQuery, queuedOrRunning);
waitForQueryState(queryRunner, secondDashboardQuery, queuedOrRunning);
}
}

@Test(timeOut = 240_000)
public void testSqlQueryQueueManagerWithTooManyQueriesScheduled()
throws Exception
{
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("query.queue-config-file", getResourceFilePath("queue_config_dashboard.json"))
.build();

try (DistributedQueryRunner queryRunner = createQueryRunner(properties)) {
QueryId firstDashboardQuery = createQuery(queryRunner, DASHBOARD_SESSION, LONG_LASTING_QUERY);
waitForQueryState(queryRunner, firstDashboardQuery, RUNNING);

QueryId secondDashboardQuery = createQuery(queryRunner, DASHBOARD_SESSION, LONG_LASTING_QUERY);
waitForQueryState(queryRunner, secondDashboardQuery, QUEUED);

QueryId thirdDashboardQuery = createQuery(queryRunner, DASHBOARD_SESSION, LONG_LASTING_QUERY);
waitForQueryState(queryRunner, thirdDashboardQuery, FAILED);
}
}

private static QueryId createQuery(DistributedQueryRunner queryRunner, Session session, String sql)
{
return queryRunner.getCoordinator().getQueryManager().createQuery(session, sql).getQueryId();
Expand All @@ -92,13 +134,20 @@ private static void cancelQuery(DistributedQueryRunner queryRunner, QueryId quer
queryRunner.getCoordinator().getQueryManager().cancelQuery(queryId);
}

private static void waitForQueryState(DistributedQueryRunner queryRunner, QueryId queryId, QueryState queryState)
private static void waitForQueryState(DistributedQueryRunner queryRunner, QueryId queryId, QueryState expectedQueryState)
throws InterruptedException
{
waitForQueryState(queryRunner, queryId, ImmutableSet.of(expectedQueryState));
}

private static void waitForQueryState(DistributedQueryRunner queryRunner, QueryId queryId, Set<QueryState> expectedQueryStates)
throws InterruptedException
{
QueryManager queryManager = queryRunner.getCoordinator().getQueryManager();
do {
MILLISECONDS.sleep(500);
}
while (queryRunner.getCoordinator().getQueryManager().getQueryInfo(queryId).getState() != queryState);
while (!expectedQueryStates.contains(queryManager.getQueryInfo(queryId).getState()));
}

private String getResourceFilePath(String fileName)
Expand Down

0 comments on commit 19755be

Please sign in to comment.