Skip to content

Commit

Permalink
Merge pull request #1385 from dgyimesi/fix/topic-authorization-failed
Browse files Browse the repository at this point in the history
Revert topic when metadata fetching failes due to authorization
  • Loading branch information
Nevon committed Jun 27, 2022
2 parents 67ffe26 + 87abb46 commit b1365b8
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 4 deletions.
61 changes: 58 additions & 3 deletions src/admin/__tests__/createAcls.spec.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const createAdmin = require('../index')
const createProducer = require('../../producer/index')

const {
secureRandom,
Expand All @@ -13,7 +14,7 @@ const ACL_OPERATION_TYPES = require('../../protocol/aclOperationTypes')
const ACL_PERMISSION_TYPES = require('../../protocol/aclPermissionTypes')
const RESOURCE_PATTERN_TYPES = require('../../protocol/resourcePatternTypes')

const createSASLAdminClientForUser = ({ username, password }) => {
const createSASLClientForUser = createClient => ({ username, password }) => {
const saslConnectionOpts = () => {
return Object.assign(sslConnectionOpts(), {
port: 9094,
Expand All @@ -25,7 +26,7 @@ const createSASLAdminClientForUser = ({ username, password }) => {
})
}

const admin = createAdmin({
const client = createClient({
logger: newLogger(),
cluster: createCluster(
{
Expand All @@ -36,9 +37,12 @@ const createSASLAdminClientForUser = ({ username, password }) => {
),
})

return admin
return client
}

const createSASLAdminClientForUser = createSASLClientForUser(createAdmin)
const createSASLProducerClientForUser = createSASLClientForUser(createProducer)

describe('Admin', () => {
let admin

Expand Down Expand Up @@ -247,5 +251,56 @@ describe('Admin', () => {

await expect(admin.fetchTopicMetadata({ topics: [topicName] })).resolves.toBeTruthy()
})

test('can produce to allowed topic after failing to produce to not-allowed topic', async () => {
const allowedTopic = `allowed-${secureRandom()}`
const notAllowedTopic = `disallowed-${secureRandom()}`

admin = createSASLAdminClientForUser({ username: 'test', password: 'testtest' })

await admin.connect()
await admin.createTopics({
waitForLeaders: true,
topics: [allowedTopic, notAllowedTopic].map(topic => ({ topic, numPartitions: 1 })),
})
await admin.createAcls({
acl: [
{
resourceType: ACL_RESOURCE_TYPES.TOPIC,
resourceName: notAllowedTopic,
resourcePatternType: RESOURCE_PATTERN_TYPES.LITERAL,
principal: 'User:bob',
host: '*',
operation: ACL_OPERATION_TYPES.WRITE,
permissionType: ACL_PERMISSION_TYPES.DENY,
},
{
resourceType: ACL_RESOURCE_TYPES.TOPIC,
resourceName: allowedTopic,
resourcePatternType: RESOURCE_PATTERN_TYPES.LITERAL,
principal: 'User:bob',
host: '*',
operation: ACL_OPERATION_TYPES.WRITE,
permissionType: ACL_PERMISSION_TYPES.ALLOW,
},
],
})

await admin.disconnect()
const producer = createSASLProducerClientForUser({ username: 'bob', password: 'bobbob' })
await producer.connect()

await expect(
producer.send({ topic: allowedTopic, messages: [{ value: 'hello' }] })
).resolves.not.toBeUndefined()
await expect(
producer.send({ topic: notAllowedTopic, messages: [{ value: 'whoops' }] })
).rejects.not.toBeUndefined()
await expect(
producer.send({ topic: allowedTopic, messages: [{ value: 'world' }] })
).resolves.not.toBeUndefined()

await producer.disconnect()
})
})
})
6 changes: 5 additions & 1 deletion src/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,11 @@ module.exports = class Cluster {
try {
await this.refreshMetadata()
} catch (e) {
if (e.type === 'INVALID_TOPIC_EXCEPTION' || e.type === 'UNKNOWN_TOPIC_OR_PARTITION') {
if (
e.type === 'INVALID_TOPIC_EXCEPTION' ||
e.type === 'UNKNOWN_TOPIC_OR_PARTITION' ||
e.type === 'TOPIC_AUTHORIZATION_FAILED'
) {
this.targetTopics = previousTopics
}

Expand Down

0 comments on commit b1365b8

Please sign in to comment.