Skip to content

Commit

Permalink
changefeedccl: refactor buildDialConfig
Browse files Browse the repository at this point in the history
We are introducing azure-event-hub:// scheme for kafka streaming in
cockroachdb#115806. This patch refactors
specific sections of the existing code into functions to simplify code review.
Note that this patch does not change existing logic or functionality.

Part of: cockroachdb#103901, cockroachdb#110558

Release Note: none
  • Loading branch information
wenyihu6 committed Jan 18, 2024
1 parent b37123f commit 25b3793
Showing 1 changed file with 111 additions and 50 deletions.
161 changes: 111 additions & 50 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,22 @@ type kafkaDialConfig struct {
saslGrantType string
}

// TODO(cdc): refactor this function by splitting it up. There is a large number
// of security-related params and it's hard to tell what combinations work,
// which combinations work but should not work, which combinations should not be
// passed together but still work etc. It makes sense to split based on SASL
// mechanism and separate the TLS related params (which I believe are common to
// all the SASL schemes). It would also make sense to update the docs to reflect
// these cases rather than having a huge table of auth params like we have now.
func buildDialConfig(u sinkURL) (kafkaDialConfig, error) {
switch u.Scheme {
case changefeedbase.SinkSchemeConfluentKafka:
return buildConfluentKafkaConfig(u)
default:
return buildDefaultKafkaConfig(u)
}
}

// TODO: refactor this function by splitting it up.
// There is a large number of security-related params and it's hard to tell what
// combinations work, which combinations work but should not work, which
Expand All @@ -901,11 +917,7 @@ type kafkaDialConfig struct {
// related params (which I believe are common to all the SASL schemes). It would
// also make sense to update the docs to reflect these cases
// rather than having a huge table of auth params like we have now.
func buildDialConfig(u sinkURL) (kafkaDialConfig, error) {
if u.Scheme == changefeedbase.SinkSchemeConfluentKafka {
return buildConfluentKafkaConfig(u)
}

func buildDefaultKafkaConfig(u sinkURL) (kafkaDialConfig, error) {
dialConfig := kafkaDialConfig{}

if _, err := u.consumeBool(changefeedbase.SinkParamTLSEnabled, &dialConfig.tlsEnabled); err != nil {
Expand Down Expand Up @@ -1006,6 +1018,89 @@ func buildDialConfig(u sinkURL) (kafkaDialConfig, error) {
return dialConfig, nil
}

// newMissingParameterError returns an error message for missing parameters in
// sinkURL.
func newMissingParameterError(scheme string, param string) error {
return errors.Newf("scheme %s requires parameter %s", scheme, param)
}

// newInvalidParameterError returns an error message for invalid parameters in
// sinkURL.
func newInvalidParameterError(scheme string, invalidParams string) error {
return errors.Newf("invalid query parameters %s for scheme %s", invalidParams, scheme)
}

// newUnsupportedValueForParameterError returns an error message for using
// unsupported values for parameters in sinkURL.
func newUnsupportedValueForParameterError(
param string, unsupportedValue, allowedValue string,
) error {
return errors.Newf("unsupported value %s for parameter %s, please use %s instead",
unsupportedValue, param, allowedValue)
}

// validateAndConsumeParams consumes and validates if the given sinkURL contains
// any unsupported values for the parameters using paramsWithAcceptedValues.
func validateAndConsumeParamsIfSet(u *sinkURL, paramsWithAcceptedValues map[string]string) error {
for param, allowedValue := range paramsWithAcceptedValues {
if v := u.consumeParam(param); v != "" && v != allowedValue {
return newUnsupportedValueForParameterError(param, /*param*/
v /*unsupportedValue*/, allowedValue /*allowedBoolValue*/)
}
}
return nil
}

// validateAndConsumeBoolParamsIfSet mirrors validateAndConsumeParamsIfSet but
// specifically for boolean parameters.
func validateAndConsumeBoolParamsIfSet(u *sinkURL, paramsWithAcceptedValues map[string]bool) error {
for param, allowedBoolValue := range paramsWithAcceptedValues {
var dest bool
wasSet, err := u.consumeBool(param, &dest)
if err != nil {
return err
}
if wasSet && dest != allowedBoolValue {
return newUnsupportedValueForParameterError(
param /*param*/, fmt.Sprintf("%t", dest), /*unsupportedValue*/
fmt.Sprintf("%t", allowedBoolValue) /*allowedBoolValue*/)
}
}
return nil
}

// setDefaultParametersForConfluentAndAzure populates the given kafkaDialConfig with other
// parameters from the sinkURL. Additionally, it validates options based on the
// given sinkURL and returns an error for unsupported values.
func setDefaultParametersForConfluentAndAzure(
u *sinkURL, dialConfig kafkaDialConfig,
) (kafkaDialConfig, error) {
// Check required values for parameters.
boolParamsWithRequiredValues := map[string]bool{
changefeedbase.SinkParamSASLEnabled: true,
changefeedbase.SinkParamTLSEnabled: true,
changefeedbase.SinkParamSASLHandshake: true,
}
if err := validateAndConsumeBoolParamsIfSet(u, boolParamsWithRequiredValues); err != nil {
return kafkaDialConfig{}, err
}
stringParamsWithRequiredValues := map[string]string{
changefeedbase.SinkParamSASLMechanism: sarama.SASLTypePlaintext,
}
if err := validateAndConsumeParamsIfSet(u, stringParamsWithRequiredValues); err != nil {
return kafkaDialConfig{}, err
}

// Set values.
dialConfig.tlsEnabled = true
dialConfig.saslHandshake = true
dialConfig.saslEnabled = true
dialConfig.saslMechanism = sarama.SASLTypePlaintext

// Ignore all other configurations.
return dialConfig, nil
}

// buildConfluentKafkaConfig constructs a simple dial config which is supported
// by kafka on confluent cloud. The dial config should have `api_key` and
// `api_secret`. It automatically sets should also have sasl_enabled=true,
Expand All @@ -1016,64 +1111,30 @@ func buildDialConfig(u sinkURL) (kafkaDialConfig, error) {
// instructions when you go to the `Clients` page of the cluster in confluent
// cloud.
func buildConfluentKafkaConfig(u sinkURL) (kafkaDialConfig, error) {
newMissingParameterError := func(param string) error {
return errors.Newf("scheme %s requires parameter %s", changefeedbase.SinkSchemeConfluentKafka, param)
}
newRequiredValueError := func(param string, unsupportedValue, allowedValue string) error {
return errors.Newf("unsupported value %s for parameter %s, please use %s instead", unsupportedValue,
param, allowedValue)
}

// Check for api_key and api_secret.
dialConfig := kafkaDialConfig{}
if dialConfig.saslUser = u.consumeParam(changefeedbase.SinkParamConfluentAPIKey); dialConfig.saslUser == `` {
return kafkaDialConfig{}, newMissingParameterError(changefeedbase.SinkParamConfluentAPIKey)
// Check for api_key and api_secret.
if dialConfig.saslUser = u.consumeParam(changefeedbase.SinkParamConfluentAPIKey); dialConfig.saslUser == "" {
return kafkaDialConfig{},
newMissingParameterError(u.Scheme /*scheme*/, changefeedbase.SinkParamConfluentAPIKey /*param*/)
}
if dialConfig.saslPassword = u.consumeParam(changefeedbase.SinkParamConfluentAPISecret); dialConfig.saslPassword == `` {
return kafkaDialConfig{}, newMissingParameterError(changefeedbase.SinkParamConfluentAPISecret)
if dialConfig.saslPassword = u.consumeParam(changefeedbase.SinkParamConfluentAPISecret); dialConfig.saslPassword == "" {
return kafkaDialConfig{},
newMissingParameterError(u.Scheme /*scheme*/, changefeedbase.SinkParamConfluentAPISecret /*param*/)
}

// If sasl_enabled is specified, it must be set to true.
if wasSet, err := u.consumeBool(changefeedbase.SinkParamSASLEnabled, &dialConfig.saslEnabled); err != nil {
return kafkaDialConfig{}, err
} else if wasSet && !dialConfig.saslEnabled {
return kafkaDialConfig{}, newRequiredValueError(changefeedbase.SinkParamSASLEnabled, "false",
"true")
}
// If sasl_mechanism is specified, it must be set to PLAIN.
if dialConfig.saslMechanism = u.consumeParam(changefeedbase.SinkParamSASLMechanism); dialConfig.saslMechanism != `` &&
dialConfig.saslMechanism != sarama.SASLTypePlaintext {
return kafkaDialConfig{}, newRequiredValueError(changefeedbase.SinkParamSASLMechanism, dialConfig.saslMechanism,
sarama.SASLTypePlaintext)
}
// If tls_enabled is specified, it must be set to true.
if wasSet, err := u.consumeBool(changefeedbase.SinkParamTLSEnabled, &dialConfig.tlsEnabled); err != nil {
return kafkaDialConfig{}, err
} else if wasSet && !dialConfig.tlsEnabled {
return kafkaDialConfig{}, newRequiredValueError(changefeedbase.SinkParamTLSEnabled, "false", "true")
}
// If sasl_handshake is specified, it must be set to true.
if wasSet, err := u.consumeBool(changefeedbase.SinkParamSASLHandshake, &dialConfig.saslHandshake); err != nil {
dialConfig, err := setDefaultParametersForConfluentAndAzure(&u, dialConfig)
if err != nil {
return kafkaDialConfig{}, err
} else if wasSet && !dialConfig.saslHandshake {
return kafkaDialConfig{}, newRequiredValueError(changefeedbase.SinkParamSASLHandshake, "false", "true")
}

if _, err := u.consumeBool(changefeedbase.SinkParamSkipTLSVerify, &dialConfig.tlsSkipVerify); err != nil {
return kafkaDialConfig{}, err
}

dialConfig.saslEnabled = true
dialConfig.saslMechanism = sarama.SASLTypePlaintext
dialConfig.tlsEnabled = true
dialConfig.saslHandshake = true

remaining := u.remainingQueryParams()
if len(remaining) > 0 {
return kafkaDialConfig{}, errors.Newf("invalid query parameters for scheme %s", remaining, changefeedbase.SinkParamConfluentAPISecret)
return kafkaDialConfig{}, newInvalidParameterError(u.Scheme, /*scheme*/
fmt.Sprintf("%v", remaining) /*invalidParams*/)
}

// Ignore all other configurations.
return dialConfig, nil
}

Expand Down

0 comments on commit 25b3793

Please sign in to comment.