Skip to content

Commit

Permalink
Support pushdown approx_distinct(x, e) into pinot
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 authored and highker committed Oct 4, 2020
1 parent 5c29aa2 commit 9f4203e
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,15 @@
public class PinotQueryGenerator
{
private static final Logger log = Logger.get(PinotQueryGenerator.class);
private static final double LOWEST_APPROX_DISTINCT_MAX_STANDARD_ERROR = 0.0040625;
private static final double HIGHEST_APPROX_DISTINCT_MAX_STANDARD_ERROR = 0.26000;
private static final Map<String, String> UNARY_AGGREGATION_MAP =
ImmutableMap.<String, String>builder()
.put("min", "min")
.put("max", "max")
.put("avg", "avg")
.put("sum", "sum")
.put("distinctcount", "DISTINCTCOUNT")
.put("approx_distinct", "DISTINCTCOUNTHLL")
.build();

private final PinotConfig pinotConfig;
Expand Down Expand Up @@ -350,6 +351,8 @@ private String handleAggregationFunction(CallExpression aggregation, Map<Variabl
break;
case "approx_percentile":
return handleApproxPercentile(aggregation, inputSelections);
case "approx_distinct":
return handleApproxDistinct(aggregation, inputSelections);
default:
if (UNARY_AGGREGATION_MAP.containsKey(prestoAggregation) && aggregation.getArguments().size() == 1) {
return format("%s(%s)", UNARY_AGGREGATION_MAP.get(prestoAggregation), inputSelections.get(getVariableReference(parameters.get(0))));
Expand Down Expand Up @@ -392,6 +395,76 @@ else if (fractionInput instanceof VariableReferenceExpression) {
return format("PERCENTILEEST%d(%s)", percentile, inputSelections.get(getVariableReference(inputs.get(0))));
}

private String handleApproxDistinct(CallExpression aggregation, Map<VariableReferenceExpression, Selection> inputSelections)
{
List<RowExpression> inputs = aggregation.getArguments();
if (inputs.isEmpty() || inputs.size() > 2) {
throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Cannot handle approx_distinct function " + aggregation);
}
Selection selection = inputSelections.get(getVariableReference(inputs.get(0)));
if (inputs.size() == 1) {
return format("DISTINCTCOUNTHLL(%s)", selection);
}
RowExpression standardErrorInput = inputs.get(1);
String standardErrorString;
if (standardErrorInput instanceof ConstantExpression) {
standardErrorString = getLiteralAsString((ConstantExpression) standardErrorInput);
}
else if (standardErrorInput instanceof VariableReferenceExpression) {
Selection fraction = inputSelections.get(standardErrorInput);
if (fraction.getOrigin() != LITERAL) {
throw new PinotException(
PINOT_UNSUPPORTED_EXPRESSION,
Optional.empty(),
"Cannot handle approx_distinct standard error argument be a non literal " + aggregation);
}
standardErrorString = fraction.getDefinition();
}
else {
throw new PinotException(PINOT_UNSUPPORTED_EXPRESSION, Optional.empty(), "Expected the standard error to be a constant or a variable " + standardErrorInput);
}

double standardError;
try {
standardError = Double.parseDouble(standardErrorString);
if (standardError <= LOWEST_APPROX_DISTINCT_MAX_STANDARD_ERROR || standardError >= HIGHEST_APPROX_DISTINCT_MAX_STANDARD_ERROR) {
throw new PinotException(
PINOT_UNSUPPORTED_EXPRESSION,
Optional.empty(),
format("Cannot handle approx_distinct parsed as %f from input %s (function %s)", standardError, standardErrorString, aggregation));
}
}
catch (Exception e) {
throw new PinotException(
PINOT_UNSUPPORTED_EXPRESSION,
Optional.empty(),
format("Cannot handle approx_distinct parsing to numerical value from input %s (function %s)", standardErrorString, aggregation));
}
// Pinot uses DISTINCTCOUNTHLL to do distinct count estimation, with hyperloglog algorithm.
//
// The HyperLogLog (HLL) data structure is a probabilistic data structure used to estimate the cardinality
// of a data set.
// In order to construct HLL data structure, the parameter log2m is used which represents the number of
// registers used internally by HLL.
//
// If we want a higher accuracy, we need to set these to higher values. Such a configuration
// will have additional overhead because our HLL will occupy more memory. If we're fine with lower accuracy,
// we can lower those parameters, and our HLL will occupy less memory.
//
// The relative standard deviation of HyperLoglog is:
// rsd = 1.106 / sqrt(2^(log2m))
// So:
// log2m = 2 * log(1.106 / rsd) / log(2)
int log2m = (int) (2 * Math.log(1.106 / standardError) / Math.log(2));
if (log2m < 1) {
throw new PinotException(
PINOT_UNSUPPORTED_EXPRESSION,
Optional.empty(),
format("Cannot handle approx_distinct, the log2m generated from error is %d from input %s (function %s)", log2m, standardErrorString, aggregation));
}
return format("DISTINCTCOUNTHLL(%s, %d)", selection, log2m);
}

private int getValidPercentile(String fraction)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static java.lang.String.format;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

public class TestPinotQueryGenerator
Expand Down Expand Up @@ -274,8 +275,32 @@ public void testPercentileAggregation()
public void testApproxDistinct()
{
testUnaryAggregationHelper((planBuilder, aggregationBuilder) -> aggregationBuilder.addAggregation(planBuilder.variable("agg"), getRowExpression("approx_distinct(fare)", defaultSessionHolder)), "DISTINCTCOUNTHLL(fare)");
testUnaryAggregationHelper((planBuilder, aggregationBuilder) -> aggregationBuilder.addAggregation(planBuilder.variable("agg"), getRowExpression("approx_distinct(fare, 0.1)", defaultSessionHolder)), "DISTINCTCOUNTHLL(fare, 6)");
testUnaryAggregationHelper((planBuilder, aggregationBuilder) -> aggregationBuilder.addAggregation(planBuilder.variable("agg"), getRowExpression("approx_distinct(fare, 0.02)", defaultSessionHolder)), "DISTINCTCOUNTHLL(fare, 11)");
testUnaryAggregationHelper((planBuilder, aggregationBuilder) -> aggregationBuilder.addAggregation(planBuilder.variable("agg"), getRowExpression("approx_distinct(fare, 0.01)", defaultSessionHolder)), "DISTINCTCOUNTHLL(fare, 13)");
testUnaryAggregationHelper((planBuilder, aggregationBuilder) -> aggregationBuilder.addAggregation(planBuilder.variable("agg"), getRowExpression("approx_distinct(fare, 0.005)", defaultSessionHolder)), "DISTINCTCOUNTHLL(fare, 15)");
}

@Test
public void testApproxDistinctWithInvalidParameters()
{
PlanNode justScan = buildPlan(planBuilder -> tableScan(planBuilder, pinotTable, regionId, secondsSinceEpoch, city, fare));
PlanNode approxPlanNode = buildPlan(planBuilder -> planBuilder.aggregation(aggBuilder -> aggBuilder.source(justScan).singleGroupingSet(variable("city")).addAggregation(planBuilder.variable("agg"), getRowExpression("approx_distinct(fare, 0)", defaultSessionHolder))));
Optional<PinotQueryGenerator.PinotQueryGeneratorResult> generatedQuery =
new PinotQueryGenerator(pinotConfig, typeManager, functionMetadataManager, standardFunctionResolution)
.generate(approxPlanNode, defaultSessionHolder.getConnectorSession());
assertFalse(generatedQuery.isPresent());
approxPlanNode = buildPlan(planBuilder -> planBuilder.aggregation(aggBuilder -> aggBuilder.source(justScan).singleGroupingSet(variable("city")).addAggregation(planBuilder.variable("agg"), getRowExpression("approx_distinct(fare, 0.004)", defaultSessionHolder))));
generatedQuery =
new PinotQueryGenerator(pinotConfig, typeManager, functionMetadataManager, standardFunctionResolution)
.generate(approxPlanNode, defaultSessionHolder.getConnectorSession());
assertFalse(generatedQuery.isPresent());
approxPlanNode = buildPlan(planBuilder -> planBuilder.aggregation(aggBuilder -> aggBuilder.source(justScan).singleGroupingSet(variable("city")).addAggregation(planBuilder.variable("agg"), getRowExpression("approx_distinct(fare, 1)", defaultSessionHolder))));
generatedQuery =
new PinotQueryGenerator(pinotConfig, typeManager, functionMetadataManager, standardFunctionResolution)
.generate(approxPlanNode, defaultSessionHolder.getConnectorSession());
assertFalse(generatedQuery.isPresent());
}
@Test
public void testAggWithUDFInGroupBy()
{
Expand Down

0 comments on commit 9f4203e

Please sign in to comment.