Skip to content

Commit

Permalink
fix: static check issues
Browse files Browse the repository at this point in the history
Signed-off-by: Souyama Debnath <souyama.debnath@atos.net>
  • Loading branch information
sansmoraxz committed Jul 15, 2023
1 parent ed6b0ae commit 6b0c624
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 58 deletions.
100 changes: 46 additions & 54 deletions pkg/scalers/kafka_x_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import (

sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4"
"github.com/go-logr/logr"
kedautil "github.com/kedacore/keda/v2/pkg/util"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/aws_msk_iam"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
v2 "k8s.io/api/autoscaling/v2"
"k8s.io/metrics/pkg/apis/external_metrics"

kedautil "github.com/kedacore/keda/v2/pkg/util"
)

type kafkaXScaler struct {
Expand Down Expand Up @@ -53,7 +54,6 @@ type kafkaXMetadata struct {
awsEndpoint string
awsAuthorization awsAuthorizationMetadata


// TLS
enableTLS bool
cert string
Expand Down Expand Up @@ -164,46 +164,43 @@ func parseKafkaXAuthParams(config *ScalerConfig, meta *kafkaXMetadata) error {

if saslAuthType != "" {
saslAuthType = strings.TrimSpace(saslAuthType)
mode := kafkaSaslType(saslAuthType)

if mode == KafkaSASLTypeMskIam {
switch mode := kafkaSaslType(saslAuthType); mode {
case KafkaSASLTypeMskIam:
meta.saslType = mode
if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
meta.awsEndpoint = val
}
if !meta.enableTLS {
return errors.New("TLS is required for MSK")
}

if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" {
meta.awsRegion = val
} else {
return fmt.Errorf("no awsRegion given")
return errors.New("no awsRegion given")
}

if val, ok := config.TriggerMetadata["awsEndpoint"]; ok {
meta.awsEndpoint = val
}

auth, err := getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv)
if err != nil {
return err
}
meta.awsAuthorization = auth
meta.saslType = mode
} else if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 {
if config.AuthParams["username"] == "" {
case KafkaSASLTypePlaintext:
fallthrough
case KafkaSASLTypeSCRAMSHA256:
fallthrough
case KafkaSASLTypeSCRAMSHA512:
if val, ok := config.AuthParams["username"]; ok {
meta.username = strings.TrimSpace(val)
} else {
return errors.New("no username given")
}
meta.username = strings.TrimSpace(config.AuthParams["username"])

if config.AuthParams["password"] == "" {
if val, ok := config.AuthParams["password"]; ok {
meta.password = strings.TrimSpace(val)
} else {
return errors.New("no password given")
}
meta.password = strings.TrimSpace(config.AuthParams["password"])
meta.saslType = mode
} else if mode == KafkaSASLTypeOAuthbearer {
// TODO: implement
return fmt.Errorf("SASL/OAUTHBEARER is not implemented yet")
} else {
return fmt.Errorf("err SASL mode %s given", mode)
}
case KafkaSASLTypeOAuthbearer:
return errors.New("SASL/OAUTHBEARER is not implemented yet")
}
}

return nil
Expand Down Expand Up @@ -328,9 +325,8 @@ func parseKafkaXMetadata(config *ScalerConfig, logger logr.Logger) (kafkaXMetada
}

func getKafkaXClient(metadata kafkaXMetadata, logger logr.Logger) (*kafka.Client, error) {

var saslMechanism sasl.Mechanism = nil
var tlsConfig *tls.Config = nil
var saslMechanism sasl.Mechanism
var tlsConfig *tls.Config
var err error

logger.V(4).Info(fmt.Sprintf("Kafka SASL type %s", metadata.saslType))
Expand All @@ -341,32 +337,32 @@ func getKafkaXClient(metadata kafkaXMetadata, logger logr.Logger) (*kafka.Client
}
}

if metadata.saslType == KafkaSASLTypePlaintext {
switch metadata.saslType {
case KafkaSASLTypePlaintext:
saslMechanism = plain.Mechanism{
Username: metadata.username,
Password: metadata.password,
}
} else if metadata.saslType == KafkaSASLTypeSCRAMSHA256 {
case KafkaSASLTypeSCRAMSHA256:
saslMechanism, err = scram.Mechanism(scram.SHA256, metadata.username, metadata.password)
if err != nil {
return nil, err
}
} else if metadata.saslType == KafkaSASLTypeSCRAMSHA512 {
case KafkaSASLTypeSCRAMSHA512:
saslMechanism, err = scram.Mechanism(scram.SHA512, metadata.username, metadata.password)
if err != nil {
return nil, err
}
} else if metadata.saslType == KafkaSASLTypeOAuthbearer {
// TODO: implement
return nil, fmt.Errorf("SASL/OAUTHBEARER is not implemented yet")
} else if metadata.saslType == KafkaSASLTypeMskIam {
case KafkaSASLTypeOAuthbearer:
return nil, errors.New("SASL/OAUTHBEARER is not implemented yet")
case KafkaSASLTypeMskIam:
_, config := getAwsConfig(metadata.awsRegion,
metadata.awsEndpoint,
metadata.awsAuthorization)

saslMechanism = &aws_msk_iam.Mechanism{
Signer: sigv4.NewSigner(config.Credentials),
Region: metadata.awsRegion,
Signer: sigv4.NewSigner(config.Credentials),
Region: metadata.awsRegion,
}
}

Expand Down Expand Up @@ -424,22 +420,21 @@ func (s *kafkaXScaler) getTopicPartitions() (map[string][]int, error) {
result[topic.Name] = partitions
}
return result, nil
} else {
result := make(map[string][]int)
for _, topic := range metadata.Topics {
partitions := make([]int, 0)
if kedautil.Contains(s.metadata.topic, topic.Name) {
for _, partition := range topic.Partitions {
if (len(s.metadata.partitionLimitation) == 0) ||
(len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) {
partitions = append(partitions, partition.ID)
}
}
result := make(map[string][]int)
for _, topic := range metadata.Topics {
partitions := make([]int, 0)
if kedautil.Contains(s.metadata.topic, topic.Name) {
for _, partition := range topic.Partitions {
if (len(s.metadata.partitionLimitation) == 0) ||
(len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) {
partitions = append(partitions, partition.ID)
}
}
result[topic.Name] = partitions
}
return result, nil
result[topic.Name] = partitions
}
return result, nil
}

func (s *kafkaXScaler) getConsumerOffsets(topicPartitions map[string][]int) (map[string]map[int]int64, error) {
Expand Down Expand Up @@ -535,8 +530,6 @@ func (s *kafkaXScaler) Close(context.Context) error {
if transport != nil {
transport.CloseIdleConnections()
}
//s.client = nil
//s.transport = nil
return nil
}

Expand Down Expand Up @@ -568,7 +561,6 @@ type kafkaXProducerOffsetResult struct {
err error
}


// getConsumerAndProducerOffsets returns (consumerOffsets, producerOffsets, error)
func (s *kafkaXScaler) getConsumerAndProducerOffsets(topicPartitions map[string][]int) (map[string]map[int]int64, map[string]map[int]int64, error) {
consumerChan := make(chan kafkaXConsumerOffsetResult, 1)
Expand Down
4 changes: 2 additions & 2 deletions pkg/scalers/kafka_x_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ var parseKafkaXMetadataTestDataset = []parseKafkaXMetadataTestData{
// success, no limitation
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": ""}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false},
// TODO: remove failure, version not supported??
//{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false},
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false},
// failure, lagThreshold is negative value
{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false},
// failure, lagThreshold is 0
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestKafkaXAuthParams(t *testing.T) {
t.Errorf("Test case: %#v. Expected error but got success", id)
}
if !testData.isError {
if testData.metadata["tls"] == "true" && !meta.enableTLS {
if testData.metadata["tls"] == stringTrue && !meta.enableTLS {
t.Errorf("Test case: %#v. Expected tls to be set to %#v but got %#v\n", id, testData.metadata["tls"], meta.enableTLS)
}
if meta.enableTLS {
Expand Down
1 change: 0 additions & 1 deletion tests/scalers/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,6 @@ func testEarliestPolicy(t *testing.T, kc *kubernetes.Clientset, data templateDat

assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, topicPartitions, 60, 2),
"replica count should be %d after 2 minute", messages)

}

func testLatestPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
Expand Down
1 change: 0 additions & 1 deletion tests/scalers/kafka_x/kafka_x_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,6 @@ func testEarliestPolicy(t *testing.T, kc *kubernetes.Clientset, data templateDat

assert.True(t, WaitForDeploymentReplicaReadyCount(t, kc, deploymentName, testNamespace, topicPartitions, 60, 2),
"replica count should be %d after 2 minute", messages)

}

func testLatestPolicy(t *testing.T, kc *kubernetes.Clientset, data templateData) {
Expand Down

0 comments on commit 6b0c624

Please sign in to comment.