Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer group not consuming due to multiple k6 init #37

Closed
timkersch opened this issue Mar 24, 2022 · 7 comments
Closed

Consumer group not consuming due to multiple k6 init #37

timkersch opened this issue Mar 24, 2022 · 7 comments
Labels
📖 Documentation Improvements or additions to documentation

Comments

@timkersch
Copy link

When using the consumer group feature to subscribe to all partitions in a topic, I got some very strange behaviour where some runs of the code did consume messages and some runs were just stuck and did not get any messages back.

After a lot of debugging I found that k6 runs the init code (code outside of methods) multiple times: https://community.k6.io/t/why-init-called-4-times/973

This caused multiple consumer group instances to be created, which in turn meant that partitions were distributed between them. Thus, sometimes the consumer group created during the last init contained the partitions where my data was, and sometimes it did not contain the right partitions as they were assigned to consumers created in a previous run of init.

I solved this by using a UUID when naming my consumer group, thereby guaranteeing that my consumer group object was assigned to all partitions in a topic.

Although this is not per-se a bug in xk6-kafka, when I followed the documentation here there was no mention of this and the examples all used consumers directly in the init code.
I would suggest to either make this clear in the documentation of somehow alter the code to avoid this problem.

@mostafa
Copy link
Owner

mostafa commented Mar 24, 2022

Hey @timkersch,

Thanks for spotting the issue! Could you make a PR with an example script and changes to the README with your experience? It would be much appreciated. 🙏

@timkersch
Copy link
Author

Sure thing, will do it in the next days! :)

@mostafa mostafa added the 📖 Documentation Improvements or additions to documentation label Apr 22, 2022
@mostafa
Copy link
Owner

mostafa commented Aug 12, 2022

Addressed in 20859bc.

@mostafa mostafa closed this as completed Aug 12, 2022
@nj-apps
Copy link

nj-apps commented Mar 20, 2023

Hello Mostafa,
I am trying to consume a topic with a consumer group but I am facing some problems:

  • If I set the consumer group with a const string (ie: const consumer_group="my_group") then only one VU consumes a partition.
  • If I add a uuid in the consumer group name (ie const consumer_group="my_group-"+uuidv4()) then the const consumer_group is different for each VU. In this case all VUs are consuming messages but each VU is attached to its own group and consumes all partitions... (instead of being attached to the same consumer group and consuming one partition per VU.).
    Can you please explain me how to use UUID to make consumer groups usage working?
    Thank you very much.
    Regards

@mostafa
Copy link
Owner

mostafa commented Mar 24, 2023

Hey @nj-apps,

Can you provide me with an example script for both cases, so that it can help me reproduce the issue? And how did you figure out that a single VU is consume a single partition in a consumer group?

@nj-apps
Copy link

nj-apps commented Mar 27, 2023

Hi Mostafa,
I am using the "UI for Apache Kafka" to visualize how consumers are consuming topics. When the consumer group name doesn't use an uuid we can notice that there is one different VU per partition but only some partitions are consumed. In the following example only 3 are effectly consumed :
Capture d’écran du 2023-03-27 11-07-25

If I add a uuid to the consumer group name, then it creates as many groups as there are pre allocated VUs :

image

If we have a look to these consumer groups we can see that the same VU is consuming all partitions :
Capture d’écran du 2023-03-27 11-11-33

Exemple of script :

import * as kafka from "k6/x/kafka";
import { Writer, Reader, Connection, SchemaRegistry, SCHEMA_TYPE_STRING, TLS_1_2 } from "k6/x/kafka";
import { SharedArray } from 'k6/data';
import {uuidv4, randomString} from "../k6-utils/index.js";
import {b64encode} from "k6/encoding";

const BOOTSTRAP=["my_broker"];
const TOPIC="bench-sd-performance-testing_test-topic-k6-rueil";

// --->> change the type of consumer group here :
// STATIC consumer group 
const consumer_group="consumers-group";
// of consumer group with UUID
// const consumer_group="consumers-group-"+uuidv4();

// TLS config
const tlsConfig = {
  enableTls: true,
  insecureSkipTlsVerify: false,
  minVersion: TLS_1_2,
  clientCertPem: "scripts/cert/client.cer",
  clientKeyPem: "scripts/cert/client.key",
  serverCaPem: "scripts/cert/ca.pem",
};

// Creates a new Writer object to produce messages to Kafka
const writer = new Writer({
    // WriterConfig object
    brokers: BOOTSTRAP,
    topic: TOPIC,
    tls: tlsConfig,
    autoCreateTopic: false,
    requiredAcks: 0,
});

const reader = new Reader({
    brokers: BOOTSTRAP,
    tls: tlsConfig,
    groupID: consumer_group, // ID du consumer group
    groupTopics: [TOPIC],
});

const connection = new Connection({ 
    // ConnectionConfig object
    address: BOOTSTRAP[0],
    tls: tlsConfig,
});


const NB_MESSAGES=100;
const PAYLOAD=1000;
const USECASES=1;
function generate_messages() { 
        const data_scenario = new Array(USECASES);
        for(var u=0; u<USECASES; u++) {
                data_scenario[u] = new Array(NB_MESSAGES);
                for(var i=0; i<NB_MESSAGES; i++) {
                        data_scenario[u][i]={
                                key:  b64encode( randomString(10)),
                                value: b64encode(randomString(PAYLOAD)),
                        };
                };
        };
        return data_scenario; // must be an array                                                                                                  
}

// generate dataset
const data = new SharedArray('messages', generate_messages);

if (__VU == 0) {
    // Create a topic on initialization (before producing messages)
    connection.createTopic({
        topic: TOPIC,
        numPartitions: 12,
        replicationFactor: 4,
    });
}

export const options = {

  scenarios: {
    step_fill_topic: {
      exec: 'producer',
      startTime: '0s',
      duration: '1m',
      executor: 'constant-arrival-rate',
      rate: 10,
      preAllocatedVUs: 20,
      maxVUs: 60,
    },


   step_consume: {
      exec: 'consumer',
      startTime: '5s',
      duration: '1m',
      executor: 'constant-arrival-rate',
      rate: 100,
      preAllocatedVUs: 6,
      maxVUs: 6,
   },

 },
};



export function producer() {

   const msg=data[0];
   writer.produce({ messages: msg });

}


export function consumer() {

    let messages = reader.consume({
        limit: 10
    });

}


export function teardown(data) {
    // Delete the topic
    connection.deleteTopic(TOPIC);

    // Close all connections
    writer.close();
    reader.close();
    connection.close();
}

Do not hesitate if you need more details.
Thanks

Repository owner deleted a comment from nj-apps Jun 30, 2023
Repository owner deleted a comment from nj-apps Jun 30, 2023
Repository owner deleted a comment from nj-apps Jun 30, 2023
@mostafa
Copy link
Owner

mostafa commented Jun 30, 2023

Hey @nj-apps,

do you still have this issue?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
📖 Documentation Improvements or additions to documentation
Projects
Status: Release
Development

No branches or pull requests

3 participants