forked from apache/hive
-
Notifications
You must be signed in to change notification settings - Fork 0
/
HiveVectorizedReader.java
246 lines (202 loc) · 11 KB
/
HiveVectorizedReader.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.mr.hive.vector;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.encoded.MemoryBufferOrBuffers;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.SyntheticFileId;
import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
import org.apache.hadoop.hive.ql.io.orc.VectorizedOrcInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.VectorizedParquetInputFormat;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hive.iceberg.org.apache.orc.OrcConf;
import org.apache.hive.iceberg.org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.hive.iceberg.org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.hive.iceberg.org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.hive.iceberg.org.apache.parquet.schema.MessageType;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
import org.apache.iceberg.orc.VectorizedReadUtils;
import org.apache.iceberg.parquet.ParquetFooterInputFromCache;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.orc.impl.OrcTail;
/**
* Utility class to create vectorized readers for Hive.
* As per the file format of the task, it will create a matching vectorized record reader that is already implemented
* in Hive. It will also do some tweaks on the produced vectors for Iceberg's use e.g. partition column handling.
*/
public class HiveVectorizedReader {
private HiveVectorizedReader() {
}
public static CloseableIterable<HiveBatchContext> reader(Path path, FileScanTask task,
Map<Integer, ?> idToConstant, TaskAttemptContext context, Expression residual) {
// Tweaks on jobConf here are relevant for this task only, so we need to copy it first as context's conf is reused..
JobConf job = new JobConf(context.getConfiguration());
FileFormat format = task.file().format();
Reporter reporter = ((MapredIcebergInputFormat.CompatibilityTaskAttemptContextImpl) context).getLegacyReporter();
// Hive by default requires partition columns to be read too. This is not required for identity partition
// columns, as we will add this as constants later.
int[] partitionColIndices = null;
Object[] partitionValues = null;
PartitionSpec partitionSpec = task.spec();
List<Integer> readColumnIds = ColumnProjectionUtils.getReadColumnIDs(job);
if (!partitionSpec.isUnpartitioned()) {
List<PartitionField> fields = partitionSpec.fields();
List<Integer> partitionColIndicesList = Lists.newLinkedList();
List<Object> partitionValuesList = Lists.newLinkedList();
for (PartitionField partitionField : fields) {
if (partitionField.transform().isIdentity()) {
// Get columns in read schema order (which matches those of readColumnIds) to find partition column indices
List<Types.NestedField> columns = task.spec().schema().columns();
for (int colIdx = 0; colIdx < columns.size(); ++colIdx) {
if (columns.get(colIdx).fieldId() == partitionField.sourceId()) {
// Skip reading identity partition columns from source file...
readColumnIds.remove((Integer) colIdx);
// ...and use the corresponding constant value instead
partitionColIndicesList.add(colIdx);
partitionValuesList.add(idToConstant.get(partitionField.sourceId()));
break;
}
}
}
}
partitionColIndices = ArrayUtils.toPrimitive(partitionColIndicesList.toArray(new Integer[0]));
partitionValues = partitionValuesList.toArray(new Object[0]);
ColumnProjectionUtils.setReadColumns(job, readColumnIds);
}
try {
long start = task.start();
long length = task.length();
// TODO: Iceberg currently does not track the last modification time of a file. Until that's added,
// we need to set Long.MIN_VALUE as last modification time in the fileId triplet.
SyntheticFileId fileId = new SyntheticFileId(path, task.file().fileSizeInBytes(), Long.MIN_VALUE);
fileId.toJobConf(job);
RecordReader<NullWritable, VectorizedRowBatch> recordReader = null;
switch (format) {
case ORC:
recordReader = orcRecordReader(job, reporter, task, path, start, length, readColumnIds,
fileId, residual);
break;
case PARQUET:
recordReader = parquetRecordReader(job, reporter, task, path, start, length, fileId);
break;
default:
throw new UnsupportedOperationException("Vectorized Hive reading unimplemented for format: " + format);
}
return createVectorizedRowBatchIterable(recordReader, job, partitionColIndices, partitionValues);
} catch (IOException ioe) {
throw new RuntimeException("Error creating vectorized record reader for " + path, ioe);
}
}
private static RecordReader<NullWritable, VectorizedRowBatch> orcRecordReader(JobConf job, Reporter reporter,
FileScanTask task, Path path, long start, long length, List<Integer> readColumnIds,
SyntheticFileId fileId, Expression residual) throws IOException {
RecordReader<NullWritable, VectorizedRowBatch> recordReader = null;
// Need to turn positional schema evolution off since we use column name based schema evolution for projection
// and Iceberg will make a mapping between the file schema and the current reading schema.
job.setBoolean(OrcConf.FORCE_POSITIONAL_EVOLUTION.getHiveConfName(), false);
// Metadata information has to be passed along in the OrcSplit. Without specifying this, the vectorized
// reader will assume that the ORC file ends at the task's start + length, and might fail reading the tail..
ByteBuffer serializedOrcTail = VectorizedReadUtils.getSerializedOrcTail(path, fileId, job);
OrcTail orcTail = VectorizedReadUtils.deserializeToOrcTail(serializedOrcTail);
VectorizedReadUtils.handleIcebergProjection(task, job,
VectorizedReadUtils.deserializeToShadedOrcTail(serializedOrcTail).getSchema(), residual);
// If LLAP enabled, try to retrieve an LLAP record reader - this might yield to null in some special cases
// TODO: add support for reading files with positional deletes with LLAP (LLAP would need to provide file row num)
if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) &&
LlapProxy.getIo() != null && task.deletes().isEmpty()) {
recordReader = LlapProxy.getIo().llapVectorizedOrcReaderForPath(fileId, path, null, readColumnIds,
job, start, length, reporter);
}
if (recordReader == null) {
InputSplit split = new OrcSplit(path, fileId, start, length, (String[]) null, orcTail,
false, false, com.google.common.collect.Lists.newArrayList(), 0, length, path.getParent(), null);
recordReader = new VectorizedOrcInputFormat().getRecordReader(split, job, reporter);
}
return recordReader;
}
private static RecordReader<NullWritable, VectorizedRowBatch> parquetRecordReader(JobConf job, Reporter reporter,
FileScanTask task, Path path, long start, long length, SyntheticFileId fileId) throws IOException {
InputSplit split = new FileSplit(path, start, length, job);
VectorizedParquetInputFormat inputFormat = new VectorizedParquetInputFormat();
MemoryBufferOrBuffers footerData = null;
if (HiveConf.getBoolVar(job, HiveConf.ConfVars.LLAP_IO_ENABLED, LlapProxy.isDaemon()) &&
LlapProxy.getIo() != null) {
LlapProxy.getIo().initCacheOnlyInputFormat(inputFormat);
footerData = LlapProxy.getIo().getParquetFooterBuffersFromCache(path, job, fileId);
}
ParquetMetadata parquetMetadata = footerData != null ?
ParquetFileReader.readFooter(new ParquetFooterInputFromCache(footerData), ParquetMetadataConverter.NO_FILTER) :
ParquetFileReader.readFooter(job, path);
MessageType fileSchema = parquetMetadata.getFileMetaData().getSchema();
MessageType typeWithIds = null;
Schema expectedSchema = task.spec().schema();
if (ParquetSchemaUtil.hasIds(fileSchema)) {
typeWithIds = ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema);
} else {
typeWithIds = ParquetSchemaUtil.pruneColumnsFallback(ParquetSchemaUtil.addFallbackIds(fileSchema),
expectedSchema);
}
ParquetSchemaFieldNameVisitor psv = new ParquetSchemaFieldNameVisitor(fileSchema);
TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), typeWithIds, psv);
job.set(IOConstants.COLUMNS, psv.retrieveColumnNameList());
return inputFormat.getRecordReader(split, job, reporter);
}
private static CloseableIterable<HiveBatchContext> createVectorizedRowBatchIterable(
RecordReader<NullWritable, VectorizedRowBatch> hiveRecordReader, JobConf job, int[] partitionColIndices,
Object[] partitionValues) {
HiveBatchIterator iterator =
new HiveBatchIterator(hiveRecordReader, job, partitionColIndices, partitionValues);
return new CloseableIterable<HiveBatchContext>() {
@Override
public CloseableIterator iterator() {
return iterator;
}
@Override
public void close() throws IOException {
iterator.close();
}
};
}
}