Skip to content

Commit

Permalink
Rewrite arbitrary to improve performance
Browse files Browse the repository at this point in the history
By removing usage of getSingleValueBlock
  • Loading branch information
haozhun committed Sep 15, 2015
1 parent eacface commit 8c0f3df
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 216 deletions.
Expand Up @@ -18,14 +18,21 @@
import com.facebook.presto.metadata.FunctionRegistry; import com.facebook.presto.metadata.FunctionRegistry;
import com.facebook.presto.metadata.ParametricAggregation; import com.facebook.presto.metadata.ParametricAggregation;
import com.facebook.presto.metadata.Signature; import com.facebook.presto.metadata.Signature;
import com.facebook.presto.operator.aggregation.state.ArbitraryAggregationState; import com.facebook.presto.operator.aggregation.state.AccumulatorState;
import com.facebook.presto.operator.aggregation.state.ArbitraryAggregationStateFactory; import com.facebook.presto.operator.aggregation.state.AccumulatorStateSerializer;
import com.facebook.presto.operator.aggregation.state.ArbitraryAggregationStateSerializer; import com.facebook.presto.operator.aggregation.state.BlockState;
import com.facebook.presto.operator.aggregation.state.BlockStateSerializer;
import com.facebook.presto.operator.aggregation.state.NullableBooleanState;
import com.facebook.presto.operator.aggregation.state.NullableDoubleState;
import com.facebook.presto.operator.aggregation.state.NullableLongState;
import com.facebook.presto.operator.aggregation.state.SliceState;
import com.facebook.presto.operator.aggregation.state.StateCompiler;
import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder; import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager; import com.facebook.presto.spi.type.TypeManager;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice;


import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandle;
import java.util.List; import java.util.List;
Expand All @@ -44,11 +51,22 @@ public class ArbitraryAggregation
{ {
public static final ArbitraryAggregation ARBITRARY_AGGREGATION = new ArbitraryAggregation(); public static final ArbitraryAggregation ARBITRARY_AGGREGATION = new ArbitraryAggregation();
private static final String NAME = "arbitrary"; private static final String NAME = "arbitrary";
private static final MethodHandle OUTPUT_FUNCTION = methodHandle(ArbitraryAggregation.class, "output", Type.class, ArbitraryAggregationState.class, BlockBuilder.class);
private static final MethodHandle INPUT_FUNCTION = methodHandle(ArbitraryAggregation.class, "input", ArbitraryAggregationState.class, Block.class, int.class);
private static final MethodHandle COMBINE_FUNCTION = methodHandle(ArbitraryAggregation.class, "combine", ArbitraryAggregationState.class, ArbitraryAggregationState.class);
private static final Signature SIGNATURE = new Signature(NAME, ImmutableList.of(typeParameter("T")), "T", ImmutableList.of("T"), false, false); private static final Signature SIGNATURE = new Signature(NAME, ImmutableList.of(typeParameter("T")), "T", ImmutableList.of("T"), false, false);


private static final MethodHandle LONG_INPUT_FUNCTION = methodHandle(ArbitraryAggregation.class, "input", Type.class, NullableLongState.class, Block.class, int.class);
private static final MethodHandle DOUBLE_INPUT_FUNCTION = methodHandle(ArbitraryAggregation.class, "input", Type.class, NullableDoubleState.class, Block.class, int.class);
private static final MethodHandle SLICE_INPUT_FUNCTION = methodHandle(ArbitraryAggregation.class, "input", Type.class, SliceState.class, Block.class, int.class);
private static final MethodHandle BOOLEAN_INPUT_FUNCTION = methodHandle(ArbitraryAggregation.class, "input", Type.class, NullableBooleanState.class, Block.class, int.class);
private static final MethodHandle BLOCK_INPUT_FUNCTION = methodHandle(ArbitraryAggregation.class, "input", Type.class, BlockState.class, Block.class, int.class);

private static final MethodHandle LONG_OUTPUT_FUNCTION = methodHandle(NullableLongState.class, "write", Type.class, NullableLongState.class, BlockBuilder.class);
private static final MethodHandle DOUBLE_OUTPUT_FUNCTION = methodHandle(NullableDoubleState.class, "write", Type.class, NullableDoubleState.class, BlockBuilder.class);
private static final MethodHandle SLICE_OUTPUT_FUNCTION = methodHandle(SliceState.class, "write", Type.class, SliceState.class, BlockBuilder.class);
private static final MethodHandle BOOLEAN_OUTPUT_FUNCTION = methodHandle(NullableBooleanState.class, "write", Type.class, NullableBooleanState.class, BlockBuilder.class);
private static final MethodHandle BLOCK_OUTPUT_FUNCTION = methodHandle(BlockState.class, "write", Type.class, BlockState.class, BlockBuilder.class);

private static final StateCompiler compiler = new StateCompiler();

@Override @Override
public Signature getSignature() public Signature getSignature()
{ {
Expand All @@ -70,60 +88,114 @@ public FunctionInfo specialize(Map<String, Type> types, int arity, TypeManager t
return new FunctionInfo(signature, getDescription(), aggregation); return new FunctionInfo(signature, getDescription(), aggregation);
} }


private static InternalAggregationFunction generateAggregation(Type valueType) private static InternalAggregationFunction generateAggregation(Type type)
{ {
DynamicClassLoader classLoader = new DynamicClassLoader(ArbitraryAggregation.class.getClassLoader()); DynamicClassLoader classLoader = new DynamicClassLoader(ArbitraryAggregation.class.getClassLoader());


ArbitraryAggregationStateSerializer stateSerializer = new ArbitraryAggregationStateSerializer(valueType); List<Type> inputTypes = ImmutableList.of(type);
Type intermediateType = stateSerializer.getSerializedType();
MethodHandle inputFunction;
MethodHandle outputFunction;
Class<? extends AccumulatorState> stateInterface;
AccumulatorStateSerializer<?> stateSerializer;


List<Type> inputTypes = ImmutableList.of(valueType); if (type.getJavaType() == long.class) {
stateInterface = NullableLongState.class;
stateSerializer = compiler.generateStateSerializer(stateInterface, classLoader);
inputFunction = LONG_INPUT_FUNCTION;
outputFunction = LONG_OUTPUT_FUNCTION;
}
else if (type.getJavaType() == double.class) {
stateInterface = NullableDoubleState.class;
stateSerializer = compiler.generateStateSerializer(stateInterface, classLoader);
inputFunction = DOUBLE_INPUT_FUNCTION;
outputFunction = DOUBLE_OUTPUT_FUNCTION;
}
else if (type.getJavaType() == Slice.class) {
stateInterface = SliceState.class;
stateSerializer = compiler.generateStateSerializer(stateInterface, classLoader);
inputFunction = SLICE_INPUT_FUNCTION;
outputFunction = SLICE_OUTPUT_FUNCTION;
}
else if (type.getJavaType() == boolean.class) {
stateInterface = NullableBooleanState.class;
stateSerializer = compiler.generateStateSerializer(stateInterface, classLoader);
inputFunction = BOOLEAN_INPUT_FUNCTION;
outputFunction = BOOLEAN_OUTPUT_FUNCTION;
}
else {
stateInterface = BlockState.class;
stateSerializer = new BlockStateSerializer(type);
inputFunction = BLOCK_INPUT_FUNCTION;
outputFunction = BLOCK_OUTPUT_FUNCTION;
}
inputFunction = inputFunction.bindTo(type);


ArbitraryAggregationStateFactory stateFactory = new ArbitraryAggregationStateFactory(); Type intermediateType = stateSerializer.getSerializedType();
List<ParameterMetadata> inputParameterMetadata = createInputParameterMetadata(type);
AggregationMetadata metadata = new AggregationMetadata( AggregationMetadata metadata = new AggregationMetadata(
generateAggregationName(NAME, valueType, inputTypes), generateAggregationName(NAME, type, inputTypes),
createInputParameterMetadata(valueType), inputParameterMetadata,
INPUT_FUNCTION, inputFunction,
null, inputParameterMetadata,
inputFunction,
null, null,
COMBINE_FUNCTION, outputFunction.bindTo(type),
OUTPUT_FUNCTION.bindTo(valueType), stateInterface,
ArbitraryAggregationState.class,
stateSerializer, stateSerializer,
stateFactory, compiler.generateStateFactory(stateInterface, classLoader),
valueType, type,
false); false);


GenericAccumulatorFactoryBinder factory = new AccumulatorCompiler().generateAccumulatorFactoryBinder(metadata, classLoader); GenericAccumulatorFactoryBinder factory = new AccumulatorCompiler().generateAccumulatorFactoryBinder(metadata, classLoader);
return new InternalAggregationFunction(NAME, inputTypes, intermediateType, valueType, true, false, factory); return new InternalAggregationFunction(NAME, inputTypes, intermediateType, type, true, false, factory);
} }


private static List<ParameterMetadata> createInputParameterMetadata(Type value) private static List<ParameterMetadata> createInputParameterMetadata(Type value)
{ {
return ImmutableList.of(new ParameterMetadata(STATE), new ParameterMetadata(BLOCK_INPUT_CHANNEL, value), new ParameterMetadata(BLOCK_INDEX)); return ImmutableList.of(new ParameterMetadata(STATE), new ParameterMetadata(BLOCK_INPUT_CHANNEL, value), new ParameterMetadata(BLOCK_INDEX));
} }


public static void input(ArbitraryAggregationState state, Block value, int position) public static void input(Type type, NullableDoubleState state, Block block, int position)
{ {
if (state.getValue() == null) { if (!state.isNull()) {
state.setValue(value.getSingleValueBlock(position)); return;
} }
state.setNull(false);
state.setDouble(type.getDouble(block, position));
} }


public static void combine(ArbitraryAggregationState state, ArbitraryAggregationState otherState) public static void input(Type type, NullableLongState state, Block block, int position)
{ {
if (state.getValue() == null && otherState.getValue() != null) { if (!state.isNull()) {
state.setValue(otherState.getValue()); return;
} }
state.setNull(false);
state.setLong(type.getLong(block, position));
} }


public static void output(Type type, ArbitraryAggregationState state, BlockBuilder out) public static void input(Type type, SliceState state, Block block, int position)
{ {
if (state.getValue() == null) { if (state.getSlice() != null) {
out.appendNull(); return;
} }
else { state.setSlice(type.getSlice(block, position));
type.appendTo(state.getValue(), 0, out); }

public static void input(Type type, NullableBooleanState state, Block block, int position)
{
if (!state.isNull()) {
return;
}
state.setNull(false);
state.setBoolean(type.getBoolean(block, position));
}

public static void input(Type type, BlockState state, Block block, int position)
{
if (state.getBlock() != null) {
return;
} }
state.setBlock((Block) type.getObject(block, position));
} }
} }

This file was deleted.

This file was deleted.

This file was deleted.

0 comments on commit 8c0f3df

Please sign in to comment.