Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to connect to confluent kafka #1241

Closed
harshik9 opened this issue Oct 12, 2020 · 25 comments
Closed

Unable to connect to confluent kafka #1241

harshik9 opened this issue Oct 12, 2020 · 25 comments
Labels
bug Something isn't working scaler-kafka-topic

Comments

@harshik9
Copy link

harshik9 commented Oct 12, 2020

I am trying to implement KEDA for Confluent Kafka. I have tested out different scenarios but the error i get is

2020-10-12T14:39:02.788Z	ERROR	controllers.ScaledObject	Failed to ensure HPA is correctly created for ScaledObject	{"ScaledObject.Namespace": "keda", "ScaledObject.Name": "kafka-scaledobject", "error": "error getting scaler for trigger #0: error creating kafka client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"}
github.com/go-logr/zapr.(*zapLogger).Error
	/go/pkg/mod/github.com/go-logr/zapr@v0.1.1/zapr.go:128
github.com/kedacore/keda/controllers.(*ScaledObjectReconciler).Reconcile
	/workspace/controllers/scaledobject_controller.go:146
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.2/pkg/internal/controller/controller.go:235
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.2/pkg/internal/controller/controller.go:209
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).worker
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.2/pkg/internal/controller/controller.go:188
k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
	/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:155
k8s.io/apimachinery/pkg/util/wait.BackoffUntil
	/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:156
k8s.io/apimachinery/pkg/util/wait.JitterUntil
	/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:133
k8s.io/apimachinery/pkg/util/wait.Until
	/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:90
2020-10-12T14:39:02.796Z	ERROR	controller	Reconciler error	{"reconcilerGroup": "keda.sh", "reconcilerKind": "ScaledObject", "controller": "scaledobject", "name": "kafka-scaledobject", "namespace": "keda", "error": "error getting scaler for trigger #0: error creating kafka client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"}
github.com/go-logr/zapr.(*zapLogger).Error
	/go/pkg/mod/github.com/go-logr/zapr@v0.1.1/zapr.go:128
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.2/pkg/internal/controller/controller.go:237
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.2/pkg/internal/controller/controller.go:209
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).worker
	/go/pkg/mod/sigs.k8s.io/controller-runtime@v0.6.2/pkg/internal/controller/controller.go:188
k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
	/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:155
k8s.io/apimachinery/pkg/util/wait.BackoffUntil
	/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:156
k8s.io/apimachinery/pkg/util/wait.JitterUntil
	/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:133
k8s.io/apimachinery/pkg/util/wait.Until
	/go/pkg/mod/k8s.io/apimachinery@v0.18.8/pkg/util/wait/wait.go:90

I deployed a scaled object as follows.

Scenario 1 without TriggerAuthentication

triggers:
- type: kafka
  metadata:
    bootstrapServers: kafka.svc:9092
    consumerGroup: my-group
    topic: test-topic
    lagThreshold: '5'

Scenario 2 with TriggerAuthentication

triggers:
- type: kafka
  metadata:
    bootstrapServers: kafka.svc:9092
    consumerGroup: my-group
    topic: test-topic
    lagThreshold: '5'   
 authenticationRef:
      name: keda-trigger-auth-kafka-credential

apiVersion: v1
kind: Secret
metadata:
  name: keda-kafka-secrets
  namespace: keda
data:
  sasl: "plaintext"
  username: "My confluent kafka API key"
  password: "My confluent kafka API key secret"

apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
  name: keda-trigger-auth-kafka-credential
  namespace: keda
spec:
  secretTargetRef:
  - parameter: sasl
    name: keda-kafka-secrets
    key: sasl
  - parameter: username
    name: keda-kafka-secrets
    key: username
  - parameter: password
    name: keda-kafka-secrets
    key: password

Expected Behavior

After deploying the scaled object, it should get the hpa.

Actual Behavior

ERROR controllers.ScaledObject Failed to ensure HPA is correctly created for ScaledObject {"ScaledObject.Namespace": "keda", "ScaledObject.Name": "kafka-scaledobject", "error": "error getting scaler for trigger #0: error creating kafka client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"}

Steps to Reproduce the Problem

  1. Helm version is 3.3.x, Installed KEDA from https://github.com/kedacore/keda/releases/tag/v2.0.0-beta
  2. Deployed ScaledObject as mentioned above
  3. Checked the logs of KEDA operator and it errors out as mentioned above in the issue

Specifications

  • KEDA Version: 2.0.0-beta
  • Platform & Version: Azure Kubernetes Service
  • Kubernetes Version: 1.17.9
  • Scaler(s): Apache Kafka targeting Confluent Kafka

The API version on Scaledobject and TriggerAuthentication is keda.sh/v1alpha1

Scaledobjectsandtriggerauthentications.txt

@harshik9 harshik9 added the bug Something isn't working label Oct 12, 2020
@tomkerkhove
Copy link
Member

I'm not a Confluent Kafka expert, does it run inside your cluster or do they manage it? From the looks of it, the address (kafka.svc:9092) points to a local service (which is also the one from our docs), is that correct?

@harshik9
Copy link
Author

harshik9 commented Oct 13, 2020

Our confluent Kafka does not run on kubernetes cluster. We are using the Confluent Kafka bootstrap servers(pkc-xxxxx.eastus2.azure.confluent.cloud:9092). We did try by replacing the address but still the same error

@zroubalik
Copy link
Member

@harshik9 okay, so that's the value that should be in bootstrapServers section.

@harshik9
Copy link
Author

@zroubalik, yes. We tried by using that value in the bootstrapServers. What should be the sasl value when connecting to confluent Kafka.

@domminiks
Copy link

Same issue here running a Kafka cluster locally, alongside with Keda installed on an on-premise K8s cluster. As a test, I created a Pod and was able to curl Kakfa topics from inside that Pod on my K8s. Apparently, Kafka cluster is reachable from inside my K8s cluster (initially I thought that that was the problem) but it feels like it is Keda that cannot reach Kafka endpoints.

Currently running Kafka without authentication, using Keda v2 and K8s 1.18!

@zroubalik
Copy link
Member

zroubalik commented Oct 13, 2020

Same issue here running a Kafka cluster locally, alongside with Keda installed on an on-premise K8s cluster. As a test, I created a Pod and was able to curl Kakfa topics from inside that Pod on my K8s. Apparently, Kafka cluster is reachable from inside my K8s cluster (initially I thought that that was the problem) but it feels like it is Keda that cannot reach Kafka endpoints.

The scenario you described here above, is Kafka with or without authentication?

@domminiks
Copy link

Same issue here running a Kafka cluster locally, alongside with Keda installed on an on-premise K8s cluster. As a test, I created a Pod and was able to curl Kakfa topics from inside that Pod on my K8s. Apparently, Kafka cluster is reachable from inside my K8s cluster (initially I thought that that was the problem) but it feels like it is Keda that cannot reach Kafka endpoints.

The scenario you described here above, is Kafka with or without authentication?

Without authentication.

@harshik9
Copy link
Author

I was able to connect to confluent kafka with Authentication by going down on the KEDA version to 1.5. https://keda.sh/docs/1.5/scalers/apache-kafka/
The authmode that I used is sasl_ssl_plain. I deployed all the files from here --> https://github.com/kedacore/keda/releases/tag/v1.5.0 from the deploy folder.

@zroubalik
Copy link
Member

Hm that's strange, unfortunately I am not able to reproduce that. I am using in-cluster Kafka (instaled via Strimzi) and it is working (the same for our e2e tests that are passing).

Is there anything non standard in your Kafka config?

@zroubalik
Copy link
Member

zroubalik commented Oct 19, 2020

I was able to connect to confluent kafka with Authentication by going down on the KEDA version to 1.5. https://keda.sh/docs/1.5/scalers/apache-kafka/
The authmode that I used is sasl_ssl_plain. I deployed all the files from here --> https://github.com/kedacore/keda/releases/tag/v1.5.0 from the deploy folder.

@harshik9 for that you used the same configuration of ScaledObject and TriggerAuthentication as mentioned above?

@harshik9
Copy link
Author

@zroubalik Our confluent Kafka is on azure. When we used the KEDA 2.0 BETA version. The Authentication parameters were sasl, Username and password. With sasl= plaintext, we were unable to connect but when I went down on the KEDA version 1.5, I had authentication parameters as authMode, username and password. The authMode that i gave was sasl_ssl_plain and this worked for me. I used the same scaledObject as in here --> https://keda.sh/docs/1.5/scalers/apache-kafka/ . But there was no necessity of providing a ca, cert and key. Username was our azure confluent kafka API key and password was our Confluent kafka API key secret.

@zroubalik
Copy link
Member

I belive the issue was fixed by #1288 please reopen if the problem still persists

@silasfrigo
Copy link

We are having the same issue over here. Can someone send some thoughts on how to fix it?
keda logs show the following:

{
   "ScaledObject.Namespace":"channels-integrations",
   "ScaledObject.Name":"ch-int-mercadolivre-orders-inbound-scaledobject",
   "error":"error getting scaler for trigger #0: error creating kafka client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"
}

And ScaledObject status show 'Unknown' state.
image

keda version 2.4

apiVersion: v1
kind: Secret
metadata:
  name: keda-kafka-secrets
  namespace: my-namespace
data:
  sasl: "plaintext as b64"
  username: "username as b64"
  password: "password as b64"
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
  name: keda-trigger-auth-kafka-credential
  namespace: my-namespace
spec:
  secretTargetRef:
  - parameter: sasl
    name: keda-kafka-secrets
    key: sasl
  - parameter: username
    name: keda-kafka-secrets
    key: username
  - parameter: password
    name: keda-kafka-secrets
    key: password
---
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: my-scaled-object-name
  namespace: my-namespace
spec:
  scaleTargetRef:
    name: my-deployment-name
  cooldownPeriod: 30
  minReplicaCount: 0
  maxReplicaCount: 3
  pollingInterval: 10
  triggers:
  - type: kafka
    metadata:
      metricName: my-metric-name
      namespace: my-namespace
      bootstrapServers: our-broker-address.confluent.cloud:9092
      consumerGroup: consumer_group
      topic: my_topic
      lagThreshold: "10"
      offsetResetPolicy: earliest
    authenticationRef:
      name: keda-trigger-auth-kafka-credential

@harshitak27
Copy link

I was also facing the same issue with newer version of KEDA 2.4.

#2125

@zroubalik
Copy link
Member

@harshitak27 following approach might help you: #2101 (comment)

Let us know and we can probably document this if it solves your issue.

@harshitak27
Copy link

@zroubalik Thank you for the solution. So here's what I changed in the scaledobject.

  1. I added the parameter "tls" and set the value as "enable". This was not required with the older versions (1.4)
  2. The value for sasl parameter was set as "plaintext"

Except for the above changes and few template changes, rest all remains same.

Also, wanted to know what does "ACTIVE" and "FALLBACK" mean? This shows up once we try to list the Scaledobjects once we deploy them.

Thanks again @zroubalik

@zroubalik
Copy link
Member

Glad to hear that it works for you.

Active shows whether scaling is active - eg. some traffic in the trigger. Wrt Fallback you can read it in the docs: https://keda.sh/docs/2.4/concepts/scaling-deployments/

@tomkerkhove
Copy link
Member

@harshitak27 Is there something that can be improved in the docs of our scaler / FAQ that could have helped you?

Otherwise I think we can close this issue.

@sergioarmgpl
Copy link

It worked for me for confluent cloud

@danielloader
Copy link

Just a question: is the broker hostname relative to the namespace the scaled object is in or the KEDA controller?

I'm having to put the full hostname of Kafka.kafkanamespace.svc.cluster.local:9092 to get connectivity despite the scaled object and broker being in the same namespace. I'd have expected Kafka:9092 to have worked like the documentation.

@edubois10
Copy link

edubois10 commented Oct 11, 2023

Hm that's strange, unfortunately I am not able to reproduce that. I am using in-cluster Kafka (instaled via Strimzi) and it is working (the same for our e2e tests that are passing).

Is there anything non standard in your Kafka config?

@zroubalik I am having the same issue with my kafka cluster deployed with Strimzi. I am not sure what I am doing wrong. I use the keda version 2.9.1. I want to authenticate using "scram_sha512".

I have the following CRs:

kind: Secret
apiVersion: v1
metadata:
  name: keda-kafka-secrets
  namespace: demo
data:
  sasl: c2NyYW1fc2hhNTEy
  ca: >-
    LS0tLS1CR ... QxM2dyQkg=
  username: <demo base64> # org.apache.kafka.common.security.scram.ScramLoginModule required username="demo" password="demo;
  password: <demo base64>
type: Opaque
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: kafka-scaledobject
  namespace: demo
spec:
  advanced:
    restoreToOriginalReplicaCount: true
    horizontalPodAutoscalerConfig:
      behavior:
        scaleDown:
          stabilizationWindowSeconds: 30
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: consumer-demo
  minReplicaCount: 0
  maxReplicaCount: 10
  pollingInterval: 5
  cooldownPeriod:  10
  triggers:
  - type: kafka
    metadata:
      bootstrapServers: dev-kafka-bootstrap-kafka.apps...com:443
      consumerGroup: demo
      topic: demo
      # Optional
      lagThreshold: 2
  authenticationRef:
      name: keda-trigger-auth-kafka-credential
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
  name: keda-trigger-auth-kafka-credential
  namespace: demo 
spec:
  secretTargetRef:
  - parameter: sasl
    name: keda-kafka-secrets
    key: sasl
  - parameter: username
    name: keda-kafka-secrets
    key: username
  - parameter: password
    name: keda-kafka-secrets
    key: password
  - parameter: ca
    name: keda-kafka-secrets
    key: ca

Did you configure something specific for strimzi? or am I missing a parameter for scram_sha512?

@zroubalik
Copy link
Member

What do you see in KEDA operator logs?

@edubois10
Copy link

edubois10 commented Oct 11, 2023

I get the following error message: 2023-10-11T13:14:40Z ERROR scalehandler Error getting scalers {"object": {"apiVersion": "keda.sh/v1alpha1", "kind": "ScaledObject", "namespace": "demo", "name": "kafka-scaledobject"}, "error": "error creating kafka client: kafka: client has run out of available brokers to talk to: EOF"}

I currently removed tls for my internal listener, but it doesn't change anything. As soon as I specify scram_sha512 for sasl I get this error message.

When you test with Strimzi, which kafka version did you use? Did you have authentication set? Also did you specify a specific version in the ScaledObject metadata?

@edubois10
Copy link

Ok I manage to make it work with scram_sha512 and tls. I will post my CRs and I hope it will help some people:

ScaledObject

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: kafka-scaledobject
  namespace: demo
spec:
  advanced:
    horizontalPodAutoscalerConfig:
      behavior:
        scaleDown:
          stabilizationWindowSeconds: 30
    restoreToOriginalReplicaCount: true
  cooldownPeriod: 10
  maxReplicaCount: 10
  minReplicaCount: 1
  pollingInterval: 5
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: consumer-demo
  triggers:
    - authenticationRef:
        kind: TriggerAuthentication
        name: keda-trigger-auth-kafka-credential
      metadata:
        bootstrapServers: 'dev-kafka-bootstrap-kafka.apps.cluster-....com:443'
        consumerGroup: demo
        lagThreshold: '5'
        topic: demo
      type: kafka

TriggerAuthentication

apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
  name: keda-trigger-auth-kafka-credential
  namespace: demo
spec:
  secretTargetRef:
    - key: username
      name: keda-kafka-secrets
      parameter: username
    - key: password
      name: keda-kafka-secrets
      parameter: password
    - key: sasl
      name: keda-kafka-secrets
      parameter: sasl
    - key: tls
      name: keda-kafka-secrets
      parameter: tls
    - key: ca
      name: keda-kafka-secrets
      parameter: ca

Secret

kind: Secret
apiVersion: v1
metadata:
  name: keda-kafka-secrets
  namespace: demo
data:
  ca: >-
    LS0tLS1CRUd ... hZUG9nVXBEeGE5WnhYCktLS0=
  password: T1VMekkd ... 2dyQkg=
  sasl: c2NyYW1fc2hhNTEy
  tls: ZW5hYmxl
  username: aGVsd ... w==
type: Opaque

@zroubalik
Copy link
Member

@edubois10 thanks for sharing. So the problem was in missing tls setting, right?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working scaler-kafka-topic
Projects
None yet
Development

No branches or pull requests

9 participants