Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to SASL authenticate protocol #229

Merged
merged 10 commits into from
Dec 5, 2018
4 changes: 3 additions & 1 deletion examples/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ const run = async () => {
})
}

run().catch(e => console.error(`[example/consumer] ${e.message}`, e))
run()
.catch(e => console.error(`[example/consumer] ${e.message}`, e))
.then(() => consumer.disconnect())

const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']
Expand Down
4 changes: 3 additions & 1 deletion examples/producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ const run = async () => {
setInterval(sendMessage, 3000)
}

run().catch(console.error)
run()
.catch(e => console.error(`[example/producer] ${e.message}`, e))
.then(() => producer.disconnect())

const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']
Expand Down
55 changes: 55 additions & 0 deletions src/broker/__tests__/connect.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const {
saslSCRAM256ConnectionOpts,
saslSCRAM512ConnectionOpts,
newLogger,
testIfKafka_1_1_0,
} = require('testHelpers')

const Broker = require('../index')
Expand Down Expand Up @@ -124,4 +125,58 @@ describe('Broker > connect', () => {
expect(broker.isConnected()).toEqual(true)
})
})

describe('when SaslAuthenticate protocol is available', () => {
testIfKafka_1_1_0('authenticate with SASL PLAIN if configured', async () => {
broker = new Broker({
connection: createConnection(saslConnectionOpts()),
logger: newLogger(),
})
expect(broker.authenticated).toEqual(false)
await broker.connect()
expect(broker.authenticated).toEqual(true)
expect(broker.supportAuthenticationProtocol).toEqual(true)
})

testIfKafka_1_1_0('authenticate with SASL SCRAM 256 if configured', async () => {
broker = new Broker({
connection: createConnection(saslSCRAM256ConnectionOpts()),
logger: newLogger(),
})
expect(broker.authenticated).toEqual(false)
await broker.connect()
expect(broker.authenticated).toEqual(true)
expect(broker.supportAuthenticationProtocol).toEqual(true)
})

testIfKafka_1_1_0('authenticate with SASL SCRAM 512 if configured', async () => {
broker = new Broker({
connection: createConnection(saslSCRAM512ConnectionOpts()),
logger: newLogger(),
})
expect(broker.authenticated).toEqual(false)
await broker.connect()
expect(broker.authenticated).toEqual(true)
expect(broker.supportAuthenticationProtocol).toEqual(true)
})

testIfKafka_1_1_0('parallel calls to connect using SCRAM', async () => {
broker = new Broker({
connection: createConnection(saslSCRAM256ConnectionOpts()),
logger: newLogger(),
})

expect(broker.authenticated).toEqual(false)

await Promise.all([
broker.connect(),
broker.connect(),
broker.connect(),
broker.connect(),
broker.connect(),
])

expect(broker.authenticated).toEqual(true)
})
})
})
31 changes: 28 additions & 3 deletions src/broker/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const SASLAuthenticator = require('./saslAuthenticator')
* @param {boolean} [allowAutoTopicCreation=true] If this and the broker config 'auto.create.topics.enable'
* are true, topics that don't exist will be created when
* fetching metadata.
* @param {boolean} [supportAuthenticationProtocol=null] If the server supports the SASLAuthenticate protocol
*/
module.exports = class Broker {
constructor({
Expand All @@ -27,22 +28,25 @@ module.exports = class Broker {
versions = null,
authenticationTimeout = 1000,
allowAutoTopicCreation = true,
supportAuthenticationProtocol = null,
}) {
this.connection = connection
this.nodeId = nodeId
this.rootLogger = logger
this.logger = logger.namespace('Broker')
this.versions = versions
this.allowExperimentalV011 = allowExperimentalV011
this.authenticationTimeout = authenticationTimeout
this.allowAutoTopicCreation = allowAutoTopicCreation
this.supportAuthenticationProtocol = supportAuthenticationProtocol
this.authenticated = false

const lockTimeout = this.connection.connectionTimeout + this.authenticationTimeout
const brokerAddress = `${this.connection.host}:${this.connection.port}`
this.brokerAddress = `${this.connection.host}:${this.connection.port}`

this.lock = new Lock({
timeout: lockTimeout,
description: `connect to broker ${brokerAddress}`,
description: `connect to broker ${this.brokerAddress}`,
})

this.lookupRequest = () => {
Expand Down Expand Up @@ -80,8 +84,29 @@ module.exports = class Broker {

this.lookupRequest = lookup(this.versions, this.allowExperimentalV011)

if (this.supportAuthenticationProtocol === null) {
try {
this.lookupRequest(apiKeys.SaslAuthenticate, requests.SaslAuthenticate)
this.supportAuthenticationProtocol = true
} catch (_) {
this.supportAuthenticationProtocol = false
}

this.logger.debug(`Verified support for SaslAuthenticate`, {
broker: this.brokerAddress,
supportAuthenticationProtocol: this.supportAuthenticationProtocol,
})
}

if (!this.authenticated && this.connection.sasl) {
await new SASLAuthenticator(this.connection, this.rootLogger, this.versions).authenticate()
const authenticator = new SASLAuthenticator(
this.connection,
this.rootLogger,
this.versions,
this.supportAuthenticationProtocol
)

await authenticator.authenticate()
this.authenticated = true
}
} finally {
Expand Down
30 changes: 27 additions & 3 deletions src/broker/saslAuthenticator/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,15 @@ const AUTHENTICATORS = {
const SUPPORTED_MECHANISMS = Object.keys(AUTHENTICATORS)

module.exports = class SASLAuthenticator {
constructor(connection, logger, versions) {
constructor(connection, logger, versions, supportAuthenticationProtocol) {
this.connection = connection
this.logger = logger
this.saslHandshake = lookup(versions)(apiKeys.SaslHandshake, requests.SaslHandshake)

const lookupRequest = lookup(versions)
this.saslHandshake = lookupRequest(apiKeys.SaslHandshake, requests.SaslHandshake)
this.protocolAuthentication = supportAuthenticationProtocol
? lookupRequest(apiKeys.SaslAuthenticate, requests.SaslAuthenticate)
: null
}

async authenticate() {
Expand All @@ -35,7 +40,26 @@ module.exports = class SASLAuthenticator {
)
}

const saslAuthenticate = async ({ request, response, authExpectResponse }) => {
if (this.protocolAuthentication) {
const { buffer: requestAuthBytes } = await request.encode()
const authResponse = await this.connection.send(
this.protocolAuthentication({ authBytes: requestAuthBytes })
)

if (!authExpectResponse) {
return
}

const { authBytes: responseAuthBytes } = authResponse
const payloadDecoded = await response.decode(responseAuthBytes)
return response.parse(payloadDecoded)
}

return this.connection.authenticate({ request, response, authExpectResponse })
}

const Authenticator = AUTHENTICATORS[mechanism]
await new Authenticator(this.connection, this.logger).authenticate()
await new Authenticator(this.connection, this.logger, saslAuthenticate).authenticate()
}
}
5 changes: 3 additions & 2 deletions src/broker/saslAuthenticator/plain.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ const plain = require('../../protocol/sasl/plain')
const { KafkaJSSASLAuthenticationError } = require('../../errors')

module.exports = class PlainAuthenticator {
constructor(connection, logger) {
constructor(connection, logger, saslAuthenticate) {
this.connection = connection
this.logger = logger.namespace('SASLPlainAuthenticator')
this.saslAuthenticate = saslAuthenticate
}

async authenticate() {
Expand All @@ -20,7 +21,7 @@ module.exports = class PlainAuthenticator {

try {
this.logger.debug('Authenticate with SASL PLAIN', { broker })
await this.connection.authenticate({ request, response })
await this.saslAuthenticate({ request, response })
this.logger.debug('SASL PLAIN authentication successful', { broker })
} catch (e) {
const error = new KafkaJSSASLAuthenticationError(
Expand Down
8 changes: 5 additions & 3 deletions src/broker/saslAuthenticator/scram.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,13 @@ class SCRAM {
/**
* @param {Connection} connection
* @param {Logger} logger
* @param {Function} saslAuthenticate
* @param {DigestDefinition} digestDefinition
*/
constructor(connection, logger, digestDefinition) {
constructor(connection, logger, saslAuthenticate, digestDefinition) {
this.connection = connection
this.logger = logger
this.saslAuthenticate = saslAuthenticate
this.digestDefinition = digestDefinition

const digestType = digestDefinition.type.toUpperCase()
Expand Down Expand Up @@ -166,7 +168,7 @@ class SCRAM {
const request = scram.firstMessage.request({ clientFirstMessage })
const response = scram.firstMessage.response

return this.connection.authenticate({
return this.saslAuthenticate({
authExpectResponse: true,
request,
response,
Expand Down Expand Up @@ -199,7 +201,7 @@ class SCRAM {
const request = scram.finalMessage.request({ finalMessage })
const response = scram.finalMessage.response

return this.connection.authenticate({
return this.saslAuthenticate({
authExpectResponse: true,
request,
response,
Expand Down
10 changes: 6 additions & 4 deletions src/broker/saslAuthenticator/scram.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,35 @@ const { newLogger } = require('testHelpers')
const SCRAM256 = require('./scram256')

describe('Broker > SASL Authenticator > SCRAM', () => {
let connection, logger
let connection, saslAuthenticate, logger

beforeEach(() => {
connection = {
authenticate: jest.fn(),
sasl: { username: 'user', password: 'pencil' },
}
saslAuthenticate = ({ request, response, authExpectResponse }) =>
connection.authenticate({ request, response, authExpectResponse })

logger = { debug: jest.fn() }
logger.namespace = () => logger
})

it('throws KafkaJSSASLAuthenticationError for invalid username', async () => {
const scram = new SCRAM256({ sasl: {} }, newLogger())
const scram = new SCRAM256({ sasl: {} }, newLogger(), saslAuthenticate)
await expect(scram.authenticate()).rejects.toThrow('Invalid username or password')
})

it('throws KafkaJSSASLAuthenticationError for invalid password', async () => {
const scram = new SCRAM256({ sasl: { username: '<username>' } }, newLogger())
const scram = new SCRAM256({ sasl: { username: '<username>' } }, newLogger(), saslAuthenticate)
await expect(scram.authenticate()).rejects.toThrow('Invalid username or password')
})

describe('SCRAM 256', () => {
let scram

beforeEach(() => {
scram = new SCRAM256(connection, logger)
scram = new SCRAM256(connection, logger, saslAuthenticate)
})

test('saltPassword', async () => {
Expand Down
4 changes: 2 additions & 2 deletions src/broker/saslAuthenticator/scram256.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const { SCRAM, DIGESTS } = require('./scram')

module.exports = class SCRAM256Authenticator extends SCRAM {
constructor(connection, logger) {
super(connection, logger.namespace('SCRAM256Authenticator'), DIGESTS.SHA256)
constructor(connection, logger, saslAuthenticate) {
super(connection, logger.namespace('SCRAM256Authenticator'), saslAuthenticate, DIGESTS.SHA256)
}
}
4 changes: 2 additions & 2 deletions src/broker/saslAuthenticator/scram512.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const { SCRAM, DIGESTS } = require('./scram')

module.exports = class SCRAM512Authenticator extends SCRAM {
constructor(connection, logger) {
super(connection, logger.namespace('SCRAM512Authenticator'), DIGESTS.SHA512)
constructor(connection, logger, saslAuthenticate) {
super(connection, logger.namespace('SCRAM512Authenticator'), saslAuthenticate, DIGESTS.SHA512)
}
}
4 changes: 4 additions & 0 deletions src/cluster/brokerPool.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ module.exports = class BrokerPool {
this.metadata = null
this.metadataExpireAt = null
this.versions = null
this.supportAuthenticationProtocol = null
}

/**
Expand Down Expand Up @@ -98,9 +99,11 @@ module.exports = class BrokerPool {
async disconnect() {
await this.seedBroker.disconnect()
await Promise.all(values(this.brokers).map(broker => broker.disconnect()))

this.brokers = {}
this.metadata = null
this.versions = null
this.supportAuthenticationProtocol = null
}

/**
Expand Down Expand Up @@ -133,6 +136,7 @@ module.exports = class BrokerPool {
[nodeId]: this.createBroker({
logger: this.rootLogger,
versions: this.versions,
supportAuthenticationProtocol: this.supportAuthenticationProtocol,
connection: this.connectionBuilder.build({ host, port, rack }),
nodeId,
}),
Expand Down
9 changes: 9 additions & 0 deletions src/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ class KafkaJSStaleTopicMetadataAssignment extends KafkaJSError {
}
}

class KafkaJSServerDoesNotSupportApiKey extends KafkaJSNonRetriableError {
constructor(e, { apiKey, apiName } = {}) {
super(e)
this.apiKey = apiKey
this.apiName = apiName
}
}

class KafkaJSBrokerNotFound extends KafkaJSError {}
class KafkaJSPartialMessageError extends KafkaJSNonRetriableError {}
class KafkaJSSASLAuthenticationError extends KafkaJSNonRetriableError {}
Expand All @@ -100,4 +108,5 @@ module.exports = {
KafkaJSStaleTopicMetadataAssignment,
KafkaJSTimeout,
KafkaJSLockTimeout,
KafkaJSServerDoesNotSupportApiKey,
}
Loading