Skip to content

Commit

Permalink
Add Block JSON serde
Browse files Browse the repository at this point in the history
  • Loading branch information
erichwang committed Nov 4, 2015
1 parent 7f46e6c commit 8c13b28
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 7 deletions.
@@ -0,0 +1,82 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.block;

import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.SliceOutput;
import io.airlift.slice.Slices;

import javax.inject.Inject;

import java.io.IOException;
import java.util.Base64;

import static com.facebook.presto.block.BlockSerdeUtil.readBlock;
import static com.facebook.presto.block.BlockSerdeUtil.writeBlock;
import static java.util.Objects.requireNonNull;

public final class BlockJsonSerde
{
private BlockJsonSerde() {}

public static class Serializer
extends JsonSerializer<Block>
{
private final BlockEncodingSerde blockEncodingSerde;

@Inject
public Serializer(BlockEncodingSerde blockEncodingSerde)
{
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
}

@Override
public void serialize(Block block, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
throws IOException
{
SliceOutput output = new DynamicSliceOutput(64);
writeBlock(blockEncodingSerde, output, block);
String encoded = Base64.getEncoder().encodeToString(output.slice().getBytes());
jsonGenerator.writeString(encoded);
}
}

public static class Deserializer
extends JsonDeserializer<Block>
{
private final BlockEncodingSerde blockEncodingSerde;

@Inject
public Deserializer(BlockEncodingSerde blockEncodingSerde)
{
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
}

@Override
public Block deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException
{
byte[] decoded = Base64.getDecoder().decode(jsonParser.readValueAs(String.class));
return readBlock(blockEncodingSerde, Slices.wrappedBuffer(decoded));
}
}
}
Expand Up @@ -34,11 +34,22 @@ private BlockSerdeUtil()

public static Block readBlock(BlockEncodingSerde blockEncodingSerde, Slice slice)
{
SliceInput input = slice.getInput();
return readBlock(blockEncodingSerde, slice.getInput());
}

public static Block readBlock(BlockEncodingSerde blockEncodingSerde, SliceInput input)
{
BlockEncoding blockEncoding = blockEncodingSerde.readBlockEncoding(input);
return blockEncoding.readBlock(input);
}

public static void writeBlock(BlockEncodingSerde blockEncodingSerde, SliceOutput output, Block block)
{
BlockEncoding encoding = block.getEncoding();
blockEncodingSerde.writeBlockEncoding(output, encoding);
encoding.writeBlock(output, block);
}

// This class is only used in LiteralInterpreter for magic literal. Most likely, you shouldn't use it from anywhere else.
public static void writeBlock(SliceOutput output, Block block)
{
Expand Down
Expand Up @@ -15,14 +15,15 @@

import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockEncoding;
import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.google.common.collect.AbstractIterator;
import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;

import java.util.Iterator;

import static com.facebook.presto.block.BlockSerdeUtil.readBlock;
import static com.facebook.presto.block.BlockSerdeUtil.writeBlock;
import static java.util.Arrays.asList;
import static java.util.Objects.requireNonNull;

Expand Down Expand Up @@ -79,9 +80,7 @@ public PagesWriter append(Page page)
output.writeInt(page.getPositionCount());
output.writeInt(blocks.length);
for (int i = 0; i < blocks.length; i++) {
BlockEncoding encoding = blocks[i].getEncoding();
serde.writeBlockEncoding(output, encoding);
encoding.writeBlock(output, blocks[i]);
writeBlock(serde, output, blocks[i]);
}

return this;
Expand Down Expand Up @@ -111,8 +110,7 @@ protected Page computeNext()
int numberOfBlocks = input.readInt();
Block[] blocks = new Block[numberOfBlocks];
for (int i = 0; i < blocks.length; i++) {
BlockEncoding encoding = serde.readBlockEncoding(input);
blocks[i] = encoding.readBlock(input);
blocks[i] = readBlock(serde, input);
}

@SuppressWarnings("UnnecessaryLocalVariable")
Expand Down
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.GroupByHashPageIndexerFactory;
import com.facebook.presto.PagesIndexPageSorter;
import com.facebook.presto.block.BlockEncodingManager;
import com.facebook.presto.block.BlockJsonSerde;
import com.facebook.presto.client.NodeVersion;
import com.facebook.presto.client.QueryResults;
import com.facebook.presto.client.ServerInfo;
Expand Down Expand Up @@ -63,6 +64,7 @@
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.PageIndexerFactory;
import com.facebook.presto.spi.PageSorter;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockEncodingFactory;
import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.facebook.presto.spi.type.Type;
Expand Down Expand Up @@ -313,6 +315,8 @@ protected void setup(Binder binder)
binder.bind(BlockEncodingManager.class).in(Scopes.SINGLETON);
binder.bind(BlockEncodingSerde.class).to(BlockEncodingManager.class).in(Scopes.SINGLETON);
newSetBinder(binder, new TypeLiteral<BlockEncodingFactory<?>>() {});
jsonBinder(binder).addSerializerBinding(Block.class).to(BlockJsonSerde.Serializer.class);
jsonBinder(binder).addDeserializerBinding(Block.class).to(BlockJsonSerde.Deserializer.class);

// thread visualizer
jaxrsBinder(binder).bind(ThreadResource.class);
Expand Down

0 comments on commit 8c13b28

Please sign in to comment.