From 822b5a1284ed580d2a4000da39a001ae64ba3a6a Mon Sep 17 00:00:00 2001 From: StreamingFlames <18889897088@163.com> Date: Tue, 20 Jun 2023 16:29:43 +0800 Subject: [PATCH] [MINOR] Improve sparksql test for bucket index bulk insert (#9014) --- .../spark/sql/hudi/TestInsertTable.scala | 101 +++++++++--------- 1 file changed, 52 insertions(+), 49 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index f0a30a9406ca..a9fd0a4a0304 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -1078,57 +1078,60 @@ class TestInsertTable extends HoodieSparkSqlTestBase { test("Test Bulk Insert Into Bucket Index Table") { withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert") { Seq("mor", "cow").foreach { tableType => - withTempDir { tmp => - val tableName = generateTableName - // Create a partitioned table - spark.sql( - s""" - |create table $tableName ( - | id int, - | dt string, - | name string, - | price double, - | ts long - |) using hudi - | tblproperties ( - | primaryKey = 'id,name', - | type = '$tableType', - | preCombineField = 'ts', - | hoodie.index.type = 'BUCKET', - | hoodie.bucket.index.hash.field = 'id,name') - | partitioned by (dt) - | location '${tmp.getCanonicalPath}' + Seq("true", "false").foreach { bulkInsertAsRow => + withTempDir { tmp => + val tableName = generateTableName + // Create a partitioned table + spark.sql( + s""" + |create table $tableName ( + | id int, + | dt string, + | name string, + | price double, + | ts long + |) using hudi + | tblproperties ( + | primaryKey = 'id,name', + | type = '$tableType', + | preCombineField = 'ts', + | hoodie.index.type = 'BUCKET', + | hoodie.bucket.index.hash.field = 'id,name', + | hoodie.datasource.write.row.writer.enable = '$bulkInsertAsRow') + | partitioned by (dt) + | location '${tmp.getCanonicalPath}' + """.stripMargin) + + // Note: Do not write the field alias, the partition field must be placed last. + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1,1', 10, 1000, "2021-01-05"), + | (2, 'a2', 20, 2000, "2021-01-06"), + | (3, 'a3,3', 30, 3000, "2021-01-07") + """.stripMargin) + + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3,3", 30.0, 3000, "2021-01-07") + ) + + spark.sql( + s""" + | insert into $tableName values + | (1, 'a1', 10, 1000, "2021-01-05"), + | (3, "a3", 30, 3000, "2021-01-07") """.stripMargin) - // Note: Do not write the field alias, the partition field must be placed last. - spark.sql( - s""" - | insert into $tableName values - | (1, 'a1,1', 10, 1000, "2021-01-05"), - | (2, 'a2', 20, 2000, "2021-01-06"), - | (3, 'a3,3', 30, 3000, "2021-01-07") - """.stripMargin) - - checkAnswer(s"select id, name, price, ts, dt from $tableName")( - Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), - Seq(2, "a2", 20.0, 2000, "2021-01-06"), - Seq(3, "a3,3", 30.0, 3000, "2021-01-07") - ) - - spark.sql( - s""" - | insert into $tableName values - | (1, 'a1', 10, 1000, "2021-01-05"), - | (3, "a3", 30, 3000, "2021-01-07") - """.stripMargin) - - checkAnswer(s"select id, name, price, ts, dt from $tableName")( - Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), - Seq(1, "a1", 10.0, 1000, "2021-01-05"), - Seq(2, "a2", 20.0, 2000, "2021-01-06"), - Seq(3, "a3,3", 30.0, 3000, "2021-01-07"), - Seq(3, "a3", 30.0, 3000, "2021-01-07") - ) + checkAnswer(s"select id, name, price, ts, dt from $tableName")( + Seq(1, "a1,1", 10.0, 1000, "2021-01-05"), + Seq(1, "a1", 10.0, 1000, "2021-01-05"), + Seq(2, "a2", 20.0, 2000, "2021-01-06"), + Seq(3, "a3,3", 30.0, 3000, "2021-01-07"), + Seq(3, "a3", 30.0, 3000, "2021-01-07") + ) + } } } }