Skip to content

Commit

Permalink
HIVE-27455: Iceberg: Set COLUMN_STATS_ACCURATE after writing stats fo…
Browse files Browse the repository at this point in the history
…r Iceberg tables (Sourabh Badhya, reviewed by Denys Kuzmenko, Krisztian Kasa)

Closes apache#4440
  • Loading branch information
SourabhBadhya authored and tarak271 committed Dec 19, 2023
1 parent 133a29b commit 5f4c861
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,11 @@ public boolean setColStatistics(org.apache.hadoop.hive.ql.metadata.Table hmsTabl
PuffinCompressionCodec.NONE,
ImmutableMap.of()));
writer.finish();
return true;
} catch (IOException e) {
LOG.error(String.valueOf(e));
return false;
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Assume;
Expand Down Expand Up @@ -207,6 +208,27 @@ public void testStatsRemoved() throws IOException {
checkColStat(identifier.name(), "customer_id", false);
}

@Test
public void testColumnStatsAccurate() throws Exception {
TableIdentifier identifier = TableIdentifier.of("default", "customers");

shell.setHiveSessionValue(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname, true);
testTables.createTable(shell, identifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
PartitionSpec.unpartitioned(), fileFormat, ImmutableList.of());

String insert = testTables.getInsertQuery(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, identifier, true);
shell.executeStatement(insert);

org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", identifier.name());

// Assert whether basic stats and column stats are accurate.
Assert.assertTrue(hmsTable.getParameters().containsKey(StatsSetupConst.COLUMN_STATS_ACCURATE));
Assert.assertTrue(StatsSetupConst.areBasicStatsUptoDate(hmsTable.getParameters()));
for (NestedField nestedField : HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA.columns()) {
Assert.assertTrue(StatsSetupConst.areColumnStatsUptoDate(hmsTable.getParameters(), nestedField.name()));
}
}

private void checkColStat(String tableName, String colName, boolean accurate) {
List<Object[]> rows = shell.executeStatement("DESCRIBE " + tableName + " " + colName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ Retention: 0
#### A masked pattern was here ####
Table Type: EXTERNAL_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"}
COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"id\":\"true\",\"value\":\"true\"}}
EXTERNAL TRUE
bucketing_version 2
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"int\"},{\"id\":2,\"name\":\"value\",\"required\":false,\"type\":\"string\"}]}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public int process(Hive db, Table tbl) throws Exception {

LOG.info("Executing stats task");
table = tbl;
return aggregateStats(db);
return aggregateStats(db, tbl);
}

@Override
Expand Down Expand Up @@ -264,7 +264,7 @@ public void process(StatsAggregator statsAggregator) throws HiveException, MetaE
}
}

private int aggregateStats(Hive db) {
private int aggregateStats(Hive db, Table tbl) {

StatsAggregator statsAggregator = null;
int ret = 0;
Expand Down Expand Up @@ -314,6 +314,11 @@ private int aggregateStats(Hive db) {
}
LOG.info("Table " + tableFullName + " stats: [" + toString(p.getPartParameters()) + ']');

// The table object is assigned to the latest table object.
// So that it can be used by ColStatsProcessor.
// This is only required for unpartitioned tables.
tbl.setTTable(res.getTTable());

} else {
// Partitioned table:
// Need to get the old stats of the partition
Expand Down
24 changes: 22 additions & 2 deletions ql/src/java/org/apache/hadoop/hive/ql/stats/ColStatsProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.common.HiveStatsUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -34,6 +36,7 @@
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
Expand All @@ -60,7 +63,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ColStatsProcessor implements IStatsProcessor {
private static transient final Logger LOG = LoggerFactory.getLogger(ColStatsProcessor.class);

Expand Down Expand Up @@ -219,8 +221,12 @@ public int persistColumnStats(Hive db, Table tbl) throws HiveException, MetaExce

start = System. currentTimeMillis();
if (tbl != null && tbl.isNonNative() && tbl.getStorageHandler().canSetColStatistics(tbl)) {
tbl.getStorageHandler().setColStatistics(tbl, colStats);
boolean success = tbl.getStorageHandler().setColStatistics(tbl, colStats);
if (!(tbl.isMaterializedView() || tbl.isView() || tbl.isTemporary())) {
setOrRemoveColumnStatsAccurateProperty(db, tbl, colStatDesc.getColName(), success);
}
}
// TODO: Write stats for native tables only (See HIVE-27421)
db.setPartitionColumnStatistics(request);
end = System.currentTimeMillis();
LOG.info("Time taken to update " + colStats.size() + " stats : " + ((end - start)/1000F) + " seconds.");
Expand All @@ -232,6 +238,20 @@ public int persistColumnStats(Hive db, Table tbl) throws HiveException, MetaExce
public void setDpPartSpecs(Collection<Partition> dpPartSpecs) {
}

private void setOrRemoveColumnStatsAccurateProperty(Hive db, Table tbl, List<String> colNames, boolean success) throws HiveException {
if (CollectionUtils.isEmpty(colNames) || !colStatDesc.isTblLevel()) {
return;
}
EnvironmentContext environmentContext = new EnvironmentContext();
environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
if (success) {
StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames);
} else {
StatsSetupConst.removeColumnStatsState(tbl.getParameters(), colNames);
}
db.alterTable(tbl.getFullyQualifiedName(), tbl, environmentContext, false);
}

/**
* Enumeration of column stats fields that can currently
* be computed. Each one has a field name associated.
Expand Down

0 comments on commit 5f4c861

Please sign in to comment.