From 5d7d4a10a8d0b45eb40aad1cb012f3c3b2da30dd Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Thu, 9 May 2024 16:00:33 -0500 Subject: [PATCH 1/2] rename --- src/connector/src/connector_common/common.rs | 14 +- src/connector/src/with_options_test.rs | 8 +- src/connector/with_options_sink.yaml | 144 +++++++++++++------ src/connector/with_options_source.yaml | 130 ++++++++++++----- 4 files changed, 212 insertions(+), 84 deletions(-) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 7a5e3ba5f8b2..58afb9914005 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -61,16 +61,28 @@ use aws_types::SdkConfig; /// A flatten config map for aws auth. #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct AwsAuthProps { + #[serde(rename = "aws.region", alias = "region")] pub region: Option, - #[serde(alias = "endpoint_url")] + + #[serde( + rename = "aws.endpoint_url", + alias = "endpoint_url", + alias = "endpoint" + )] pub endpoint: Option, + #[serde(rename = "aws.credentials.access_key_id", alias = "access_key")] pub access_key: Option, + #[serde(rename = "aws.credentials.secret_access_key", alias = "secret_key")] pub secret_key: Option, + #[serde(rename = "aws.credentials.session_token", alias = "session_token")] pub session_token: Option, /// IAM role + #[serde(rename = "aws.credentials.role.arn", alias = "arn")] pub arn: Option, /// external ID in IAM role trust policy + #[serde(rename = "aws.credentials.role.external_id", alias = "external_id")] pub external_id: Option, + #[serde(rename = "aws.region.profile", alias = "profile")] pub profile: Option, } diff --git a/src/connector/src/with_options_test.rs b/src/connector/src/with_options_test.rs index 155932ce9045..160964d7920c 100644 --- a/src/connector/src/with_options_test.rs +++ b/src/connector/src/with_options_test.rs @@ -191,15 +191,15 @@ struct FieldInfo { #[serde(skip_serializing_if = "Option::is_none")] default: Option, - #[serde(skip_serializing_if = "Option::is_none")] - alias: Option, + #[serde(skip_serializing_if = "Vec::is_empty")] + alias: Vec, } #[derive(Default)] struct SerdeProperties { default_func: Option, rename: Option, - alias: Option, + alias: Vec, } #[derive(Debug, Serialize, Default)] @@ -270,7 +270,7 @@ fn extract_serde_properties(field: &Field) -> SerdeProperties { } } else if path.is_ident("alias") { if let Lit::Str(lit_str) = lit { - serde_props.alias = Some(lit_str.value()); + serde_props.alias.push(lit_str.value()); } } else if path.is_ident("default") { if let Lit::Str(lit_str) = lit { diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 219d7bc8a733..c6331f5e53c0 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -21,33 +21,49 @@ BigQueryConfig: field_type: usize required: false default: '1024' - - name: region + - name: aws.region field_type: String required: false - - name: endpoint + alias: + - region + - name: aws.endpoint_url field_type: String required: false - alias: endpoint_url - - name: access_key + alias: + - endpoint_url + - endpoint + - name: aws.credentials.access_key_id field_type: String required: false - - name: secret_key + alias: + - access_key + - name: aws.credentials.secret_access_key field_type: String required: false - - name: session_token + alias: + - secret_key + - name: aws.credentials.session_token field_type: String required: false - - name: arn + alias: + - session_token + - name: aws.credentials.role.arn field_type: String comments: IAM role required: false - - name: external_id + alias: + - arn + - name: aws.credentials.role.external_id field_type: String comments: external ID in IAM role trust policy required: false - - name: profile + alias: + - external_id + - name: aws.region.profile field_type: String required: false + alias: + - profile - name: r#type field_type: String required: true @@ -197,11 +213,13 @@ KafkaConfig: - name: properties.bootstrap.server field_type: String required: true - alias: kafka.brokers + alias: + - kafka.brokers - name: topic field_type: String required: true - alias: kafka.topic + alias: + - kafka.topic - name: properties.sync.call.timeout field_type: Duration required: false @@ -352,67 +370,91 @@ KafkaConfig: field_type: HashMap comments: This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users. required: false - - name: region + - name: aws.region field_type: String required: false - - name: endpoint + alias: + - region + - name: aws.endpoint_url field_type: String required: false - alias: endpoint_url - - name: access_key + alias: + - endpoint_url + - endpoint + - name: aws.credentials.access_key_id field_type: String required: false - - name: secret_key + alias: + - access_key + - name: aws.credentials.secret_access_key field_type: String required: false - - name: session_token + alias: + - secret_key + - name: aws.credentials.session_token field_type: String required: false - - name: arn + alias: + - session_token + - name: aws.credentials.role.arn field_type: String comments: IAM role required: false - - name: external_id + alias: + - arn + - name: aws.credentials.role.external_id field_type: String comments: external ID in IAM role trust policy required: false - - name: profile + alias: + - external_id + - name: aws.region.profile field_type: String required: false + alias: + - profile KinesisSinkConfig: fields: - name: stream field_type: String required: true - alias: kinesis.stream.name + alias: + - kinesis.stream.name - name: aws.region field_type: String required: true - alias: kinesis.stream.region + alias: + - kinesis.stream.region - name: endpoint field_type: String required: false - alias: kinesis.endpoint + alias: + - kinesis.endpoint - name: aws.credentials.access_key_id field_type: String required: false - alias: kinesis.credentials.access + alias: + - kinesis.credentials.access - name: aws.credentials.secret_access_key field_type: String required: false - alias: kinesis.credentials.secret + alias: + - kinesis.credentials.secret - name: aws.credentials.session_token field_type: String required: false - alias: kinesis.credentials.session_token + alias: + - kinesis.credentials.session_token - name: aws.credentials.role.arn field_type: String required: false - alias: kinesis.assumerole.arn + alias: + - kinesis.assumerole.arn - name: aws.credentials.role.external_id field_type: String required: false - alias: kinesis.assumerole.external_id + alias: + - kinesis.assumerole.external_id MqttConfig: fields: - name: url @@ -525,11 +567,13 @@ PulsarConfig: - name: topic field_type: String required: true - alias: pulsar.topic + alias: + - pulsar.topic - name: service.url field_type: String required: true - alias: pulsar.service.url + alias: + - pulsar.service.url - name: auth.token field_type: String required: false @@ -545,33 +589,49 @@ PulsarConfig: - name: oauth.scope field_type: String required: false - - name: region + - name: aws.region field_type: String required: false - - name: endpoint + alias: + - region + - name: aws.endpoint_url field_type: String required: false - alias: endpoint_url - - name: access_key + alias: + - endpoint_url + - endpoint + - name: aws.credentials.access_key_id field_type: String required: false - - name: secret_key + alias: + - access_key + - name: aws.credentials.secret_access_key field_type: String required: false - - name: session_token + alias: + - secret_key + - name: aws.credentials.session_token field_type: String required: false - - name: arn + alias: + - session_token + - name: aws.credentials.role.arn field_type: String comments: IAM role required: false - - name: external_id + alias: + - arn + - name: aws.credentials.role.external_id field_type: String comments: external ID in IAM role trust policy required: false - - name: profile + alias: + - external_id + - name: aws.region.profile field_type: String required: false + alias: + - profile - name: properties.batch.size field_type: u32 required: false @@ -645,12 +705,14 @@ StarrocksConfig: field_type: String comments: The port to the MySQL server of `StarRocks` FE. required: true - alias: starrocks.query_port + alias: + - starrocks.query_port - name: starrocks.httpport field_type: String comments: The port to the HTTP server of `StarRocks` FE. required: true - alias: starrocks.http_port + alias: + - starrocks.http_port - name: starrocks.user field_type: String comments: The user name used to access the `StarRocks` database. diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index c62db228eeb0..b3e1fefcb155 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -80,20 +80,25 @@ KafkaProperties: field_type: String comments: This parameter is not intended to be exposed to users. This parameter specifies only for one parallelism. The parallelism of kafka source is equal to the parallelism passed into compute nodes. So users need to calculate how many bytes will be consumed in total across all the parallelism by themselves. required: false - alias: kafka.bytes.per.second + alias: + - kafka.bytes.per.second - name: max.num.messages field_type: String comments: This parameter is not intended to be exposed to users. This parameter specifies only for one parallelism. The parallelism of kafka source is equal to the parallelism passed into compute nodes. So users need to calculate how many messages will be consumed in total across all the parallelism by themselves. required: false - alias: kafka.max.num.messages + alias: + - kafka.max.num.messages - name: scan.startup.mode field_type: String required: false - alias: kafka.scan.startup.mode + alias: + - kafka.scan.startup.mode - name: scan.startup.timestamp.millis field_type: String required: false - alias: scan.startup.timestamp_millis + alias: + - kafka.time.offset + - scan.startup.timestamp_millis - name: upsert field_type: String comments: 'This parameter is used to tell `KafkaSplitReader` to produce `UpsertMessage`s, which combine both key and value fields of the Kafka message. TODO: Currently, `Option` can not be parsed here.' @@ -101,11 +106,13 @@ KafkaProperties: - name: properties.bootstrap.server field_type: String required: true - alias: kafka.brokers + alias: + - kafka.brokers - name: topic field_type: String required: true - alias: kafka.topic + alias: + - kafka.topic - name: properties.sync.call.timeout field_type: Duration required: false @@ -214,74 +221,99 @@ KafkaProperties: field_type: HashMap comments: This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users. required: false - - name: region + - name: aws.region field_type: String required: false - - name: endpoint + alias: + - region + - name: aws.endpoint_url field_type: String required: false - alias: endpoint_url - - name: access_key + alias: + - endpoint_url + - endpoint + - name: aws.credentials.access_key_id field_type: String required: false - - name: secret_key + alias: + - access_key + - name: aws.credentials.secret_access_key field_type: String required: false - - name: session_token + alias: + - secret_key + - name: aws.credentials.session_token field_type: String required: false - - name: arn + alias: + - session_token + - name: aws.credentials.role.arn field_type: String comments: IAM role required: false - - name: external_id + alias: + - arn + - name: aws.credentials.role.external_id field_type: String comments: external ID in IAM role trust policy required: false - - name: profile + alias: + - external_id + - name: aws.region.profile field_type: String required: false + alias: + - profile KinesisProperties: fields: - name: scan.startup.mode field_type: String required: false - alias: kinesis.scan.startup.mode + alias: + - kinesis.scan.startup.mode - name: scan.startup.timestamp.millis field_type: i64 required: false - name: stream field_type: String required: true - alias: kinesis.stream.name + alias: + - kinesis.stream.name - name: aws.region field_type: String required: true - alias: kinesis.stream.region + alias: + - kinesis.stream.region - name: endpoint field_type: String required: false - alias: kinesis.endpoint + alias: + - kinesis.endpoint - name: aws.credentials.access_key_id field_type: String required: false - alias: kinesis.credentials.access + alias: + - kinesis.credentials.access - name: aws.credentials.secret_access_key field_type: String required: false - alias: kinesis.credentials.secret + alias: + - kinesis.credentials.secret - name: aws.credentials.session_token field_type: String required: false - alias: kinesis.credentials.session_token + alias: + - kinesis.credentials.session_token - name: aws.credentials.role.arn field_type: String required: false - alias: kinesis.assumerole.arn + alias: + - kinesis.assumerole.arn - name: aws.credentials.role.external_id field_type: String required: false - alias: kinesis.assumerole.external_id + alias: + - kinesis.assumerole.external_id MqttProperties: fields: - name: url @@ -377,7 +409,8 @@ NatsProperties: - name: scan.startup.timestamp.millis field_type: String required: false - alias: scan.startup.timestamp_millis + alias: + - scan.startup.timestamp_millis - name: stream field_type: String required: true @@ -619,19 +652,24 @@ PulsarProperties: - name: scan.startup.mode field_type: String required: false - alias: pulsar.scan.startup.mode + alias: + - pulsar.scan.startup.mode - name: scan.startup.timestamp.millis field_type: String required: false - alias: scan.startup.timestamp_millis + alias: + - pulsar.time.offset + - scan.startup.timestamp_millis - name: topic field_type: String required: true - alias: pulsar.topic + alias: + - pulsar.topic - name: service.url field_type: String required: true - alias: pulsar.service.url + alias: + - pulsar.service.url - name: auth.token field_type: String required: false @@ -647,33 +685,49 @@ PulsarProperties: - name: oauth.scope field_type: String required: false - - name: region + - name: aws.region field_type: String required: false - - name: endpoint + alias: + - region + - name: aws.endpoint_url field_type: String required: false - alias: endpoint_url - - name: access_key + alias: + - endpoint_url + - endpoint + - name: aws.credentials.access_key_id field_type: String required: false - - name: secret_key + alias: + - access_key + - name: aws.credentials.secret_access_key field_type: String required: false - - name: session_token + alias: + - secret_key + - name: aws.credentials.session_token field_type: String required: false - - name: arn + alias: + - session_token + - name: aws.credentials.role.arn field_type: String comments: IAM role required: false - - name: external_id + alias: + - arn + - name: aws.credentials.role.external_id field_type: String comments: external ID in IAM role trust policy required: false - - name: profile + alias: + - external_id + - name: aws.region.profile field_type: String required: false + alias: + - profile - name: iceberg.enabled field_type: bool required: false From 6c3008fd3e04af4a59b6bba0642b74a8005cb14f Mon Sep 17 00:00:00 2001 From: Yuhao Su Date: Fri, 10 May 2024 11:27:25 -0500 Subject: [PATCH 2/2] fix --- src/connector/src/connector_common/common.rs | 2 +- src/connector/with_options_sink.yaml | 6 +++--- src/connector/with_options_source.yaml | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/connector/src/connector_common/common.rs b/src/connector/src/connector_common/common.rs index 58afb9914005..e5aa7529e3e9 100644 --- a/src/connector/src/connector_common/common.rs +++ b/src/connector/src/connector_common/common.rs @@ -82,7 +82,7 @@ pub struct AwsAuthProps { /// external ID in IAM role trust policy #[serde(rename = "aws.credentials.role.external_id", alias = "external_id")] pub external_id: Option, - #[serde(rename = "aws.region.profile", alias = "profile")] + #[serde(rename = "aws.profile", alias = "profile")] pub profile: Option, } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index c6331f5e53c0..3dbe5d394a8e 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -59,7 +59,7 @@ BigQueryConfig: required: false alias: - external_id - - name: aws.region.profile + - name: aws.profile field_type: String required: false alias: @@ -408,7 +408,7 @@ KafkaConfig: required: false alias: - external_id - - name: aws.region.profile + - name: aws.profile field_type: String required: false alias: @@ -627,7 +627,7 @@ PulsarConfig: required: false alias: - external_id - - name: aws.region.profile + - name: aws.profile field_type: String required: false alias: diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index b3e1fefcb155..68ed02624199 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -259,7 +259,7 @@ KafkaProperties: required: false alias: - external_id - - name: aws.region.profile + - name: aws.profile field_type: String required: false alias: @@ -723,7 +723,7 @@ PulsarProperties: required: false alias: - external_id - - name: aws.region.profile + - name: aws.profile field_type: String required: false alias: