Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xk6-kafka supports SASL_SSL authentication to confluent cloud? #56

Closed
fdahunsibread opened this issue May 9, 2022 · 22 comments
Closed
Labels
🐛 Bug Something isn't working ❓ Question Further information is requested

Comments

@fdahunsibread
Copy link

fdahunsibread commented May 9, 2022

I'm connecting to a kafka topic on confluent with this config details.

ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule

I'm getting the error
There was an error fetching messages: could not successfully authenticate to pkc-ep9mm.us-east-2.aws.confluent.cloud:9092 with SASL: SASL handshake failed: EOF

Test Script

import { check } from 'k6';
import { reader, consume } from 'k6/x/kafka'; // import kafka extension

const bootstrapServers = ['subdomain.us-east-2.aws.confluent.cloud:9092'];
const kafkaTopic = 'topicName';

const auth = JSON.stringify({
  username: 'apiclient',
  password: 'apiSecret',
  algorithm: 'plain'
});

const offset = 0;
// partition and groupID are mutually exclusive
const partition = '';
const groupID = '';

const consumer = reader(bootstrapServers, kafkaTopic, partition, groupID, offset, auth);

export default function () {
  // Read 1 message only
  let messages = consume(consumer, 1);
  check(messages, {
    '1 message returned': (msgs) => msgs.length == 1
  });
}

export function teardown(data) {
  consumer.close();
}

Does xk6-kafka not support SASL_SSL or am I missing something in my test script?

@fdahunsibread
Copy link
Author

fdahunsibread commented May 9, 2022

Response from @mostafa
It does, if you provide extra options here:

xk6-kafka/auth.go

Lines 24 to 31 in 0f4a3b8

type Credentials struct {
Username string `json:"username"`
Password string `json:"password"`
Algorithm string `json:"algorithm"`
ClientCertPem string `json:"clientCertPem"`
ClientKeyPem string `json:"clientKeyPem"`
ServerCaPem string `json:"serverCaPem"`
}

The options go here:

const auth = JSON.stringify({
username: "username",
password: "password",
algorithm: "plain",
});

@mostafa mostafa added the ❓ Question Further information is requested label May 9, 2022
@thanapat-sk
Copy link

@fdahunsibread hi friend, i recommend to you use xk6-kafka@0.8.0 cuz of last commit change something about sasl_ssl and not still stable.
you can download it from release at v0.8.0.

@mostafa
Copy link
Owner

mostafa commented May 9, 2022

@thanapat-sk What is your experience on this? Have you run into issues? I'd be happy to know more.

@thanapat-sk
Copy link

@mostafa hello, i have to same issue about cannot connect confluence kafka. but i solved by revision to 0.8.0

@mostafa
Copy link
Owner

mostafa commented May 10, 2022

@thanapat-sk Can you please test the new version and tell me your experience, as I am actively adding new features and fixing bugs, and I need feedback to make these possible. I need to find the regression, but my fixes don't work so far. I can't reproduce the issue you are facing as long as I don't have access to sample code and terminal output, and so on.

@mostafa
Copy link
Owner

mostafa commented May 10, 2022

@fdahunsibread How was your experience? Have you faced any issues, bugs, regressions, or anything?

@fdahunsibread
Copy link
Author

@mostafa I haven't been able to get the ssl certificates yet. @thanapat-sk Could you please share how you're connecting on 0.8.0?

@mostafa
Copy link
Owner

mostafa commented May 21, 2022

@fdahunsibread @thanapat-sk
Could you please test the latest main by building it from source with xk6?

@anjosanap
Copy link

@fdahunsibread @thanapat-sk Could you please test the latest main by building it from source with xk6?

Hi guys, I tried to use the latest version (my script is practically the same as fdahunsibread) and received this error message:
msg="kafka.(*Writer).WriteMessages: cannot create a kafka writer with a nil address"

So I updated my script passing the producer like this:
const [producer, _writerError] = writer(bootstrapServers, kafkaTopic, auth);

And now I received an error to connect to my kafka topic:
ERROR read tcp XX->XX: read: connection reset by peer

Then I found this issue and tried to downgrade to version 0.8.0 and my tests ran successfully (And I used my producer in the same way as before).
const producer = writer(bootstrapServers, kafkaTopic, auth);

@mostafa
Copy link
Owner

mostafa commented Jun 4, 2022

@anjosanap Thanks for your feedback! 🙏
I'm going to release a version soon, and in that release, the APIs are changed. So, please update your scripts after the release and file an issue with the details of the bug so that I can test and reproduce the bug. And please provide as much information about your Kafka and your script as possible.

@Momotoculteur
Copy link

Momotoculteur commented Jun 8, 2022

Hi guys,
i got a similar issue as @anjosanap

I'm trying to send data with your extension to an Azure Event Hub via sasl ssl plain authentication. With your v0.8.0 it's working, but got issue with v0.10.0

My script v0.8.0:
https://gist.github.com/Momotoculteur/f905195f5034d52e657e1230ca0d0401
My script v0.10.0
https://gist.github.com/Momotoculteur/253f4bc3373ca171bda5183e771cbfde

Even if i follow your changes in your new version i got this error :
ERRO[0002] kafka.(*Writer).WriteMessages: cannot create a kafka writer with a nil address error="kafka.(*Writer).WriteMessages: cannot create a kafka writer with a nil address"
or weird :
ERRO[0001] unexpected EOF error="unexpected EOF"

I will then stay with this older version :(

@mostafa
Copy link
Owner

mostafa commented Jun 8, 2022

@Momotoculteur @anjosanap @thanapat-sk @fdahunsibread
I think I found the problem. Previously the code used to enable TLS with a default config, that is, &tls.Config{}. Now it requires certificates and keys to be provided, which causes the TLS to be disabled. I'll fix this.

@mostafa
Copy link
Owner

mostafa commented Jun 8, 2022

I suppose this is the flowchart of what we want to achieve:

graph LR;
    A[Credentials]-- Yes -->B[Algorithm];
    A-- No -->C[TLS];
    C-- Yes -->D[Unauthenticated dialer with default TLS config];
    C-- No -->E[Unauthenticated dialer without TLS config];
    B-- Yes -->F[Certificate and keys];
    B-- No -->D;
    F-- Yes -->G[Authenticated dialer with TLS];
    F-- No -->D;

Supported algorithms are:

  • SASL/PLAIN
  • SASL/SCRAM
  • SASL/SSL (WIP)

Unsupported ones are:

  • GSSAPI (Kerberos)
  • LDAP
  • OAuth bearer token
  • JAAS
  • Basic Auth
  • mTLS

Ref:

@mostafa
Copy link
Owner

mostafa commented Jun 10, 2022

Update:
I am still working on this to fix the authentication + TLS issue. I refactored the auth.go file and will test it properly against an account I created a Confluent Cloud.

mostafa added a commit that referenced this issue Jun 13, 2022
* Fix SASL and TLS issues reported in #56 and #84
* Replace dead status code
* Fix error message
* Exclude gosec G402 error
* Update JS API docs
* Add API docs with typedoc (#87)
@mostafa
Copy link
Owner

mostafa commented Jun 13, 2022

I tried my best to fix it on #86. I merged the code to the main branch, so feel free to test it by building from source.

@Momotoculteur
Copy link

Hey @mostafa, thanks for your quick fix !

I'm beginner with golang env & k6 tool ; but it still not work for me. I'm trying to following your example script, i got this error:

Goja stack:
native
ERRO[0000] panic: runtime error: invalid memory address or nil pointer dereference
goroutine 16 [running]:
runtime/debug.Stack()
	runtime/debug/stack.go:24 +0x68
go.k6.io/k6/js/common.RunWithPanicCatching.func1()
	go.k6.io/k6@v0.38.3/js/common/util.go:101 +0x150
panic({0x1055b1aa0, 0x1060cfce0})
	runtime/panic.go:838 +0x204
github.com/dop251/goja.AssertFunction.func1.1()
	github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/runtime.go:2300 +0x84
panic({0x1055b1aa0, 0x1060cfce0})
	runtime/panic.go:838 +0x204
github.com/dop251/goja.(*vm).try.func1()
	github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/vm.go:537 +0x590
panic({0x1055b1aa0, 0x1060cfce0})
	runtime/panic.go:838 +0x204
github.com/segmentio/kafka-go.(*Writer).stats(0x5000?)
	github.com/segmentio/kafka-go@v0.4.31/writer.go:832 +0x1c
github.com/segmentio/kafka-go.(*Writer).Stats(_)
	github.com/segmentio/kafka-go@v0.4.31/writer.go:850 +0x3c
github.com/mostafa/xk6-kafka.(*Kafka).produceInternal(0x14001dc52c0, 0x10471d0d4?, {0x14000fd6090, 0x6, 0x14000fe28c8?}, {{{0x0, 0x0}, {0x0, 0x0}}, {{0x0, ...}, ...}, ...}, ...)
	github.com/mostafa/xk6-kafka@v0.10.0/producer.go:144 +0x7b8
github.com/mostafa/xk6-kafka.(*Kafka).Produce(0x14000f33620?, 0x14000f50000?, {0x14000fd6090?, 0x10?, 0x14000fe2968?}, {0x0?, 0x8?}, {0x0?, 0x9?})
	github.com/mostafa/xk6-kafka@v0.10.0/producer.go:59 +0x40
reflect.Value.call({0x1056b24a0?, 0x14001dc52c0?, 0x10661cf18?}, {0x10518488a, 0x4}, {0x140005ef200, 0x4, 0x0?})
	reflect/value.go:556 +0x5e4
reflect.Value.Call({0x1056b24a0?, 0x14001dc52c0?, 0x14000f9ecc0?}, {0x140005ef200, 0x4, 0x4})
	reflect/value.go:339 +0x98
github.com/dop251/goja.(*Runtime).wrapReflectFunc.func1({{0x10573d340, 0x106121420}, {0x14000fb40c0, 0x2, 0x26}})
	github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/runtime.go:1886 +0x150
github.com/dop251/goja.(*vm)._nativeCall(0x14000494000, 0x14000fbb540, 0x2)
	github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/vm.go:2727 +0x1f8
github.com/dop251/goja.call.exec(0x494000?, 0x14000494000)
	github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/vm.go:2699 +0x92c
github.com/dop251/goja.(*vm).run(0x14000494000)
	github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/vm.go:408 +0x9c
github.com/dop251/goja.(*baseJsFuncObject)._call(0x14000d940b0, {{0x10573d340, 0x106121420}, {0x140010fc210, 0x1, 0x1}}, {0x0?, 0x0}, {0x10573d340?, 0x106121420?})
	github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/func.go:193 +0x394
github.com/dop251/goja.(*baseJsFuncObject).call(...)
	github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/func.go:203
github.com/dop251/goja.(*baseJsFuncObject).Call(0x140005f7548?, {{0x10573d340, 0x106121420}, {0x140010fc210, 0x1, 0x1}})
	github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/func.go:156 +0xb4
github.com/dop251/goja.AssertFunction.func1.2()
	github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/runtime.go:2305 +0x6c
github.com/dop251/goja.(*vm).try(0x140005f76a8?, 0x10470d590?)
	github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/vm.go:545 +0x108
github.com/dop251/goja.AssertFunction.func1({0x10573d340?, 0x106121420?}, {0x140010fc210?, 0x1?, 0x1?})
	github.com/dop251/goja@v0.0.0-20220405120441-9037c2b61cbf/runtime.go:2304 +0xb8
go.k6.io/k6/js.(*VU).runFn.func1.1()
	go.k6.io/k6@v0.38.3/js/runner.go:800 +0x54
go.k6.io/k6/js/eventloop.(*EventLoop).Start(0x14000d800f0, 0x140005e2780)
	go.k6.io/k6@v0.38.3/js/eventloop/eventloop.go:112 +0x134
go.k6.io/k6/js.(*VU).runFn.func1()
	go.k6.io/k6@v0.38.3/js/runner.go:799 +0xe8
go.k6.io/k6/js/common.RunWithPanicCatching({0x10573dc98?, 0x14000313b20?}, 0x140005f7948?, 0x1046cfc00?)
	go.k6.io/k6@v0.38.3/js/common/util.go:105 +0x78
go.k6.io/k6/js.(*VU).runFn(0x14000dbf220, {0x105734f28, 0x14000efa2c0}, 0xe0?, 0x14000b5e4b0, 0x140010fc200, {0x140010fc210, 0x1, 0x1})
	go.k6.io/k6@v0.38.3/js/runner.go:798 +0x208
go.k6.io/k6/js.(*ActiveVU).RunOnce(0x14000efa280)
	go.k6.io/k6@v0.38.3/js/runner.go:750 +0x36c
go.k6.io/k6/lib/executor.getIterationRunner.func1({0x105734fd0, 0x14000ead830}, {0x10572b9f8?, 0x14000efa280?})
	go.k6.io/k6@v0.38.3/lib/executor/helpers.go:145 +0x50
go.k6.io/k6/lib/executor.ConstantVUs.Run.func3({0x105730c40, 0x14000dbf220})
	go.k6.io/k6@v0.38.3/lib/executor/constant_vus.go:206 +0x2c4
created by go.k6.io/k6/lib/executor.ConstantVUs.Run
	go.k6.io/k6@v0.38.3/lib/executor/constant_vus.go:217 +0x700

Goja stack:
native
ERRO[0000] Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character 'o' looking for beginning of value 2})  error="Unable to unmarshal credentials, OriginalError: %!w(*json.SyntaxError=&{invalid character 'o' looking for beginning of value 2})"
ERRO[0003] TypeError: Cannot read property 'close' of undefined or null
running at teardown (file:///Users/momotoculteur/Github/nightly-tests/ground-load-testing/src/tests/event-processor/event-processor-beta.js:47:4(3))
default at native  hint="script exception"

I'm creating my credentials like this :

const credentials = {
    username: "$ConnectionString",
    password: getEventhubServerAdress(),
};

Obvioulsy the getEventhubServerAdress return a string like "Endpoint=.....". And as i said before it's working like a charm with your version in 0.8 :( Am i wrong ?

Have a good day,
Bastien

@anjosanap
Copy link

I had the same error here. I followed the same steps that I did to run on fix-sasl-and-tls-config branch.

But when I try to run my test, I receive the same error message that was reported by @Momotoculteur here

I used the same script that I used to validate on fix-sasl-and-tls-config branch (which the tests had passed).

import { check } from "k6";
import { writer, produce, reader, consume, createTopic, deleteTopic, listTopics } from "k6/x/kafka"; // import kafka extension

const bootstrapServers = ["localhost:9092"];
const kafkaTopic = "mytopic";

const saslConfig = {
username: "USERNAME",
password: "PASSWORD",
algorithm: "sasl_plain",
};

const tlsConfig = {
enableTLS: true,
insecureSkipTLSVerify: false,
minVersion: "TLSv1.2",
};

const [producer, _writerError] = writer(bootstrapServers, kafkaTopic, saslConfig, tlsConfig);

Versions:
K6 = latest (v0.38.3)
xK6 = latest (v.10.0)
Go = I try with 1.18.3 and 1.18.1

@mostafa
Copy link
Owner

mostafa commented Jun 14, 2022

I know it is a bit unconventional, but the main branch is the development branch, unless tagged with a version. The latest tagged version is two commits behind. This is why @latest means v0.10.0, and it doesn't include the latest commits.

I'll update the README to mention this approach.

@mostafa
Copy link
Owner

mostafa commented Jun 14, 2022

FYI: https://github.com/mostafa/xk6-kafka#the-release-process

@anjosanap
Copy link

My bad 😅 hahahah following these steps I still can run my tests successfully. No news here ✅

@mostafa
Copy link
Owner

mostafa commented Jun 16, 2022

I suppose this issue is resolved, as tested by @anjosanap and myself. Feel free to re-open the issue if it persists.

@mostafa mostafa closed this as completed Jun 16, 2022
@mostafa
Copy link
Owner

mostafa commented Jun 20, 2022

Released in v0.11.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🐛 Bug Something isn't working ❓ Question Further information is requested
Projects
Status: Release
Development

No branches or pull requests

5 participants