Skip to content

Commit

Permalink
Add logic to rename aggregation to avoid name collision (#1095)
Browse files Browse the repository at this point in the history
* Add logic to rename aggregation to avoid name collision

* fix tests

* fix typo

* Review comments

* fix build failure

* addressed review comments

* fix style errors

* update changelog

* retry build.

* retry build

* fix checkstyle error

* fix constructor

Co-authored-by: Bharat Motwani <>
  • Loading branch information
bharatmotwani committed Oct 13, 2020
1 parent d5bd70a commit 7c022c7
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 17 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pull request if there was one.
Current
-------
### Fixed:

- [Fix: Bad serialization of AllGranularity](https://github.com/yahoo/fili/issues/1093)
* Change to rollup formatter broke serialization of all timegrain.

Expand Down Expand Up @@ -62,6 +63,11 @@ Current
* Created `LegacyGenerator` as a bridge interface from the existing constructor based api request impls and the factory based value object usage.

### Added:
- [Add logic to rename aggregation to avoid name collision](https://github.com/yahoo/fili/issues/1095)
* Add renameIfConflicting logic for aggregations in BaseProtocolMetricMaker.
- Subclasses of `BaseProtocolMetricMaker` can implement `getRenamedMetricNameWithPrefix` method to have unique rename prefix for corresponding maker.
- Default implementation will add Prefix `__renamed_` whenever there is aggregation name collision.

- [Add ability in fili-sql to translate FilteredAggregation into SQL](https://github.com/yahoo/fili/pull/1083)
* Translate a Druid query with `n` FilteredAggregation into SQL using `(n + 1)` subquery unions.
- See PR description for details
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class AggregationAverageMaker extends BaseProtocolMetricMaker {

private static final int DEPENDENT_METRICS_REQUIRED = 1;

private static final String RENAMED_AVERAGER_PREFIX = "__averager_renamed_";

public static final String PROTOCOL_NAME = ReaggregationProtocol.REAGGREGATION_CONTRACT_NAME;

public static final PostAggregation COUNT_INNER = new ConstantPostAggregation("one", 1);
Expand Down Expand Up @@ -97,6 +99,11 @@ public AggregationAverageMaker(
this.innerGrain = innerGrain;
}

@Override
protected String getRenamedMetricNameWithPrefix(String name) {
return RENAMED_AVERAGER_PREFIX + name;
}

@Override
protected TemplateDruidQuery makePartialQuery(
final LogicalMetricInfo logicalMetricInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class ArithmeticMaker extends BaseProtocolMetricMaker {

private final Function<String, ResultSetMapper> resultSetMapperSupplier;

private static final String RENAMED_ARITHMETIC_PREFIX = "__airthmetic_renamed_";

/**
* Constructor.
*
Expand Down Expand Up @@ -92,6 +94,11 @@ public ArithmeticMaker(MetricDictionary metricDictionary, ArithmeticPostAggregat
);
}

@Override
protected String getRenamedMetricNameWithPrefix(String name) {
return RENAMED_ARITHMETIC_PREFIX + name;
}

@Override
public ResultSetMapper makeCalculation(LogicalMetricInfo logicalMetricInfo, List<LogicalMetric> dependentMetric) {
return resultSetMapperSupplier.apply(logicalMetricInfo.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@
import com.yahoo.bard.webservice.data.metric.TemplateDruidQuery;
import com.yahoo.bard.webservice.data.metric.mappers.ResultSetMapper;
import com.yahoo.bard.webservice.data.metric.protocol.DefaultSystemMetricProtocols;
import com.yahoo.bard.webservice.data.metric.protocol.GeneratedMetricInfo;
import com.yahoo.bard.webservice.data.metric.protocol.ProtocolMetric;
import com.yahoo.bard.webservice.data.metric.protocol.ProtocolSupport;
import com.yahoo.bard.webservice.data.metric.protocol.ProtocolMetricImpl;
import com.yahoo.bard.webservice.druid.model.MetricField;

import java.util.Collections;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* An expansion on the the base {@link MetricMaker} contract that leverages the functionality of {@link ProtocolMetric}.
Expand All @@ -27,6 +33,8 @@ public abstract class BaseProtocolMetricMaker extends MetricMaker implements Mak
*/
protected final ProtocolSupport baseProtocolSupport;

private static final String DEFAULT_RENAMED_PREFIX = "__renamed_";

/**
* Construct a fully specified MetricMaker.
*
Expand Down Expand Up @@ -59,12 +67,75 @@ public LogicalMetric makeInnerWithResolvedDependencies(
LogicalMetricInfo logicalMetricInfo,
List<LogicalMetric> dependentMetrics
) {
TemplateDruidQuery partialQuery = makePartialQuery(logicalMetricInfo, dependentMetrics);
ResultSetMapper calculation = makeCalculation(logicalMetricInfo, dependentMetrics);
ProtocolSupport protocolSupport = makeProtocolSupport(logicalMetricInfo, dependentMetrics);
LogicalMetric dependentMetric = dependentMetrics.get(0);
LogicalMetric renamedDependentMetric = renameIfConflicting(logicalMetricInfo.getName(), dependentMetric);
List<LogicalMetric> renamedDependentMetrics =
dependentMetrics.size() == 1 ? Collections.singletonList(renamedDependentMetric) : dependentMetrics;
TemplateDruidQuery partialQuery = makePartialQuery(logicalMetricInfo, renamedDependentMetrics);
ResultSetMapper calculation = makeCalculation(logicalMetricInfo, renamedDependentMetrics);
ProtocolSupport protocolSupport = makeProtocolSupport(logicalMetricInfo, renamedDependentMetrics);

return new ProtocolMetricImpl(logicalMetricInfo, partialQuery, calculation, protocolSupport);
}

/**
* Renames the provided logical metric if there is a name conflict between it and the final desired output name.
* If there is no name conflict the input metric is returned unchanged. Otherwise, the input metric is renamed to
* not conflict with the final output name.
*
* @param finalOutputName The name to check for conflicts on
* @param dependentMetric The dependent metric to check for a conflicting name
*
* @return The metric without a name conflicting with finalOutputName
*/
protected LogicalMetric renameIfConflicting(String finalOutputName, LogicalMetric dependentMetric) {
LogicalMetricInfo dependentMetricInfo = dependentMetric.getLogicalMetricInfo();
//if no name conflict , return the original dependentMetric
if (!java.util.Objects.equals(finalOutputName, dependentMetricInfo.getName())) {
return dependentMetric;
}
String newName = getRenamedMetricNameWithPrefix(dependentMetricInfo.getName());
while (ifConflicting(newName, dependentMetric)) {
newName = getRenamedMetricNameWithPrefix(newName);
}
LogicalMetricInfo resultInfo;
if (dependentMetricInfo instanceof GeneratedMetricInfo) {
GeneratedMetricInfo generatedMetricInfo = (GeneratedMetricInfo) dependentMetricInfo;
resultInfo = new GeneratedMetricInfo(newName, generatedMetricInfo.getBaseMetricName());
} else {
resultInfo = new LogicalMetricInfo(newName);
}

return dependentMetric.withLogicalMetricInfo(resultInfo);
}

/**
* Method to build new renamed metric name by adding prefix to it.
* This method would make sure all maker subclasses use unqiue prefix to build renamed name
* to avoid naming collision.
* @param name The dependent metric name need rename.
*
* @return Renamed metric name with specified prefix
*/
protected String getRenamedMetricNameWithPrefix(String name) {
return DEFAULT_RENAMED_PREFIX + name;
}

/**
* Method to determine if there is name conflict.
* @param newName The dependent metric new name.
* @param dependentMetric The dependent LogicalMetric.
*
* @return Returns true if newName is already been used by the aggregations. otherwise false.
*/
protected boolean ifConflicting(String newName, LogicalMetric dependentMetric) {
Set<String> aggregations = dependentMetric.getTemplateDruidQuery().getAggregations().stream()
.map(MetricField::getName).collect(Collectors.toSet());
Set<String> postAggregations = dependentMetric.getTemplateDruidQuery().getPostAggregations()
.stream().map(MetricField::getName).collect(Collectors.toSet());
return aggregations.contains(newName) || postAggregations.contains(newName);
}

/**
* Create the post processing mapper for this LogicalMetric.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ public class QueryPlanningConstraint extends BaseDataSourceConstraint {
* @param requestGranularity The requested granularity of on the requested table
*/
public QueryPlanningConstraint(
Set<Dimension> requestDimensions,
Set<Dimension> filterDimensions,
Set<Dimension> metricDimensions,
Set<String> metricNames,
ApiFilters apiFilters,
@NotNull Set<Dimension> requestDimensions,
@NotNull Set<Dimension> filterDimensions,
@NotNull Set<Dimension> metricDimensions,
@NotNull Set<String> metricNames,
@NotNull ApiFilters apiFilters,
LogicalTable logicalTable,
List<Interval> intervals,
Set<LogicalMetric> logicalMetrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms.
package com.yahoo.bard.webservice.data.config.metric.makers

import com.yahoo.bard.webservice.data.metric.LogicalMetricImpl
import com.yahoo.bard.webservice.data.metric.protocol.GeneratedMetricInfo
import com.yahoo.bard.webservice.druid.model.aggregation.LongSumAggregation

import static com.yahoo.bard.webservice.data.metric.protocol.protocols.ReaggregationProtocol.REAGGREGATION_CONTRACT_NAME
import static com.yahoo.bard.webservice.data.time.DefaultTimeGrain.DAY

Expand Down Expand Up @@ -30,18 +34,23 @@ import spock.lang.Specification
class AggregationAverageMakerSpec extends Specification{

static final String NAME = "users"
static final String NAME_RENAMED = "__averager_renamed_users"
static final String DESCRIPTION = NAME
static final String ESTIMATE_NAME = "users_estimate"
static final String ESTIMATE_SUM_NAME = "users_estimate_sum"
static final String ESTIMATE_NAME_RENAMED = "__averager_renamed_users_estimate"
static final String ESTIMATE_SUM_NAME_RENAMED = "__averager_renamed_users_estimate_sum"
static final int SKETCH_SIZE = 16000
static final ZonelessTimeGrain INNER_GRAIN = DAY
AggregationAverageMaker averageMaker

@Shared
FieldConverters converter = FieldConverterSupplier.sketchConverter
Aggregation sketchMerge
Aggregation sketchMergeRenamed

def setup(){
sketchMerge = new ThetaSketchAggregation(NAME, NAME, SKETCH_SIZE)
sketchMergeRenamed = new ThetaSketchAggregation(NAME_RENAMED, NAME, SKETCH_SIZE)
//Initializing the Sketch field converter
FieldConverterSupplier.sketchConverter = new ThetaSketchFieldConverter();
}
Expand All @@ -53,6 +62,7 @@ class AggregationAverageMakerSpec extends Specification{
def "Build a correct LogicalMetric when passed a sketch count aggregation."(){
given: "a logical metric for counting the number of users each day"
Aggregation userSketchCount = new ThetaSketchAggregation(NAME, NAME, 16000)
Aggregation userSketchCountRenamed = new ThetaSketchAggregation(NAME_RENAMED, NAME, 16000)
TemplateDruidQuery sketchQuery = new TemplateDruidQuery(
[userSketchCount] as Set,
[] as Set
Expand All @@ -67,8 +77,8 @@ class AggregationAverageMakerSpec extends Specification{
and: """a test-specific inner post aggregation and the expected metric. The test-specific inner post aggregation
estimates the size of userSketchCount."""
PostAggregation sketchEstimate = new ThetaSketchEstimatePostAggregation(
ESTIMATE_NAME,
new FieldAccessorPostAggregation(userSketchCount)
ESTIMATE_NAME_RENAMED,
new FieldAccessorPostAggregation(userSketchCountRenamed)
)
LogicalMetric expectedMetric = buildExpectedMetric(sketchEstimate)
LogicalMetric actual = maker.make(NAME, NAME)
Expand All @@ -81,9 +91,13 @@ class AggregationAverageMakerSpec extends Specification{
given: """A logical metric for counting the number of users each day, using a sketch merge and sketch
estimate rather than a sketch count."""
PostAggregation sketchEstimate = new ThetaSketchEstimatePostAggregation(
ESTIMATE_NAME,
ESTIMATE_NAME_RENAMED,
new FieldAccessorPostAggregation(sketchMerge)
)
PostAggregation sketchEstimateRenamed = new ThetaSketchEstimatePostAggregation(
ESTIMATE_NAME_RENAMED,
new FieldAccessorPostAggregation(sketchMergeRenamed)
)
TemplateDruidQuery sketchMergeAndEstimateQuery = new TemplateDruidQuery(
[sketchMerge] as Set,
[sketchEstimate] as Set
Expand All @@ -96,7 +110,7 @@ class AggregationAverageMakerSpec extends Specification{

and: """The expected metric. Identical to the expected metric from the previous test, except that the
sketchEstimate post aggregation is accessing a sketch merge, rather than a sketch count aggregation."""
LogicalMetric expectedMetric = buildExpectedMetric(sketchEstimate)
LogicalMetric expectedMetric = buildExpectedMetric(sketchEstimateRenamed)

expect:
maker.make(NAME, NAME).equals(expectedMetric)
Expand All @@ -105,6 +119,7 @@ class AggregationAverageMakerSpec extends Specification{
def "Build a correct LogicalMetric when passed only a sketch merge."(){
given: "A Logical Metric containing only a sketch estimate"
Aggregation sketchMerge = new ThetaSketchAggregation(NAME, NAME, SKETCH_SIZE)
Aggregation sketchMergeRenamed = new ThetaSketchAggregation(NAME_RENAMED, NAME, SKETCH_SIZE)
TemplateDruidQuery sketchEstimateQuery = new TemplateDruidQuery(
[sketchMerge] as Set,
[] as Set
Expand All @@ -118,15 +133,39 @@ class AggregationAverageMakerSpec extends Specification{
and: """the expected metric. Note that a sketch estimate is expected to be added automatically by the
AggregationAverageMaker."""
PostAggregation sketchEstimate = new ThetaSketchEstimatePostAggregation(
ESTIMATE_NAME,
new FieldAccessorPostAggregation(sketchMerge)
ESTIMATE_NAME_RENAMED,
new FieldAccessorPostAggregation(sketchMergeRenamed)
)
LogicalMetric expectedMetric = buildExpectedMetric(sketchEstimate)

expect:
maker.make(NAME, NAME).equals(expectedMetric)
}

def "When output name collides with dependent metric name, dependent metric must be renamed"() {
setup:
String metricName = "inputMetric"
String finalMetricName = "inputMetric"
LogicalMetric inputMetric = new LogicalMetricImpl(
info,
new TemplateDruidQuery([new LongSumAggregation("inputMetric", "unused")], []),
new NoOpResultSetMapper()
)
MetricMaker maker = new AggregationAverageMaker(new MetricDictionary(), INNER_GRAIN)
maker.metrics.add(inputMetric)

when:
LogicalMetric result = maker.renameIfConflicting(finalMetricName, inputMetric)

then:
result.getName() == AggregationAverageMaker.RENAMED_AVERAGER_PREFIX + metricName

where:
info | infotype
new LogicalMetricInfo("inputMetric") | "Logical Metric Info"
new GeneratedMetricInfo("inputMetric", "baseName") | "Generated Metric Info"
}

LogicalMetric buildDependentMetric(TemplateDruidQuery dependentQuery){
return new ProtocolMetricImpl(
new LogicalMetricInfo(NAME, DESCRIPTION),
Expand Down Expand Up @@ -162,15 +201,15 @@ class AggregationAverageMakerSpec extends Specification{
* @return The LogicalMetric expected by the tests
*/
LogicalMetric buildExpectedMetric(PostAggregation innerPostAggregation){
Set<Aggregation> innerAggregations = [sketchMerge] as LinkedHashSet
Set<Aggregation> innerAggregations = [sketchMergeRenamed] as LinkedHashSet
Set<PostAggregation> innerPostAggregations = [innerPostAggregation, AggregationAverageMaker.COUNT_INNER]
TemplateDruidQuery innerQueryTemplate = new TemplateDruidQuery(
innerAggregations,
innerPostAggregations,
DAY
)

Aggregation outerSum = new DoubleSumAggregation(ESTIMATE_SUM_NAME, ESTIMATE_NAME)
Aggregation outerSum = new DoubleSumAggregation(ESTIMATE_SUM_NAME_RENAMED, ESTIMATE_NAME_RENAMED)
FieldAccessorPostAggregation outerSumLookup = new FieldAccessorPostAggregation(outerSum)
PostAggregation average = new ArithmeticPostAggregation(
NAME,
Expand Down

0 comments on commit 7c022c7

Please sign in to comment.