-
Notifications
You must be signed in to change notification settings - Fork 96
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reduce makers #127
Reduce makers #127
Changes from all commits
57a9ff3
0dd0ead
7e9a453
2a7cb5f
c9eecc3
43d85da
7ba6157
1c3a9bf
af2f6be
7f88179
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,12 +2,13 @@ | |
// Licensed under the terms of the Apache license. Please see LICENSE.md file distributed with this work for terms. | ||
package com.yahoo.bard.webservice.data.config.metric.makers; | ||
|
||
import static com.yahoo.bard.webservice.druid.model.postaggregation.ArithmeticPostAggregation.ArithmeticPostAggregationFunction.DIVIDE; | ||
import static com.yahoo.bard.webservice.druid.model.postaggregation.ArithmeticPostAggregation | ||
.ArithmeticPostAggregationFunction.DIVIDE; | ||
import static com.yahoo.bard.webservice.druid.util.FieldConverterSupplier.sketchConverter; | ||
|
||
import com.yahoo.bard.webservice.data.metric.LogicalMetric; | ||
import com.yahoo.bard.webservice.data.metric.MetricDictionary; | ||
import com.yahoo.bard.webservice.data.metric.TemplateDruidQuery; | ||
import com.yahoo.bard.webservice.data.metric.mappers.NoOpResultSetMapper; | ||
import com.yahoo.bard.webservice.data.time.ZonelessTimeGrain; | ||
import com.yahoo.bard.webservice.druid.model.MetricField; | ||
import com.yahoo.bard.webservice.druid.model.aggregation.Aggregation; | ||
|
@@ -18,13 +19,15 @@ | |
import com.yahoo.bard.webservice.druid.model.postaggregation.ConstantPostAggregation; | ||
import com.yahoo.bard.webservice.druid.model.postaggregation.FieldAccessorPostAggregation; | ||
import com.yahoo.bard.webservice.druid.model.postaggregation.PostAggregation; | ||
import com.yahoo.bard.webservice.druid.util.FieldConverterSupplier; | ||
|
||
import com.google.common.collect.ImmutableSet; | ||
|
||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.LinkedHashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
import javax.validation.constraints.NotNull; | ||
|
||
|
@@ -35,6 +38,15 @@ | |
* takes aggregated data from a finer time grain (i.e. DefaultTimeGrain.DAY) and computes an average across a coarser | ||
* time grain (i.e. DefaultTimeGrain.WEEK). For example, given the total number of visitors to www.example.com for each | ||
* day of the week, we can compute the average number of daily visitors to example.com for the entire week. | ||
* <p> | ||
* A nested average requires the following columns: | ||
* <ul> | ||
* <li>an inner query constant one</li> | ||
* <li>an inner query numeric aggregation or post aggregation field to sum</li> | ||
* <li>and outer query sum of the constant which provide a row count</li> | ||
* <li>an outer query sum of the inner numeric summand</li> | ||
* <li>finally, a post aggregation in the outer query dividing the sum by the count</li> | ||
* </ul> | ||
*/ | ||
public class AggregationAverageMaker extends MetricMaker { | ||
|
||
|
@@ -70,7 +82,7 @@ protected LogicalMetric makeInner(String metricName, List<String> dependentMetri | |
TemplateDruidQuery innerQuery = buildInnerQuery(sourceMetric, dependentMetric.getTemplateDruidQuery()); | ||
TemplateDruidQuery outerQuery = buildOuterQuery(metricName, sourceMetric, innerQuery); | ||
|
||
return new LogicalMetric(outerQuery, new NoOpResultSetMapper(), metricName); | ||
return new LogicalMetric(outerQuery, NO_OP_MAPPER, metricName); | ||
} | ||
|
||
/** | ||
|
@@ -113,80 +125,75 @@ private TemplateDruidQuery buildOuterQuery( | |
* @return A template query representing the inner aggregation | ||
*/ | ||
private TemplateDruidQuery buildInnerQuery(MetricField sourceMetric, TemplateDruidQuery innerDependentQuery) { | ||
|
||
Set<Aggregation> newInnerAggregations = convertSketchesToSketchMerges(innerDependentQuery.getAggregations()); | ||
Set<PostAggregation> newInnerPostAggregations = new LinkedHashSet<>(); | ||
|
||
// If the sourceMetric is a Post Aggregator, we need to preserve it in the inner query | ||
if (sourceMetric instanceof PostAggregation) { | ||
newInnerPostAggregations.add((PostAggregation) sourceMetric); | ||
} | ||
Set<PostAggregation> newInnerPostAggregations = (sourceMetric instanceof PostAggregation) ? | ||
ImmutableSet.of((PostAggregation) sourceMetric) : | ||
Collections.emptySet(); | ||
|
||
// Build the inner query with the new aggregations and with the count | ||
TemplateDruidQuery innerQuery = innerDependentQuery; | ||
innerQuery = innerQuery.withAggregations(newInnerAggregations); | ||
innerQuery = innerQuery.withPostAggregations(newInnerPostAggregations); | ||
innerQuery = innerQuery.merge(buildTimeGrainCounterQuery()); | ||
return innerQuery; | ||
return innerDependentQuery.withAggregations(newInnerAggregations) | ||
.withPostAggregations(newInnerPostAggregations) | ||
.merge(buildTimeGrainCounterQuery()); | ||
} | ||
|
||
/** | ||
* If the aggregation being averaged is a sketch, it will become a sketch merge, and need to be estimated for | ||
* summing to work at the outer level. | ||
* If the aggregation being averaged is a sketch, the inner query must convert it to a numerical type so that it | ||
* can be summed. | ||
* | ||
* @param originalSourceMetric The metric being target for sums | ||
* | ||
* @return Either the original MetricField, or a new SketchEstimate post aggregation | ||
*/ | ||
private MetricField convertToSketchEstimateIfNeeded(MetricField originalSourceMetric) { | ||
if (originalSourceMetric instanceof SketchAggregation) { | ||
return FieldConverterSupplier.sketchConverter.asSketchEstimate((SketchAggregation) originalSourceMetric); | ||
} | ||
return originalSourceMetric; | ||
return originalSourceMetric instanceof SketchAggregation ? | ||
sketchConverter.asSketchEstimate((SketchAggregation) originalSourceMetric) : | ||
originalSourceMetric; | ||
} | ||
|
||
/** | ||
* Copy a set of aggregations, replacing any sketch aggregations with sketchMerge aggregations. | ||
* This is an artifact of earlier sketch code which didn't have a single sketch type that could be used without | ||
* finalization in inner queries. | ||
* | ||
* @param originalAggregations The read-only original aggregations | ||
* | ||
* @return The new aggregation set | ||
* | ||
* @deprecated This will become unnecessary when the old sketch library is removed | ||
*/ | ||
@Deprecated | ||
private Set<Aggregation> convertSketchesToSketchMerges(Set<Aggregation> originalAggregations) { | ||
Set<Aggregation> result = new LinkedHashSet<>(); | ||
for (Aggregation agg : originalAggregations) { | ||
if (agg.isSketch()) { | ||
result.add(FieldConverterSupplier.sketchConverter.asInnerSketch((SketchAggregation) agg)); | ||
} else { | ||
result.add(agg); | ||
} | ||
} | ||
return result; | ||
return originalAggregations.stream() | ||
.map(agg -> agg.isSketch() ? sketchConverter.asInnerSketch((SketchAggregation) agg) : agg) | ||
.collect(Collectors.toCollection(LinkedHashSet::new)); | ||
} | ||
|
||
/** | ||
* Create an Aggregation for summing on a metric from an inner query. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should probably toss in a blank line and a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That hasn't been our rule. The summary ends with a period and newline. Further commentary may be offset, but need not be. The sentence end will be caught by the javadoc tool and pulled out to a summary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When one is adding an extra line or two to the summary, I'm fine with that. But since this is a rather detailed paragraph, I was just thinking that it would look better if it was offset into its own paragraph from the summary, in the method's more detailed view. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair enough |
||
* <p> | ||
* If the original metric that is being averaged is used together in a query with the averaging metric, then both | ||
* the original aggregator name and the summing aggregator used by the average metric may appear together in the | ||
* outer query. If they share the same name but different definitions, there would be a conflict. This can arise | ||
* if the original metric used a post aggregation or if it contained a non numeric (e.g. sketch) aggregation. | ||
* <p> | ||
* We use the convention of changing the name for the average summing aggregation by adding the suffix '_sum'. | ||
* | ||
* @param innerMetric The metric on the inner query being summed | ||
* | ||
* @return The aggregator that sums across the inner query quantity | ||
*/ | ||
private @NotNull Aggregation createSummingAggregator(MetricField innerMetric) { | ||
// Pick a name for the outer (summing) aggregation name | ||
String outerSummingFieldName; | ||
// TODO: Explain why we have to go through these naming hoops. Likely due to druid naming requirement issues | ||
if (!innerMetric.isSketch() && innerMetric instanceof Aggregation) { | ||
outerSummingFieldName = innerMetric.getName(); | ||
} else { | ||
outerSummingFieldName = innerMetric.getName() + "_sum"; | ||
} | ||
String outerSummingName = (!innerMetric.isSketch() && innerMetric instanceof Aggregation) ? | ||
innerMetric.getName() : | ||
innerMetric.getName() + "_sum"; | ||
|
||
// Make sure we don't drop precision | ||
if (innerMetric.isFloatingPoint()) { | ||
return new DoubleSumAggregation(outerSummingFieldName, innerMetric.getName()); | ||
} | ||
return innerMetric.isFloatingPoint() ? | ||
new DoubleSumAggregation(outerSummingName, innerMetric.getName()) : | ||
new LongSumAggregation(outerSummingName, innerMetric.getName()); | ||
|
||
return new LongSumAggregation(outerSummingFieldName, innerMetric.getName()); | ||
} | ||
|
||
@Override | ||
|
@@ -200,8 +207,6 @@ protected int getDependentMetricsRequired() { | |
* @return The created query | ||
*/ | ||
private TemplateDruidQuery buildTimeGrainCounterQuery() { | ||
Set<Aggregation> timedAggs = Collections.emptySet(); | ||
Set<PostAggregation> timedPostAggs = Collections.singleton(COUNT_INNER); | ||
return new TemplateDruidQuery(timedAggs, timedPostAggs, innerGrain); | ||
return new TemplateDruidQuery(Collections.emptySet(), Collections.singleton(COUNT_INNER), innerGrain); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,7 @@ | |
import com.yahoo.bard.webservice.data.metric.MetricDictionary; | ||
import com.yahoo.bard.webservice.data.metric.TemplateDruidQuery; | ||
import com.yahoo.bard.webservice.data.metric.mappers.ColumnMapper; | ||
import com.yahoo.bard.webservice.data.metric.mappers.SketchRoundUpMapper; | ||
import com.yahoo.bard.webservice.data.metric.mappers.ResultSetMapper; | ||
import com.yahoo.bard.webservice.druid.model.postaggregation.ArithmeticPostAggregation; | ||
import com.yahoo.bard.webservice.druid.model.postaggregation.ArithmeticPostAggregation.ArithmeticPostAggregationFunction; | ||
import com.yahoo.bard.webservice.druid.model.postaggregation.PostAggregation; | ||
|
@@ -17,6 +17,7 @@ | |
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.function.Function; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
|
@@ -28,26 +29,49 @@ public class ArithmeticMaker extends MetricMaker { | |
|
||
private static final Logger LOG = LoggerFactory.getLogger(ArithmeticMaker.class); | ||
|
||
final ArithmeticPostAggregationFunction function; | ||
private final ArithmeticPostAggregationFunction function; | ||
|
||
final ColumnMapper resultSetMapper; | ||
private final Function<String, ResultSetMapper> resultSetMapperSupplier; | ||
|
||
/** | ||
* Build a fully specified ArithmeticMaker. | ||
* Constructor. | ||
* | ||
* @param metricDictionary The dictionary used to resolve dependent metrics when building the LogicalMetric | ||
* @param function The arithmetic operation performed by the LogicalMetrics constructed by this maker | ||
* @param resultSetMapper The function to be applied to the result that is returned by the query | ||
that is built from the LogicalMetric which is built by this maker. | ||
* @param resultSetMapperSupplier A function that takes a metric column name and produces at build time, a result | ||
* set mapper. | ||
*/ | ||
public ArithmeticMaker( | ||
protected ArithmeticMaker( | ||
MetricDictionary metricDictionary, | ||
ArithmeticPostAggregationFunction function, | ||
ColumnMapper resultSetMapper | ||
Function<String, ResultSetMapper> resultSetMapperSupplier | ||
) { | ||
super(metricDictionary); | ||
this.function = function; | ||
this.resultSetMapper = resultSetMapper; | ||
this.resultSetMapperSupplier = resultSetMapperSupplier; | ||
} | ||
|
||
/** | ||
* Constructor. | ||
* | ||
* @param metricDictionary The dictionary used to resolve dependent metrics when building the LogicalMetric | ||
* @param function The arithmetic operation performed by the LogicalMetrics constructed by this maker | ||
* @param resultSetMapper The mapping function to be applied to the result that is returned by the query that is | ||
* built from the LogicalMetric which is built by this maker. | ||
* | ||
* @deprecated to override default mapping, use the Function constructor | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we're deprecating this, we should also:
Alternatively, if we think there's value in the existence of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deprecating |
||
*/ | ||
@Deprecated | ||
public ArithmeticMaker( | ||
MetricDictionary metricDictionary, | ||
ArithmeticPostAggregationFunction function, | ||
ColumnMapper resultSetMapper | ||
) { | ||
this( | ||
metricDictionary, | ||
function, | ||
(Function<String, ResultSetMapper>) (String name) -> (ResultSetMapper) resultSetMapper | ||
); | ||
} | ||
|
||
/** | ||
|
@@ -57,33 +81,29 @@ public ArithmeticMaker( | |
* @param function The arithmetic operation performed by the LogicalMetrics constructed by this maker | ||
*/ | ||
public ArithmeticMaker(MetricDictionary metricDictionary, ArithmeticPostAggregationFunction function) { | ||
// TODO: Deprecate me, mappers should always be specified at creation time, not implicitly | ||
this(metricDictionary, function, new SketchRoundUpMapper()); | ||
this( | ||
metricDictionary, | ||
function, | ||
(Function<String, ResultSetMapper>) ignore -> NO_OP_MAPPER | ||
); | ||
} | ||
|
||
@Override | ||
protected LogicalMetric makeInner(String metricName, List<String> dependentMetrics) { | ||
TemplateDruidQuery mergedQuery = getMergedQuery(dependentMetrics); | ||
|
||
// Get the ArithmeticPostAggregation operands from the dependent metrics | ||
List<PostAggregation> operands = dependentMetrics.stream() | ||
.map(this::getNumericField) | ||
.collect(Collectors.toList()); | ||
|
||
// Create the ArithmeticPostAggregation | ||
PostAggregation resultPostAgg = new ArithmeticPostAggregation(metricName, function, operands); | ||
Set<PostAggregation> postAggs = Collections.singleton(resultPostAgg); | ||
|
||
TemplateDruidQuery query = new TemplateDruidQuery( | ||
mergedQuery.getAggregations(), | ||
postAggs, | ||
mergedQuery.getInnerQuery(), | ||
mergedQuery.getTimeGrain() | ||
); | ||
|
||
// Note: We need to pass everything through ColumnMapper | ||
// We need to refactor this to be a list. | ||
return new LogicalMetric(query, resultSetMapper.withColumnName(metricName), metricName); | ||
Set<PostAggregation> postAggregations = Collections.singleton(new ArithmeticPostAggregation( | ||
metricName, | ||
function, | ||
operands | ||
)); | ||
|
||
TemplateDruidQuery query = getMergedQuery(dependentMetrics).withPostAggregations(postAggregations); | ||
return new LogicalMetric(query, resultSetMapperSupplier.apply(metricName), metricName); | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is an artifact and is no longer needed, should we deprecate it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not until we remove that deprecated code, I believe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's the case, can we
@Deprecate
this method as well, so that when we do the next sweep to remove deprecated code, this will be flagged and we can remove it?