Skip to content

Commit

Permalink
Simplify construction of HiveRecordWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Oct 29, 2016
1 parent a6636ca commit 87e8b99
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 72 deletions.
150 changes: 91 additions & 59 deletions presto-hive/src/main/java/com/facebook/presto/hive/HivePageSink.java
Expand Up @@ -27,9 +27,9 @@
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
Expand All @@ -39,6 +39,7 @@
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -48,6 +49,7 @@
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
import org.apache.hadoop.hive.serde2.columnar.OptimizedLazyBinaryColumnarSerde;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.io.compress.CompressionCodec;
Expand Down Expand Up @@ -83,7 +85,6 @@
import static com.facebook.presto.hive.HiveWriteUtils.createRecordWriter;
import static com.facebook.presto.hive.HiveWriteUtils.getField;
import static com.facebook.presto.hive.HiveWriteUtils.getRowColumnInspectors;
import static com.facebook.presto.hive.HiveWriteUtils.initializeSerializer;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getHiveSchema;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.google.common.base.MoreObjects.toStringHelper;
Expand Down Expand Up @@ -141,8 +142,8 @@ public class HivePageSink
private final boolean immutablePartitions;
private final boolean compress;

private HiveRecordWriter[] writers;
private final List<Int2ObjectMap<HiveRecordWriter>> bucketWriters;
private HiveWriter[] writers;
private final List<Int2ObjectMap<HiveWriter>> bucketWriters;
private int bucketWriterCount;

private final ConnectorSession session;
Expand Down Expand Up @@ -248,7 +249,7 @@ public HivePageSink(
this.bucketColumns = null;
bucketFunction = null;
bucketWriters = null;
writers = new HiveRecordWriter[0];
writers = new HiveWriter[0];
}

this.pageIndexer = pageIndexerFactory.createPageIndexer(this.partitionColumnTypes);
Expand Down Expand Up @@ -284,7 +285,7 @@ private ImmutableList<Slice> doFinish()
{
ImmutableList.Builder<Slice> partitionUpdates = ImmutableList.builder();
if (!bucketCount.isPresent()) {
for (HiveRecordWriter writer : writers) {
for (HiveWriter writer : writers) {
if (writer != null) {
writer.commit();
PartitionUpdate partitionUpdate = writer.getPartitionUpdate();
Expand All @@ -293,10 +294,10 @@ private ImmutableList<Slice> doFinish()
}
}
else {
for (Int2ObjectMap<HiveRecordWriter> writers : bucketWriters) {
for (Int2ObjectMap<HiveWriter> writers : bucketWriters) {
PartitionUpdate firstPartitionUpdate = null;
ImmutableList.Builder<String> fileNamesBuilder = ImmutableList.builder();
for (HiveRecordWriter writer : writers.values()) {
for (HiveWriter writer : writers.values()) {
writer.commit();
PartitionUpdate partitionUpdate = writer.getPartitionUpdate();
if (firstPartitionUpdate == null) {
Expand Down Expand Up @@ -335,15 +336,15 @@ public void abort()
private void doAbort()
{
if (!bucketCount.isPresent()) {
for (HiveRecordWriter writer : writers) {
for (HiveWriter writer : writers) {
if (writer != null) {
writer.rollback();
}
}
}
else {
for (Int2ObjectMap<HiveRecordWriter> writers : bucketWriters) {
for (HiveRecordWriter writer : writers.values()) {
for (Int2ObjectMap<HiveWriter> writers : bucketWriters) {
for (HiveWriter writer : writers.values()) {
writer.rollback();
}
}
Expand Down Expand Up @@ -377,7 +378,7 @@ private CompletableFuture<?> doAppend(Page page)
}
for (int position = 0; position < page.getPositionCount(); position++) {
int writerIndex = indexes[position];
HiveRecordWriter writer = writers[writerIndex];
HiveWriter writer = writers[writerIndex];
if (writer == null) {
writer = createWriter(partitionColumns, position, OptionalInt.empty());
writers[writerIndex] = writer;
Expand All @@ -394,9 +395,9 @@ private CompletableFuture<?> doAppend(Page page)
Page bucketColumnsPage = extractColumns(page, bucketColumns);
for (int position = 0; position < page.getPositionCount(); position++) {
int writerIndex = indexes[position];
Int2ObjectMap<HiveRecordWriter> writers = bucketWriters.get(writerIndex);
Int2ObjectMap<HiveWriter> writers = bucketWriters.get(writerIndex);
int bucket = bucketFunction.getBucket(bucketColumnsPage, position);
HiveRecordWriter writer = writers.get(bucket);
HiveWriter writer = writers.get(bucket);
if (writer == null) {
if (bucketWriterCount >= maxOpenPartitions) {
throw new PrestoException(HIVE_TOO_MANY_OPEN_PARTITIONS, "Too many open writers for partitions and buckets");
Expand All @@ -417,7 +418,7 @@ public static String computeBucketedFileName(String filePrefix, int bucket)
return filePrefix + "_bucket-" + Strings.padStart(Integer.toString(bucket), BUCKET_NUMBER_PADDING, '0');
}

private HiveRecordWriter createWriter(Page partitionColumns, int position, OptionalInt bucketNumber)
private HiveWriter createWriter(Page partitionColumns, int position, OptionalInt bucketNumber)
{
String fileName;
if (bucketNumber.isPresent()) {
Expand Down Expand Up @@ -532,19 +533,19 @@ private HiveRecordWriter createWriter(Page partitionColumns, int position, Optio

validateSchema(partitionName, schema);

return new HiveRecordWriter(
partitionName.orElse(""),
String fileNameWithExtension = fileName + getFileExtension(conf, outputFormat);
HiveRecordWriter hiveRecordWriter = new HiveRecordWriter(
new Path(write, fileNameWithExtension),
dataColumns.stream()
.map(DataColumn::getName)
.collect(toList()),
compress,
isNew,
dataColumns,
outputFormat,
serDe,
schema,
fileName + getFileExtension(conf, outputFormat),
write.toString(),
target.toString(),
typeManager,
conf);
return new HiveWriter(hiveRecordWriter, partitionName, isNew, fileNameWithExtension, write.toString(), target.toString());
}

private void validateSchema(Optional<String> partitionName, Properties schema)
Expand Down Expand Up @@ -650,14 +651,54 @@ private static Page extractColumns(Page page, int[] columns)
return new Page(page.getPositionCount(), blocks);
}

@VisibleForTesting
public static class HiveRecordWriter
private static class HiveWriter
{
private final String partitionName;
private final HiveRecordWriter hiveRecordWriter;
private final Optional<String> partitionName;
private final boolean isNew;
private final String fileName;
private final String writePath;
private final String targetPath;

public HiveWriter(HiveRecordWriter hiveRecordWriter, Optional<String> partitionName, boolean isNew, String fileName, String writePath, String targetPath)
{
this.hiveRecordWriter = hiveRecordWriter;
this.partitionName = partitionName;
this.isNew = isNew;
this.fileName = fileName;
this.writePath = writePath;
this.targetPath = targetPath;
}

public void addRow(Block[] columns, int position)
{
hiveRecordWriter.addRow(columns, position);
}

public void commit()
{
hiveRecordWriter.commit();
}

public void rollback()
{
hiveRecordWriter.rollback();
}

public PartitionUpdate getPartitionUpdate()
{
return new PartitionUpdate(
partitionName.orElse(""),
isNew,
writePath,
targetPath,
ImmutableList.of(fileName));
}
}

public static class HiveRecordWriter
{
private final Path path;
private final int fieldCount;
@SuppressWarnings("deprecation")
private final Serializer serializer;
Expand All @@ -668,53 +709,44 @@ public static class HiveRecordWriter
private final FieldSetter[] setters;

public HiveRecordWriter(
String partitionName,
Path path,
List<String> inputColumnNames,
boolean compress,
boolean isNew,
List<DataColumn> inputColumns,
String outputFormat,
String serDe,
Properties schema,
String fileName,
String writePath,
String targetPath,
TypeManager typeManager,
JobConf conf)
{
this.partitionName = partitionName;
this.isNew = isNew;
this.fileName = fileName;
this.writePath = writePath;
this.targetPath = targetPath;
this.path = requireNonNull(path, "path is null");

// existing tables may have columns in a different order
List<String> fileColumnNames = Splitter.on(',').trimResults().omitEmptyStrings().splitToList(schema.getProperty(META_TABLE_COLUMNS, ""));
List<HiveType> fileColumnHiveTypes = toHiveTypes(schema.getProperty(META_TABLE_COLUMN_TYPES, ""));
List<Type> fileColumnTypes = toHiveTypes(schema.getProperty(META_TABLE_COLUMN_TYPES, "")).stream()
.map(hiveType -> hiveType.getType(typeManager))
.collect(toList());

fieldCount = fileColumnNames.size();

if (serDe.equals(LazyBinaryColumnarSerDe.class.getName())) {
serDe = OptimizedLazyBinaryColumnarSerde.class.getName();
}
serializer = initializeSerializer(conf, schema, serDe);
recordWriter = createRecordWriter(new Path(writePath, fileName), conf, compress, schema, outputFormat);
recordWriter = createRecordWriter(path, conf, compress, schema, outputFormat);

List<Type> fileColumnTypes = fileColumnHiveTypes.stream()
.map(hiveType -> hiveType.getType(typeManager))
.collect(toList());
tableInspector = getStandardStructObjectInspector(fileColumnNames, getRowColumnInspectors(fileColumnTypes));
List<ObjectInspector> objectInspectors = getRowColumnInspectors(fileColumnTypes);
tableInspector = getStandardStructObjectInspector(fileColumnNames, objectInspectors);

// reorder (and possibly reduce) struct fields to match input
structFields = ImmutableList.copyOf(inputColumns.stream()
.map(DataColumn::getName)
.map(tableInspector::getStructFieldRef)
.collect(toList()));
structFields = ImmutableList.copyOf(inputColumnNames.stream()
.map(tableInspector::getStructFieldRef)
.collect(toList()));

row = tableInspector.create();

setters = new FieldSetter[structFields.size()];
for (int i = 0; i < setters.length; i++) {
setters[i] = createFieldSetter(tableInspector, row, structFields.get(i), inputColumns.get(i).getType());
setters[i] = createFieldSetter(tableInspector, row, structFields.get(i), fileColumnTypes.get(structFields.get(i).getFieldID()));
}
}

Expand Down Expand Up @@ -757,29 +789,29 @@ public void rollback()
}
}

public PartitionUpdate getPartitionUpdate()
@SuppressWarnings("deprecation")
private static Serializer initializeSerializer(Configuration conf, Properties properties, String serializerName)
{
return new PartitionUpdate(
partitionName,
isNew,
writePath,
targetPath,
ImmutableList.of(fileName));
try {
Serializer result = (Serializer) Class.forName(serializerName).getConstructor().newInstance();
result.initialize(conf, properties);
return result;
}
catch (SerDeException | ReflectiveOperationException e) {
throw Throwables.propagate(e);
}
}

@Override
public String toString()
{
return toStringHelper(this)
.add("partitionName", partitionName)
.add("writePath", writePath)
.add("fileName", fileName)
.add("path", path)
.toString();
}
}

@VisibleForTesting
public static class DataColumn
private static class DataColumn
{
private final String name;
private final Type type;
Expand Down
Expand Up @@ -19,7 +19,6 @@
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveColumnHandle;
import com.facebook.presto.hive.HiveCompressionCodec;
import com.facebook.presto.hive.HivePageSink.DataColumn;
import com.facebook.presto.hive.HivePageSink.HiveRecordWriter;
import com.facebook.presto.hive.HivePageSourceFactory;
import com.facebook.presto.hive.HiveRecordCursorProvider;
Expand Down Expand Up @@ -391,25 +390,15 @@ public RecordFormatWriter(File targetFile,
HiveStorageFormat format)
{
JobConf config = new JobConf(conf);
TypeTranslator typeTranslator = new HiveTypeTranslator();
configureCompression(config, compressionCodec);

List<DataColumn> dataColumns = new ArrayList<>(columnNames.size());
for (int i = 0; i < columnNames.size(); i++) {
dataColumns.add(new DataColumn(columnNames.get(i), columnTypes.get(i), HiveType.toHiveType(typeTranslator, columnTypes.get(i))));
}

recordWriter = new HiveRecordWriter(
null,
new Path(targetFile.toURI()),
columnNames,
compressionCodec != HiveCompressionCodec.NONE,
true,
dataColumns,
format.getOutputFormat(),
format.getSerDe(),
createSchema(format, columnNames, columnTypes),
targetFile.getName(),
targetFile.getParent(),
targetFile.toString(),
TYPE_MANAGER,
config);
}
Expand Down

0 comments on commit 87e8b99

Please sign in to comment.