Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added listGroup method to admin interface #645

Merged
merged 6 commits into from
Feb 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/Admin.md
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,29 @@ Example response:
throttleTime: 0,
}
```

## <a name="list-groups"></a> List groups

List groups available on the broker.

```javascript
await admin.listGroups()
```

Example:

```javascript
const { ResourceTypes } = require('kafkajs')

await admin.listGroups()
```

Example response:

```javascript
{
groups: [
{groupId: 'testgroup', protocolType: 'consumer'}
]
}
```
73 changes: 73 additions & 0 deletions src/admin/__tests__/listGroups.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
const createAdmin = require('../index')
const createConsumer = require('../../consumer')
const createProducer = require('../../producer')

const {
createCluster,
newLogger,
createTopic,
secureRandom,
createModPartitioner,
waitForConsumerToJoinGroup,
waitForMessages,
} = require('testHelpers')

describe('Admin', () => {
let admin, topicName, groupId, cluster, consumer, producer

beforeEach(async () => {
topicName = `test-topic-${secureRandom()}`
groupId = `consumer-group-id-${secureRandom()}`

cluster = createCluster()
admin = createAdmin({ cluster: cluster, logger: newLogger() })
consumer = createConsumer({
cluster,
groupId,
maxWaitTimeInMs: 100,
logger: newLogger(),
})

producer = createProducer({
cluster,
createPartitioner: createModPartitioner,
logger: newLogger(),
})

await Promise.all([admin.connect(), consumer.connect(), producer.connect()])
})

afterEach(async () => {
// Checking that they exist first, in case the test is skipped or failed before instantiating the admin/consumer
admin && (await admin.disconnect())
consumer && (await consumer.disconnect())
producer && (await producer.disconnect())
})

describe('listGroups', () => {
test('list groups', async () => {
await createTopic({ topic: topicName })

const messagesConsumed = []
dfilkovi marked this conversation as resolved.
Show resolved Hide resolved
await consumer.subscribe({ topic: topicName, fromBeginning: true })
consumer.run({ eachMessage: async event => messagesConsumed.push(event) })
await waitForConsumerToJoinGroup(consumer)

const messages = Array(1)
.fill()
.map(() => {
const value = secureRandom()
return { key: `key-${value}`, value: `value-${value}` }
})

await producer.send({ acks: 1, topic: topicName, messages })
await waitForMessages(messagesConsumed, { number: messages.length })

const listGroupResponse = await admin.listGroups()

expect(listGroupResponse.groups).toEqual(
expect.arrayContaining([expect.objectContaining({ groupId })])
)
})
})
})
26 changes: 26 additions & 0 deletions src/admin/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,31 @@ module.exports = ({
}
}

/**
* List groups in a broker
*
* @return {Promise<ListGroups>}
*
* @typedef {Object} ListGroups
* @property {Array<ListGroup>} groups
*
* @typedef {Object} ListGroup
* @property {string} groupId
* @property {string} protocolType
*/
const listGroups = async () => {
let groups = []
for (var nodeId in cluster.brokerPool.brokers) {
await cluster.refreshMetadata()

const broker = await cluster.findBroker({ nodeId })
const response = await broker.listGroups()
groups = groups.concat(response.groups)
}

return { groups }
}

/**
* @param {string} eventName
* @param {Function} listener
Expand Down Expand Up @@ -637,5 +662,6 @@ module.exports = ({
alterConfigs,
on,
logger: getLogger,
listGroups,
}
}
10 changes: 10 additions & 0 deletions src/broker/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,16 @@ module.exports = class Broker {
)
}

/**
* Send request for list of groups
* @public
* @returns {Promise}
*/
async listGroups() {
const listGroups = this.lookupRequest(apiKeys.ListGroups, requests.ListGroups)
return await this.connection.send(listGroups())
}

/***
* @private
*/
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/requests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const requests = {
LeaveGroup: require('./leaveGroup'),
SyncGroup: require('./syncGroup'),
DescribeGroups: require('./describeGroups'),
ListGroups: {},
ListGroups: require('./listGroups'),
SaslHandshake: require('./saslHandshake'),
ApiVersions: require('./apiVersions'),
CreateTopics: require('./createTopics'),
Expand Down
27 changes: 27 additions & 0 deletions src/protocol/requests/listGroups/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
const versions = {
0: () => {
const request = require('./v0/request')
const response = require('./v0/response')
return { request: request(), response }
},
1: () => {
const request = require('./v1/request')
const response = require('./v1/response')
return { request: request(), response }
},
2: () => {
const request = require('./v2/request')
const response = require('./v2/response')
return { request: request(), response }
},
3: () => {
const request = require('./v3/request')
const response = require('./v3/response')
return { request: request(), response }
},
}

module.exports = {
versions: Object.keys(versions),
protocol: ({ version }) => versions[version],
}
17 changes: 17 additions & 0 deletions src/protocol/requests/listGroups/v0/request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
const Encoder = require('../../../encoder')
const { ListGroups: apiKey } = require('../../apiKeys')

/**
* ListGroups Request (Version: 0)
*/

/**
*/
module.exports = () => ({
apiKey,
apiVersion: 0,
apiName: 'ListGroups',
encode: async () => {
return new Encoder()
},
})
40 changes: 40 additions & 0 deletions src/protocol/requests/listGroups/v0/response.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
const Decoder = require('../../../decoder')
const { failure, createErrorFromCode } = require('../../../error')

/**
* ListGroups Response (Version: 0) => error_code [groups]
* error_code => INT16
* groups => group_id protocol_type
* group_id => STRING
* protocol_type => STRING
*/

const decodeGroup = decoder => ({
groupId: decoder.readString(),
protocolType: decoder.readString(),
})

const decode = async rawData => {
const decoder = new Decoder(rawData)
const errorCode = decoder.readInt16()
const groups = decoder.readArray(decodeGroup)

return {
errorCode,
groups,
}
}

const parse = async data => {
if (failure(data.errorCode)) {
throw createErrorFromCode(data.errorCode)
}

return data
}

module.exports = {
decodeGroup,
decode,
parse,
}
7 changes: 7 additions & 0 deletions src/protocol/requests/listGroups/v1/request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
const requestV0 = require('../v0/request')

/**
* ListGroups Request (Version: 1)
*/

module.exports = () => Object.assign(requestV0(), { apiVersion: 1 })
30 changes: 30 additions & 0 deletions src/protocol/requests/listGroups/v1/response.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
const responseV0 = require('../v0/response')

const Decoder = require('../../../decoder')

/**
* ListGroups Response (Version: 1) => error_code [groups]
* throttle_time_ms => INT32
* error_code => INT16
* groups => group_id protocol_type
* group_id => STRING
* protocol_type => STRING
*/

const decode = async rawData => {
const decoder = new Decoder(rawData)
const throttleTime = decoder.readInt32()
const errorCode = decoder.readInt16()
const groups = decoder.readArray(responseV0.decodeGroup)

return {
throttleTime,
errorCode,
groups,
}
}

module.exports = {
decode,
parse: responseV0.parse,
}
7 changes: 7 additions & 0 deletions src/protocol/requests/listGroups/v2/request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
const requestV1 = require('../v1/request')

/**
* ListGroups Request (Version: 2)
*/

module.exports = () => Object.assign(requestV1(), { apiVersion: 2 })
12 changes: 12 additions & 0 deletions src/protocol/requests/listGroups/v2/response.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
const responseV1 = require('../v1/response')

/**
* ListGroups Response (Version: 2) => error_code [groups]
* throttle_time_ms => INT32
* error_code => INT16
* groups => group_id protocol_type
* group_id => STRING
* protocol_type => STRING
*/

module.exports = responseV1
7 changes: 7 additions & 0 deletions src/protocol/requests/listGroups/v3/request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
const requestV2 = require('../v2/request')

/**
* ListGroups Request (Version: 3)
*/

module.exports = () => Object.assign(requestV2(), { apiVersion: 3 })
34 changes: 34 additions & 0 deletions src/protocol/requests/listGroups/v3/response.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
const responseV2 = require('../v2/response')
const Decoder = require('../../../decoder')

/**
* ListGroups Response (Version: 3) => throttle_time_ms error_code [groups] TAG_BUFFER
* throttle_time_ms => INT32
* error_code => INT16
* groups => group_id protocol_type TAG_BUFFER
Nevon marked this conversation as resolved.
Show resolved Hide resolved
* group_id => COMPACT_STRING
* protocol_type => COMPACT_STRING
*/

const decodeGroup = decoder => ({
groupId: decoder.readVarIntString(),
protocolType: decoder.readVarIntString(),
})

const decode = async rawData => {
const decoder = new Decoder(rawData)
const throttleTime = decoder.readInt32()
const errorCode = decoder.readInt16()
const groups = decoder.readArray(decodeGroup)

return {
throttleTime,
errorCode,
groups,
}
}

module.exports = {
decode,
parse: responseV2.parse,
}