diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/GenericUDFIcebergBucket.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/GenericUDFIcebergBucket.java index 52b0a1edbf79..5f27319d12f8 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/GenericUDFIcebergBucket.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/GenericUDFIcebergBucket.java @@ -18,13 +18,14 @@ package org.apache.iceberg.mr.hive; -import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.util.function.Function; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.io.DateWritableV2; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; @@ -39,7 +40,6 @@ import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.iceberg.transforms.Transform; import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -80,13 +80,13 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen argumentOI = (PrimitiveObjectInspector) arguments[0]; PrimitiveObjectInspector.PrimitiveCategory inputType = argumentOI.getPrimitiveCategory(); - ObjectInspector outputOI = null; + ObjectInspector outputOI; switch (inputType) { case CHAR: case VARCHAR: case STRING: converter = new PrimitiveObjectInspectorConverter.StringConverter(argumentOI); - Transform stringTransform = Transforms.bucket(Types.StringType.get(), numBuckets); + Function stringTransform = Transforms.bucket(numBuckets).bind(Types.StringType.get()); evaluator = arg -> { String val = (String) converter.convert(arg.get()); result.set(stringTransform.apply(val)); @@ -96,7 +96,7 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen case BINARY: converter = new PrimitiveObjectInspectorConverter.BinaryConverter(argumentOI, PrimitiveObjectInspectorFactory.writableBinaryObjectInspector); - Transform byteBufferTransform = Transforms.bucket(Types.BinaryType.get(), numBuckets); + Function byteBufferTransform = Transforms.bucket(numBuckets).bind(Types.BinaryType.get()); evaluator = arg -> { BytesWritable val = (BytesWritable) converter.convert(arg.get()); ByteBuffer byteBuffer = ByteBuffer.wrap(val.getBytes(), 0, val.getLength()); @@ -107,7 +107,7 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen case INT: converter = new PrimitiveObjectInspectorConverter.IntConverter(argumentOI, PrimitiveObjectInspectorFactory.writableIntObjectInspector); - Transform intTransform = Transforms.bucket(Types.IntegerType.get(), numBuckets); + Function intTransform = Transforms.bucket(numBuckets).bind(Types.IntegerType.get()); evaluator = arg -> { IntWritable val = (IntWritable) converter.convert(arg.get()); result.set(intTransform.apply(val.get())); @@ -117,7 +117,7 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen case LONG: converter = new PrimitiveObjectInspectorConverter.LongConverter(argumentOI, PrimitiveObjectInspectorFactory.writableLongObjectInspector); - Transform longTransform = Transforms.bucket(Types.LongType.get(), numBuckets); + Function longTransform = Transforms.bucket(numBuckets).bind(Types.LongType.get()); evaluator = arg -> { LongWritable val = (LongWritable) converter.convert(arg.get()); result.set(longTransform.apply(val.get())); @@ -131,7 +131,7 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen converter = new PrimitiveObjectInspectorConverter.HiveDecimalConverter(argumentOI, PrimitiveObjectInspectorFactory.writableHiveDecimalObjectInspector); - Transform bigDecimalTransform = Transforms.bucket(decimalIcebergType, numBuckets); + Function bigDecimalTransform = Transforms.bucket(numBuckets).bind(decimalIcebergType); evaluator = arg -> { HiveDecimalWritable val = (HiveDecimalWritable) converter.convert(arg.get()); result.set(bigDecimalTransform.apply(val.getHiveDecimal().bigDecimalValue())); @@ -141,7 +141,7 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen case FLOAT: converter = new PrimitiveObjectInspectorConverter.FloatConverter(argumentOI, PrimitiveObjectInspectorFactory.writableFloatObjectInspector); - Transform floatTransform = Transforms.bucket(Types.FloatType.get(), numBuckets); + Function floatTransform = Transforms.bucket(numBuckets).bind(Types.FloatType.get()); evaluator = arg -> { FloatWritable val = (FloatWritable) converter.convert(arg.get()); result.set(floatTransform.apply(val.get())); @@ -151,16 +151,26 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen case DOUBLE: converter = new PrimitiveObjectInspectorConverter.DoubleConverter(argumentOI, PrimitiveObjectInspectorFactory.writableDoubleObjectInspector); - Transform doubleTransform = Transforms.bucket(Types.DoubleType.get(), numBuckets); + Function doubleTransform = Transforms.bucket(numBuckets).bind(Types.DoubleType.get()); evaluator = arg -> { DoubleWritable val = (DoubleWritable) converter.convert(arg.get()); result.set(doubleTransform.apply(val.get())); }; break; + case DATE: + converter = new PrimitiveObjectInspectorConverter.DateConverter(argumentOI, + PrimitiveObjectInspectorFactory.writableDateObjectInspector); + Function dateTransform = Transforms.bucket(numBuckets).bind(Types.DateType.get()); + evaluator = arg -> { + DateWritableV2 val = (DateWritableV2) converter.convert(arg.get()); + result.set(dateTransform.apply(val.getDays())); + }; + break; + default: throw new UDFArgumentException( - " ICEBERG_BUCKET() only takes STRING/CHAR/VARCHAR/BINARY/INT/LONG/DECIMAL/FLOAT/DOUBLE" + + " ICEBERG_BUCKET() only takes STRING/CHAR/VARCHAR/BINARY/INT/LONG/DECIMAL/FLOAT/DOUBLE/DATE" + " types as first argument, got " + inputType); } diff --git a/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q index 0cbb7d23d538..a70f19316156 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q @@ -7,7 +7,7 @@ drop table if exists tbl_src; drop table if exists tbl_target_identity; drop table if exists tbl_target_bucket; drop table if exists tbl_target_mixed; - +drop table if exists tbl_bucket_date; create external table tbl_src (a int, b string, c bigint) stored by iceberg stored as orc; insert into tbl_src values (1, 'EUR', 10), (2, 'EUR', 10), (3, 'USD', 11), (4, 'EUR', 12), (5, 'HUF', 30), (6, 'USD', 10), (7, 'USD', 100), (8, 'PLN', 20), (9, 'PLN', 11), (10, 'CZK', 5), (12, NULL, NULL); @@ -43,3 +43,14 @@ insert into table tbl_target_mixed select * from tbl_src where b = 'USD' and c = select * from tbl_target_mixed order by a, ccy; select * from default.tbl_target_mixed.files; + +--bucket partition transforms with DATE column type +create external table tbl_bucket_date (id string, date_time_date date, year_partition int) + partitioned by spec (year_partition, bucket(1, date_time_date)) +stored by iceberg stored as parquet +tblproperties ('parquet.compression'='snappy','format-version'='2'); + +insert into tbl_bucket_date values (88669, '2018-05-27', 2018), (40568, '2018-02-12', 2018), (40568, '2018-07-03', 2018); +update tbl_bucket_date set date_time_date = '2018-07-02' where date_time_date = '2018-07-03'; + +select count(*) from tbl_bucket_date where date_time_date = '2018-07-02'; diff --git a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out index 037d1b439f73..1a93668ac126 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out @@ -14,6 +14,10 @@ PREHOOK: query: drop table if exists tbl_target_mixed PREHOOK: type: DROPTABLE POSTHOOK: query: drop table if exists tbl_target_mixed POSTHOOK: type: DROPTABLE +PREHOOK: query: drop table if exists tbl_bucket_date +PREHOOK: type: DROPTABLE +POSTHOOK: query: drop table if exists tbl_bucket_date +POSTHOOK: type: DROPTABLE PREHOOK: query: create external table tbl_src (a int, b string, c bigint) stored by iceberg stored as orc PREHOOK: type: CREATETABLE PREHOOK: Output: database:default @@ -551,3 +555,44 @@ POSTHOOK: Output: hdfs://### HDFS PATH ### 0 hdfs://### HDFS PATH ### ORC 0 {"ccy":"USD","c_bucket":2} 1 432 {1:6,2:12,3:6} {1:1,2:1,3:1} {1:0,2:0,3:0} {} {1:,2:USD,3: } {1:,2:USD,3: } NULL [3] NULL 0 {"a":{"column_size":6,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":6,"upper_bound":6},"c":{"column_size":6,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":10,"upper_bound":10},"ccy":{"column_size":12,"value_count":1,"null_value_count":0,"nan_value_count":null,"lower_bound":"USD","upper_bound":"USD"}} +PREHOOK: query: create external table tbl_bucket_date (id string, date_time_date date, year_partition int) + partitioned by spec (year_partition, bucket(1, date_time_date)) +stored by iceberg stored as parquet +tblproperties ('parquet.compression'='snappy','format-version'='2') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@tbl_bucket_date +POSTHOOK: query: create external table tbl_bucket_date (id string, date_time_date date, year_partition int) + partitioned by spec (year_partition, bucket(1, date_time_date)) +stored by iceberg stored as parquet +tblproperties ('parquet.compression'='snappy','format-version'='2') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@tbl_bucket_date +PREHOOK: query: insert into tbl_bucket_date values (88669, '2018-05-27', 2018), (40568, '2018-02-12', 2018), (40568, '2018-07-03', 2018) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@tbl_bucket_date +POSTHOOK: query: insert into tbl_bucket_date values (88669, '2018-05-27', 2018), (40568, '2018-02-12', 2018), (40568, '2018-07-03', 2018) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@tbl_bucket_date +PREHOOK: query: update tbl_bucket_date set date_time_date = '2018-07-02' where date_time_date = '2018-07-03' +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_bucket_date +PREHOOK: Output: default@tbl_bucket_date +PREHOOK: Output: default@tbl_bucket_date +POSTHOOK: query: update tbl_bucket_date set date_time_date = '2018-07-02' where date_time_date = '2018-07-03' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_bucket_date +POSTHOOK: Output: default@tbl_bucket_date +POSTHOOK: Output: default@tbl_bucket_date +PREHOOK: query: select count(*) from tbl_bucket_date where date_time_date = '2018-07-02' +PREHOOK: type: QUERY +PREHOOK: Input: default@tbl_bucket_date +PREHOOK: Output: hdfs://### HDFS PATH ### +POSTHOOK: query: select count(*) from tbl_bucket_date where date_time_date = '2018-07-02' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@tbl_bucket_date +POSTHOOK: Output: hdfs://### HDFS PATH ### +1