Skip to content

Commit

Permalink
[HUDI-5045] Adding support to configure index type with integ tests (a…
Browse files Browse the repository at this point in the history
…pache#6982)

Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com>
  • Loading branch information
2 people authored and Alexey Kudinkin committed Dec 14, 2022
1 parent daf8308 commit d62a4a2
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 2 deletions.
54 changes: 54 additions & 0 deletions docker/demo/config/test-suite/spark-immutable-to-mutable.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: spark-immutable-dataset.yaml
dag_rounds: 1
dag_intermittent_delay_mins: 0
dag_content:
first_bulk_insert:
config:
record_size: 200
num_partitions_insert: 10
repeat_count: 1
num_records_insert: 100
type: SparkBulkInsertNode
deps: none
first_validate:
config:
validate_hive: false
delete_input_data: false
type: ValidateDatasetNode
deps: first_bulk_insert
first_update:
config:
record_size: 200
num_partitions_upsert: 10
repeat_count: 1
num_records_upsert: 50
type: SparkUpsertNode
deps: first_validate
second_validate:
config:
validate_hive: false
delete_input_data: false
validate_full_data: true
type: ValidateDatasetNode
deps: first_update
last_validate:
config:
execute_itr_count: 1
delete_input_data: true
type: ValidateAsyncOperations
deps: second_validate
Original file line number Diff line number Diff line change
Expand Up @@ -340,5 +340,8 @@ public static class HoodieTestSuiteConfig extends HoodieDeltaStreamer.Config {

@Parameter(names = {"--trino-jdbc-password"}, description = "Password corresponding to the username to use for authentication")
public String trinoPassword;

@Parameter(names = {"--index-type"}, description = "Index type to use for writes")
public String indexType = "SIMPLE";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public HoodieWriteConfig getWriteConfig() {
}

private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteJob.HoodieTestSuiteConfig cfg, Properties props, String schema) {

HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath)
.withAutoCommit(false)
Expand All @@ -99,7 +100,7 @@ private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteJob.HoodieTestSui
.withPayloadClass(cfg.payloadClassName)
.build())
.forTable(cfg.targetTableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.valueOf(cfg.indexType)).build())
.withProps(props);
builder = builder.withSchema(schema);
return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.hudi.integ.testsuite.dag.nodes
import org.apache.avro.Schema
import org.apache.hudi.client.WriteStatus
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats
Expand Down Expand Up @@ -70,6 +70,7 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "test_suite_source_ordering_field")
.option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
.option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType)
.option(HoodieIndexConfig.INDEX_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.indexType)
.option(DataSourceWriteOptions.OPERATION.key, getOperation())
.option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
.mode(SaveMode.Append)
Expand Down

0 comments on commit d62a4a2

Please sign in to comment.