Skip to content

HIVE-25948: Optimize Iceberg writes by directing records either to Clustered or Fanout writer #5789

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 22, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -122,7 +122,8 @@ static Expression getFilterExpr(Configuration conf, ExprNodeGenericFuncDesc expr
try {
return HiveIcebergFilterFactory.generateFilterExpression(sarg);
} catch (UnsupportedOperationException e) {
LOG.warn("Unable to create Iceberg filter, continuing without filter (will be applied by Hive later): ", e);
LOG.warn("Unable to create Iceberg filter, proceeding without it (will be applied by Hive later): {}",
e.getMessage());
}
}
return null;
Original file line number Diff line number Diff line change
@@ -456,7 +456,7 @@ private void commitTable(FileIO io, ExecutorService executor, OutputTable output

for (JobContext jobContext : jobContexts) {
JobConf conf = jobContext.getJobConf();
table = Optional.ofNullable(table).orElse(Catalogs.loadTable(conf, catalogProperties));
table = Optional.ofNullable(table).orElseGet(() -> Catalogs.loadTable(conf, catalogProperties));
branchName = conf.get(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF);
snapshotId = getSnapshotId(outputTable.table, branchName);

@@ -857,7 +857,6 @@ public List<FileStatus> getOutputFiles(List<JobContext> jobContexts) throws IOEx
.onFailure((output, exc) -> LOG.warn("Failed to retrieve merge input file for the table {}", output, exc))
.run(output -> {
for (JobContext jobContext : outputs.get(output)) {
LOG.info("Cleaning job for jobID: {}, table: {}", jobContext.getJobID(), output);
Table table = output.table;
FileSystem fileSystem = new Path(table.location()).getFileSystem(jobConf);
String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID());
@@ -897,22 +896,21 @@ public List<FileStatus> getOutputFiles(List<JobContext> jobContexts) throws IOEx
* @return Returns the list of file statuses of the output files in the jobContexts
* @throws IOException Throws IOException
*/
public List<ContentFile> getOutputContentFiles(List<JobContext> jobContexts) throws IOException {
public List<ContentFile<?>> getOutputContentFiles(List<JobContext> jobContexts) throws IOException {
Multimap<OutputTable, JobContext> outputs = collectOutputs(jobContexts);
JobConf jobConf = jobContexts.get(0).getJobConf();

ExecutorService fileExecutor = fileExecutor(jobConf);
ExecutorService tableExecutor = tableExecutor(jobConf, outputs.keySet().size());

Collection<ContentFile> files = new ConcurrentLinkedQueue<>();
Collection<ContentFile<?>> files = new ConcurrentLinkedQueue<>();
try {
Tasks.foreach(outputs.keySet())
.suppressFailureWhenFinished()
.executeWith(tableExecutor)
.onFailure((output, exc) -> LOG.warn("Failed to retrieve merge input file for the table {}", output, exc))
.run(output -> {
for (JobContext jobContext : outputs.get(output)) {
LOG.info("Cleaning job for jobID: {}, table: {}", jobContext.getJobID(), output);
Table table = output.table;
String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID());
// list jobLocation to get number of forCommit files
Original file line number Diff line number Diff line change
@@ -25,15 +25,13 @@
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.security.authorization.HiveCustomStorageHandlerUtils;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
@@ -78,9 +76,9 @@ public class HiveIcebergSerDe extends AbstractSerDe {
private final Map<String, String> jobConf = Maps.newHashMap();

@Override
public void initialize(@Nullable Configuration configuration, Properties serDeProperties,
Properties partitionProperties) throws SerDeException {
super.initialize(configuration, serDeProperties, partitionProperties);
public void initialize(Configuration conf, Properties serDeProperties,
Properties partitionProperties) throws SerDeException {
super.initialize(conf, serDeProperties, partitionProperties);

// HiveIcebergSerDe.initialize is called multiple places in Hive code:
// - When we are trying to create a table - HiveDDL data is stored at the serDeProperties, but no Iceberg table
@@ -103,7 +101,7 @@ public void initialize(@Nullable Configuration configuration, Properties serDePr
}
} else {
try {
Table table = IcebergTableUtil.getTable(configuration, serDeProperties);
Table table = IcebergTableUtil.getTable(conf, serDeProperties);
// always prefer the original table schema if there is one
this.tableSchema = table.schema();
this.partitionColumns = table.spec().fields().stream().map(PartitionField::name).collect(Collectors.toList());
@@ -115,7 +113,7 @@ public void initialize(@Nullable Configuration configuration, Properties serDePr

if (serDeProperties.get("metadata_location") != null) {
// If metadata location is provided, extract the schema details from it.
try (FileIO fileIO = new HadoopFileIO(configuration)) {
try (FileIO fileIO = new HadoopFileIO(conf)) {
TableMetadata metadata = TableMetadataParser.read(fileIO, serDeProperties.getProperty("metadata_location"));
this.tableSchema = metadata.schema();
this.partitionColumns =
@@ -126,41 +124,41 @@ public void initialize(@Nullable Configuration configuration, Properties serDePr
}
}
} else {
boolean autoConversion = configuration.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false);
boolean autoConversion = conf.getBoolean(InputFormatConfig.SCHEMA_AUTO_CONVERSION, false);
// If we can not load the table try the provided hive schema
this.tableSchema = hiveSchemaOrThrow(e, autoConversion);
// This is only for table creation, it is ok to have an empty partition column list
this.partitionColumns = ImmutableList.of();
}
if (e instanceof NoSuchTableException &&
HiveTableUtil.isCtas(serDeProperties) &&
!Catalogs.hiveCatalog(configuration, serDeProperties)) {
HiveTableUtil.isCtas(serDeProperties) && !Catalogs.hiveCatalog(conf, serDeProperties)) {
throw new SerDeException(CTAS_EXCEPTION_MSG);
}
}
}

this.projectedSchema =
projectedSchema(configuration, serDeProperties.getProperty(Catalogs.NAME), tableSchema, jobConf);
projectedSchema(conf, serDeProperties.getProperty(Catalogs.NAME), tableSchema, jobConf);

// Currently ClusteredWriter is used which requires that records are ordered by partition keys.
// Here we ensure that SortedDynPartitionOptimizer will kick in and do the sorting.
// TODO: remove once we have both Fanout and ClusteredWriter available: HIVE-25948
HiveConf.setIntVar(configuration, HiveConf.ConfVars.HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD, 1);
HiveConf.setVar(configuration, HiveConf.ConfVars.DYNAMIC_PARTITIONING_MODE, "nonstrict");
if (!IcebergTableUtil.isFanoutEnabled(Maps.fromProperties(serDeProperties))) {
// ClusteredWriter requires that records are ordered by partition keys.
// Here we ensure that SortedDynPartitionOptimizer will kick in and do the sorting.
HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD, 1);
}
HiveConf.setVar(conf, HiveConf.ConfVars.DYNAMIC_PARTITIONING_MODE, "nonstrict");
try {
this.inspector = IcebergObjectInspector.create(projectedSchema);
} catch (Exception e) {
throw new SerDeException(e);
}
}

private static Schema projectedSchema(Configuration configuration, String tableName, Schema tableSchema,
Map<String, String> jobConfs) {
Context.Operation operation = HiveCustomStorageHandlerUtils.getWriteOperation(configuration::get, tableName);
private static Schema projectedSchema(Configuration conf, String tableName, Schema tableSchema,
Map<String, String> jobConf) {
Context.Operation operation = HiveCustomStorageHandlerUtils.getWriteOperation(conf::get, tableName);
if (operation == null) {
jobConfs.put(InputFormatConfig.CASE_SENSITIVE, "false");
String[] selectedColumns = ColumnProjectionUtils.getReadColumnNames(configuration);
jobConf.put(InputFormatConfig.CASE_SENSITIVE, "false");
String[] selectedColumns = ColumnProjectionUtils.getReadColumnNames(conf);
// When same table is joined multiple times, it is possible some selected columns are duplicated,
// in this case wrong recordStructField position leads wrong value or ArrayIndexOutOfBoundException
String[] distinctSelectedColumns = Arrays.stream(selectedColumns).distinct().toArray(String[]::new);
@@ -175,7 +173,7 @@ private static Schema projectedSchema(Configuration configuration, String tableN
return projectedSchema;
}
}
boolean isCOW = IcebergTableUtil.isCopyOnWriteMode(operation, configuration::get);
boolean isCOW = IcebergTableUtil.isCopyOnWriteMode(operation, conf::get);
if (isCOW) {
return IcebergAcidUtil.createSerdeSchemaForDelete(tableSchema.columns());
}
@@ -212,16 +210,9 @@ public Writable serialize(Object o, ObjectInspector objectInspector) {
return row;
}

@Override
public SerDeStats getSerDeStats() {
return null;
}

@Override
public void handleJobLevelConfiguration(HiveConf conf) {
for (Map.Entry<String, String> confs : jobConf.entrySet()) {
conf.set(confs.getKey(), confs.getValue());
}
jobConf.forEach(conf::set);
}

@Override
@@ -268,7 +259,7 @@ private Schema hiveSchemaOrThrow(Exception previousException, boolean autoConver

/**
* If the table already exists then returns the list of the current Iceberg partition column names.
* If the table is not partitioned by Iceberg, or the table does not exists yet then returns an empty list.
* If the table is not partitioned by Iceberg, or the table does not exist yet then returns an empty list.
* @return The name of the Iceberg partition columns.
*/
public Collection<String> partitionColumns() {
Original file line number Diff line number Diff line change
@@ -41,7 +41,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections.MapUtils;
@@ -92,7 +91,6 @@
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.DummyPartition;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
@@ -194,7 +192,6 @@
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.base.Throwables;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -348,7 +345,7 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
boolean isMergeTaskEnabled = Boolean.parseBoolean(tableDesc.getProperty(
HiveCustomStorageHandlerUtils.MERGE_TASK_ENABLED + tableName));
if (isMergeTaskEnabled) {
HiveCustomStorageHandlerUtils.setMergeTaskEnabled(jobConf, tableName, isMergeTaskEnabled);
HiveCustomStorageHandlerUtils.setMergeTaskEnabled(jobConf, tableName, true);
}
String tables = jobConf.get(InputFormatConfig.OUTPUT_TABLES);
tables = tables == null ? tableName : tables + TABLE_NAME_SEPARATOR + tableName;
@@ -366,7 +363,7 @@ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
Utilities.addDependencyJars(jobConf, HiveIcebergStorageHandler.class);
}
} catch (IOException e) {
Throwables.propagate(e);
throw new RuntimeException(e);
}
}

@@ -606,14 +603,10 @@ private static Map<String, String> getPartishSummary(Partish partish, Table tabl
}

private Table getTable(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
Table table;
final Optional<QueryState> queryState = SessionStateUtil.getQueryState(conf);
if (!queryState.isPresent() || queryState.get().getNumModifiedRows() > 0) {
table = IcebergTableUtil.getTable(conf, hmsTable.getTTable(), true);
} else {
table = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
}
return table;
boolean skipCache = SessionStateUtil.getQueryState(conf)
.map(queryState -> queryState.getNumModifiedRows() > 0)
.orElse(true);
return IcebergTableUtil.getTable(conf, hmsTable.getTTable(), skipCache);
}

@Override
@@ -919,11 +912,10 @@ public DynamicPartitionCtx createDPContext(
DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(Maps.newLinkedHashMap(),
hiveConf.getVar(ConfVars.DEFAULT_PARTITION_NAME),
hiveConf.getIntVar(ConfVars.DYNAMIC_PARTITION_MAX_PARTS_PER_NODE));
List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs = Lists.newLinkedList();
dpCtx.setCustomSortExpressions(customSortExprs);

if (table.spec().isPartitioned()) {
addCustomSortExpr(table, hmsTable, writeOperation, customSortExprs, getPartitionTransformSpec(hmsTable));
if (table.spec().isPartitioned() &&
hiveConf.getIntVar(ConfVars.HIVE_OPT_SORT_DYNAMIC_PARTITION_THRESHOLD) >= 0) {
addCustomSortExpr(table, hmsTable, writeOperation, dpCtx, getPartitionTransformSpec(hmsTable));
}

SortOrder sortOrder = table.sortOrder();
@@ -951,15 +943,14 @@ public DynamicPartitionCtx createDPContext(
}
}

addCustomSortExpr(table, hmsTable, writeOperation, customSortExprs, getSortTransformSpec(table));
addCustomSortExpr(table, hmsTable, writeOperation, dpCtx, getSortTransformSpec(table));
}
dpCtx.setHasCustomSortExprs(!customSortExprs.isEmpty());

return dpCtx;
}

private void addCustomSortExpr(Table table, org.apache.hadoop.hive.ql.metadata.Table hmsTable,
Operation writeOperation, List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs,
Operation writeOperation, DynamicPartitionCtx dpCtx,
List<TransformSpec> transformSpecs) {
Map<String, Integer> fieldOrderMap = Maps.newHashMap();
List<Types.NestedField> fields = table.schema().columns();
@@ -970,7 +961,7 @@ private void addCustomSortExpr(Table table, org.apache.hadoop.hive.ql.metadata.
int offset = (shouldOverwrite(hmsTable, writeOperation) ?
ACID_VIRTUAL_COLS_AS_FIELD_SCHEMA : acidSelectColumns(hmsTable, writeOperation)).size();

customSortExprs.addAll(transformSpecs.stream().map(spec ->
dpCtx.addCustomSortExpressions(transformSpecs.stream().map(spec ->
IcebergTransformSortFunctionUtil.getCustomSortExprs(spec, fieldOrderMap.get(spec.getColumnName()) + offset)
).collect(Collectors.toList()));
}
@@ -1511,7 +1502,7 @@ public boolean addDynamicSplitPruningEdge(org.apache.hadoop.hive.ql.metadata.Tab
ExprNodeDesc syntheticFilterPredicate) {
try {
Collection<String> partitionColumns = ((HiveIcebergSerDe) table.getDeserializer()).partitionColumns();
if (partitionColumns.size() > 0) {
if (!partitionColumns.isEmpty()) {
// The filter predicate contains ExprNodeDynamicListDesc object(s) for places where we will substitute
// dynamic values later during execution. For Example:
// GenericUDFIn(Column[ss_sold_date_sk], RS[5] <-- This is an ExprNodeDynamicListDesc)
@@ -1805,19 +1796,16 @@ private String collectColumnAndReplaceDummyValues(ExprNodeDesc node, String foun
* </ul>
* @param tableProps table properties, must be not null
*/
private static void fallbackToNonVectorizedModeBasedOnProperties(Properties tableProps) {
private void fallbackToNonVectorizedModeBasedOnProperties(Properties tableProps) {
Schema tableSchema = SchemaParser.fromJson(tableProps.getProperty(InputFormatConfig.TABLE_SCHEMA));
String tableMeta = tableProps.getProperty(IcebergAcidUtil.META_TABLE_PROPERTY);
boolean isMetaTable = tableMeta != null && IcebergMetadataTables.isValidMetaTable(tableMeta);

if (FileFormat.AVRO.name().equalsIgnoreCase(tableProps.getProperty(TableProperties.DEFAULT_FILE_FORMAT)) ||
isMetaTable ||
isValidMetadataTable(tableProps.getProperty(IcebergAcidUtil.META_TABLE_PROPERTY)) ||
hasOrcTimeInSchema(tableProps, tableSchema) ||
!hasParquetNestedTypeWithinListOrMap(tableProps, tableSchema)) {
try {
Hive.get(false).getConf().setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false);
} catch (HiveException e) {
throw new RuntimeException(e);
}
// disable vectorization
SessionStateUtil.getQueryState(conf).ifPresent(queryState ->
queryState.getConf().setBoolVar(ConfVars.HIVE_VECTORIZATION_ENABLED, false));
}
}

Original file line number Diff line number Diff line change
@@ -49,7 +49,7 @@ public Properties getSplitProperties() throws IOException {
if (jobContextList.isEmpty()) {
return null;
}
List<ContentFile> contentFiles = HiveIcebergOutputCommitter.getInstance().getOutputContentFiles(jobContextList);
List<ContentFile<?>> contentFiles = HiveIcebergOutputCommitter.getInstance().getOutputContentFiles(jobContextList);
Properties pathToContentFile = new Properties();
contentFiles.forEach(contentFile ->
pathToContentFile.put(new Path(String.valueOf(contentFile.path())), contentFile));
Original file line number Diff line number Diff line change
@@ -19,12 +19,24 @@ insert into source values (1, 'one', 3);
insert into source values (1, 'two', 4);

explain extended
create external table tbl_ice partitioned by spec (bucket(16, a), truncate(3, b)) stored by iceberg stored as orc tblproperties ('format-version'='2') as
select a, b, c from source;
create external table tbl_ice partitioned by spec (bucket(16, a), truncate(3, b)) stored by iceberg stored as orc
as select a, b, c from source;

create external table tbl_ice partitioned by spec (bucket(16, a), truncate(3, b)) stored by iceberg stored as orc tblproperties ('format-version'='2') as
select a, b, c from source;
create external table tbl_ice partitioned by spec (bucket(16, a), truncate(3, b)) stored by iceberg stored as orc
as select a, b, c from source;

describe formatted tbl_ice;

select * from tbl_ice;

set hive.stats.autogather=false;
set hive.optimize.sort.dynamic.partition.threshold=-1;

explain
create external table tbl_ice partitioned by (c) stored by iceberg
as select * from source;

explain
create external table tbl_ice partitioned by (c) stored by iceberg
tblproperties ('write.fanout.enabled'='false')
as select * from source;
Loading
Oops, something went wrong.
Loading
Oops, something went wrong.