Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split out queued phase from QueryManager #12176

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,6 +28,10 @@ public enum QueryState
* Query is waiting for the required resources (beta).
*/
WAITING_FOR_RESOURCES(false),
/**
* Query is being dispatched to a coordinator.
*/
DISPATCHING(false),
/**
* Query is being planned.
*/
Expand Down
Expand Up @@ -66,6 +66,7 @@
import java.util.function.Consumer;

import static com.facebook.presto.execution.BasicStageStats.EMPTY_STAGE_STATS;
import static com.facebook.presto.execution.QueryState.DISPATCHING;
import static com.facebook.presto.execution.QueryState.FAILED;
import static com.facebook.presto.execution.QueryState.FINISHED;
import static com.facebook.presto.execution.QueryState.FINISHING;
Expand Down Expand Up @@ -472,6 +473,7 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)
queryStateTimer.getElapsedTime(),
queryStateTimer.getQueuedTime(),
queryStateTimer.getResourceWaitingTime(),
queryStateTimer.getDispatchingTime(),
queryStateTimer.getExecutionTime(),
queryStateTimer.getAnalysisTime(),
queryStateTimer.getDistributedPlanningTime(),
Expand Down Expand Up @@ -656,6 +658,12 @@ public boolean transitionToWaitingForResources()
return queryState.setIf(WAITING_FOR_RESOURCES, currentState -> currentState.ordinal() < WAITING_FOR_RESOURCES.ordinal());
}

public boolean transitionToDispatching()
{
queryStateTimer.beginDispatching();
return queryState.setIf(DISPATCHING, currentState -> currentState.ordinal() < DISPATCHING.ordinal());
}

public boolean transitionToPlanning()
{
queryStateTimer.beginPlanning();
Expand Down Expand Up @@ -947,6 +955,7 @@ private static QueryStats pruneQueryStats(QueryStats queryStats)
queryStats.getElapsedTime(),
queryStats.getQueuedTime(),
queryStats.getResourceWaitingTime(),
queryStats.getDispatchingTime(),
queryStats.getExecutionTime(),
queryStats.getAnalysisTime(),
queryStats.getDistributedPlanningTime(),
Expand Down
Expand Up @@ -33,12 +33,14 @@ class QueryStateTimer

private final long createNanos;
private final AtomicReference<Long> beginResourceWaitingNanos = new AtomicReference<>();
private final AtomicReference<Long> beginDispatchingNanos = new AtomicReference<>();
private final AtomicReference<Long> beginPlanningNanos = new AtomicReference<>();
private final AtomicReference<Long> beginFinishingNanos = new AtomicReference<>();
private final AtomicReference<Long> endNanos = new AtomicReference<>();

private final AtomicReference<Duration> queuedTime = new AtomicReference<>();
private final AtomicReference<Duration> resourceWaitingTime = new AtomicReference<>();
private final AtomicReference<Duration> dispatchingTime = new AtomicReference<>();
private final AtomicReference<Duration> executionTime = new AtomicReference<>();
private final AtomicReference<Duration> planningTime = new AtomicReference<>();
private final AtomicReference<Duration> finishingTime = new AtomicReference<>();
Expand Down Expand Up @@ -73,15 +75,27 @@ private void beginWaitingForResources(long now)
beginResourceWaitingNanos.compareAndSet(null, now);
}

public void beginDispatching()
{
beginDispatching(tickerNanos());
}

private void beginDispatching(long now)
{
beginWaitingForResources(now);
resourceWaitingTime.compareAndSet(null, nanosSince(beginResourceWaitingNanos, now));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming resource waiting time is deliberately 0 for this commit.

beginDispatchingNanos.compareAndSet(null, now);
}

public void beginPlanning()
{
beginPlanning(tickerNanos());
}

private void beginPlanning(long now)
{
beginWaitingForResources(now);
resourceWaitingTime.compareAndSet(null, nanosSince(beginResourceWaitingNanos, now));
beginDispatching(now);
dispatchingTime.compareAndSet(null, nanosSince(beginDispatchingNanos, now));
beginPlanningNanos.compareAndSet(null, now);
}

Expand Down Expand Up @@ -197,6 +211,11 @@ public Duration getResourceWaitingTime()
return getDuration(resourceWaitingTime, beginResourceWaitingNanos);
}

public Duration getDispatchingTime()
{
return getDuration(dispatchingTime, beginDispatchingNanos);
}

public Duration getPlanningTime()
{
return getDuration(planningTime, beginPlanningNanos);
Expand Down
Expand Up @@ -48,8 +48,9 @@ public class QueryStats

private final Duration elapsedTime;
private final Duration queuedTime;
private final Duration executionTime;
private final Duration resourceWaitingTime;
private final Duration dispatchingTime;
private final Duration executionTime;
private final Duration analysisTime;
private final Duration distributedPlanningTime;
private final Duration totalPlanningTime;
Expand Down Expand Up @@ -104,6 +105,7 @@ public QueryStats(
@JsonProperty("elapsedTime") Duration elapsedTime,
@JsonProperty("queuedTime") Duration queuedTime,
@JsonProperty("resourceWaitingTime") Duration resourceWaitingTime,
@JsonProperty("dispatchingTime") Duration dispatchingTime,
@JsonProperty("executionTime") Duration executionTime,
@JsonProperty("analysisTime") Duration analysisTime,
@JsonProperty("distributedPlanningTime") Duration distributedPlanningTime,
Expand Down Expand Up @@ -157,6 +159,7 @@ public QueryStats(
this.elapsedTime = requireNonNull(elapsedTime, "elapsedTime is null");
this.queuedTime = requireNonNull(queuedTime, "queuedTime is null");
this.resourceWaitingTime = requireNonNull(resourceWaitingTime, "resourceWaitingTime is null");
this.dispatchingTime = requireNonNull(dispatchingTime, "dispatchingTime is null");
this.executionTime = requireNonNull(executionTime, "executionTime is null");
this.analysisTime = requireNonNull(analysisTime, "analysisTime is null");
this.distributedPlanningTime = requireNonNull(distributedPlanningTime, "distributedPlanningTime is null");
Expand Down Expand Up @@ -229,6 +232,7 @@ public static QueryStats immediateFailureQueryStats()
new Duration(0, MILLISECONDS),
new Duration(0, MILLISECONDS),
new Duration(0, MILLISECONDS),
new Duration(0, MILLISECONDS),
0,
0,
0,
Expand Down Expand Up @@ -297,6 +301,12 @@ public Duration getResourceWaitingTime()
return resourceWaitingTime;
}

@JsonProperty
public Duration getDispatchingTime()
{
return dispatchingTime;
}

@JsonProperty
public Duration getQueuedTime()
{
Expand Down
Expand Up @@ -113,6 +113,7 @@ public QueryInfo getQueryInfo()
new Duration(6, NANOSECONDS),
new Duration(5, NANOSECONDS),
new Duration(31, NANOSECONDS),
new Duration(32, NANOSECONDS),
new Duration(41, NANOSECONDS),
new Duration(7, NANOSECONDS),
new Duration(8, NANOSECONDS),
Expand Down
Expand Up @@ -46,6 +46,7 @@
import java.util.function.Consumer;

import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
import static com.facebook.presto.execution.QueryState.DISPATCHING;
import static com.facebook.presto.execution.QueryState.FAILED;
import static com.facebook.presto.execution.QueryState.FINISHED;
import static com.facebook.presto.execution.QueryState.FINISHING;
Expand Down Expand Up @@ -100,6 +101,9 @@ public void testBasicStateChanges()
QueryStateMachine stateMachine = createQueryStateMachine();
assertState(stateMachine, QUEUED);

assertTrue(stateMachine.transitionToDispatching());
assertState(stateMachine, DISPATCHING);

assertTrue(stateMachine.transitionToPlanning());
assertState(stateMachine, PLANNING);

Expand All @@ -123,6 +127,9 @@ public void testStateChangesWithResourceWaiting()
assertTrue(stateMachine.transitionToWaitingForResources());
assertState(stateMachine, WAITING_FOR_RESOURCES);

assertTrue(stateMachine.transitionToDispatching());
assertState(stateMachine, DISPATCHING);

assertTrue(stateMachine.transitionToPlanning());
assertState(stateMachine, PLANNING);

Expand All @@ -143,6 +150,7 @@ public void testQueued()
// all time before the first state transition is accounted to queueing
assertAllTimeSpentInQueueing(QUEUED, queryStateMachine -> {});
assertAllTimeSpentInQueueing(WAITING_FOR_RESOURCES, QueryStateMachine::transitionToWaitingForResources);
assertAllTimeSpentInQueueing(DISPATCHING, QueryStateMachine::transitionToDispatching);
assertAllTimeSpentInQueueing(PLANNING, QueryStateMachine::transitionToPlanning);
assertAllTimeSpentInQueueing(STARTING, QueryStateMachine::transitionToStarting);
assertAllTimeSpentInQueueing(RUNNING, QueryStateMachine::transitionToRunning);
Expand All @@ -167,6 +175,7 @@ private void assertAllTimeSpentInQueueing(QueryState expectedState, Consumer<Que
QueryStats queryStats = stateMachine.getQueryInfo(Optional.empty()).getQueryStats();
assertEquals(queryStats.getQueuedTime(), new Duration(7, MILLISECONDS));
assertEquals(queryStats.getResourceWaitingTime(), new Duration(0, MILLISECONDS));
assertEquals(queryStats.getDispatchingTime(), new Duration(0, MILLISECONDS));
assertEquals(queryStats.getTotalPlanningTime(), new Duration(0, MILLISECONDS));
assertEquals(queryStats.getExecutionTime(), new Duration(0, MILLISECONDS));
assertEquals(queryStats.getFinishingTime(), new Duration(0, MILLISECONDS));
Expand All @@ -179,6 +188,9 @@ public void testPlanning()
assertTrue(stateMachine.transitionToPlanning());
assertState(stateMachine, PLANNING);

assertFalse(stateMachine.transitionToDispatching());
assertState(stateMachine, PLANNING);

assertFalse(stateMachine.transitionToPlanning());
assertState(stateMachine, PLANNING);

Expand Down Expand Up @@ -209,6 +221,9 @@ public void testStarting()
assertTrue(stateMachine.transitionToStarting());
assertState(stateMachine, STARTING);

assertFalse(stateMachine.transitionToDispatching());
assertState(stateMachine, STARTING);

assertFalse(stateMachine.transitionToPlanning());
assertState(stateMachine, STARTING);

Expand Down Expand Up @@ -237,6 +252,9 @@ public void testRunning()
assertTrue(stateMachine.transitionToRunning());
assertState(stateMachine, RUNNING);

assertFalse(stateMachine.transitionToDispatching());
assertState(stateMachine, RUNNING);

assertFalse(stateMachine.transitionToPlanning());
assertState(stateMachine, RUNNING);

Expand Down Expand Up @@ -288,10 +306,14 @@ public void testPlanningTimeDuration()
QueryStateMachine stateMachine = createQueryStateMachineWithTicker(mockTicker);
assertState(stateMachine, QUEUED);

mockTicker.increment(50, MILLISECONDS);
mockTicker.increment(25, MILLISECONDS);
assertTrue(stateMachine.transitionToWaitingForResources());
assertState(stateMachine, WAITING_FOR_RESOURCES);

mockTicker.increment(50, MILLISECONDS);
assertTrue(stateMachine.transitionToDispatching());
assertState(stateMachine, DISPATCHING);

mockTicker.increment(100, MILLISECONDS);
assertTrue(stateMachine.transitionToPlanning());
assertState(stateMachine, PLANNING);
Expand All @@ -310,9 +332,10 @@ public void testPlanningTimeDuration()
assertState(stateMachine, FINISHED);

QueryStats queryStats = stateMachine.getQueryInfo(Optional.empty()).getQueryStats();
assertEquals(queryStats.getElapsedTime().toMillis(), 1050);
assertEquals(queryStats.getQueuedTime().toMillis(), 50);
assertEquals(queryStats.getResourceWaitingTime().toMillis(), 100);
assertEquals(queryStats.getElapsedTime().toMillis(), 1075);
assertEquals(queryStats.getQueuedTime().toMillis(), 25);
assertEquals(queryStats.getResourceWaitingTime().toMillis(), 50);
assertEquals(queryStats.getDispatchingTime().toMillis(), 100);
assertEquals(queryStats.getTotalPlanningTime().toMillis(), 200);
// there is no way to induce finishing time without a transaction and connector
assertEquals(queryStats.getFinishingTime().toMillis(), 0);
Expand Down Expand Up @@ -356,6 +379,9 @@ private static void assertFinalState(QueryStateMachine stateMachine, QueryState
assertTrue(expectedState.isDone());
assertState(stateMachine, expectedState, expectedException);

assertFalse(stateMachine.transitionToDispatching());
assertState(stateMachine, expectedState, expectedException);

assertFalse(stateMachine.transitionToPlanning());
assertState(stateMachine, expectedState, expectedException);

Expand Down Expand Up @@ -404,12 +430,13 @@ private static void assertState(QueryStateMachine stateMachine, QueryState expec
assertNotNull(queryStats.getElapsedTime());
assertNotNull(queryStats.getQueuedTime());
assertNotNull(queryStats.getResourceWaitingTime());
assertNotNull(queryStats.getDispatchingTime());
assertNotNull(queryStats.getExecutionTime());
assertNotNull(queryStats.getTotalPlanningTime());
assertNotNull(queryStats.getFinishingTime());

assertNotNull(queryStats.getCreateTime());
if (queryInfo.getState() == QUEUED || queryInfo.getState() == WAITING_FOR_RESOURCES) {
if (queryInfo.getState() == QUEUED || queryInfo.getState() == WAITING_FOR_RESOURCES || queryInfo.getState() == DISPATCHING) {
assertNull(queryStats.getExecutionStartTime());
}
else {
Expand Down
Expand Up @@ -143,6 +143,7 @@ public class TestQueryStats
new Duration(6, NANOSECONDS),
new Duration(5, NANOSECONDS),
new Duration(31, NANOSECONDS),
new Duration(32, NANOSECONDS),
new Duration(41, NANOSECONDS),
new Duration(7, NANOSECONDS),
new Duration(8, NANOSECONDS),
Expand Down Expand Up @@ -216,6 +217,8 @@ public static void assertExpectedQueryStats(QueryStats actual)

assertEquals(actual.getElapsedTime(), new Duration(6, NANOSECONDS));
assertEquals(actual.getQueuedTime(), new Duration(5, NANOSECONDS));
assertEquals(actual.getResourceWaitingTime(), new Duration(31, NANOSECONDS));
assertEquals(actual.getDispatchingTime(), new Duration(32, NANOSECONDS));
assertEquals(actual.getExecutionTime(), new Duration(41, NANOSECONDS));
assertEquals(actual.getAnalysisTime(), new Duration(7, NANOSECONDS));
assertEquals(actual.getDistributedPlanningTime(), new Duration(8, NANOSECONDS));
Expand Down
Expand Up @@ -59,6 +59,7 @@ public void testConstructor()
Duration.valueOf("8m"),
Duration.valueOf("7m"),
Duration.valueOf("34m"),
Duration.valueOf("35m"),
Duration.valueOf("44m"),
Duration.valueOf("9m"),
Duration.valueOf("10m"),
Expand Down
Expand Up @@ -110,6 +110,7 @@ private QueryInfo createQueryInfo(String queryId, QueryState state, String query
Duration.valueOf("8m"),
Duration.valueOf("7m"),
Duration.valueOf("34m"),
Duration.valueOf("35m"),
Duration.valueOf("44m"),
Duration.valueOf("9m"),
Duration.valueOf("10m"),
Expand Down