Skip to content

Commit

Permalink
Simplify Block serialization/deserialization
Browse files Browse the repository at this point in the history
Previously, there are 3 sections in a serialized Block:
* Name - written by BlockEncodingSerde
  * e.g. "ARRAY" indicating it's an ArrayBlock
* Metadata - written by BlockEncodingFactory
  * e.g. "INT_ARRAY" indicating the inner Block type of an ArrayBlock
  * e.g. 4 indicating the entry size of FixedWidthBlock
* Data - written by BlockEncoding

This 3-section scheme makes things more complicated:
* Not recursive - For an ArrayBlock with IntArrayBlock inside, the encoding
  of IntArrayBlock isn't the same as a stand-alone IntArrayBlock. Specifically,
  part of it is put in the metadata section, and part of it in the data section.
  This requires each invididual encoding to handle nested blocks,
  which introduces both code duplication and complexity.
* It is harder to learn and understand the implementations with
  two interfaces (BlockEncodingFactory and BlockEncoding),
  which don't have a very clear separation of responsibility, for each Block.

This commit solves both problems:
* Only a single class (BlockEncoding) is now needed.
* A block inside another block now serializes the exact same way as a standalone one.
  The outer block no longer need to handle any of the details of the inner one.

It also resolves the TODO item that replacementBlockForWrite was not respected for
nested Blocks.
  • Loading branch information
haozhun committed Apr 20, 2018
1 parent 9f4cbde commit 90a52a8
Show file tree
Hide file tree
Showing 51 changed files with 314 additions and 735 deletions.
Expand Up @@ -13,29 +13,31 @@
*/
package com.facebook.presto.block;

import com.facebook.presto.spi.block.ArrayBlockEncoding.ArrayBlockEncodingFactory;
import com.facebook.presto.spi.block.ArrayBlockEncoding;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockEncoding;
import com.facebook.presto.spi.block.BlockEncodingFactory;
import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.facebook.presto.spi.block.ByteArrayBlockEncoding.ByteArrayBlockEncodingFactory;
import com.facebook.presto.spi.block.DictionaryBlockEncoding.DictionaryBlockEncodingFactory;
import com.facebook.presto.spi.block.FixedWidthBlockEncoding.FixedWidthBlockEncodingFactory;
import com.facebook.presto.spi.block.IntArrayBlockEncoding.IntArrayBlockEncodingFactory;
import com.facebook.presto.spi.block.LongArrayBlockEncoding.LongArrayBlockEncodingFactory;
import com.facebook.presto.spi.block.MapBlockEncoding.MapBlockEncodingFactory;
import com.facebook.presto.spi.block.RowBlockEncoding.RowBlockEncodingFactory;
import com.facebook.presto.spi.block.RunLengthBlockEncoding.RunLengthBlockEncodingFactory;
import com.facebook.presto.spi.block.ShortArrayBlockEncoding.ShortArrayBlockEncodingFactory;
import com.facebook.presto.spi.block.SingleMapBlockEncoding.SingleMapBlockEncodingFactory;
import com.facebook.presto.spi.block.SingleRowBlockEncoding.SingleRowBlockEncodingFactory;
import com.facebook.presto.spi.block.VariableWidthBlockEncoding.VariableWidthBlockEncodingFactory;
import com.facebook.presto.spi.block.ByteArrayBlockEncoding;
import com.facebook.presto.spi.block.DictionaryBlockEncoding;
import com.facebook.presto.spi.block.FixedWidthBlockEncoding;
import com.facebook.presto.spi.block.IntArrayBlockEncoding;
import com.facebook.presto.spi.block.LazyBlockEncoding;
import com.facebook.presto.spi.block.LongArrayBlockEncoding;
import com.facebook.presto.spi.block.MapBlockEncoding;
import com.facebook.presto.spi.block.RowBlockEncoding;
import com.facebook.presto.spi.block.RunLengthBlockEncoding;
import com.facebook.presto.spi.block.ShortArrayBlockEncoding;
import com.facebook.presto.spi.block.SingleMapBlockEncoding;
import com.facebook.presto.spi.block.SingleRowBlockEncoding;
import com.facebook.presto.spi.block.VariableWidthBlockEncoding;
import com.facebook.presto.spi.type.TypeManager;
import com.google.common.collect.ImmutableSet;
import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;

import javax.inject.Inject;

import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -47,73 +49,85 @@
public final class BlockEncodingManager
implements BlockEncodingSerde
{
private final ConcurrentMap<String, BlockEncodingFactory<?>> blockEncodings = new ConcurrentHashMap<>();
private final ConcurrentMap<String, BlockEncoding> blockEncodings = new ConcurrentHashMap<>();

public BlockEncodingManager(TypeManager typeManager, BlockEncodingFactory<?>... blockEncodingFactories)
public BlockEncodingManager(TypeManager typeManager, BlockEncoding... blockEncodings)
{
this(typeManager, ImmutableSet.copyOf(blockEncodingFactories));
this(typeManager, ImmutableSet.copyOf(blockEncodings));
}

@Inject
public BlockEncodingManager(TypeManager typeManager, Set<BlockEncodingFactory<?>> blockEncodingFactories)
public BlockEncodingManager(TypeManager typeManager, Set<BlockEncoding> blockEncodings)
{
// This function should be called from Guice and tests only

// always add the built-in BlockEncodingFactories
addBlockEncodingFactory(new VariableWidthBlockEncodingFactory());
addBlockEncodingFactory(new FixedWidthBlockEncodingFactory());
addBlockEncodingFactory(new ByteArrayBlockEncodingFactory());
addBlockEncodingFactory(new ShortArrayBlockEncodingFactory());
addBlockEncodingFactory(new IntArrayBlockEncodingFactory());
addBlockEncodingFactory(new LongArrayBlockEncodingFactory());
addBlockEncodingFactory(new DictionaryBlockEncodingFactory());
addBlockEncodingFactory(new ArrayBlockEncodingFactory());
addBlockEncodingFactory(new MapBlockEncodingFactory(typeManager));
addBlockEncodingFactory(new SingleMapBlockEncodingFactory(typeManager));
addBlockEncodingFactory(new RowBlockEncodingFactory());
addBlockEncodingFactory(new SingleRowBlockEncodingFactory());
addBlockEncodingFactory(new RunLengthBlockEncodingFactory());

for (BlockEncodingFactory<?> factory : requireNonNull(blockEncodingFactories, "blockEncodingFactories is null")) {
addBlockEncodingFactory(factory);
// always add the built-in BlockEncodings
addBlockEncoding(new VariableWidthBlockEncoding());
addBlockEncoding(new FixedWidthBlockEncoding());
addBlockEncoding(new ByteArrayBlockEncoding());
addBlockEncoding(new ShortArrayBlockEncoding());
addBlockEncoding(new IntArrayBlockEncoding());
addBlockEncoding(new LongArrayBlockEncoding());
addBlockEncoding(new DictionaryBlockEncoding());
addBlockEncoding(new ArrayBlockEncoding());
addBlockEncoding(new MapBlockEncoding(typeManager));
addBlockEncoding(new SingleMapBlockEncoding(typeManager));
addBlockEncoding(new RowBlockEncoding());
addBlockEncoding(new SingleRowBlockEncoding());
addBlockEncoding(new RunLengthBlockEncoding());
addBlockEncoding(new LazyBlockEncoding());

for (BlockEncoding blockEncoding : requireNonNull(blockEncodings, "blockEncodings is null")) {
addBlockEncoding(blockEncoding);
}
}

public void addBlockEncodingFactory(BlockEncodingFactory<?> blockEncoding)
public void addBlockEncoding(BlockEncoding blockEncoding)
{
requireNonNull(blockEncoding, "blockEncoding is null");
BlockEncodingFactory<?> existingEntry = blockEncodings.putIfAbsent(blockEncoding.getName(), blockEncoding);
BlockEncoding existingEntry = blockEncodings.putIfAbsent(blockEncoding.getName(), blockEncoding);
checkArgument(existingEntry == null, "Encoding %s is already registered", blockEncoding.getName());
}

@Override
public BlockEncoding readBlockEncoding(SliceInput input)
public Block readBlock(SliceInput input)
{
// read the encoding name
String encodingName = readLengthPrefixedString(input);

// look up the encoding factory
BlockEncodingFactory<?> blockEncoding = blockEncodings.get(encodingName);
BlockEncoding blockEncoding = blockEncodings.get(encodingName);
checkArgument(blockEncoding != null, "Unknown block encoding %s", encodingName);

// load read the encoding factory from the output stream
return blockEncoding.readEncoding(this, input);
return blockEncoding.readBlock(this, input);
}

@Override
public void writeBlockEncoding(SliceOutput output, BlockEncoding encoding)
public void writeBlock(SliceOutput output, Block block)
{
// get the encoding name
String encodingName = encoding.getName();
while (true) {
// get the encoding name
String encodingName = block.getEncodingName();

// look up the encoding factory
BlockEncodingFactory blockEncoding = blockEncodings.get(encodingName);
// look up the BlockEncoding
BlockEncoding blockEncoding = blockEncodings.get(encodingName);

// see if a replacement block should be written instead
Optional<Block> replacementBlock = blockEncoding.replacementBlockForWrite(block);
if (replacementBlock.isPresent()) {
block = replacementBlock.get();
continue;
}

// write the name to the output
writeLengthPrefixedString(output, encodingName);
// write the name to the output
writeLengthPrefixedString(output, encodingName);

// write the encoding to the output
blockEncoding.writeEncoding(this, output, encoding);
// write the block to the output
blockEncoding.writeBlock(this, output, block);

break;
}
}

private static String readLengthPrefixedString(SliceInput input)
Expand Down
Expand Up @@ -14,14 +14,12 @@
package com.facebook.presto.block;

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockEncoding;
import com.facebook.presto.spi.block.BlockEncodingSerde;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;

import java.lang.invoke.MethodHandle;
import java.util.Optional;

import static com.facebook.presto.util.Reflection.methodHandle;

Expand All @@ -40,26 +38,11 @@ public static Block readBlock(BlockEncodingSerde blockEncodingSerde, Slice slice

public static Block readBlock(BlockEncodingSerde blockEncodingSerde, SliceInput input)
{
BlockEncoding blockEncoding = blockEncodingSerde.readBlockEncoding(input);
return blockEncoding.readBlock(input);
return blockEncodingSerde.readBlock(input);
}

public static void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput output, Block block)
{
while (true) {
// TODO: respect replacementBlockForWrite for Blocks nested in other Block
// A proposed simplified design for block encoding decoding will address this to-do item.

BlockEncoding encoding = block.getEncoding();
Optional<Block> replaceBlock = encoding.replacementBlockForWrite(block);
if (!replaceBlock.isPresent()) {
break;
}
block = replaceBlock.get();
}

BlockEncoding encoding = block.getEncoding();
blockEncodingSerde.writeBlockEncoding(output, encoding);
encoding.writeBlock(output, block);
blockEncodingSerde.writeBlock(output, block);
}
}
Expand Up @@ -15,7 +15,6 @@

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockEncoding;
import io.airlift.slice.Slice;
import org.openjdk.jol.info.ClassLayout;

Expand Down Expand Up @@ -190,9 +189,9 @@ public void retainedBytesForEachPart(BiConsumer<Object, Long> consumer)
}

@Override
public BlockEncoding getEncoding()
public String getEncodingName()
{
return block.getEncoding();
throw new UnsupportedOperationException("GroupByIdBlock does not support serialization");
}

@Override
Expand Down
Expand Up @@ -21,7 +21,7 @@
import com.facebook.presto.security.AccessControlManager;
import com.facebook.presto.server.security.PasswordAuthenticatorManager;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.block.BlockEncodingFactory;
import com.facebook.presto.spi.block.BlockEncoding;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
Expand Down Expand Up @@ -175,9 +175,9 @@ private void loadPlugin(URLClassLoader pluginClassLoader)

public void installPlugin(Plugin plugin)
{
for (BlockEncodingFactory<?> blockEncodingFactory : plugin.getBlockEncodingFactories(blockEncodingManager)) {
log.info("Registering block encoding %s", blockEncodingFactory.getName());
blockEncodingManager.addBlockEncodingFactory(blockEncodingFactory);
for (BlockEncoding blockEncoding : plugin.getBlockEncodings()) {
log.info("Registering block encoding %s", blockEncoding.getName());
blockEncodingManager.addBlockEncoding(blockEncoding);
}

for (Type type : plugin.getTypes()) {
Expand Down
Expand Up @@ -104,7 +104,7 @@
import com.facebook.presto.spi.PageIndexerFactory;
import com.facebook.presto.spi.PageSorter;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockEncodingFactory;
import com.facebook.presto.spi.block.BlockEncoding;
import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
Expand Down Expand Up @@ -472,7 +472,7 @@ protected void setup(Binder binder)
// block encodings
binder.bind(BlockEncodingManager.class).in(Scopes.SINGLETON);
binder.bind(BlockEncodingSerde.class).to(BlockEncodingManager.class).in(Scopes.SINGLETON);
newSetBinder(binder, new TypeLiteral<BlockEncodingFactory<?>>() {});
newSetBinder(binder, BlockEncoding.class);
jsonBinder(binder).addSerializerBinding(Block.class).to(BlockJsonSerde.Serializer.class);
jsonBinder(binder).addDeserializerBinding(Block.class).to(BlockJsonSerde.Deserializer.class);

Expand Down
Expand Up @@ -13,11 +13,15 @@
*/
package com.facebook.presto.block;

import com.facebook.presto.metadata.FunctionRegistry;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.block.BlockEncoding;
import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.facebook.presto.spi.block.DictionaryId;
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.type.TypeRegistry;
import com.google.common.collect.ImmutableList;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
Expand Down Expand Up @@ -51,6 +55,13 @@
@Test
public abstract class AbstractTestBlock
{
private static final TypeManager TYPE_MANAGER = new TypeRegistry();
private static final BlockEncodingSerde BLOCK_ENCODING_SERDE = new BlockEncodingManager(TYPE_MANAGER);
static {
// associate TYPE_MANAGER with a function registry
new FunctionRegistry(TYPE_MANAGER, new BlockEncodingManager(TYPE_MANAGER), new FeaturesConfig());
}

protected <T> void assertBlock(Block block, Supplier<BlockBuilder> newBlockBuilder, T[] expectedValues)
{
assertBlockPositions(block, newBlockBuilder, expectedValues);
Expand Down Expand Up @@ -363,9 +374,8 @@ protected boolean isSliceAccessSupported()
private static Block copyBlockViaBlockSerde(Block block)
{
DynamicSliceOutput sliceOutput = new DynamicSliceOutput(1024);
BlockEncoding blockEncoding = block.getEncoding();
blockEncoding.writeBlock(sliceOutput, block);
return blockEncoding.readBlock(sliceOutput.slice().getInput());
BLOCK_ENCODING_SERDE.writeBlock(sliceOutput, block);
return BLOCK_ENCODING_SERDE.readBlock(sliceOutput.slice().getInput());
}

private static Block copyBlockViaWritePositionTo(Block block, Supplier<BlockBuilder> newBlockBuilder)
Expand Down
Expand Up @@ -13,10 +13,14 @@
*/
package com.facebook.presto.block;

import com.facebook.presto.metadata.FunctionRegistry;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockEncoding;
import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.facebook.presto.spi.block.DictionaryBlock;
import com.facebook.presto.spi.block.RunLengthEncodedBlock;
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.type.TypeRegistry;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;

Expand All @@ -29,6 +33,13 @@

final class ColumnarTestUtils
{
private static final TypeManager TYPE_MANAGER = new TypeRegistry();
private static final BlockEncodingSerde BLOCK_ENCODING_SERDE = new BlockEncodingManager(TYPE_MANAGER);
static {
// associate TYPE_MANAGER with a function registry
new FunctionRegistry(TYPE_MANAGER, new BlockEncodingManager(TYPE_MANAGER), new FeaturesConfig());
}

private ColumnarTestUtils() {}

public static <T> void assertBlock(Block block, T[] expectedValues)
Expand Down Expand Up @@ -109,9 +120,8 @@ public static <T> T[] alternatingNullValues(T[] objects)
private static Block copyBlock(Block block)
{
DynamicSliceOutput sliceOutput = new DynamicSliceOutput(1024);
BlockEncoding blockEncoding = block.getEncoding();
blockEncoding.writeBlock(sliceOutput, block);
return blockEncoding.readBlock(sliceOutput.slice().getInput());
BLOCK_ENCODING_SERDE.writeBlock(sliceOutput, block);
return BLOCK_ENCODING_SERDE.readBlock(sliceOutput.slice().getInput());
}

public static DictionaryBlock createTestDictionaryBlock(Block block)
Expand Down
5 changes: 2 additions & 3 deletions presto-spi/src/main/java/com/facebook/presto/spi/Plugin.java
Expand Up @@ -13,8 +13,7 @@
*/
package com.facebook.presto.spi;

import com.facebook.presto.spi.block.BlockEncodingFactory;
import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.facebook.presto.spi.block.BlockEncoding;
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerFactory;
Expand All @@ -36,7 +35,7 @@ default Iterable<ConnectorFactory> getConnectorFactories()
return emptyList();
}

default Iterable<BlockEncodingFactory<?>> getBlockEncodingFactories(BlockEncodingSerde serde)
default Iterable<BlockEncoding> getBlockEncodings()
{
return emptyList();
}
Expand Down
Expand Up @@ -36,9 +36,9 @@ int getOffset(int position)
}

@Override
public BlockEncoding getEncoding()
public String getEncodingName()
{
return new ArrayBlockEncoding(getValues().getEncoding());
return ArrayBlockEncoding.NAME;
}

@Override
Expand Down
Expand Up @@ -145,9 +145,9 @@ public void writePositionTo(int position, BlockBuilder blockBuilder)
}

@Override
public BlockEncoding getEncoding()
public String getEncodingName()
{
return new FixedWidthBlockEncoding(fixedSize);
return FixedWidthBlockEncoding.NAME;
}

@Override
Expand Down

0 comments on commit 90a52a8

Please sign in to comment.