Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Support aggregations min, max #541

Merged
merged 18 commits into from Sep 30, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -178,6 +178,14 @@ public Aggregator count(Expression... expressions) {
return aggregate(BuiltinFunctionName.COUNT, expressions);
}

public Aggregator min(Expression... expressions) {
return aggregate(BuiltinFunctionName.MIN, expressions);
}

public Aggregator max(Expression... expressions) {
return aggregate(BuiltinFunctionName.MAX, expressions);
}

private FunctionExpression function(BuiltinFunctionName functionName, Expression... expressions) {
return (FunctionExpression) repository.compile(
functionName.getName(), Arrays.asList(expressions));
Expand Down
Expand Up @@ -52,6 +52,8 @@ public static void register(BuiltinFunctionRepository repository) {
repository.register(avg());
repository.register(sum());
repository.register(count());
repository.register(min());
repository.register(max());
}

private static FunctionResolver avg() {
Expand Down Expand Up @@ -106,4 +108,41 @@ private static FunctionResolver sum() {
.build()
);
}

private static FunctionResolver min() {
FunctionName functionName = BuiltinFunctionName.MIN.getName();
return new FunctionResolver(
functionName,
new ImmutableMap.Builder<FunctionSignature, FunctionBuilder>()
.put(new FunctionSignature(functionName, Collections.singletonList(INTEGER)),
arguments -> new MinAggregator(arguments, INTEGER))
.put(new FunctionSignature(functionName, Collections.singletonList(LONG)),
arguments -> new MinAggregator(arguments, LONG))
.put(new FunctionSignature(functionName, Collections.singletonList(FLOAT)),
arguments -> new MinAggregator(arguments, FLOAT))
.put(new FunctionSignature(functionName, Collections.singletonList(DOUBLE)),
arguments -> new MinAggregator(arguments, DOUBLE))
.put(new FunctionSignature(functionName, Collections.singletonList(STRING)),
arguments -> new MinAggregator(arguments, STRING))
.build());
}

private static FunctionResolver max() {
FunctionName functionName = BuiltinFunctionName.MAX.getName();
return new FunctionResolver(
functionName,
new ImmutableMap.Builder<FunctionSignature, FunctionBuilder>()
.put(new FunctionSignature(functionName, Collections.singletonList(INTEGER)),
arguments -> new MaxAggregator(arguments, INTEGER))
.put(new FunctionSignature(functionName, Collections.singletonList(LONG)),
arguments -> new MaxAggregator(arguments, LONG))
.put(new FunctionSignature(functionName, Collections.singletonList(FLOAT)),
arguments -> new MaxAggregator(arguments, FLOAT))
.put(new FunctionSignature(functionName, Collections.singletonList(DOUBLE)),
arguments -> new MaxAggregator(arguments, DOUBLE))
.put(new FunctionSignature(functionName, Collections.singletonList(STRING)),
arguments -> new MaxAggregator(arguments, STRING))
.build()
);
}
}
Expand Up @@ -46,9 +46,8 @@ public AvgState create() {
public AvgState iterate(BindingTuple tuple, AvgState state) {
Expression expression = getArguments().get(0);
ExprValue value = expression.valueOf(tuple);
if (value.isNull() || value.isMissing()) {
state.isNullResult = true;
} else {
if (!(value.isNull() || value.isMissing())) {
state.isEmptyCollection = false;
state.count++;
state.total += ExprValueUtils.getDoubleValue(value);
}
Expand All @@ -63,19 +62,19 @@ public String toString() {
/**
* Average State.
*/
protected class AvgState implements AggregationState {
protected static class AvgState implements AggregationState {
private int count;
private double total;
private boolean isNullResult = false;
private boolean isEmptyCollection = true;

public AvgState() {
AvgState() {
this.count = 0;
this.total = 0d;
}

@Override
public ExprValue result() {
return isNullResult ? ExprNullValue.of() : ExprValueUtils.doubleValue(total / count);
return isEmptyCollection ? ExprNullValue.of() : ExprValueUtils.doubleValue(total / count);
chloe-zh marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Up @@ -56,10 +56,10 @@ public String toString() {
/**
* Count State.
*/
protected class CountState implements AggregationState {
protected static class CountState implements AggregationState {
private int count;

public CountState() {
CountState() {
this.count = 0;
}

Expand Down
@@ -0,0 +1,106 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.expression.aggregation;

import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.LITERAL_NULL;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.doubleValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.floatValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.getDoubleValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.getFloatValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.getIntegerValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.getLongValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.getStringValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.integerValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.longValue;
import static com.amazon.opendistroforelasticsearch.sql.utils.ExpressionUtils.format;

import com.amazon.opendistroforelasticsearch.sql.data.model.ExprNullValue;
import com.amazon.opendistroforelasticsearch.sql.data.model.ExprValue;
import com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType;
import com.amazon.opendistroforelasticsearch.sql.exception.ExpressionEvaluationException;
import com.amazon.opendistroforelasticsearch.sql.expression.Expression;
import com.amazon.opendistroforelasticsearch.sql.expression.function.BuiltinFunctionName;
import com.amazon.opendistroforelasticsearch.sql.storage.bindingtuple.BindingTuple;
import java.util.List;

public class MaxAggregator extends Aggregator<MaxAggregator.MaxState> {

public MaxAggregator(List<Expression> arguments, ExprCoreType returnType) {
super(BuiltinFunctionName.MAX.getName(), arguments, returnType);
}

@Override
public MaxState create() {
return new MaxState(returnType);
}

@Override
public MaxState iterate(BindingTuple tuple, MaxState state) {
Expression expression = getArguments().get(0);
ExprValue value = expression.valueOf(tuple);
if (!(value.isNull() || value.isMissing())) {
state.isEmptyCollection = false;
state.max(value);
}
return state;
}

@Override
public String toString() {
return String.format("max(%s)", format(getArguments()));
}

protected static class MaxState implements AggregationState {
private final ExprCoreType type;
private ExprValue maxResult;
private boolean isEmptyCollection;

MaxState(ExprCoreType type) {
this.type = type;
maxResult = type.equals(ExprCoreType.STRING) ? LITERAL_NULL : doubleValue(Double.MIN_VALUE);
isEmptyCollection = true;
}

public void max(ExprValue value) {
switch (type) {
case INTEGER:
maxResult = integerValue(Math.max(getIntegerValue(maxResult), getIntegerValue(value)));
break;
case LONG:
maxResult = longValue(Math.max(getLongValue(maxResult), getLongValue(value)));
break;
case FLOAT:
maxResult = floatValue(Math.max(getFloatValue(maxResult), getFloatValue(value)));
break;
case DOUBLE:
maxResult = doubleValue(Math.max(getDoubleValue(maxResult), getDoubleValue(value)));
break;
case STRING:
maxResult = maxResult.isNull() ? value : getStringValue(maxResult)
.compareTo(getStringValue(value)) < 0 ? value : maxResult;
break;
default:
throw new ExpressionEvaluationException(
String.format("unexpected type [%s] in max aggregation", type));
}
}

@Override
public ExprValue result() {
return isEmptyCollection ? ExprNullValue.of() : maxResult;
}
}
}
@@ -0,0 +1,111 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.sql.expression.aggregation;

import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.LITERAL_NULL;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.doubleValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.floatValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.getDoubleValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.getFloatValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.getIntegerValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.getLongValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.getStringValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.integerValue;
import static com.amazon.opendistroforelasticsearch.sql.data.model.ExprValueUtils.longValue;
import static com.amazon.opendistroforelasticsearch.sql.utils.ExpressionUtils.format;

import com.amazon.opendistroforelasticsearch.sql.data.model.ExprNullValue;
import com.amazon.opendistroforelasticsearch.sql.data.model.ExprValue;
import com.amazon.opendistroforelasticsearch.sql.data.type.ExprCoreType;
import com.amazon.opendistroforelasticsearch.sql.exception.ExpressionEvaluationException;
import com.amazon.opendistroforelasticsearch.sql.expression.Expression;
import com.amazon.opendistroforelasticsearch.sql.expression.function.BuiltinFunctionName;
import com.amazon.opendistroforelasticsearch.sql.storage.bindingtuple.BindingTuple;
import java.util.List;

/**
* The minimum aggregator aggregate the value evaluated by the expression.
* If the expression evaluated result is NULL or MISSING, then the result is NULL.
*/
public class MinAggregator extends Aggregator<MinAggregator.MinState> {
dai-chen marked this conversation as resolved.
Show resolved Hide resolved

public MinAggregator(List<Expression> arguments, ExprCoreType returnType) {
super(BuiltinFunctionName.MIN.getName(), arguments, returnType);
}


@Override
public MinState create() {
return new MinState(returnType);
}

@Override
public MinState iterate(BindingTuple tuple, MinState state) {
Expression expression = getArguments().get(0);
ExprValue value = expression.valueOf(tuple);
if (!(value.isNull() || value.isMissing())) {
state.isEmptyCollection = false;
state.min(value);
}
return state;
}

@Override
public String toString() {
return String.format("min(%s)", format(getArguments()));
}

protected static class MinState implements AggregationState {
private final ExprCoreType type;
private ExprValue minResult;
private boolean isEmptyCollection;

MinState(ExprCoreType type) {
this.type = type;
minResult = type.equals(ExprCoreType.STRING) ? LITERAL_NULL : doubleValue(Double.MAX_VALUE);
isEmptyCollection = true;
}

public void min(ExprValue value) {
switch (type) {
case INTEGER:
minResult = integerValue(Math.min(getIntegerValue(minResult), getIntegerValue(value)));
break;
case LONG:
minResult = longValue(Math.min(getLongValue(minResult), getLongValue(value)));
break;
case FLOAT:
minResult = floatValue(Math.min(getFloatValue(minResult), getFloatValue(value)));
break;
case DOUBLE:
minResult = doubleValue(Math.min(getDoubleValue(minResult), getDoubleValue(value)));
break;
case STRING:
minResult = minResult.isNull() ? value : getStringValue(minResult)
.compareTo(getStringValue(value)) > 0 ? value : minResult;
break;
default:
throw new ExpressionEvaluationException(
String.format("unexpected type [%s] in min aggregation", type));
}
}

@Override
public ExprValue result() {
return isEmptyCollection ? ExprNullValue.of() : minResult;
chloe-zh marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Up @@ -56,9 +56,8 @@ public SumState create() {
public SumState iterate(BindingTuple tuple, SumState state) {
Expression expression = getArguments().get(0);
ExprValue value = expression.valueOf(tuple);
if (value.isNull() || value.isMissing()) {
state.isNullResult = true;
} else {
if (!(value.isNull() || value.isMissing())) {
state.isEmptyCollection = false;
state.add(value);
}
return state;
Expand All @@ -72,15 +71,16 @@ public String toString() {
/**
* Sum State.
*/
protected class SumState implements AggregationState {
protected static class SumState implements AggregationState {

private final ExprCoreType type;
private ExprValue sumResult;
private boolean isNullResult = false;
private boolean isEmptyCollection;

public SumState(ExprCoreType type) {
SumState(ExprCoreType type) {
this.type = type;
sumResult = ExprValueUtils.integerValue(0);
isEmptyCollection = true;
}

/**
Expand Down Expand Up @@ -108,7 +108,7 @@ public void add(ExprValue value) {

@Override
public ExprValue result() {
return isNullResult ? ExprNullValue.of() : sumResult;
return isEmptyCollection ? ExprNullValue.of() : sumResult;
}
}
}
Expand Up @@ -64,7 +64,9 @@ public enum BuiltinFunctionName {
*/
AVG(FunctionName.of("avg")),
SUM(FunctionName.of("sum")),
COUNT(FunctionName.of("count"));
COUNT(FunctionName.of("count")),
MIN(FunctionName.of("min")),
MAX(FunctionName.of("max"));

private final FunctionName name;

Expand Down