Skip to content

Commit

Permalink
Report dynamic filtering stats in QueryStats
Browse files Browse the repository at this point in the history
  • Loading branch information
raunaqmorarka committed Aug 19, 2020
1 parent d28bc51 commit 28b84ce
Show file tree
Hide file tree
Showing 11 changed files with 318 additions and 5 deletions.
Expand Up @@ -38,6 +38,7 @@
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.prestosql.execution.QueryState.FAILED;
import static io.prestosql.memory.LocalMemoryManager.GENERAL_POOL;
import static io.prestosql.server.DynamicFilterService.DynamicFiltersStats;
import static io.prestosql.util.Failures.toFailure;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -295,6 +296,7 @@ private static QueryStats immediateFailureQueryStats()
0,
DataSize.ofBytes(0),
ImmutableList.of(),
DynamicFiltersStats.EMPTY,
ImmutableList.of());
}
}
Expand Up @@ -70,6 +70,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand All @@ -88,6 +89,7 @@
import static io.prestosql.execution.QueryState.WAITING_FOR_RESOURCES;
import static io.prestosql.execution.StageInfo.getAllStages;
import static io.prestosql.memory.LocalMemoryManager.GENERAL_POOL;
import static io.prestosql.server.DynamicFilterService.DynamicFiltersStats;
import static io.prestosql.spi.StandardErrorCode.NOT_FOUND;
import static io.prestosql.spi.StandardErrorCode.USER_CANCELED;
import static io.prestosql.util.Failures.toFailure;
Expand Down Expand Up @@ -160,6 +162,10 @@ public class QueryStateMachine

private final Optional<QueryType> queryType;

@GuardedBy("dynamicFiltersStatsSupplierLock")
private Supplier<DynamicFiltersStats> dynamicFiltersStatsSupplier = () -> DynamicFiltersStats.EMPTY;
private final Object dynamicFiltersStatsSupplierLock = new Object();

private QueryStateMachine(
String query,
Optional<String> preparedQuery,
Expand Down Expand Up @@ -614,6 +620,8 @@ private QueryStats getQueryStats(Optional<StageInfo> rootStage)

stageGcStatistics.build(),

getDynamicFiltersStats(),

operatorStatsSummary.build());
}

Expand Down Expand Up @@ -666,6 +674,20 @@ public void setRoutines(List<RoutineInfo> routines)
this.routines.set(ImmutableList.copyOf(routines));
}

private DynamicFiltersStats getDynamicFiltersStats()
{
synchronized (dynamicFiltersStatsSupplierLock) {
return dynamicFiltersStatsSupplier.get();
}
}

public void setDynamicFiltersStatsSupplier(Supplier<DynamicFiltersStats> dynamicFiltersStatsSupplier)
{
synchronized (dynamicFiltersStatsSupplierLock) {
this.dynamicFiltersStatsSupplier = requireNonNull(dynamicFiltersStatsSupplier, "dynamicFiltersStatsSupplier is null");
}
}

public Map<String, String> getSetSessionProperties()
{
return setSessionProperties;
Expand Down Expand Up @@ -1131,6 +1153,7 @@ private static QueryStats pruneQueryStats(QueryStats queryStats)
queryStats.getOutputPositions(),
queryStats.getPhysicalWrittenDataSize(),
queryStats.getStageGcStatistics(),
queryStats.getDynamicFiltersStats(),
ImmutableList.of()); // Remove the operator summaries as OperatorInfo (especially ExchangeClientStatus) can hold onto a large amount of memory
}

Expand Down
13 changes: 13 additions & 0 deletions presto-main/src/main/java/io/prestosql/execution/QueryStats.java
Expand Up @@ -33,6 +33,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.succinctBytes;
import static io.prestosql.server.DynamicFilterService.DynamicFiltersStats;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -102,6 +103,8 @@ public class QueryStats

private final List<StageGcStatistics> stageGcStatistics;

private final DynamicFiltersStats dynamicFiltersStats;

private final List<OperatorStats> operatorSummaries;

@JsonCreator
Expand Down Expand Up @@ -169,6 +172,8 @@ public QueryStats(

@JsonProperty("stageGcStatistics") List<StageGcStatistics> stageGcStatistics,

@JsonProperty("dynamicFiltersStats") DynamicFiltersStats dynamicFiltersStats,

@JsonProperty("operatorSummaries") List<OperatorStats> operatorSummaries)
{
this.createTime = requireNonNull(createTime, "createTime is null");
Expand Down Expand Up @@ -246,6 +251,8 @@ public QueryStats(

this.stageGcStatistics = ImmutableList.copyOf(requireNonNull(stageGcStatistics, "stageGcStatistics is null"));

this.dynamicFiltersStats = requireNonNull(dynamicFiltersStats, "dynamicFiltersStats is null");

this.operatorSummaries = ImmutableList.copyOf(requireNonNull(operatorSummaries, "operatorSummaries is null"));
}

Expand Down Expand Up @@ -569,6 +576,12 @@ public List<StageGcStatistics> getStageGcStatistics()
return stageGcStatistics;
}

@JsonProperty
public DynamicFiltersStats getDynamicFiltersStats()
{
return dynamicFiltersStats;
}

@JsonProperty
public List<OperatorStats> getOperatorSummaries()
{
Expand Down
Expand Up @@ -88,6 +88,7 @@
import static io.prestosql.execution.buffer.OutputBuffers.BROADCAST_PARTITION_ID;
import static io.prestosql.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers;
import static io.prestosql.execution.scheduler.SqlQueryScheduler.createSqlQueryScheduler;
import static io.prestosql.server.DynamicFilterService.DynamicFiltersStats;
import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.prestosql.sql.ParameterUtils.parameterExtractor;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -179,7 +180,8 @@ private SqlQueryExecution(
// analyze query
this.analysis = analyze(preparedQuery, stateMachine, metadata, accessControl, sqlParser, queryExplainer, warningCollector);

stateMachine.addStateChangeListener(state -> unregisterDynamicFilteringQuery());
stateMachine.addStateChangeListener(state -> unregisterDynamicFilteringQuery(
dynamicFilterService.getDynamicFilteringStats(stateMachine.getQueryId(), stateMachine.getSession())));

// when the query finishes cache the final query info, and clear the reference to the output stage
AtomicReference<SqlQueryScheduler> queryScheduler = this.queryScheduler;
Expand Down Expand Up @@ -211,14 +213,18 @@ private synchronized void registerDynamicFilteringQuery()
}

dynamicFilterService.registerQuery(this);
stateMachine.setDynamicFiltersStatsSupplier(
() -> dynamicFilterService.getDynamicFilteringStats(
stateMachine.getQueryId(),
stateMachine.getSession()));
}

private synchronized void unregisterDynamicFilteringQuery()
private synchronized void unregisterDynamicFilteringQuery(DynamicFiltersStats finalDynamicFiltersStats)
{
if (!isDone()) {
return;
}

stateMachine.setDynamicFiltersStatsSupplier(() -> finalDynamicFiltersStats);
dynamicFilterService.removeQuery(stateMachine.getQueryId());
}

Expand Down
Expand Up @@ -13,19 +13,24 @@
*/
package io.prestosql.server;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.units.Duration;
import io.prestosql.Session;
import io.prestosql.execution.SqlQueryExecution;
import io.prestosql.execution.StageState;
import io.prestosql.spi.QueryId;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.DynamicFilter;
import io.prestosql.spi.predicate.DiscreteValues;
import io.prestosql.spi.predicate.Domain;
import io.prestosql.spi.predicate.Ranges;
import io.prestosql.spi.predicate.TupleDomain;
import io.prestosql.sql.DynamicFilters;
import io.prestosql.sql.analyzer.FeaturesConfig;
Expand All @@ -42,6 +47,7 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -55,6 +61,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.airlift.concurrent.MoreFutures.getDone;
import static io.airlift.concurrent.MoreFutures.toCompletableFuture;
import static io.airlift.concurrent.MoreFutures.tryGetFutureValue;
import static io.airlift.concurrent.MoreFutures.whenAnyComplete;
Expand Down Expand Up @@ -156,6 +163,39 @@ void registerQuery(
}
}

public DynamicFiltersStats getDynamicFilteringStats(QueryId queryId, Session session)
{
Map<DynamicFilterId, SettableFuture<Domain>> dynamicFilterFutures = dynamicFilterSummaries.getOrDefault(queryId, ImmutableMap.of());
int numRepartitionedFilters = queryRepartitionedDynamicFilters.getOrDefault(queryId, ImmutableSet.of()).size();
int numReplicatedFilters = queryReplicatedDynamicFilters.getOrDefault(queryId, ImmutableSet.of()).size();
int numTotalDynamicFilters = dynamicFilterFutures.size();

List<DynamicFilterDomainStats> dynamicFilterDomainStats = dynamicFilterFutures.entrySet().stream()
.filter(entry -> entry.getValue().isDone())
.map(entry -> {
DynamicFilterId dynamicFilterId = entry.getKey();
Domain domain = getDone(entry.getValue());
// simplify for readability
String simplifiedDomain = domain.simplify(1).toString(session.toConnectorSession());
int rangeCount = domain.getValues().getValuesProcessor().transform(
Ranges::getRangeCount,
discreteValues -> 0,
allOrNone -> 0);
int discreteValuesCount = domain.getValues().getValuesProcessor().transform(
ranges -> 0,
DiscreteValues::getValuesCount,
allOrNone -> 0);
return new DynamicFilterDomainStats(dynamicFilterId, simplifiedDomain, rangeCount, discreteValuesCount);
})
.collect(toImmutableList());
return new DynamicFiltersStats(
dynamicFilterDomainStats,
numRepartitionedFilters,
numReplicatedFilters,
numTotalDynamicFilters,
dynamicFilterDomainStats.size());
}

public synchronized void removeQuery(QueryId queryId)
{
dynamicFilterSummaries.remove(queryId);
Expand Down Expand Up @@ -350,4 +390,150 @@ private List<Map<DynamicFilterId, Domain>> getTaskDynamicFilters()
return taskDynamicFilters;
}
}

public static class DynamicFiltersStats
{
public static final DynamicFiltersStats EMPTY = new DynamicFiltersStats(ImmutableList.of(), 0, 0, 0, 0);

private final List<DynamicFilterDomainStats> dynamicFilterDomainStats;
private final int numRepartitionedDynamicFilters;
private final int numReplicatedDynamicFilters;
private final int numTotalDynamicFilters;
private final int numDynamicFiltersCompleted;

@JsonCreator
public DynamicFiltersStats(
@JsonProperty("dynamicFilterDomainStats") List<DynamicFilterDomainStats> dynamicFilterDomainStats,
@JsonProperty("numRepartitionedDynamicFilters") int numRepartitionedDynamicFilters,
@JsonProperty("numReplicatedDynamicFilters") int numReplicatedDynamicFilters,
@JsonProperty("numTotalDynamicFilters") int numTotalDynamicFilters,
@JsonProperty("numDynamicFiltersCompleted") int numDynamicFiltersCompleted)
{
this.dynamicFilterDomainStats = dynamicFilterDomainStats;
this.numRepartitionedDynamicFilters = numRepartitionedDynamicFilters;
this.numReplicatedDynamicFilters = numReplicatedDynamicFilters;
this.numTotalDynamicFilters = numTotalDynamicFilters;
this.numDynamicFiltersCompleted = numDynamicFiltersCompleted;
}

@JsonProperty
public List<DynamicFilterDomainStats> getDynamicFilterDomainStats()
{
return dynamicFilterDomainStats;
}

@JsonProperty
public int getNumRepartitionedDynamicFilters()
{
return numRepartitionedDynamicFilters;
}

@JsonProperty
public int getNumReplicatedDynamicFilters()
{
return numReplicatedDynamicFilters;
}

@JsonProperty
public int getNumTotalDynamicFilters()
{
return numTotalDynamicFilters;
}

@JsonProperty
public int getNumDynamicFiltersCompleted()
{
return numDynamicFiltersCompleted;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DynamicFiltersStats that = (DynamicFiltersStats) o;
return numRepartitionedDynamicFilters == that.numRepartitionedDynamicFilters &&
numReplicatedDynamicFilters == that.numReplicatedDynamicFilters &&
numTotalDynamicFilters == that.numTotalDynamicFilters &&
numDynamicFiltersCompleted == that.numDynamicFiltersCompleted &&
Objects.equals(dynamicFilterDomainStats, that.dynamicFilterDomainStats);
}

@Override
public int hashCode()
{
return Objects.hash(dynamicFilterDomainStats, numRepartitionedDynamicFilters, numReplicatedDynamicFilters, numTotalDynamicFilters, numDynamicFiltersCompleted);
}
}

public static class DynamicFilterDomainStats
{
private final DynamicFilterId dynamicFilterId;
private final String simplifiedDomain;
private final int rangeCount;
private final int discreteValuesCount;

@JsonCreator
public DynamicFilterDomainStats(
@JsonProperty("dynamicFilterId") DynamicFilterId dynamicFilterId,
@JsonProperty("simplifiedDomain") String simplifiedDomain,
@JsonProperty("rangeCount") int rangeCount,
@JsonProperty("discreteValuesCount") int discreteValuesCount)
{
this.dynamicFilterId = dynamicFilterId;
this.simplifiedDomain = simplifiedDomain;
this.rangeCount = rangeCount;
this.discreteValuesCount = discreteValuesCount;
}

@JsonProperty
public DynamicFilterId getDynamicFilterId()
{
return dynamicFilterId;
}

@JsonProperty
public String getSimplifiedDomain()
{
return simplifiedDomain;
}

@JsonProperty
public int getRangeCount()
{
return rangeCount;
}

@JsonProperty
public int getDiscreteValuesCount()
{
return discreteValuesCount;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DynamicFilterDomainStats that = (DynamicFilterDomainStats) o;
return rangeCount == that.rangeCount &&
discreteValuesCount == that.discreteValuesCount &&
Objects.equals(dynamicFilterId, that.dynamicFilterId) &&
Objects.equals(simplifiedDomain, that.simplifiedDomain);
}

@Override
public int hashCode()
{
return Objects.hash(dynamicFilterId, simplifiedDomain, rangeCount, discreteValuesCount);
}
}
}

0 comments on commit 28b84ce

Please sign in to comment.