Skip to content

Commit

Permalink
Implement array and map types in Raptor
Browse files Browse the repository at this point in the history
  • Loading branch information
cberner committed Aug 16, 2015
1 parent eaeda42 commit 7aa6d34
Show file tree
Hide file tree
Showing 11 changed files with 389 additions and 58 deletions.
Expand Up @@ -34,6 +34,8 @@
import java.io.File;
import java.io.IOException;
import java.util.BitSet;
import java.util.List;
import java.util.Map;

import static com.facebook.presto.raptor.util.Closer.closer;
import static io.airlift.slice.SizeOf.SIZE_OF_BYTE;
Expand Down Expand Up @@ -136,6 +138,21 @@ private static int uncompressedSize(Object object)
if (object instanceof BytesWritable) {
return ((BytesWritable) object).getLength();
}
if (object instanceof List<?>) {
int size = 0;
for (Object element : (Iterable<?>) object) {
size += uncompressedSize(element);
}
return size;
}
if (object instanceof Map<?, ?>) {
int size = 0;
for (Map.Entry<?, ?> entry : ((Map<?, ?>) object).entrySet()) {
size += uncompressedSize(entry.getKey());
size += uncompressedSize(entry.getValue());
}
return size;
}
throw new IOException("Unhandled ORC object: " + object.getClass().getName());
}

Expand Down
Expand Up @@ -31,8 +31,13 @@
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;

import java.io.Closeable;
import java.io.File;
Expand All @@ -45,6 +50,10 @@

import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_ERROR;
import static com.facebook.presto.raptor.storage.Row.extractRow;
import static com.facebook.presto.raptor.storage.StorageType.arrayOf;
import static com.facebook.presto.raptor.storage.StorageType.mapOf;
import static com.facebook.presto.raptor.util.Types.isArrayType;
import static com.facebook.presto.raptor.util.Types.isMapType;
import static com.facebook.presto.spi.StandardErrorCode.INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Functions.toStringFunction;
Expand All @@ -56,12 +65,14 @@
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_COLUMN_TYPES;
import static org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import static org.apache.hadoop.hive.ql.io.orc.CompressionKind.SNAPPY;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.LIST;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.MAP;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category.PRIMITIVE;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardListObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardMapObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.getStandardStructObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaBooleanObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaByteArrayObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaLongObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.javaStringObjectInspector;
import static org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector;
import static org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory.getPrimitiveTypeInfo;

public class OrcFileWriter
implements Closeable
Expand Down Expand Up @@ -196,24 +207,30 @@ private static Constructor<? extends RecordWriter> getOrcWriterConstructor()

private static List<ObjectInspector> getJavaObjectInspectors(List<StorageType> types)
{
return types.stream().map(OrcFileWriter::getJavaObjectInspector).collect(toList());
return types.stream()
.map(StorageType::getHiveTypeName)
.map(TypeInfoUtils::getTypeInfoFromTypeString)
.map(OrcFileWriter::getJavaObjectInspector)
.collect(toList());
}

private static ObjectInspector getJavaObjectInspector(StorageType type)
private static ObjectInspector getJavaObjectInspector(TypeInfo typeInfo)
{
switch (type) {
case BOOLEAN:
return javaBooleanObjectInspector;
case LONG:
return javaLongObjectInspector;
case DOUBLE:
return javaDoubleObjectInspector;
case STRING:
return javaStringObjectInspector;
case BYTES:
return javaByteArrayObjectInspector;
}
throw new PrestoException(INTERNAL_ERROR, "Unhandled storage type: " + type);
Category category = typeInfo.getCategory();
if (category == PRIMITIVE) {
return getPrimitiveJavaObjectInspector(getPrimitiveTypeInfo(typeInfo.getTypeName()));
}
if (category == LIST) {
ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo;
return getStandardListObjectInspector(getJavaObjectInspector(listTypeInfo.getListElementTypeInfo()));
}
if (category == MAP) {
MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
return getStandardMapObjectInspector(
getJavaObjectInspector(mapTypeInfo.getMapKeyTypeInfo()),
getJavaObjectInspector(mapTypeInfo.getMapValueTypeInfo()));
}
throw new PrestoException(INTERNAL_ERROR, "Unhandled storage type: " + category);
}

private static <T> boolean isUnique(Collection<T> items)
Expand Down Expand Up @@ -246,6 +263,12 @@ private static StorageType toStorageType(Type type)
return StorageType.BYTES;
}
}
if (isArrayType(type)) {
return arrayOf(toStorageType(type.getTypeParameters().get(0)));
}
if (isMapType(type)) {
return mapOf(toStorageType(type.getTypeParameters().get(0)), toStorageType(type.getTypeParameters().get(1)));
}
throw new PrestoException(NOT_SUPPORTED, "No storage type for type: " + type);
}
}
Expand Up @@ -18,13 +18,15 @@
import com.facebook.presto.orc.LongVector;
import com.facebook.presto.orc.OrcDataSource;
import com.facebook.presto.orc.OrcRecordReader;
import com.facebook.presto.orc.SingleObjectVector;
import com.facebook.presto.orc.SliceVector;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.UpdatablePageSource;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.BlockBuilder;
import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.block.LazyArrayBlock;
import com.facebook.presto.spi.block.LazyBlockLoader;
import com.facebook.presto.spi.block.LazyFixedWidthBlock;
import com.facebook.presto.spi.block.LazySliceArrayBlock;
Expand All @@ -41,6 +43,8 @@

import static com.facebook.presto.orc.Vector.MAX_VECTOR_LENGTH;
import static com.facebook.presto.raptor.RaptorErrorCode.RAPTOR_ERROR;
import static com.facebook.presto.raptor.util.Types.isArrayType;
import static com.facebook.presto.raptor.util.Types.isMapType;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
Expand Down Expand Up @@ -177,6 +181,9 @@ else if (DOUBLE.equals(type)) {
else if (VARCHAR.equals(type) || VARBINARY.equals(type)) {
blocks[fieldId] = new LazySliceArrayBlock(batchSize, new LazySliceBlockLoader(columnIndexes[fieldId], batchSize));
}
else if (isArrayType(type) || isMapType(type)) {
blocks[fieldId] = new LazyArrayBlock(new LazyStructuralBlockLoader(columnIndexes[fieldId], type));
}
else {
throw new PrestoException(NOT_SUPPORTED, "Unsupported column type: " + type);
}
Expand Down Expand Up @@ -420,4 +427,33 @@ public void load(LazySliceArrayBlock block)
}
}
}

private final class LazyStructuralBlockLoader
implements LazyBlockLoader<LazyArrayBlock>
{
private final int expectedBatchId = batchId;
private final int columnIndex;
private final Type type;

public LazyStructuralBlockLoader(int columnIndex, Type type)
{
this.columnIndex = columnIndex;
this.type = type;
}

@Override
public void load(LazyArrayBlock block)
{
checkState(batchId == expectedBatchId);
try {
SingleObjectVector vector = new SingleObjectVector();
recordReader.readVector(type, columnIndex, vector);
Block resultBlock = (Block) vector.object;
block.copyFromBlock(resultBlock);
}
catch (IOException e) {
throw new PrestoException(RAPTOR_ERROR, e);
}
}
}
}
Expand Up @@ -35,7 +35,10 @@
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.TupleDomain;
import com.facebook.presto.spi.type.StandardTypes;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.spi.type.TypeSignature;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -82,6 +85,7 @@
import static io.airlift.units.Duration.nanosSince;
import static java.lang.Math.min;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.util.Objects.requireNonNull;
import static org.joda.time.DateTimeZone.UTC;

public class OrcStorageManager
Expand All @@ -102,6 +106,7 @@ public class OrcStorageManager
private final DataSize maxShardSize;
private final DataSize maxBufferSize;
private final StorageManagerStats stats;
private final TypeManager typeManager;

@Inject
public OrcStorageManager(
Expand All @@ -110,7 +115,8 @@ public OrcStorageManager(
Optional<BackupStore> backupStore,
JsonCodec<ShardDelta> shardDeltaCodec,
StorageManagerConfig config,
ShardRecoveryManager recoveryManager)
ShardRecoveryManager recoveryManager,
TypeManager typeManager)
{
this(currentNodeId.toString(),
storageService,
Expand All @@ -120,6 +126,7 @@ public OrcStorageManager(
config.getOrcMaxReadSize(),
config.getOrcStreamBufferSize(),
recoveryManager,
typeManager,
config.getShardRecoveryTimeout(),
config.getMaxShardRows(),
config.getMaxShardSize(),
Expand All @@ -135,6 +142,7 @@ public OrcStorageManager(
DataSize orcMaxReadSize,
DataSize orcStreamBufferSize,
ShardRecoveryManager recoveryManager,
TypeManager typeManager,
Duration shardRecoveryTimeout,
long maxShardRows,
DataSize maxShardSize,
Expand All @@ -156,6 +164,7 @@ public OrcStorageManager(
this.maxShardSize = checkNotNull(maxShardSize, "maxShardSize is null");
this.maxBufferSize = checkNotNull(maxBufferSize, "maxBufferSize is null");
this.stats = new StorageManagerStats();
this.typeManager = requireNonNull(typeManager, "typeManager is null");
}

@Override
Expand Down Expand Up @@ -351,27 +360,27 @@ private static OrcFileInfo rewriteFile(File input, File output, BitSet rowsToDel
}
}

private static List<ColumnInfo> getColumnInfo(OrcReader reader)
private List<ColumnInfo> getColumnInfo(OrcReader reader)
{
// TODO: These should be stored as proper metadata.
// XXX: Relying on ORC types will not work when more Presto types are supported.

List<String> names = reader.getColumnNames();
List<OrcType> types = reader.getFooter().getTypes();
types = types.subList(1, types.size()); // skip struct
if (names.size() != types.size()) {
Type rowType = getType(reader.getFooter().getTypes(), 0);
if (names.size() != rowType.getTypeParameters().size()) {
throw new PrestoException(RAPTOR_ERROR, "Column names and types do not match");
}

ImmutableList.Builder<ColumnInfo> list = ImmutableList.builder();
for (int i = 0; i < names.size(); i++) {
list.add(new ColumnInfo(Long.parseLong(names.get(i)), getType(types.get(i))));
list.add(new ColumnInfo(Long.parseLong(names.get(i)), rowType.getTypeParameters().get(i)));
}
return list.build();
}

private static Type getType(OrcType type)
private Type getType(List<OrcType> types, int index)
{
OrcType type = types.get(index);
switch (type.getOrcTypeKind()) {
case BOOLEAN:
return BOOLEAN;
Expand All @@ -383,6 +392,19 @@ private static Type getType(OrcType type)
return VARCHAR;
case BINARY:
return VARBINARY;
case LIST:
TypeSignature elementType = getType(types, type.getFieldTypeIndex(0)).getTypeSignature();
return typeManager.getParameterizedType(StandardTypes.ARRAY, ImmutableList.of(elementType), ImmutableList.of());
case MAP:
TypeSignature keyType = getType(types, type.getFieldTypeIndex(0)).getTypeSignature();
TypeSignature valueType = getType(types, type.getFieldTypeIndex(1)).getTypeSignature();
return typeManager.getParameterizedType(StandardTypes.MAP, ImmutableList.of(keyType, valueType), ImmutableList.of());
case STRUCT:
ImmutableList.Builder<TypeSignature> fieldTypes = ImmutableList.builder();
for (int i = 0; i < type.getFieldCount(); i++) {
fieldTypes.add(getType(types, type.getFieldTypeIndex(i)).getTypeSignature());
}
return typeManager.getParameterizedType(StandardTypes.ROW, fieldTypes.build(), ImmutableList.copyOf(type.getFieldNames()));
}
throw new PrestoException(RAPTOR_ERROR, "Unhandled ORC type: " + type);
}
Expand Down

0 comments on commit 7aa6d34

Please sign in to comment.