Skip to content

Commit

Permalink
HIVE-27522: Iceberg: Bucket partition transformation date type suppor…
Browse files Browse the repository at this point in the history
…t (Denys Kuzmenko, reviewed by Attila Turoczy, Ayush Saxena, Butao Zhang, Sourabh Badhya)

Closes apache#4507
  • Loading branch information
deniskuzZ authored and tarak271 committed Dec 19, 2023
1 parent 3bf6395 commit 24f2417
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

package org.apache.iceberg.mr.hive;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.function.Function;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.io.DateWritableV2;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
Expand All @@ -39,7 +40,6 @@
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -80,13 +80,13 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen
argumentOI = (PrimitiveObjectInspector) arguments[0];

PrimitiveObjectInspector.PrimitiveCategory inputType = argumentOI.getPrimitiveCategory();
ObjectInspector outputOI = null;
ObjectInspector outputOI;
switch (inputType) {
case CHAR:
case VARCHAR:
case STRING:
converter = new PrimitiveObjectInspectorConverter.StringConverter(argumentOI);
Transform<String, Integer> stringTransform = Transforms.bucket(Types.StringType.get(), numBuckets);
Function<Object, Integer> stringTransform = Transforms.bucket(numBuckets).bind(Types.StringType.get());
evaluator = arg -> {
String val = (String) converter.convert(arg.get());
result.set(stringTransform.apply(val));
Expand All @@ -96,7 +96,7 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen
case BINARY:
converter = new PrimitiveObjectInspectorConverter.BinaryConverter(argumentOI,
PrimitiveObjectInspectorFactory.writableBinaryObjectInspector);
Transform<ByteBuffer, Integer> byteBufferTransform = Transforms.bucket(Types.BinaryType.get(), numBuckets);
Function<Object, Integer> byteBufferTransform = Transforms.bucket(numBuckets).bind(Types.BinaryType.get());
evaluator = arg -> {
BytesWritable val = (BytesWritable) converter.convert(arg.get());
ByteBuffer byteBuffer = ByteBuffer.wrap(val.getBytes(), 0, val.getLength());
Expand All @@ -107,7 +107,7 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen
case INT:
converter = new PrimitiveObjectInspectorConverter.IntConverter(argumentOI,
PrimitiveObjectInspectorFactory.writableIntObjectInspector);
Transform<Integer, Integer> intTransform = Transforms.bucket(Types.IntegerType.get(), numBuckets);
Function<Object, Integer> intTransform = Transforms.bucket(numBuckets).bind(Types.IntegerType.get());
evaluator = arg -> {
IntWritable val = (IntWritable) converter.convert(arg.get());
result.set(intTransform.apply(val.get()));
Expand All @@ -117,7 +117,7 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen
case LONG:
converter = new PrimitiveObjectInspectorConverter.LongConverter(argumentOI,
PrimitiveObjectInspectorFactory.writableLongObjectInspector);
Transform<Long, Integer> longTransform = Transforms.bucket(Types.LongType.get(), numBuckets);
Function<Object, Integer> longTransform = Transforms.bucket(numBuckets).bind(Types.LongType.get());
evaluator = arg -> {
LongWritable val = (LongWritable) converter.convert(arg.get());
result.set(longTransform.apply(val.get()));
Expand All @@ -131,7 +131,7 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen

converter = new PrimitiveObjectInspectorConverter.HiveDecimalConverter(argumentOI,
PrimitiveObjectInspectorFactory.writableHiveDecimalObjectInspector);
Transform<BigDecimal, Integer> bigDecimalTransform = Transforms.bucket(decimalIcebergType, numBuckets);
Function<Object, Integer> bigDecimalTransform = Transforms.bucket(numBuckets).bind(decimalIcebergType);
evaluator = arg -> {
HiveDecimalWritable val = (HiveDecimalWritable) converter.convert(arg.get());
result.set(bigDecimalTransform.apply(val.getHiveDecimal().bigDecimalValue()));
Expand All @@ -141,7 +141,7 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen
case FLOAT:
converter = new PrimitiveObjectInspectorConverter.FloatConverter(argumentOI,
PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
Transform<Float, Integer> floatTransform = Transforms.bucket(Types.FloatType.get(), numBuckets);
Function<Object, Integer> floatTransform = Transforms.bucket(numBuckets).bind(Types.FloatType.get());
evaluator = arg -> {
FloatWritable val = (FloatWritable) converter.convert(arg.get());
result.set(floatTransform.apply(val.get()));
Expand All @@ -151,16 +151,26 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen
case DOUBLE:
converter = new PrimitiveObjectInspectorConverter.DoubleConverter(argumentOI,
PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
Transform<Double, Integer> doubleTransform = Transforms.bucket(Types.DoubleType.get(), numBuckets);
Function<Object, Integer> doubleTransform = Transforms.bucket(numBuckets).bind(Types.DoubleType.get());
evaluator = arg -> {
DoubleWritable val = (DoubleWritable) converter.convert(arg.get());
result.set(doubleTransform.apply(val.get()));
};
break;

case DATE:
converter = new PrimitiveObjectInspectorConverter.DateConverter(argumentOI,
PrimitiveObjectInspectorFactory.writableDateObjectInspector);
Function<Object, Integer> dateTransform = Transforms.bucket(numBuckets).bind(Types.DateType.get());
evaluator = arg -> {
DateWritableV2 val = (DateWritableV2) converter.convert(arg.get());
result.set(dateTransform.apply(val.getDays()));
};
break;

default:
throw new UDFArgumentException(
" ICEBERG_BUCKET() only takes STRING/CHAR/VARCHAR/BINARY/INT/LONG/DECIMAL/FLOAT/DOUBLE" +
" ICEBERG_BUCKET() only takes STRING/CHAR/VARCHAR/BINARY/INT/LONG/DECIMAL/FLOAT/DOUBLE/DATE" +
" types as first argument, got " + inputType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ drop table if exists tbl_src;
drop table if exists tbl_target_identity;
drop table if exists tbl_target_bucket;
drop table if exists tbl_target_mixed;

drop table if exists tbl_bucket_date;

create external table tbl_src (a int, b string, c bigint) stored by iceberg stored as orc;
insert into tbl_src values (1, 'EUR', 10), (2, 'EUR', 10), (3, 'USD', 11), (4, 'EUR', 12), (5, 'HUF', 30), (6, 'USD', 10), (7, 'USD', 100), (8, 'PLN', 20), (9, 'PLN', 11), (10, 'CZK', 5), (12, NULL, NULL);
Expand Down Expand Up @@ -43,3 +43,14 @@ insert into table tbl_target_mixed select * from tbl_src where b = 'USD' and c =

select * from tbl_target_mixed order by a, ccy;
select * from default.tbl_target_mixed.files;

--bucket partition transforms with DATE column type
create external table tbl_bucket_date (id string, date_time_date date, year_partition int)
partitioned by spec (year_partition, bucket(1, date_time_date))
stored by iceberg stored as parquet
tblproperties ('parquet.compression'='snappy','format-version'='2');

insert into tbl_bucket_date values (88669, '2018-05-27', 2018), (40568, '2018-02-12', 2018), (40568, '2018-07-03', 2018);
update tbl_bucket_date set date_time_date = '2018-07-02' where date_time_date = '2018-07-03';

select count(*) from tbl_bucket_date where date_time_date = '2018-07-02';
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ PREHOOK: query: drop table if exists tbl_target_mixed
PREHOOK: type: DROPTABLE
POSTHOOK: query: drop table if exists tbl_target_mixed
POSTHOOK: type: DROPTABLE
PREHOOK: query: drop table if exists tbl_bucket_date
PREHOOK: type: DROPTABLE
POSTHOOK: query: drop table if exists tbl_bucket_date
POSTHOOK: type: DROPTABLE
PREHOOK: query: create external table tbl_src (a int, b string, c bigint) stored by iceberg stored as orc
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
Expand Down Expand Up @@ -551,3 +555,44 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
0 hdfs://### HDFS PATH ### ORC 0 {"ccy":"USD","c_bucket":2} 1 432 {1:6,2:12,3:6} {1:1,2:1,3:1} {1:0,2:0,3:0} {} {1:,2:USD,3:
} {1:,2:USD,3:
} NULL [3] NULL 0 {"a":{"column_size":6,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":6,"upper_bound":6},"c":{"column_size":6,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":10,"upper_bound":10},"ccy":{"column_size":12,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":"USD","upper_bound":"USD"}}
PREHOOK: query: create external table tbl_bucket_date (id string, date_time_date date, year_partition int)
partitioned by spec (year_partition, bucket(1, date_time_date))
stored by iceberg stored as parquet
tblproperties ('parquet.compression'='snappy','format-version'='2')
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@tbl_bucket_date
POSTHOOK: query: create external table tbl_bucket_date (id string, date_time_date date, year_partition int)
partitioned by spec (year_partition, bucket(1, date_time_date))
stored by iceberg stored as parquet
tblproperties ('parquet.compression'='snappy','format-version'='2')
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@tbl_bucket_date
PREHOOK: query: insert into tbl_bucket_date values (88669, '2018-05-27', 2018), (40568, '2018-02-12', 2018), (40568, '2018-07-03', 2018)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@tbl_bucket_date
POSTHOOK: query: insert into tbl_bucket_date values (88669, '2018-05-27', 2018), (40568, '2018-02-12', 2018), (40568, '2018-07-03', 2018)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_bucket_date
PREHOOK: query: update tbl_bucket_date set date_time_date = '2018-07-02' where date_time_date = '2018-07-03'
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_bucket_date
PREHOOK: Output: default@tbl_bucket_date
PREHOOK: Output: default@tbl_bucket_date
POSTHOOK: query: update tbl_bucket_date set date_time_date = '2018-07-02' where date_time_date = '2018-07-03'
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_bucket_date
POSTHOOK: Output: default@tbl_bucket_date
POSTHOOK: Output: default@tbl_bucket_date
PREHOOK: query: select count(*) from tbl_bucket_date where date_time_date = '2018-07-02'
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_bucket_date
PREHOOK: Output: hdfs://### HDFS PATH ###
POSTHOOK: query: select count(*) from tbl_bucket_date where date_time_date = '2018-07-02'
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_bucket_date
POSTHOOK: Output: hdfs://### HDFS PATH ###
1

0 comments on commit 24f2417

Please sign in to comment.