Skip to content

Commit

Permalink
HIVE-27235: Iceberg: CREATE TAG SQL implementation (Butao Zhang, revi…
Browse files Browse the repository at this point in the history
…ewed by Denys Kuzmenko)

Closes apache#4372
  • Loading branch information
zhangbutao authored and tarak271 committed Dec 19, 2023
1 parent 21adc21 commit 133a29b
Show file tree
Hide file tree
Showing 27 changed files with 491 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
static final EnumSet<AlterTableType> SUPPORTED_ALTER_OPS = EnumSet.of(
AlterTableType.ADDCOLS, AlterTableType.REPLACE_COLUMNS, AlterTableType.RENAME_COLUMN,
AlterTableType.ADDPROPS, AlterTableType.DROPPROPS, AlterTableType.SETPARTITIONSPEC,
AlterTableType.UPDATE_COLUMNS, AlterTableType.RENAME, AlterTableType.EXECUTE, AlterTableType.CREATE_BRANCH);
AlterTableType.UPDATE_COLUMNS, AlterTableType.RENAME, AlterTableType.EXECUTE, AlterTableType.CREATE_BRANCH,
AlterTableType.CREATE_TAG);
private static final List<String> MIGRATION_ALLOWED_SOURCE_FORMATS = ImmutableList.of(
FileFormat.PARQUET.name().toLowerCase(),
FileFormat.ORC.name().toLowerCase(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@
import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.ql.parse.AlterTableBranchSpec;
import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
import org.apache.hadoop.hive.ql.parse.AlterTableSnapshotRefSpec;
import org.apache.hadoop.hive.ql.parse.PartitionTransform;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.StorageFormat;
Expand Down Expand Up @@ -789,25 +789,29 @@ private static ExecutorService getDeleteExecutorService(String completeName, int
}

@Override
public void alterTableBranchOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
AlterTableBranchSpec alterBranchSpec) {
public void alterTableSnapshotRefOperation(org.apache.hadoop.hive.ql.metadata.Table hmsTable,
AlterTableSnapshotRefSpec alterTableSnapshotRefSpec) {
TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
Table icebergTable = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
Optional.ofNullable(icebergTable.currentSnapshot()).orElseThrow(() ->
new UnsupportedOperationException(String.format("Cannot alter branch on iceberg table" +
" %s.%s which has no snapshot", hmsTable.getDbName(), hmsTable.getTableName())));
new UnsupportedOperationException(String.format("Cannot alter %s on iceberg table %s.%s which has no snapshot",
alterTableSnapshotRefSpec.getOperationType().getName(), hmsTable.getDbName(), hmsTable.getTableName())));

switch (alterBranchSpec.getOperationType()) {
switch (alterTableSnapshotRefSpec.getOperationType()) {
case CREATE_BRANCH:
AlterTableBranchSpec.CreateBranchSpec createBranchSpec =
(AlterTableBranchSpec.CreateBranchSpec) alterBranchSpec.getOperationParams();
AlterTableSnapshotRefSpec.CreateSnapshotRefSpec createBranchSpec =
(AlterTableSnapshotRefSpec.CreateSnapshotRefSpec) alterTableSnapshotRefSpec.getOperationParams();
IcebergBranchExec.createBranch(icebergTable, createBranchSpec);
break;
case CREATE_TAG:
AlterTableSnapshotRefSpec.CreateSnapshotRefSpec createTagSpec =
(AlterTableSnapshotRefSpec.CreateSnapshotRefSpec) alterTableSnapshotRefSpec.getOperationParams();
IcebergTagExec.createTag(icebergTable, createTagSpec);
break;
default:
throw new UnsupportedOperationException(
String.format("Operation type %s is not supported", alterBranchSpec.getOperationType().name()));
throw new UnsupportedOperationException(String.format(
"Operation type %s is not supported", alterTableSnapshotRefSpec.getOperationType().getName()));
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.iceberg.mr.hive;

import org.apache.hadoop.hive.ql.parse.AlterTableBranchSpec;
import org.apache.hadoop.hive.ql.parse.AlterTableSnapshotRefSpec;
import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.Table;
import org.apache.iceberg.util.SnapshotUtil;
Expand All @@ -38,8 +38,8 @@ private IcebergBranchExec() {
* @param table the iceberg table
* @param createBranchSpec Get the basic parameters needed to create a branch
*/
public static void createBranch(Table table, AlterTableBranchSpec.CreateBranchSpec createBranchSpec) {
String branchName = createBranchSpec.getBranchName();
public static void createBranch(Table table, AlterTableSnapshotRefSpec.CreateSnapshotRefSpec createBranchSpec) {
String branchName = createBranchSpec.getRefName();
Long snapshotId = null;
if (createBranchSpec.getSnapshotId() != null) {
snapshotId = createBranchSpec.getSnapshotId();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import org.apache.hadoop.hive.ql.parse.AlterTableSnapshotRefSpec;
import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.Table;
import org.apache.iceberg.util.SnapshotUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergTagExec {

private static final Logger LOG = LoggerFactory.getLogger(IcebergTagExec.class);

private IcebergTagExec() {
}

public static void createTag(Table table, AlterTableSnapshotRefSpec.CreateSnapshotRefSpec createTagSpec) {
String tagName = createTagSpec.getRefName();
Long snapshotId = null;
if (createTagSpec.getSnapshotId() != null) {
snapshotId = createTagSpec.getSnapshotId();
} else if (createTagSpec.getAsOfTime() != null) {
snapshotId = SnapshotUtil.snapshotIdAsOfTime(table, createTagSpec.getAsOfTime());
} else {
snapshotId = table.currentSnapshot().snapshotId();
}
LOG.info("Creating tag {} on iceberg table {} with snapshotId {}", tagName, table.name(), snapshotId);
ManageSnapshots manageSnapshots = table.manageSnapshots();
manageSnapshots.createTag(tagName, snapshotId);
if (createTagSpec.getMaxRefAgeMs() != null) {
manageSnapshots.setMaxRefAgeMs(tagName, createTagSpec.getMaxRefAgeMs());
}

manageSnapshots.commit();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.mr.hive;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.junit.Assert;
import org.junit.Test;

import static org.apache.iceberg.mr.hive.HiveIcebergTestUtils.timestampAfterSnapshot;

public class TestHiveIcebergTagOperation extends HiveIcebergStorageHandlerWithEngineBase {
@Test
public void testCreateTagWithDefaultConfig() throws InterruptedException, IOException {
Table table =
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);

String tagName = "test_tag_1";
shell.executeStatement(String.format("ALTER TABLE customers CREATE TAG %s", tagName));
table.refresh();
SnapshotRef ref = table.refs().get(tagName);
Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId());
Assert.assertNull(ref.maxRefAgeMs());

// creating a tag which is already exists will fail
try {
shell.executeStatement(String.format("ALTER TABLE customers CREATE TAG %s", tagName));
} catch (Throwable e) {
while (e.getCause() != null) {
e = e.getCause();
}
Assert.assertTrue(e.getMessage().contains("Ref test_tag_1 already exists"));
}
}

@Test
public void testCreateTagWithSnapshotId() throws InterruptedException, IOException {
Table table =
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);

String tagName = "test_tag_1";
Long snapshotId = table.history().get(0).snapshotId();
shell.executeStatement(String.format("ALTER TABLE customers CREATE TAG %s FOR SYSTEM_VERSION AS OF %d",
tagName, snapshotId));
table.refresh();
SnapshotRef ref = table.refs().get(tagName);
Assert.assertEquals(snapshotId.longValue(), ref.snapshotId());
Assert.assertNull(ref.maxRefAgeMs());
}

@Test
public void testCreateTagWithTimeStamp() throws InterruptedException, IOException {
Table table =
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);

String tagName = "test_tag_1";
Long snapshotId = table.history().get(0).snapshotId();

shell.executeStatement(String.format("ALTER TABLE customers CREATE TAG %s FOR SYSTEM_TIME AS OF '%s'",
tagName, timestampAfterSnapshot(table, 0)));
table.refresh();
SnapshotRef ref = table.refs().get(tagName);
Assert.assertEquals(snapshotId.longValue(), ref.snapshotId());
}

@Test
public void testCreateTagWithMaxRefAge() throws InterruptedException, IOException {
Table table =
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);

String tagName = "test_tag_1";
long maxRefAge = 5L;
shell.executeStatement(String.format("ALTER TABLE customers CREATE TAG %s RETAIN %d DAYS", tagName, maxRefAge));
table.refresh();
SnapshotRef ref = table.refs().get(tagName);
Assert.assertEquals(table.currentSnapshot().snapshotId(), ref.snapshotId());
Assert.assertEquals(TimeUnit.DAYS.toMillis(maxRefAge), ref.maxRefAgeMs().longValue());
}

@Test
public void testCreateTagWithAllCustomConfig() throws IOException, InterruptedException {
Table table =
testTables.createTableWithVersions(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);

String tagName = "test_tag_1";
Long snapshotId = table.history().get(0).snapshotId();
long maxRefAge = 5L;
shell.executeStatement(String.format("ALTER TABLE customers CREATE TAG %s FOR SYSTEM_VERSION AS OF %d RETAIN" +
" %d DAYS",
tagName, snapshotId, maxRefAge));
table.refresh();
SnapshotRef ref = table.refs().get(tagName);
Assert.assertEquals(snapshotId.longValue(), ref.snapshotId());
Assert.assertEquals(TimeUnit.DAYS.toMillis(maxRefAge), ref.maxRefAgeMs().longValue());
}

@Test
public void testCreateTagWithNonIcebergTable() {
shell.executeStatement("create table nonice_tbl (id int, name string)");

String tagName = "test_tag_1";
try {
shell.executeStatement(String.format("ALTER TABLE nonice_tbl CREATE TAG %s", tagName));
} catch (Throwable e) {
while (e.getCause() != null) {
e = e.getCause();
}
Assert.assertTrue(e.getMessage().contains("Not an iceberg table"));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
create table ice_tbl (id int, name string) Stored by Iceberg;

alter table ice_tbl create tag test_branch_1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- SORT_QUERY_RESULTS
set hive.explain.user=false;

create table iceTbl (id int, name string) Stored by Iceberg;

-- creating tag requires table to have current snapshot. here insert some values to generate current snapshot
insert into iceTbl values(1, 'jack');

-- create tag with default values based on the current snapshotId
explain alter table iceTbl create tag test_tag_1;
alter table iceTbl create tag test_tag_1;
select name, max_reference_age_in_ms from default.iceTbl.refs where type='TAG';

-- create a tag which could be retained 5 days based on the current snapshotId
insert into iceTbl values(2, 'bob');
explain alter table iceTbl create tag test_tag_2 retain 5 days;
alter table iceTbl create tag test_tag_2 retain 5 days;
select name, max_reference_age_in_ms from default.iceTbl.refs where type='TAG';

Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ POSTHOOK: Output: default@ice_tbl
PREHOOK: query: alter table ice_tbl create branch test_branch_1
PREHOOK: type: ALTERTABLE_CREATEBRANCH
PREHOOK: Input: default@ice_tbl
FAILED: Execution Error, return code 40000 from org.apache.hadoop.hive.ql.ddl.DDLTask. java.lang.UnsupportedOperationException: Cannot alter branch on iceberg table default.ice_tbl which has no snapshot
FAILED: Execution Error, return code 40000 from org.apache.hadoop.hive.ql.ddl.DDLTask. java.lang.UnsupportedOperationException: Cannot alter create branch on iceberg table default.ice_tbl which has no snapshot
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
PREHOOK: query: create table ice_tbl (id int, name string) Stored by Iceberg
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@ice_tbl
POSTHOOK: query: create table ice_tbl (id int, name string) Stored by Iceberg
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@ice_tbl
PREHOOK: query: alter table ice_tbl create tag test_branch_1
PREHOOK: type: ALTERTABLE_CREATETAG
PREHOOK: Input: default@ice_tbl
FAILED: Execution Error, return code 40000 from org.apache.hadoop.hive.ql.ddl.DDLTask. java.lang.UnsupportedOperationException: Cannot alter create tag on iceberg table default.ice_tbl which has no snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ STAGE DEPENDENCIES:

STAGE PLANS:
Stage: Stage-0
CreateBranch operation
CreateSnapshotRef Operation
table name: default.iceTbl
spec: AlterTableBranchSpec{operationType=CREATE_BRANCH, operationParams=CreateBranchSpec{branchName=test_branch_1, snapshotId=null, asOfTime=null, maxRefAgeMs=null, minSnapshotsToKeep=null, maxSnapshotAgeMs=null}}
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{branchName=test_branch_1}}

PREHOOK: query: alter table iceTbl create branch test_branch_1
PREHOOK: type: ALTERTABLE_CREATEBRANCH
Expand Down Expand Up @@ -63,9 +63,9 @@ STAGE DEPENDENCIES:

STAGE PLANS:
Stage: Stage-0
CreateBranch operation
CreateSnapshotRef Operation
table name: default.iceTbl
spec: AlterTableBranchSpec{operationType=CREATE_BRANCH, operationParams=CreateBranchSpec{branchName=test_branch_2, snapshotId=null, asOfTime=null, maxRefAgeMs=432000000, minSnapshotsToKeep=null, maxSnapshotAgeMs=null}}
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{branchName=test_branch_2, maxRefAgeMs=432000000}}

PREHOOK: query: alter table iceTbl create branch test_branch_2 retain 5 days
PREHOOK: type: ALTERTABLE_CREATEBRANCH
Expand Down Expand Up @@ -102,9 +102,9 @@ STAGE DEPENDENCIES:

STAGE PLANS:
Stage: Stage-0
CreateBranch operation
CreateSnapshotRef Operation
table name: default.iceTbl
spec: AlterTableBranchSpec{operationType=CREATE_BRANCH, operationParams=CreateBranchSpec{branchName=test_branch_3, snapshotId=null, asOfTime=null, maxRefAgeMs=null, minSnapshotsToKeep=5, maxSnapshotAgeMs=null}}
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{branchName=test_branch_3, minSnapshotsToKeep=5}}

PREHOOK: query: alter table iceTbl create branch test_branch_3 with snapshot retention 5 snapshots
PREHOOK: type: ALTERTABLE_CREATEBRANCH
Expand Down Expand Up @@ -142,9 +142,9 @@ STAGE DEPENDENCIES:

STAGE PLANS:
Stage: Stage-0
CreateBranch operation
CreateSnapshotRef Operation
table name: default.iceTbl
spec: AlterTableBranchSpec{operationType=CREATE_BRANCH, operationParams=CreateBranchSpec{branchName=test_branch_4, snapshotId=null, asOfTime=null, maxRefAgeMs=null, minSnapshotsToKeep=5, maxSnapshotAgeMs=432000000}}
spec: AlterTableSnapshotRefSpec{operationType=CREATE_BRANCH, operationParams=CreateSnapshotRefSpec{branchName=test_branch_4, minSnapshotsToKeep=5, maxSnapshotAgeMs=432000000}}

PREHOOK: query: alter table iceTbl create branch test_branch_4 with snapshot retention 5 snapshots 5 days
PREHOOK: type: ALTERTABLE_CREATEBRANCH
Expand Down
Loading

0 comments on commit 133a29b

Please sign in to comment.