Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report dynamic filtering stats in QueryStats #4440

Merged
merged 1 commit into from
Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -38,6 +38,7 @@
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.prestosql.execution.QueryState.FAILED;
import static io.prestosql.memory.LocalMemoryManager.GENERAL_POOL;
import static io.prestosql.server.DynamicFilterService.DynamicFiltersStats;
import static io.prestosql.util.Failures.toFailure;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -295,6 +296,7 @@ private static QueryStats immediateFailureQueryStats()
0,
DataSize.ofBytes(0),
ImmutableList.of(),
DynamicFiltersStats.EMPTY,
ImmutableList.of());
}
}
Expand Up @@ -70,6 +70,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand All @@ -88,6 +89,7 @@
import static io.prestosql.execution.QueryState.WAITING_FOR_RESOURCES;
import static io.prestosql.execution.StageInfo.getAllStages;
import static io.prestosql.memory.LocalMemoryManager.GENERAL_POOL;
import static io.prestosql.server.DynamicFilterService.DynamicFiltersStats;
import static io.prestosql.spi.StandardErrorCode.NOT_FOUND;
import static io.prestosql.spi.StandardErrorCode.USER_CANCELED;
import static io.prestosql.util.Failures.toFailure;
Expand Down Expand Up @@ -160,6 +162,10 @@ public class QueryStateMachine

private final Optional<QueryType> queryType;

@GuardedBy("dynamicFiltersStatsSupplierLock")
private Supplier<DynamicFiltersStats> dynamicFiltersStatsSupplier = () -> DynamicFiltersStats.EMPTY;
private final Object dynamicFiltersStatsSupplierLock = new Object();

private QueryStateMachine(
String query,
Optional<String> preparedQuery,
Expand Down Expand Up @@ -614,6 +620,8 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)

stageGcStatistics.build(),

getDynamicFiltersStats(),

operatorStatsSummary.build());
}

Expand Down Expand Up @@ -666,6 +674,20 @@ public void setRoutines(List<RoutineInfo> routines)
this.routines.set(ImmutableList.copyOf(routines));
}

private DynamicFiltersStats getDynamicFiltersStats()
{
synchronized (dynamicFiltersStatsSupplierLock) {
return dynamicFiltersStatsSupplier.get();
}
}

public void setDynamicFiltersStatsSupplier(Supplier<DynamicFiltersStats> dynamicFiltersStatsSupplier)
{
synchronized (dynamicFiltersStatsSupplierLock) {
this.dynamicFiltersStatsSupplier = requireNonNull(dynamicFiltersStatsSupplier, "dynamicFiltersStatsSupplier is null");
}
}

public Map<String, String> getSetSessionProperties()
{
return setSessionProperties;
Expand Down Expand Up @@ -1131,6 +1153,7 @@ private static QueryStats pruneQueryStats(QueryStats queryStats)
queryStats.getOutputPositions(),
queryStats.getPhysicalWrittenDataSize(),
queryStats.getStageGcStatistics(),
queryStats.getDynamicFiltersStats(),
ImmutableList.of()); // Remove the operator summaries as OperatorInfo (especially ExchangeClientStatus) can hold onto a large amount of memory
}

Expand Down
13 changes: 13 additions & 0 deletions presto-main/src/main/java/io/prestosql/execution/QueryStats.java
Expand Up @@ -33,6 +33,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.succinctBytes;
import static io.prestosql.server.DynamicFilterService.DynamicFiltersStats;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -102,6 +103,8 @@ public class QueryStats

private final List<StageGcStatistics> stageGcStatistics;

private final DynamicFiltersStats dynamicFiltersStats;

private final List<OperatorStats> operatorSummaries;

@JsonCreator
Expand Down Expand Up @@ -169,6 +172,8 @@ public QueryStats(

@JsonProperty("stageGcStatistics") List<StageGcStatistics> stageGcStatistics,

@JsonProperty("dynamicFiltersStats") DynamicFiltersStats dynamicFiltersStats,

@JsonProperty("operatorSummaries") List<OperatorStats> operatorSummaries)
{
this.createTime = requireNonNull(createTime, "createTime is null");
Expand Down Expand Up @@ -246,6 +251,8 @@ public QueryStats(

this.stageGcStatistics = ImmutableList.copyOf(requireNonNull(stageGcStatistics, "stageGcStatistics is null"));

this.dynamicFiltersStats = requireNonNull(dynamicFiltersStats, "dynamicFiltersStats is null");

this.operatorSummaries = ImmutableList.copyOf(requireNonNull(operatorSummaries, "operatorSummaries is null"));
}

Expand Down Expand Up @@ -569,6 +576,12 @@ public List<StageGcStatistics> getStageGcStatistics()
return stageGcStatistics;
}

@JsonProperty
public DynamicFiltersStats getDynamicFiltersStats()
{
return dynamicFiltersStats;
}

@JsonProperty
public List<OperatorStats> getOperatorSummaries()
{
Expand Down
Expand Up @@ -88,6 +88,7 @@
import static io.prestosql.execution.buffer.OutputBuffers.BROADCAST_PARTITION_ID;
import static io.prestosql.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers;
import static io.prestosql.execution.scheduler.SqlQueryScheduler.createSqlQueryScheduler;
import static io.prestosql.server.DynamicFilterService.DynamicFiltersStats;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.prestosql.sql.ParameterUtils.parameterExtractor;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -179,7 +180,8 @@ private SqlQueryExecution(
// analyze query
this.analysis = analyze(preparedQuery, stateMachine, metadata, accessControl, sqlParser, queryExplainer, warningCollector);

stateMachine.addStateChangeListener(state -> unregisterDynamicFilteringQuery());
stateMachine.addStateChangeListener(state -> unregisterDynamicFilteringQuery(
dynamicFilterService.getDynamicFilteringStats(stateMachine.getQueryId(), stateMachine.getSession())));

// when the query finishes cache the final query info, and clear the reference to the output stage
AtomicReference<SqlQueryScheduler> queryScheduler = this.queryScheduler;
Expand Down Expand Up @@ -211,14 +213,18 @@ private synchronized void registerDynamicFilteringQuery()
}

dynamicFilterService.registerQuery(this);
stateMachine.setDynamicFiltersStatsSupplier(
() -> dynamicFilterService.getDynamicFilteringStats(
stateMachine.getQueryId(),
stateMachine.getSession()));
}

private synchronized void unregisterDynamicFilteringQuery()
private synchronized void unregisterDynamicFilteringQuery(DynamicFiltersStats finalDynamicFiltersStats)
{
if (!isDone()) {
return;
}

stateMachine.setDynamicFiltersStatsSupplier(() -> finalDynamicFiltersStats);
dynamicFilterService.removeQuery(stateMachine.getQueryId());
}

Expand Down
Expand Up @@ -13,19 +13,24 @@
*/
package io.prestosql.server;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
import io.prestosql.Session;
import io.prestosql.execution.SqlQueryExecution;
import io.prestosql.execution.StageState;
import io.prestosql.spi.QueryId;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.DynamicFilter;
import io.prestosql.spi.predicate.DiscreteValues;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.Ranges;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.sql.DynamicFilters;
import io.prestosql.sql.analyzer.FeaturesConfig;
Expand All @@ -42,6 +47,7 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -55,6 +61,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.airlift.concurrent.MoreFutures.getDone;
import static io.airlift.concurrent.MoreFutures.toCompletableFuture;
import static io.airlift.concurrent.MoreFutures.tryGetFutureValue;
import static io.airlift.concurrent.MoreFutures.whenAnyComplete;
Expand Down Expand Up @@ -156,6 +163,39 @@ void registerQuery(
}
}

public DynamicFiltersStats getDynamicFilteringStats(QueryId queryId, Session session)
{
Map<DynamicFilterId, SettableFuture<Domain>> dynamicFilterFutures = dynamicFilterSummaries.getOrDefault(queryId, ImmutableMap.of());
int numRepartitionedFilters = queryRepartitionedDynamicFilters.getOrDefault(queryId, ImmutableSet.of()).size();
int numReplicatedFilters = queryReplicatedDynamicFilters.getOrDefault(queryId, ImmutableSet.of()).size();
int numTotalDynamicFilters = dynamicFilterFutures.size();

List<DynamicFilterDomainStats> dynamicFilterDomainStats = dynamicFilterFutures.entrySet().stream()
.filter(entry -> entry.getValue().isDone())
.map(entry -> {
DynamicFilterId dynamicFilterId = entry.getKey();
Domain domain = getDone(entry.getValue());
// simplify for readability
String simplifiedDomain = domain.simplify(1).toString(session.toConnectorSession());
int rangeCount = domain.getValues().getValuesProcessor().transform(
Ranges::getRangeCount,
discreteValues -> 0,
allOrNone -> 0);
int discreteValuesCount = domain.getValues().getValuesProcessor().transform(
ranges -> 0,
DiscreteValues::getValuesCount,
allOrNone -> 0);
return new DynamicFilterDomainStats(dynamicFilterId, simplifiedDomain, rangeCount, discreteValuesCount);
})
.collect(toImmutableList());
return new DynamicFiltersStats(
dynamicFilterDomainStats,
numRepartitionedFilters,
numReplicatedFilters,
numTotalDynamicFilters,
dynamicFilterDomainStats.size());
}

public synchronized void removeQuery(QueryId queryId)
{
dynamicFilterSummaries.remove(queryId);
Expand Down Expand Up @@ -350,4 +390,150 @@ private List<Map<DynamicFilterId, Domain>> getTaskDynamicFilters()
return taskDynamicFilters;
}
}

public static class DynamicFiltersStats
{
public static final DynamicFiltersStats EMPTY = new DynamicFiltersStats(ImmutableList.of(), 0, 0, 0, 0);

private final List<DynamicFilterDomainStats> dynamicFilterDomainStats;
private final int numRepartitionedDynamicFilters;
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
private final int numReplicatedDynamicFilters;
private final int numTotalDynamicFilters;
private final int numDynamicFiltersCompleted;

@JsonCreator
public DynamicFiltersStats(
@JsonProperty("dynamicFilterDomainStats") List<DynamicFilterDomainStats> dynamicFilterDomainStats,
@JsonProperty("numRepartitionedDynamicFilters") int numRepartitionedDynamicFilters,
@JsonProperty("numReplicatedDynamicFilters") int numReplicatedDynamicFilters,
@JsonProperty("numTotalDynamicFilters") int numTotalDynamicFilters,
@JsonProperty("numDynamicFiltersCompleted") int numDynamicFiltersCompleted)
{
this.dynamicFilterDomainStats = dynamicFilterDomainStats;
this.numRepartitionedDynamicFilters = numRepartitionedDynamicFilters;
this.numReplicatedDynamicFilters = numReplicatedDynamicFilters;
this.numTotalDynamicFilters = numTotalDynamicFilters;
this.numDynamicFiltersCompleted = numDynamicFiltersCompleted;
}

@JsonProperty
public List<DynamicFilterDomainStats> getDynamicFilterDomainStats()
{
return dynamicFilterDomainStats;
}

@JsonProperty
public int getNumRepartitionedDynamicFilters()
{
return numRepartitionedDynamicFilters;
}

@JsonProperty
public int getNumReplicatedDynamicFilters()
{
return numReplicatedDynamicFilters;
}

@JsonProperty
public int getNumTotalDynamicFilters()
{
return numTotalDynamicFilters;
}

@JsonProperty
public int getNumDynamicFiltersCompleted()
{
return numDynamicFiltersCompleted;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DynamicFiltersStats that = (DynamicFiltersStats) o;
return numRepartitionedDynamicFilters == that.numRepartitionedDynamicFilters &&
numReplicatedDynamicFilters == that.numReplicatedDynamicFilters &&
numTotalDynamicFilters == that.numTotalDynamicFilters &&
numDynamicFiltersCompleted == that.numDynamicFiltersCompleted &&
Objects.equals(dynamicFilterDomainStats, that.dynamicFilterDomainStats);
}

@Override
public int hashCode()
{
return Objects.hash(dynamicFilterDomainStats, numRepartitionedDynamicFilters, numReplicatedDynamicFilters, numTotalDynamicFilters, numDynamicFiltersCompleted);
}
}

public static class DynamicFilterDomainStats
{
private final DynamicFilterId dynamicFilterId;
private final String simplifiedDomain;
private final int rangeCount;
private final int discreteValuesCount;

@JsonCreator
public DynamicFilterDomainStats(
@JsonProperty("dynamicFilterId") DynamicFilterId dynamicFilterId,
@JsonProperty("simplifiedDomain") String simplifiedDomain,
@JsonProperty("rangeCount") int rangeCount,
@JsonProperty("discreteValuesCount") int discreteValuesCount)
{
this.dynamicFilterId = dynamicFilterId;
this.simplifiedDomain = simplifiedDomain;
this.rangeCount = rangeCount;
this.discreteValuesCount = discreteValuesCount;
}

@JsonProperty
public DynamicFilterId getDynamicFilterId()
{
return dynamicFilterId;
}

@JsonProperty
public String getSimplifiedDomain()
{
return simplifiedDomain;
}

@JsonProperty
public int getRangeCount()
{
return rangeCount;
}

@JsonProperty
public int getDiscreteValuesCount()
{
return discreteValuesCount;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DynamicFilterDomainStats that = (DynamicFilterDomainStats) o;
return rangeCount == that.rangeCount &&
discreteValuesCount == that.discreteValuesCount &&
Objects.equals(dynamicFilterId, that.dynamicFilterId) &&
Objects.equals(simplifiedDomain, that.simplifiedDomain);
}

@Override
public int hashCode()
{
return Objects.hash(dynamicFilterId, simplifiedDomain, rangeCount, discreteValuesCount);
}
}
}