Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logic to rename aggregation to avoid name collision #1095

Merged
merged 12 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pull request if there was one.
Current
-------
### Fixed:

- [Fix: Bad serialization of AllGranularity](https://github.com/yahoo/fili/issues/1093)
* Change to rollup formatter broke serialization of all timegrain.

Expand Down Expand Up @@ -62,6 +63,11 @@ Current
* Created `LegacyGenerator` as a bridge interface from the existing constructor based api request impls and the factory based value object usage.

### Added:
- [Add logic to rename aggregation to avoid name collision](https://github.com/yahoo/fili/issues/1095)
* Add renameIfConflicting logic for aggregations in BaseProtocolMetricMaker.
- Subclasses of `BaseProtocolMetricMaker` can implement `getRenamedMetricNameWithPrefix` method to have unique rename prefix for corresponding maker.
- Default implementation will add Prefix `__renamed_` whenever there is aggregation name collision.

- [Add ability in fili-sql to translate FilteredAggregation into SQL](https://github.com/yahoo/fili/pull/1083)
* Translate a Druid query with `n` FilteredAggregation into SQL using `(n + 1)` subquery unions.
- See PR description for details
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class AggregationAverageMaker extends BaseProtocolMetricMaker {

private static final int DEPENDENT_METRICS_REQUIRED = 1;

private static final String RENAMED_AVERAGER_PREFIX = "__averager_renamed_";

public static final String PROTOCOL_NAME = ReaggregationProtocol.REAGGREGATION_CONTRACT_NAME;

public static final PostAggregation COUNT_INNER = new ConstantPostAggregation("one", 1);
Expand Down Expand Up @@ -97,6 +99,11 @@ public AggregationAverageMaker(
this.innerGrain = innerGrain;
}

@Override
protected String getRenamedMetricNameWithPrefix(String name) {
return RENAMED_AVERAGER_PREFIX + name;
}

@Override
protected TemplateDruidQuery makePartialQuery(
final LogicalMetricInfo logicalMetricInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class ArithmeticMaker extends BaseProtocolMetricMaker {

private final Function<String, ResultSetMapper> resultSetMapperSupplier;

private static final String RENAMED_ARITHMETIC_PREFIX = "__airthmetic_renamed_";

/**
* Constructor.
*
Expand Down Expand Up @@ -92,6 +94,11 @@ public ArithmeticMaker(MetricDictionary metricDictionary, ArithmeticPostAggregat
);
}

@Override
protected String getRenamedMetricNameWithPrefix(String name) {
return RENAMED_ARITHMETIC_PREFIX + name;
}

@Override
public ResultSetMapper makeCalculation(LogicalMetricInfo logicalMetricInfo, List<LogicalMetric> dependentMetric) {
return resultSetMapperSupplier.apply(logicalMetricInfo.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@
import com.yahoo.bard.webservice.data.metric.TemplateDruidQuery;
import com.yahoo.bard.webservice.data.metric.mappers.ResultSetMapper;
import com.yahoo.bard.webservice.data.metric.protocol.DefaultSystemMetricProtocols;
import com.yahoo.bard.webservice.data.metric.protocol.GeneratedMetricInfo;
import com.yahoo.bard.webservice.data.metric.protocol.ProtocolMetric;
import com.yahoo.bard.webservice.data.metric.protocol.ProtocolSupport;
import com.yahoo.bard.webservice.data.metric.protocol.ProtocolMetricImpl;
import com.yahoo.bard.webservice.druid.model.MetricField;

import java.util.Collections;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* An expansion on the the base {@link MetricMaker} contract that leverages the functionality of {@link ProtocolMetric}.
Expand All @@ -27,6 +33,8 @@ public abstract class BaseProtocolMetricMaker extends MetricMaker implements Mak
*/
protected final ProtocolSupport baseProtocolSupport;

private static final String DEFAULT_RENAMED_PREFIX = "__renamed_";

/**
* Construct a fully specified MetricMaker.
*
Expand Down Expand Up @@ -59,12 +67,75 @@ public LogicalMetric makeInnerWithResolvedDependencies(
LogicalMetricInfo logicalMetricInfo,
List<LogicalMetric> dependentMetrics
) {
TemplateDruidQuery partialQuery = makePartialQuery(logicalMetricInfo, dependentMetrics);
ResultSetMapper calculation = makeCalculation(logicalMetricInfo, dependentMetrics);
ProtocolSupport protocolSupport = makeProtocolSupport(logicalMetricInfo, dependentMetrics);
LogicalMetric dependentMetric = dependentMetrics.get(0);
LogicalMetric renamedDependentMetric = renameIfConflicting(logicalMetricInfo.getName(), dependentMetric);
List<LogicalMetric> renamedDependentMetrics =
dependentMetrics.size() == 1 ? Collections.singletonList(renamedDependentMetric) : dependentMetrics;
TemplateDruidQuery partialQuery = makePartialQuery(logicalMetricInfo, renamedDependentMetrics);
ResultSetMapper calculation = makeCalculation(logicalMetricInfo, renamedDependentMetrics);
ProtocolSupport protocolSupport = makeProtocolSupport(logicalMetricInfo, renamedDependentMetrics);

return new ProtocolMetricImpl(logicalMetricInfo, partialQuery, calculation, protocolSupport);
}

/**
* Renames the provided logical metric if there is a name conflict between it and the final desired output name.
* If there is no name conflict the input metric is returned unchanged. Otherwise, the input metric is renamed to
* not conflict with the final output name.
*
* @param finalOutputName The name to check for conflicts on
* @param dependentMetric The dependent metric to check for a conflicting name
*
* @return The metric without a name conflicting with finalOutputName
bharatmotwani marked this conversation as resolved.
Show resolved Hide resolved
*/
protected LogicalMetric renameIfConflicting(String finalOutputName, LogicalMetric dependentMetric) {
LogicalMetricInfo dependentMetricInfo = dependentMetric.getLogicalMetricInfo();
//if no name conflict , return the original dependentMetric
if (!java.util.Objects.equals(finalOutputName, dependentMetricInfo.getName())) {
return dependentMetric;
}
String newName = getRenamedMetricNameWithPrefix(dependentMetricInfo.getName());
while (ifConflicting(newName, dependentMetric)) {
newName = getRenamedMetricNameWithPrefix(newName);
}
LogicalMetricInfo resultInfo;
if (dependentMetricInfo instanceof GeneratedMetricInfo) {
GeneratedMetricInfo generatedMetricInfo = (GeneratedMetricInfo) dependentMetricInfo;
resultInfo = new GeneratedMetricInfo(newName, generatedMetricInfo.getBaseMetricName());
} else {
resultInfo = new LogicalMetricInfo(newName);
}

return dependentMetric.withLogicalMetricInfo(resultInfo);
}

/**
* Method to build new renamed metric name by adding prefix to it.
* This method would make sure all maker subclasses use unqiue prefix to build renamed name
* to avoid naming collision.
* @param name The dependent metric name need rename.
*
* @return Renamed metric name with specified prefix
*/
bharatmotwani marked this conversation as resolved.
Show resolved Hide resolved
protected String getRenamedMetricNameWithPrefix(String name) {
return DEFAULT_RENAMED_PREFIX + name;
}

/**
* Method to determine if there is name conflict.
* @param newName The dependent metric new name.
* @param dependentMetric The dependent LogicalMetric.
*
* @return Returns true if newName is already been used by the aggregations. otherwise false.
*/
protected boolean ifConflicting(String newName, LogicalMetric dependentMetric) {
Set<String> aggregations = dependentMetric.getTemplateDruidQuery().getAggregations().stream()
.map(MetricField::getName).collect(Collectors.toSet());
Set<String> postAggregations = dependentMetric.getTemplateDruidQuery().getPostAggregations()
.stream().map(MetricField::getName).collect(Collectors.toSet());
return aggregations.contains(newName) || postAggregations.contains(newName);
}

/**
* Create the post processing mapper for this LogicalMetric.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ public class QueryPlanningConstraint extends BaseDataSourceConstraint {
* @param requestGranularity The requested granularity of on the requested table
*/
public QueryPlanningConstraint(
Set<Dimension> requestDimensions,
Set<Dimension> filterDimensions,
Set<Dimension> metricDimensions,
Set<String> metricNames,
ApiFilters apiFilters,
@NotNull Set<Dimension> requestDimensions,
@NotNull Set<Dimension> filterDimensions,
@NotNull Set<Dimension> metricDimensions,
@NotNull Set<String> metricNames,
@NotNull ApiFilters apiFilters,
LogicalTable logicalTable,
List<Interval> intervals,
Set<LogicalMetric> logicalMetrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.data.config.metric.makers

import com.yahoo.bard.webservice.data.metric.LogicalMetricImpl
import com.yahoo.bard.webservice.data.metric.protocol.GeneratedMetricInfo
import com.yahoo.bard.webservice.druid.model.aggregation.LongSumAggregation

import static com.yahoo.bard.webservice.data.metric.protocol.protocols.ReaggregationProtocol.REAGGREGATION_CONTRACT_NAME
import static com.yahoo.bard.webservice.data.time.DefaultTimeGrain.DAY

Expand Down Expand Up @@ -30,18 +34,23 @@ import spock.lang.Specification
class AggregationAverageMakerSpec extends Specification{

static final String NAME = "users"
static final String NAME_RENAMED = "__averager_renamed_users"
static final String DESCRIPTION = NAME
static final String ESTIMATE_NAME = "users_estimate"
static final String ESTIMATE_SUM_NAME = "users_estimate_sum"
static final String ESTIMATE_NAME_RENAMED = "__averager_renamed_users_estimate"
static final String ESTIMATE_SUM_NAME_RENAMED = "__averager_renamed_users_estimate_sum"
static final int SKETCH_SIZE = 16000
static final ZonelessTimeGrain INNER_GRAIN = DAY
AggregationAverageMaker averageMaker

@Shared
FieldConverters converter = FieldConverterSupplier.sketchConverter
Aggregation sketchMerge
Aggregation sketchMergeRenamed

def setup(){
sketchMerge = new ThetaSketchAggregation(NAME, NAME, SKETCH_SIZE)
sketchMergeRenamed = new ThetaSketchAggregation(NAME_RENAMED, NAME, SKETCH_SIZE)
//Initializing the Sketch field converter
FieldConverterSupplier.sketchConverter = new ThetaSketchFieldConverter();
}
Expand All @@ -53,6 +62,7 @@ class AggregationAverageMakerSpec extends Specification{
def "Build a correct LogicalMetric when passed a sketch count aggregation."(){
given: "a logical metric for counting the number of users each day"
Aggregation userSketchCount = new ThetaSketchAggregation(NAME, NAME, 16000)
Aggregation userSketchCountRenamed = new ThetaSketchAggregation(NAME_RENAMED, NAME, 16000)
TemplateDruidQuery sketchQuery = new TemplateDruidQuery(
[userSketchCount] as Set,
[] as Set
Expand All @@ -67,8 +77,8 @@ class AggregationAverageMakerSpec extends Specification{
and: """a test-specific inner post aggregation and the expected metric. The test-specific inner post aggregation
estimates the size of userSketchCount."""
PostAggregation sketchEstimate = new ThetaSketchEstimatePostAggregation(
ESTIMATE_NAME,
new FieldAccessorPostAggregation(userSketchCount)
ESTIMATE_NAME_RENAMED,
new FieldAccessorPostAggregation(userSketchCountRenamed)
)
LogicalMetric expectedMetric = buildExpectedMetric(sketchEstimate)
LogicalMetric actual = maker.make(NAME, NAME)
Expand All @@ -81,9 +91,13 @@ class AggregationAverageMakerSpec extends Specification{
given: """A logical metric for counting the number of users each day, using a sketch merge and sketch
estimate rather than a sketch count."""
PostAggregation sketchEstimate = new ThetaSketchEstimatePostAggregation(
ESTIMATE_NAME,
ESTIMATE_NAME_RENAMED,
new FieldAccessorPostAggregation(sketchMerge)
)
PostAggregation sketchEstimateRenamed = new ThetaSketchEstimatePostAggregation(
ESTIMATE_NAME_RENAMED,
new FieldAccessorPostAggregation(sketchMergeRenamed)
)
TemplateDruidQuery sketchMergeAndEstimateQuery = new TemplateDruidQuery(
[sketchMerge] as Set,
[sketchEstimate] as Set
Expand All @@ -96,7 +110,7 @@ class AggregationAverageMakerSpec extends Specification{

and: """The expected metric. Identical to the expected metric from the previous test, except that the
sketchEstimate post aggregation is accessing a sketch merge, rather than a sketch count aggregation."""
LogicalMetric expectedMetric = buildExpectedMetric(sketchEstimate)
LogicalMetric expectedMetric = buildExpectedMetric(sketchEstimateRenamed)

expect:
maker.make(NAME, NAME).equals(expectedMetric)
Expand All @@ -105,6 +119,7 @@ class AggregationAverageMakerSpec extends Specification{
def "Build a correct LogicalMetric when passed only a sketch merge."(){
given: "A Logical Metric containing only a sketch estimate"
Aggregation sketchMerge = new ThetaSketchAggregation(NAME, NAME, SKETCH_SIZE)
Aggregation sketchMergeRenamed = new ThetaSketchAggregation(NAME_RENAMED, NAME, SKETCH_SIZE)
TemplateDruidQuery sketchEstimateQuery = new TemplateDruidQuery(
[sketchMerge] as Set,
[] as Set
Expand All @@ -118,15 +133,39 @@ class AggregationAverageMakerSpec extends Specification{
and: """the expected metric. Note that a sketch estimate is expected to be added automatically by the
AggregationAverageMaker."""
PostAggregation sketchEstimate = new ThetaSketchEstimatePostAggregation(
ESTIMATE_NAME,
new FieldAccessorPostAggregation(sketchMerge)
ESTIMATE_NAME_RENAMED,
new FieldAccessorPostAggregation(sketchMergeRenamed)
)
LogicalMetric expectedMetric = buildExpectedMetric(sketchEstimate)

expect:
maker.make(NAME, NAME).equals(expectedMetric)
}

def "When output name collides with dependent metric name, dependent metric must be renamed"() {
setup:
String metricName = "inputMetric"
String finalMetricName = "inputMetric"
LogicalMetric inputMetric = new LogicalMetricImpl(
info,
new TemplateDruidQuery([new LongSumAggregation("inputMetric", "unused")], []),
new NoOpResultSetMapper()
)
MetricMaker maker = new AggregationAverageMaker(new MetricDictionary(), INNER_GRAIN)
maker.metrics.add(inputMetric)

when:
LogicalMetric result = maker.renameIfConflicting(finalMetricName, inputMetric)

then:
result.getName() == AggregationAverageMaker.RENAMED_AVERAGER_PREFIX + metricName

where:
info | infotype
new LogicalMetricInfo("inputMetric") | "Logical Metric Info"
new GeneratedMetricInfo("inputMetric", "baseName") | "Generated Metric Info"
}

LogicalMetric buildDependentMetric(TemplateDruidQuery dependentQuery){
return new ProtocolMetricImpl(
new LogicalMetricInfo(NAME, DESCRIPTION),
Expand Down Expand Up @@ -162,15 +201,15 @@ class AggregationAverageMakerSpec extends Specification{
* @return The LogicalMetric expected by the tests
*/
LogicalMetric buildExpectedMetric(PostAggregation innerPostAggregation){
Set<Aggregation> innerAggregations = [sketchMerge] as LinkedHashSet
Set<Aggregation> innerAggregations = [sketchMergeRenamed] as LinkedHashSet
Set<PostAggregation> innerPostAggregations = [innerPostAggregation, AggregationAverageMaker.COUNT_INNER]
TemplateDruidQuery innerQueryTemplate = new TemplateDruidQuery(
innerAggregations,
innerPostAggregations,
DAY
)

Aggregation outerSum = new DoubleSumAggregation(ESTIMATE_SUM_NAME, ESTIMATE_NAME)
Aggregation outerSum = new DoubleSumAggregation(ESTIMATE_SUM_NAME_RENAMED, ESTIMATE_NAME_RENAMED)
FieldAccessorPostAggregation outerSumLookup = new FieldAccessorPostAggregation(outerSum)
PostAggregation average = new ArithmeticPostAggregation(
NAME,
Expand Down