Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to correct disconnect multiple consumers? #66

Closed
al66 opened this issue May 29, 2018 · 5 comments
Closed

How to correct disconnect multiple consumers? #66

al66 opened this issue May 29, 2018 · 5 comments
Labels

Comments

@al66
Copy link

al66 commented May 29, 2018

I have some trouble with disconnecting multiple consumers I have started with different groupId's.
Hopefully you can help me.

This is my code for disconnecting...

/**
 * Service stopped lifecycle event handler
 */
async stopped() {

    await Promise.all(this.subscriptions.map(async (subscription) => {
        try {
            await subscription.consumer.pause( [ { topic: subscription.topic } ]);
            this.logger.info(`Consumer for subscription ${subscription.id} paused`);
        } catch(err) {
            this.logger.warn(`Pause consumer for subscription ${subscription.id} failed`);
        }
        Promise.resolve();
    }));
    await Promise.all(this.subscriptions.map(async (subscription) => {
        try {
            await subscription.consumer.disconnect();
            this.logger.info(`Consumer for subscription ${subscription.id} disconnected`);
        } catch(err) {
            this.logger.warn(`Disconnecting consumer for subscription ${subscription.id} failed`);
        }
        Promise.resolve();
    }));
    this.logger.info(`All consumers disconnected`);

}
`
I'm getting the following log:
`
  console.log lib\flow.subscriber.js:139
    ConsumerGroup INFO Pausing fetching from 1 topics { timestamp: '2018-05-29T19:10:29.954Z',
      logger: 'kafkajs',
      message: 'Pausing fetching from 1 topics',
      topics: [ 'events' ] }

  console.log lib\flow.subscriber.js:139
    ConsumerGroup INFO Pausing fetching from 1 topics { timestamp: '2018-05-29T19:10:29.959Z',
      logger: 'kafkajs',
      message: 'Pausing fetching from 1 topics',
      topics: [ 'events' ] }
  console.log lib\flow.subscriber.js:139
    Runner DEBUG stop consumer group { timestamp: '2018-05-29T19:10:29.965Z',
      logger: 'kafkajs',
      message: 'stop consumer group',
      groupId: 'f0c54a7b-1cbd-4248-8471-ae788c3566db',
      memberId: 'flow.subscriber1527621028637-9bef2c45-b84f-40f5-b670-c8b687240b51' }

  console.log lib\flow.subscriber.js:139
    Runner DEBUG stop consumer group { timestamp: '2018-05-29T19:10:29.981Z',
      logger: 'kafkajs',
      message: 'stop consumer group',
      groupId: '6bb861e2-e250-4f79-af08-3e9acecd2d45',
      memberId: 'flow.subscriber1527621028637-80fc7d53-6b5f-471c-b9de-504d750e8804' }

  console.log lib\flow.subscriber.js:139
    Connection DEBUG Request LeaveGroup(key: 13, version: 0) { timestamp: '2018-05-29T19:10:29.983Z',
      logger: 'kafkajs',
      message: 'Request LeaveGroup(key: 13, version: 0)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      correlationId: 12,
      size: 147 }

  console.log lib\flow.subscriber.js:139
    Connection DEBUG Request LeaveGroup(key: 13, version: 0) { timestamp: '2018-05-29T19:10:29.985Z',
      logger: 'kafkajs',
      message: 'Request LeaveGroup(key: 13, version: 0)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      correlationId: 12,
      size: 147 }

  console.info node_modules\moleculer\src\logger.js:112
    [2018-05-29T19:10:29.987Z] INFO  nbtpt510-al-7796/FLOW.PUBLISHER: Producer disconnectied

  console.log lib\flow.subscriber.js:139
    Connection DEBUG Response Fetch(key: 1, version: 2) { timestamp: '2018-05-29T19:10:34.452Z',
      logger: 'kafkajs',
      message: 'Response Fetch(key: 1, version: 2)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      correlationId: 11,
      size: 42,
      data: '[filtered]' }

  console.log lib\flow.subscriber.js:139
    Connection DEBUG Request Heartbeat(key: 12, version: 0) { timestamp: '2018-05-29T19:10:34.481Z',
      logger: 'kafkajs',
      message: 'Request Heartbeat(key: 12, version: 0)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      correlationId: 13,
      size: 151 }

  console.log lib\flow.subscriber.js:139
    Connection DEBUG Response LeaveGroup(key: 13, version: 0) { timestamp: '2018-05-29T19:10:34.487Z',
      logger: 'kafkajs',
      message: 'Response LeaveGroup(key: 13, version: 0)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      correlationId: 12,
      size: 6,
      data: { errorCode: 0 } }

  console.log lib\flow.subscriber.js:139
    Consumer DEBUG consumer has stopped, disconnecting { timestamp: '2018-05-29T19:10:34.491Z',
      logger: 'kafkajs',
      message: 'consumer has stopped, disconnecting',
      groupId: 'f0c54a7b-1cbd-4248-8471-ae788c3566db' }

  console.log lib\flow.subscriber.js:139
    Connection DEBUG disconnecting... { timestamp: '2018-05-29T19:10:34.495Z',
      logger: 'kafkajs',
      message: 'disconnecting...',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637' }

  console.log lib\flow.subscriber.js:139
    Connection DEBUG disconnected { timestamp: '2018-05-29T19:10:34.499Z',
      logger: 'kafkajs',
      message: 'disconnected',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637' }

  console.log lib\flow.subscriber.js:139
    Consumer INFO Stopped { timestamp: '2018-05-29T19:10:34.500Z',
      logger: 'kafkajs',
      message: 'Stopped',
      groupId: 'f0c54a7b-1cbd-4248-8471-ae788c3566db' }

  console.info node_modules\moleculer\src\logger.js:112
    [2018-05-29T19:10:34.502Z] INFO  nbtpt510-al-7796/FLOW.SUBSCRIBER: Consumer for subscription f0c54a7b-1cbd-4248-8471-ae788c3566db disconnected

  console.log lib\flow.subscriber.js:139
    Connection DEBUG Response Fetch(key: 1, version: 2) { timestamp: '2018-05-29T19:10:34.503Z',
      logger: 'kafkajs',
      message: 'Response Fetch(key: 1, version: 2)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      correlationId: 11,
      size: 42,
      data: '[filtered]' }

  console.log lib\flow.subscriber.js:139
    Connection DEBUG Request Heartbeat(key: 12, version: 0) { timestamp: '2018-05-29T19:10:34.505Z',
      logger: 'kafkajs',
      message: 'Request Heartbeat(key: 12, version: 0)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      correlationId: 13,
      size: 151 }
`
now somthing went wrong...

`
  console.log lib\flow.subscriber.js:139
    Connection ERROR Response Heartbeat(key: 12, version: 0) { timestamp: '2018-05-29T19:10:34.508Z',
      logger: 'kafkajs',
      message: 'Response Heartbeat(key: 12, version: 0)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      error: 'The coordinator is not aware of this member',
      correlationId: 13,
      size: 6 }

  console.log lib\flow.subscriber.js:139
    Connection DEBUG Response Heartbeat(key: 12, version: 0) { timestamp: '2018-05-29T19:10:34.511Z',
      logger: 'kafkajs',
      message: 'Response Heartbeat(key: 12, version: 0)',
      broker: '192.168.2.124:9092',
      clientId: 'flow.subscriber1527621028637',
      error: 'The coordinator is not aware of this member',
      correlationId: 13,
      payload: <Buffer 00 19> }

  console.log lib\flow.subscriber.js:139
    Runner ERROR The coordinator is not aware of this member, re-joining the group { timestamp: '2018-05-29T19:10:34.515Z',
      logger: 'kafkajs',
      message: 'The coordinator is not aware of this member, re-joining the group',
      groupId: 'f0c54a7b-1cbd-4248-8471-ae788c3566db',
      memberId: 'flow.subscriber1527621028637-9bef2c45-b84f-40f5-b670-c8b687240b51',
      error: 'The coordinator is not aware of this member',
      retryCount: 0,
      retryTime: 51 }
`
@tulios
Copy link
Owner

tulios commented May 29, 2018

Hi @al66, you can call disconnect, it will wait for your consumers to finish the current set of messages and then disconnect. You don't need pause for that, example:

const consumers = [...]
await Promise.all(consumers)
console.log('disconnected')

@al66
Copy link
Author

al66 commented May 30, 2018

Hi @tulios . That was my first try.
But with the same result.

I think the reason is, that the cluster is also disconnected in the first call.
Would it possible to add a method "stop", which doesn't disconnect the cluster directly?

  /**
   * @return {Promise}
   */
  const disconnect = async () => {
    try {
      if (runner) {
        await runner.stop()
        logger.debug('consumer has stopped, disconnecting', { groupId })
      }
      await cluster.disconnect()
    } catch (e) {}
    logger.info('Stopped', { groupId })
}

Here is my original coding:

	/**
	 * Service stopped lifecycle event handler
	 */
	async stopped() {
        
        await Promise.all(this.subscriptions.map(async (subscription) => {
            try {
                await subscription.consumer.disconnect();
                this.logger.info(`Consumer for subscription ${subscription.id} stopped`);

            } catch(err) {
                this.logger.warn(`Stopping consumer for subscription ${subscription.id} failed`);
            }
            Promise.resolve();
        }));
        this.logger.info(`All consumers disconnected`);

    }

@tulios
Copy link
Owner

tulios commented May 30, 2018

You are right, I think we usually subscribe to different topics instead of creating several consumers. I'll create an issue to add the stop method. In the meantime you can create a new client for each consumer (new Kafka(...))

@tulios
Copy link
Owner

tulios commented May 30, 2018

You can follow issue #67

@tulios
Copy link
Owner

tulios commented Jun 2, 2018

This wasn't necessary, please check issue #67 (just to document)

@tulios tulios closed this as completed Jun 2, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants