Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for custom query prerequisite logic #16073

Merged
merged 2 commits into from May 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -42,7 +42,7 @@

public class TestPrestoExceptionClassifier
{
private static final QueryStats QUERY_STATS = new QueryStats("id", "", false, false, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, Optional.empty());
private static final QueryStats QUERY_STATS = new QueryStats("id", "", false, false, false, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, Optional.empty());

private final SqlExceptionClassifier classifier = new PrestoExceptionClassifier(ImmutableSet.of());

Expand Down
Expand Up @@ -29,6 +29,7 @@
public class StatementStats
{
private final String state;
private final boolean waitingForPrerequisites;
private final boolean queued;
private final boolean scheduled;
private final int nodes;
Expand All @@ -38,6 +39,7 @@ public class StatementStats
private final int completedSplits;
private final long cpuTimeMillis;
private final long wallTimeMillis;
private final long waitingForPrerequisitesTimeMillis;
private final long queuedTimeMillis;
private final long elapsedTimeMillis;
private final long processedRows;
Expand All @@ -51,6 +53,7 @@ public class StatementStats
@JsonCreator
public StatementStats(
@JsonProperty("state") String state,
@JsonProperty("waitingForPrerequisites") boolean waitingForPrerequisites,
@JsonProperty("queued") boolean queued,
@JsonProperty("scheduled") boolean scheduled,
@JsonProperty("nodes") int nodes,
Expand All @@ -60,6 +63,7 @@ public StatementStats(
@JsonProperty("completedSplits") int completedSplits,
@JsonProperty("cpuTimeMillis") long cpuTimeMillis,
@JsonProperty("wallTimeMillis") long wallTimeMillis,
@JsonProperty("waitingForPrerequisitesTimeMillis") long waitingForPrerequisitesTimeMillis,
@JsonProperty("queuedTimeMillis") long queuedTimeMillis,
@JsonProperty("elapsedTimeMillis") long elapsedTimeMillis,
@JsonProperty("processedRows") long processedRows,
Expand All @@ -71,6 +75,7 @@ public StatementStats(
@JsonProperty("rootStage") StageStats rootStage)
{
this.state = requireNonNull(state, "state is null");
this.waitingForPrerequisites = waitingForPrerequisites;
this.queued = queued;
this.scheduled = scheduled;
this.nodes = nodes;
Expand All @@ -80,6 +85,7 @@ public StatementStats(
this.completedSplits = completedSplits;
this.cpuTimeMillis = cpuTimeMillis;
this.wallTimeMillis = wallTimeMillis;
this.waitingForPrerequisitesTimeMillis = waitingForPrerequisitesTimeMillis;
this.queuedTimeMillis = queuedTimeMillis;
this.elapsedTimeMillis = elapsedTimeMillis;
this.processedRows = processedRows;
Expand All @@ -97,6 +103,12 @@ public String getState()
return state;
}

@JsonProperty
public boolean isWaitingForPrerequisites()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this if we already have state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am maintaining symmetry in this class. We already have isQueued despite the state being there and hence added this. One of the reasons why I feel this might have been done is that state is a String and not an enum - and hence this could provide people an easy way without figuring out the strings if the query has started or not.

{
return waitingForPrerequisites;
}

@JsonProperty
public boolean isQueued()
{
Expand Down Expand Up @@ -151,6 +163,12 @@ public long getWallTimeMillis()
return wallTimeMillis;
}

@JsonProperty
public long getWaitingForPrerequisitesTimeMillis()
{
return waitingForPrerequisitesTimeMillis;
}

@JsonProperty
public long getQueuedTimeMillis()
{
Expand Down Expand Up @@ -220,6 +238,7 @@ public String toString()
{
return toStringHelper(this)
.add("state", state)
.add("waitingForPrerequisites", waitingForPrerequisites)
.add("queued", queued)
.add("scheduled", scheduled)
.add("nodes", nodes)
Expand All @@ -229,6 +248,7 @@ public String toString()
.add("completedSplits", completedSplits)
.add("cpuTimeMillis", cpuTimeMillis)
.add("wallTimeMillis", wallTimeMillis)
.add("waitingForPrerequisitesTimeMillis", waitingForPrerequisitesTimeMillis)
.add("queuedTimeMillis", queuedTimeMillis)
.add("elapsedTimeMillis", elapsedTimeMillis)
.add("processedRows", processedRows)
Expand All @@ -249,6 +269,7 @@ public static Builder builder()
public static class Builder
{
private String state;
private boolean waitingForPrerequisites;
private boolean queued;
private boolean scheduled;
private int nodes;
Expand All @@ -258,6 +279,7 @@ public static class Builder
private int completedSplits;
private long cpuTimeMillis;
private long wallTimeMillis;
private long waitingForPrerequisitesTimeMillis;
private long queuedTimeMillis;
private long elapsedTimeMillis;
private long processedRows;
Expand All @@ -282,6 +304,12 @@ public Builder setNodes(int nodes)
return this;
}

public Builder setWaitingForPrerequisites(boolean waitingForPrerequisites)
{
this.waitingForPrerequisites = waitingForPrerequisites;
return this;
}

public Builder setQueued(boolean queued)
{
this.queued = queued;
Expand Down Expand Up @@ -330,6 +358,12 @@ public Builder setWallTimeMillis(long wallTimeMillis)
return this;
}

public Builder setWaitingForPrerequisitesTimeMillis(long waitingForPrerequisitesTimeMillis)
{
this.waitingForPrerequisitesTimeMillis = waitingForPrerequisitesTimeMillis;
return this;
}

public Builder setQueuedTimeMillis(long queuedTimeMillis)
{
this.queuedTimeMillis = queuedTimeMillis;
Expand Down Expand Up @@ -388,6 +422,7 @@ public StatementStats build()
{
return new StatementStats(
state,
waitingForPrerequisites,
queued,
scheduled,
nodes,
Expand All @@ -397,6 +432,7 @@ public StatementStats build()
completedSplits,
cpuTimeMillis,
wallTimeMillis,
waitingForPrerequisitesTimeMillis,
queuedTimeMillis,
elapsedTimeMillis,
processedRows,
Expand Down
Expand Up @@ -25,6 +25,7 @@ public final class QueryStats
{
private final String queryId;
private final String state;
private final boolean waitingForPrerequisites;
private final boolean queued;
private final boolean scheduled;
private final int nodes;
Expand All @@ -34,6 +35,7 @@ public final class QueryStats
private final int completedSplits;
private final long cpuTimeMillis;
private final long wallTimeMillis;
private final long waitingForPrerequisitesTimeMillis;
private final long queuedTimeMillis;
private final long elapsedTimeMillis;
private final long processedRows;
Expand All @@ -46,6 +48,7 @@ public final class QueryStats
public QueryStats(
String queryId,
String state,
boolean waitingForPrerequisites,
boolean queued,
boolean scheduled,
int nodes,
Expand All @@ -55,6 +58,7 @@ public QueryStats(
int completedSplits,
long cpuTimeMillis,
long wallTimeMillis,
long waitingForPrerequisitesTimeMillis,
long queuedTimeMillis,
long elapsedTimeMillis,
long processedRows,
Expand All @@ -66,6 +70,7 @@ public QueryStats(
{
this.queryId = requireNonNull(queryId, "queryId is null");
this.state = requireNonNull(state, "state is null");
this.waitingForPrerequisites = waitingForPrerequisites;
this.queued = queued;
this.scheduled = scheduled;
this.nodes = nodes;
Expand All @@ -75,6 +80,7 @@ public QueryStats(
this.completedSplits = completedSplits;
this.cpuTimeMillis = cpuTimeMillis;
this.wallTimeMillis = wallTimeMillis;
this.waitingForPrerequisitesTimeMillis = waitingForPrerequisitesTimeMillis;
this.queuedTimeMillis = queuedTimeMillis;
this.elapsedTimeMillis = elapsedTimeMillis;
this.processedRows = processedRows;
Expand All @@ -90,6 +96,7 @@ static QueryStats create(String queryId, StatementStats stats)
return new QueryStats(
queryId,
stats.getState(),
stats.isWaitingForPrerequisites(),
stats.isQueued(),
stats.isScheduled(),
stats.getNodes(),
Expand All @@ -99,6 +106,7 @@ static QueryStats create(String queryId, StatementStats stats)
stats.getCompletedSplits(),
stats.getCpuTimeMillis(),
stats.getWallTimeMillis(),
stats.getWaitingForPrerequisitesTimeMillis(),
stats.getQueuedTimeMillis(),
stats.getElapsedTimeMillis(),
stats.getProcessedRows(),
Expand All @@ -119,6 +127,11 @@ public String getState()
return state;
}

public boolean isWaitingForPrerequisites()
{
return waitingForPrerequisites;
}

public boolean isQueued()
{
return queued;
Expand Down Expand Up @@ -164,6 +177,11 @@ public long getWallTimeMillis()
return wallTimeMillis;
}

public long getWaitingForPrerequisitesTimeMillis()
{
return waitingForPrerequisitesTimeMillis;
}

public long getQueuedTimeMillis()
{
return queuedTimeMillis;
Expand Down
Expand Up @@ -70,10 +70,11 @@ private List<String> createResults()
{
List<Column> columns = ImmutableList.of(new Column("_col0", BigintType.BIGINT));
return ImmutableList.<String>builder()
.add(newQueryResults(null, 1, null, null, "QUEUED"))
.add(newQueryResults(1, 2, columns, null, "RUNNING"))
.add(newQueryResults(null, 1, null, null, "WAITING_FOR_PREREQUISITES"))
.add(newQueryResults(null, 2, null, null, "QUEUED"))
.add(newQueryResults(1, 3, columns, null, "RUNNING"))
.add(newQueryResults(0, 4, columns, ImmutableList.of(ImmutableList.of(253161)), "RUNNING"))
.add(newQueryResults(1, 4, columns, null, "RUNNING"))
.add(newQueryResults(0, 5, columns, ImmutableList.of(ImmutableList.of(253161)), "RUNNING"))
.add(newQueryResults(null, null, columns, null, "FINISHED"))
.build();
}
Expand All @@ -89,7 +90,7 @@ private String newQueryResults(Integer partialCancelId, Integer nextUriId, List<
nextUriId == null ? null : server.url(format("/v1/statement/%s/%s", queryId, nextUriId)).uri(),
responseColumns,
data,
new StatementStats(state, state.equals("QUEUED"), true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null),
new StatementStats(state, state.equals("WAITING_FOR_PREREQUISITES"), state.equals("QUEUED"), true, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, null),
null,
ImmutableList.of(),
null,
Expand Down Expand Up @@ -128,7 +129,7 @@ public void test()

List<QueryStats> queryStatsList = progressMonitor.finish();
assertGreaterThanOrEqual(queryStatsList.size(), 5); // duplicate stats is possible
assertEquals(queryStatsList.get(0).getState(), "QUEUED");
assertEquals(queryStatsList.get(0).getState(), "WAITING_FOR_PREREQUISITES");
assertEquals(queryStatsList.get(queryStatsList.size() - 1).getState(), "FINISHED");
}
}
Expand Down
@@ -0,0 +1,34 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.dispatcher;

import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.prerequisites.QueryPrerequisites;
import com.facebook.presto.spi.prerequisites.QueryPrerequisitesContext;

import java.util.concurrent.CompletableFuture;

import static java.util.concurrent.CompletableFuture.completedFuture;

public class DefaultQueryPrerequisites
implements QueryPrerequisites
{
private static final CompletableFuture<?> COMPLETED_FUTURE = completedFuture(null);
mayankgarg1990 marked this conversation as resolved.
Show resolved Hide resolved

@Override
public CompletableFuture<?> waitForPrerequisites(QueryId queryId, QueryPrerequisitesContext context)
{
return COMPLETED_FUTURE;
}
}
Expand Up @@ -25,30 +25,45 @@ public class DispatchInfo
private final Optional<CoordinatorLocation> coordinatorLocation;
private final Optional<ExecutionFailureInfo> failureInfo;
private final Duration elapsedTime;
private final Duration queuedTime;
private final Duration waitingForPrerequisitesTime;
private final Optional<Duration> queuedTime;

public static DispatchInfo queued(Duration elapsedTime, Duration queuedTime)
public static DispatchInfo waitingForPrerequisites(Duration elapsedTime, Duration waitingForPrerequisitesTime)
{
return new DispatchInfo(Optional.empty(), Optional.empty(), elapsedTime, queuedTime);
return new DispatchInfo(Optional.empty(), Optional.empty(), elapsedTime, waitingForPrerequisitesTime, Optional.empty());
}

public static DispatchInfo dispatched(CoordinatorLocation coordinatorLocation, Duration elapsedTime, Duration queuedTime)
public static DispatchInfo queued(Duration elapsedTime, Duration waitingForPrerequisitesTime, Duration queuedTime)
{
requireNonNull(queuedTime, "queuedTime is null");
return new DispatchInfo(Optional.empty(), Optional.empty(), elapsedTime, waitingForPrerequisitesTime, Optional.of(queuedTime));
}

public static DispatchInfo dispatched(CoordinatorLocation coordinatorLocation, Duration elapsedTime, Duration waitingForPrerequisitesTime, Duration queuedTime)
{
requireNonNull(coordinatorLocation, "coordinatorLocation is null");
return new DispatchInfo(Optional.of(coordinatorLocation), Optional.empty(), elapsedTime, queuedTime);
requireNonNull(queuedTime, "queuedTime is null");
return new DispatchInfo(Optional.of(coordinatorLocation), Optional.empty(), elapsedTime, waitingForPrerequisitesTime, Optional.of(queuedTime));
}

public static DispatchInfo failed(ExecutionFailureInfo failureInfo, Duration elapsedTime, Duration queuedTime)
public static DispatchInfo failed(ExecutionFailureInfo failureInfo, Duration elapsedTime, Duration waitingForPrerequisitesTime, Duration queuedTime)
{
requireNonNull(failureInfo, "coordinatorLocation is null");
return new DispatchInfo(Optional.empty(), Optional.of(failureInfo), elapsedTime, queuedTime);
requireNonNull(queuedTime, "queuedTime is null");
return new DispatchInfo(Optional.empty(), Optional.of(failureInfo), elapsedTime, waitingForPrerequisitesTime, Optional.of(queuedTime));
}

private DispatchInfo(Optional<CoordinatorLocation> coordinatorLocation, Optional<ExecutionFailureInfo> failureInfo, Duration elapsedTime, Duration queuedTime)
private DispatchInfo(
Optional<CoordinatorLocation> coordinatorLocation,
Optional<ExecutionFailureInfo> failureInfo,
Duration elapsedTime,
Duration waitingForPrerequisitesTime,
Optional<Duration> queuedTime)
{
this.coordinatorLocation = requireNonNull(coordinatorLocation, "coordinatorLocation is null");
this.failureInfo = requireNonNull(failureInfo, "failureInfo is null");
this.elapsedTime = requireNonNull(elapsedTime, "elapsedTime is null");
this.waitingForPrerequisitesTime = requireNonNull(waitingForPrerequisitesTime, "waitingForPrerequisitesTime is null");
this.queuedTime = requireNonNull(queuedTime, "queuedTime is null");
}

Expand All @@ -67,7 +82,12 @@ public Duration getElapsedTime()
return elapsedTime;
}

public Duration getQueuedTime()
public Duration getWaitingForPrerequisitesTime()
{
return waitingForPrerequisitesTime;
}

public Optional<Duration> getQueuedTime()
{
return queuedTime;
}
Expand Down
Expand Up @@ -213,13 +213,14 @@ private <C> void createQueryInternal(QueryId queryId, String slug, int retryCoun
retryCount,
selectionContext.getResourceGroupId(),
queryType,
warningCollector);
warningCollector,
(dq) -> resourceGroupManager.submit(preparedQuery.getStatement(), dq, selectionContext, queryExecutor));
mayankgarg1990 marked this conversation as resolved.
Show resolved Hide resolved

boolean queryAdded = queryCreated(dispatchQuery);
if (queryAdded && !dispatchQuery.isDone()) {
try {
clusterStatusSender.registerQuery(dispatchQuery);
resourceGroupManager.submit(preparedQuery.getStatement(), dispatchQuery, selectionContext, queryExecutor);
dispatchQuery.startWaitingForPrerequisites();
}
catch (Throwable e) {
// dispatch query has already been registered, so just fail it directly
Expand Down