Skip to content

Commit

Permalink
[MINOR] Improve sparksql test for bucket index bulk insert (apache#9014)
Browse files Browse the repository at this point in the history
  • Loading branch information
stream2000 committed Jun 20, 2023
1 parent 587a53f commit 822b5a1
Showing 1 changed file with 52 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1078,57 +1078,60 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
test("Test Bulk Insert Into Bucket Index Table") {
withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert") {
Seq("mor", "cow").foreach { tableType =>
withTempDir { tmp =>
val tableName = generateTableName
// Create a partitioned table
spark.sql(
s"""
|create table $tableName (
| id int,
| dt string,
| name string,
| price double,
| ts long
|) using hudi
| tblproperties (
| primaryKey = 'id,name',
| type = '$tableType',
| preCombineField = 'ts',
| hoodie.index.type = 'BUCKET',
| hoodie.bucket.index.hash.field = 'id,name')
| partitioned by (dt)
| location '${tmp.getCanonicalPath}'
Seq("true", "false").foreach { bulkInsertAsRow =>
withTempDir { tmp =>
val tableName = generateTableName
// Create a partitioned table
spark.sql(
s"""
|create table $tableName (
| id int,
| dt string,
| name string,
| price double,
| ts long
|) using hudi
| tblproperties (
| primaryKey = 'id,name',
| type = '$tableType',
| preCombineField = 'ts',
| hoodie.index.type = 'BUCKET',
| hoodie.bucket.index.hash.field = 'id,name',
| hoodie.datasource.write.row.writer.enable = '$bulkInsertAsRow')
| partitioned by (dt)
| location '${tmp.getCanonicalPath}'
""".stripMargin)

// Note: Do not write the field alias, the partition field must be placed last.
spark.sql(
s"""
| insert into $tableName values
| (1, 'a1,1', 10, 1000, "2021-01-05"),
| (2, 'a2', 20, 2000, "2021-01-06"),
| (3, 'a3,3', 30, 3000, "2021-01-07")
""".stripMargin)

checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
Seq(2, "a2", 20.0, 2000, "2021-01-06"),
Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
)

spark.sql(
s"""
| insert into $tableName values
| (1, 'a1', 10, 1000, "2021-01-05"),
| (3, "a3", 30, 3000, "2021-01-07")
""".stripMargin)

// Note: Do not write the field alias, the partition field must be placed last.
spark.sql(
s"""
| insert into $tableName values
| (1, 'a1,1', 10, 1000, "2021-01-05"),
| (2, 'a2', 20, 2000, "2021-01-06"),
| (3, 'a3,3', 30, 3000, "2021-01-07")
""".stripMargin)

checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
Seq(2, "a2", 20.0, 2000, "2021-01-06"),
Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
)

spark.sql(
s"""
| insert into $tableName values
| (1, 'a1', 10, 1000, "2021-01-05"),
| (3, "a3", 30, 3000, "2021-01-07")
""".stripMargin)

checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
Seq(1, "a1", 10.0, 1000, "2021-01-05"),
Seq(2, "a2", 20.0, 2000, "2021-01-06"),
Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
Seq(3, "a3", 30.0, 3000, "2021-01-07")
)
checkAnswer(s"select id, name, price, ts, dt from $tableName")(
Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
Seq(1, "a1", 10.0, 1000, "2021-01-05"),
Seq(2, "a2", 20.0, 2000, "2021-01-06"),
Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
Seq(3, "a3", 30.0, 3000, "2021-01-07")
)
}
}
}
}
Expand Down

0 comments on commit 822b5a1

Please sign in to comment.