From c316c292664f5340c4f0da5d5793ab45d03ab422 Mon Sep 17 00:00:00 2001 From: Mark Gaylard Date: Fri, 6 May 2022 16:21:07 +1000 Subject: [PATCH 01/14] first cut at pluggable auth --- src/broker/saslAuthenticator/index.js | 27 ++++++++++++++++++++------- types/index.d.ts | 26 +++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/src/broker/saslAuthenticator/index.js b/src/broker/saslAuthenticator/index.js index 9d79d13e7..4ce5a4294 100644 --- a/src/broker/saslAuthenticator/index.js +++ b/src/broker/saslAuthenticator/index.js @@ -33,11 +33,11 @@ module.exports = class SASLAuthenticator { async authenticate() { const mechanism = this.connection.sasl.mechanism.toUpperCase() - if (!SUPPORTED_MECHANISMS.includes(mechanism)) { - throw new KafkaJSSASLAuthenticationError( - `SASL ${mechanism} mechanism is not supported by the client` - ) - } + // if (!SUPPORTED_MECHANISMS.includes(mechanism)) { + // throw new KafkaJSSASLAuthenticationError( + // `SASL ${mechanism} mechanism is not supported by the client` + // ) + // } const handshake = await this.connection.send(this.saslHandshake({ mechanism })) if (!handshake.enabledMechanisms.includes(mechanism)) { @@ -69,7 +69,20 @@ module.exports = class SASLAuthenticator { return this.connection.sendAuthRequest({ request, response, authExpectResponse }) } - const Authenticator = AUTHENTICATORS[mechanism] - await new Authenticator(this.connection, this.logger, saslAuthenticate).authenticate() + if (SUPPORTED_MECHANISMS.includes(mechanism)) { + const Authenticator = AUTHENTICATORS[mechanism] + await new Authenticator(this.connection, this.logger, saslAuthenticate).authenticate() + } else { + await this.connection.sasl + .authenticationProvider( + { + host: this.connection.host, + port: this.connection.port, + }, + this.logger.namespace(`SaslAuthenticator-${mechanism}`), + saslAuthenticate + ) + .authenticate() + } } } diff --git a/types/index.d.ts b/types/index.d.ts index 17c4fde90..9d42b3996 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -16,10 +16,34 @@ export class Kafka { export type BrokersFunction = () => string[] | Promise +type SaslAuthenticationRequest = { + encode: () => Buffer | Promise +} +type SaslAuthenticationResponse = { + decode: (rawResponse: Buffer) => Buffer | Promise + parse: (data: Buffer) => ParseResult +} + +export type Authenticator = { + authenticate: () => Promise +} + +export type Mechanism = { + mechanism: string + authenticationProvider: ( + connection: { host: string; port: number }, + logger: Logger, + saslAuthenticate: ( + request: SaslAuthenticationRequest, + response?: SaslAuthenticationResponse + ) => Promise + ) => Authenticator +} + export interface KafkaConfig { brokers: string[] | BrokersFunction ssl?: tls.ConnectionOptions | boolean - sasl?: SASLOptions + sasl?: SASLOptions | Mechanism clientId?: string connectionTimeout?: number authenticationTimeout?: number From 2307ddfdad6a60e05edd5e631c6f4af9b7cdd939 Mon Sep 17 00:00:00 2001 From: Mark Gaylard Date: Wed, 25 May 2022 19:48:33 +1000 Subject: [PATCH 02/14] remove commented code --- src/broker/saslAuthenticator/index.js | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/broker/saslAuthenticator/index.js b/src/broker/saslAuthenticator/index.js index 4ce5a4294..61b39ea0b 100644 --- a/src/broker/saslAuthenticator/index.js +++ b/src/broker/saslAuthenticator/index.js @@ -33,12 +33,6 @@ module.exports = class SASLAuthenticator { async authenticate() { const mechanism = this.connection.sasl.mechanism.toUpperCase() - // if (!SUPPORTED_MECHANISMS.includes(mechanism)) { - // throw new KafkaJSSASLAuthenticationError( - // `SASL ${mechanism} mechanism is not supported by the client` - // ) - // } - const handshake = await this.connection.send(this.saslHandshake({ mechanism })) if (!handshake.enabledMechanisms.includes(mechanism)) { throw new KafkaJSSASLAuthenticationError( From cd3bbfcd80e0631d84a0f35256608138092cbbcb Mon Sep 17 00:00:00 2001 From: Mark Gaylard Date: Wed, 25 May 2022 19:48:52 +1000 Subject: [PATCH 03/14] add unsupported mechanism test --- src/broker/__tests__/connect.spec.js | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/broker/__tests__/connect.spec.js b/src/broker/__tests__/connect.spec.js index 07b014d50..0c14d4d98 100644 --- a/src/broker/__tests__/connect.spec.js +++ b/src/broker/__tests__/connect.spec.js @@ -2,6 +2,7 @@ const { createConnectionPool, connectionOpts, saslSCRAM256ConnectionOpts, + sslConnectionOpts, newLogger, testIfKafkaAtLeast_1_1_0, describeIfOauthbearerDisabled, @@ -34,6 +35,29 @@ describe('Broker > connect', () => { expect(broker.versions).toBeTruthy() }) + test("throws if the mechanism isn't supported by the server", async () => { + broker = new Broker({ + connectionPool: createConnectionPool( + Object.assign(sslConnectionOpts(), { + port: 9094, + sasl: { + mechanism: 'fake-mechanism', + authenticationProvider: () => ({ + authenticate: async () => { + throw new Error('🥸') + }, + }), + }, + }) + ), + logger: newLogger(), + }) + + await expect(broker.connect()).rejects.toThrow( + 'The broker does not support the requested SASL mechanism' + ) + }) + for (const e of saslEntries) { test(`authenticate with SASL ${e.name} if configured`, async () => { broker = new Broker({ From 533db2f155ceb6fcdee39e87acc67e51782b1e68 Mon Sep 17 00:00:00 2001 From: Mark Gaylard Date: Thu, 2 Jun 2022 11:39:05 +1000 Subject: [PATCH 04/14] Fix return type of request.encode to actually be a Buffer --- src/broker/saslAuthenticator/index.js | 2 +- src/broker/saslAuthenticator/scram.spec.js | 16 ++++++++-------- src/protocol/sasl/awsIam/request.js | 2 +- src/protocol/sasl/oauthBearer/request.js | 2 +- src/protocol/sasl/plain/request.js | 2 +- src/protocol/sasl/scram/finalMessage/request.js | 2 +- src/protocol/sasl/scram/firstMessage/request.js | 2 +- 7 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/broker/saslAuthenticator/index.js b/src/broker/saslAuthenticator/index.js index 61b39ea0b..c362d6e55 100644 --- a/src/broker/saslAuthenticator/index.js +++ b/src/broker/saslAuthenticator/index.js @@ -42,7 +42,7 @@ module.exports = class SASLAuthenticator { const saslAuthenticate = async ({ request, response, authExpectResponse }) => { if (this.protocolAuthentication) { - const { buffer: requestAuthBytes } = await request.encode() + const requestAuthBytes = await request.encode() const authResponse = await this.connection.send( this.protocolAuthentication({ authBytes: requestAuthBytes }) ) diff --git a/src/broker/saslAuthenticator/scram.spec.js b/src/broker/saslAuthenticator/scram.spec.js index 5578abca2..6ffde10cb 100644 --- a/src/broker/saslAuthenticator/scram.spec.js +++ b/src/broker/saslAuthenticator/scram.spec.js @@ -82,8 +82,8 @@ describe('Broker > SASL Authenticator > SCRAM', () => { }) const { request } = connection.authenticate.mock.calls[0][0] - const encoder = await request.encode() - const decoder = new Decoder(encoder.buffer) + const buffer = await request.encode() + const decoder = new Decoder(buffer) expect(decoder.readBytes().toString()).toEqual(`n,,n=user,r=${scram.currentNonce}`) }) @@ -97,8 +97,8 @@ describe('Broker > SASL Authenticator > SCRAM', () => { }) const { request } = connection.authenticate.mock.calls[0][0] - const encoder = await request.encode() - const decoder = new Decoder(encoder.buffer) + const buffer = await request.encode() + const decoder = new Decoder(buffer) expect(decoder.readBytes().toString()).toEqual(`n,,n=bob=2C,r=${scram.currentNonce}`) }) @@ -112,8 +112,8 @@ describe('Broker > SASL Authenticator > SCRAM', () => { }) const { request } = connection.authenticate.mock.calls[0][0] - const encoder = await request.encode() - const decoder = new Decoder(encoder.buffer) + const buffer = await request.encode() + const decoder = new Decoder(buffer) expect(decoder.readBytes().toString()).toEqual(`n,,n=bob=3D,r=${scram.currentNonce}`) }) }) @@ -137,8 +137,8 @@ describe('Broker > SASL Authenticator > SCRAM', () => { }) const { request } = connection.authenticate.mock.calls[0][0] - const encoder = await request.encode() - const decoder = new Decoder(encoder.buffer) + const buffer = await request.encode() + const decoder = new Decoder(buffer) expect(decoder.readBytes().toString()).toEqual( 'c=biws,r=rOprNGfwEbeRWgbNEkqO%hvYDpWUa2RaTCAfuxFIlj)hNlF$k0,p=dHzbZapWIk4jUhN+Ute9ytag9zjfMHgsqmmiz7AndVQ=' ) diff --git a/src/protocol/sasl/awsIam/request.js b/src/protocol/sasl/awsIam/request.js index 165591bb8..f97149c18 100644 --- a/src/protocol/sasl/awsIam/request.js +++ b/src/protocol/sasl/awsIam/request.js @@ -6,6 +6,6 @@ module.exports = ({ authorizationIdentity, accessKeyId, secretAccessKey, session encode: async () => { return new Encoder().writeBytes( [authorizationIdentity, accessKeyId, secretAccessKey, sessionToken].join(US_ASCII_NULL_CHAR) - ) + ).buffer }, }) diff --git a/src/protocol/sasl/oauthBearer/request.js b/src/protocol/sasl/oauthBearer/request.js index fac9b86e8..3cff0bb13 100644 --- a/src/protocol/sasl/oauthBearer/request.js +++ b/src/protocol/sasl/oauthBearer/request.js @@ -56,7 +56,7 @@ module.exports = async ({ authorizationIdentity = null }, oauthBearerToken) => { return { encode: async () => { - return new Encoder().writeBytes(Buffer.from(oauthMsg)) + return new Encoder().writeBytes(Buffer.from(oauthMsg)).buffer }, } } diff --git a/src/protocol/sasl/plain/request.js b/src/protocol/sasl/plain/request.js index 182dee5a6..a52c98321 100644 --- a/src/protocol/sasl/plain/request.js +++ b/src/protocol/sasl/plain/request.js @@ -23,6 +23,6 @@ module.exports = ({ authorizationIdentity = null, username, password }) => ({ encode: async () => { return new Encoder().writeBytes( [authorizationIdentity, username, password].join(US_ASCII_NULL_CHAR) - ) + ).buffer }, }) diff --git a/src/protocol/sasl/scram/finalMessage/request.js b/src/protocol/sasl/scram/finalMessage/request.js index bcb8f28a3..58d40aaa6 100644 --- a/src/protocol/sasl/scram/finalMessage/request.js +++ b/src/protocol/sasl/scram/finalMessage/request.js @@ -1,5 +1,5 @@ const Encoder = require('../../../encoder') module.exports = ({ finalMessage }) => ({ - encode: async () => new Encoder().writeBytes(finalMessage), + encode: async () => new Encoder().writeBytes(finalMessage).buffer, }) diff --git a/src/protocol/sasl/scram/firstMessage/request.js b/src/protocol/sasl/scram/firstMessage/request.js index 6213f450c..865f0e586 100644 --- a/src/protocol/sasl/scram/firstMessage/request.js +++ b/src/protocol/sasl/scram/firstMessage/request.js @@ -18,5 +18,5 @@ const Encoder = require('../../../encoder') module.exports = ({ clientFirstMessage }) => ({ - encode: async () => new Encoder().writeBytes(clientFirstMessage), + encode: async () => new Encoder().writeBytes(clientFirstMessage).buffer, }) From 53b26dbb2aa35cdacccfd78ae9ecd7bc8771bc7b Mon Sep 17 00:00:00 2001 From: Mark Gaylard Date: Thu, 2 Jun 2022 11:39:44 +1000 Subject: [PATCH 05/14] flatten host and port args --- src/broker/saslAuthenticator/index.js | 6 ++---- types/index.d.ts | 3 ++- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/broker/saslAuthenticator/index.js b/src/broker/saslAuthenticator/index.js index c362d6e55..66fd930b1 100644 --- a/src/broker/saslAuthenticator/index.js +++ b/src/broker/saslAuthenticator/index.js @@ -69,10 +69,8 @@ module.exports = class SASLAuthenticator { } else { await this.connection.sasl .authenticationProvider( - { - host: this.connection.host, - port: this.connection.port, - }, + this.connection.host, + this.connection.port, this.logger.namespace(`SaslAuthenticator-${mechanism}`), saslAuthenticate ) diff --git a/types/index.d.ts b/types/index.d.ts index 9d42b3996..347f42628 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -31,7 +31,8 @@ export type Authenticator = { export type Mechanism = { mechanism: string authenticationProvider: ( - connection: { host: string; port: number }, + host: string, + port: number, logger: Logger, saslAuthenticate: ( request: SaslAuthenticationRequest, From e2dcb42def1cd530865b3278b2ee8da0d0e956fc Mon Sep 17 00:00:00 2001 From: Mark Gaylard <1952851+markgaylard@users.noreply.github.com> Date: Thu, 2 Jun 2022 16:01:41 +1000 Subject: [PATCH 06/14] Update built in authenticators (#1) --- src/broker/saslAuthenticator/awsIam.js | 68 ++++++++--------- src/broker/saslAuthenticator/awsIam.spec.js | 25 +++---- src/broker/saslAuthenticator/index.js | 47 ++++++------ src/broker/saslAuthenticator/oauthBearer.js | 74 +++++++++---------- .../saslAuthenticator/oauthBearer.spec.js | 10 ++- src/broker/saslAuthenticator/plain.js | 50 ++++++------- src/broker/saslAuthenticator/plain.spec.js | 6 +- src/broker/saslAuthenticator/scram.js | 17 +++-- src/broker/saslAuthenticator/scram.spec.js | 54 +++++++------- src/broker/saslAuthenticator/scram256.js | 9 ++- src/broker/saslAuthenticator/scram512.js | 9 ++- 11 files changed, 180 insertions(+), 189 deletions(-) diff --git a/src/broker/saslAuthenticator/awsIam.js b/src/broker/saslAuthenticator/awsIam.js index 795d0c076..583e5954c 100644 --- a/src/broker/saslAuthenticator/awsIam.js +++ b/src/broker/saslAuthenticator/awsIam.js @@ -1,43 +1,37 @@ -const awsIam = require('../../protocol/sasl/awsIam') +const { request, response } = require('../../protocol/sasl/awsIam') const { KafkaJSSASLAuthenticationError } = require('../../errors') -module.exports = class AWSIAMAuthenticator { - constructor(connection, logger, saslAuthenticate) { - this.connection = connection - this.logger = logger.namespace('SASLAWSIAMAuthenticator') - this.saslAuthenticate = saslAuthenticate - } - - async authenticate() { - const { sasl } = this.connection - if (!sasl.authorizationIdentity) { - throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing authorizationIdentity') - } - if (!sasl.accessKeyId) { - throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing accessKeyId') - } - if (!sasl.secretAccessKey) { - throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing secretAccessKey') - } - if (!sasl.sessionToken) { - sasl.sessionToken = '' - } +const awsIAMAuthenticatorProvider = sasl => (host, port, logger, saslAuthenticate) => { + return { + authenticate: async () => { + if (!sasl.authorizationIdentity) { + throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing authorizationIdentity') + } + if (!sasl.accessKeyId) { + throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing accessKeyId') + } + if (!sasl.secretAccessKey) { + throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing secretAccessKey') + } + if (!sasl.sessionToken) { + sasl.sessionToken = '' + } - const request = awsIam.request(sasl) - const response = awsIam.response - const { host, port } = this.connection - const broker = `${host}:${port}` + const broker = `${host}:${port}` - try { - this.logger.debug('Authenticate with SASL AWS-IAM', { broker }) - await this.saslAuthenticate({ request, response }) - this.logger.debug('SASL AWS-IAM authentication successful', { broker }) - } catch (e) { - const error = new KafkaJSSASLAuthenticationError( - `SASL AWS-IAM authentication failed: ${e.message}` - ) - this.logger.error(error.message, { broker }) - throw error - } + try { + logger.debug('Authenticate with SASL AWS-IAM', { broker }) + await saslAuthenticate({ request: request(sasl), response }) + logger.debug('SASL AWS-IAM authentication successful', { broker }) + } catch (e) { + const error = new KafkaJSSASLAuthenticationError( + `SASL AWS-IAM authentication failed: ${e.message}` + ) + logger.error(error.message, { broker }) + throw error + } + }, } } + +module.exports = awsIAMAuthenticatorProvider diff --git a/src/broker/saslAuthenticator/awsIam.spec.js b/src/broker/saslAuthenticator/awsIam.spec.js index 42b539a30..3d7b77392 100644 --- a/src/broker/saslAuthenticator/awsIam.spec.js +++ b/src/broker/saslAuthenticator/awsIam.spec.js @@ -1,32 +1,27 @@ const { newLogger } = require('testHelpers') -const AWSIAM = require('./awsIam') +const awsIAMAuthenticatorProvider = require('./awsIam') describe('Broker > SASL Authenticator > AWS-IAM', () => { it('throws KafkaJSSASLAuthenticationError for missing authorizationIdentity', async () => { - const awsIam = new AWSIAM({ sasl: {} }, newLogger()) + const awsIam = awsIAMAuthenticatorProvider({})('', 0, newLogger()) await expect(awsIam.authenticate()).rejects.toThrow( 'SASL AWS-IAM: Missing authorizationIdentity' ) }) it('throws KafkaJSSASLAuthenticationError for invalid accessKeyId', async () => { - const awsIam = new AWSIAM( - { - sasl: { - authorizationIdentity: '', - secretAccessKey: '', - }, - }, - newLogger() - ) + const awsIam = awsIAMAuthenticatorProvider({ + authorizationIdentity: '', + secretAccessKey: '', + })('', 0, newLogger()) await expect(awsIam.authenticate()).rejects.toThrow('SASL AWS-IAM: Missing accessKeyId') }) it('throws KafkaJSSASLAuthenticationError for invalid secretAccessKey', async () => { - const awsIam = new AWSIAM( - { sasl: { authorizationIdentity: '', accessKeyId: '' } }, - newLogger() - ) + const awsIam = awsIAMAuthenticatorProvider({ + authorizationIdentity: '', + accessKeyId: '', + })('', 0, newLogger()) await expect(awsIam.authenticate()).rejects.toThrow('SASL AWS-IAM: Missing secretAccessKey') }) }) diff --git a/src/broker/saslAuthenticator/index.js b/src/broker/saslAuthenticator/index.js index 66fd930b1..bd5dc32dd 100644 --- a/src/broker/saslAuthenticator/index.js +++ b/src/broker/saslAuthenticator/index.js @@ -1,21 +1,20 @@ const { requests, lookup } = require('../../protocol/requests') const apiKeys = require('../../protocol/requests/apiKeys') -const PlainAuthenticator = require('./plain') -const SCRAM256Authenticator = require('./scram256') -const SCRAM512Authenticator = require('./scram512') -const AWSIAMAuthenticator = require('./awsIam') -const OAuthBearerAuthenticator = require('./oauthBearer') +const plainAuthenticatorProvider = require('./plain') +const scram256AuthenticatorProvider = require('./scram256') +const scram512AuthenticatorProvider = require('./scram512') +const awsIAMAuthenticatorProvider = require('./awsIam') +const oauthBearerAuthenticatorProvider = require('./oauthBearer') const { KafkaJSSASLAuthenticationError } = require('../../errors') -const AUTHENTICATORS = { - PLAIN: PlainAuthenticator, - 'SCRAM-SHA-256': SCRAM256Authenticator, - 'SCRAM-SHA-512': SCRAM512Authenticator, - AWS: AWSIAMAuthenticator, - OAUTHBEARER: OAuthBearerAuthenticator, +const BUILT_IN_AUTHENTICATION_PROVIDERS = { + AWS: awsIAMAuthenticatorProvider, + PLAIN: plainAuthenticatorProvider, + OAUTHBEARER: oauthBearerAuthenticatorProvider, + 'SCRAM-SHA-256': scram256AuthenticatorProvider, + 'SCRAM-SHA-512': scram512AuthenticatorProvider, } -const SUPPORTED_MECHANISMS = Object.keys(AUTHENTICATORS) const UNLIMITED_SESSION_LIFETIME = '0' module.exports = class SASLAuthenticator { @@ -63,18 +62,18 @@ module.exports = class SASLAuthenticator { return this.connection.sendAuthRequest({ request, response, authExpectResponse }) } - if (SUPPORTED_MECHANISMS.includes(mechanism)) { - const Authenticator = AUTHENTICATORS[mechanism] - await new Authenticator(this.connection, this.logger, saslAuthenticate).authenticate() - } else { - await this.connection.sasl - .authenticationProvider( - this.connection.host, - this.connection.port, - this.logger.namespace(`SaslAuthenticator-${mechanism}`), - saslAuthenticate - ) - .authenticate() + if (Object.keys(BUILT_IN_AUTHENTICATION_PROVIDERS).includes(mechanism)) { + this.connection.sasl.authenticationProvider = BUILT_IN_AUTHENTICATION_PROVIDERS[mechanism]( + this.connection.sasl + ) } + await this.connection.sasl + .authenticationProvider( + this.connection.host, + this.connection.port, + this.logger.namespace(`SaslAuthenticator-${mechanism}`), + saslAuthenticate + ) + .authenticate() } } diff --git a/src/broker/saslAuthenticator/oauthBearer.js b/src/broker/saslAuthenticator/oauthBearer.js index 2ede35674..a22254d1b 100644 --- a/src/broker/saslAuthenticator/oauthBearer.js +++ b/src/broker/saslAuthenticator/oauthBearer.js @@ -10,47 +10,41 @@ * reused and refreshed when appropriate. */ -const oauthBearer = require('../../protocol/sasl/oauthBearer') +const { request, response } = require('../../protocol/sasl/oauthBearer') const { KafkaJSSASLAuthenticationError } = require('../../errors') -module.exports = class OAuthBearerAuthenticator { - constructor(connection, logger, saslAuthenticate) { - this.connection = connection - this.logger = logger.namespace('SASLOAuthBearerAuthenticator') - this.saslAuthenticate = saslAuthenticate - } - - async authenticate() { - const { sasl } = this.connection - if (sasl.oauthBearerProvider == null) { - throw new KafkaJSSASLAuthenticationError( - 'SASL OAUTHBEARER: Missing OAuth bearer token provider' - ) - } - - const { oauthBearerProvider } = sasl - - const oauthBearerToken = await oauthBearerProvider() - - if (oauthBearerToken.value == null) { - throw new KafkaJSSASLAuthenticationError('SASL OAUTHBEARER: Invalid OAuth bearer token') - } - - const request = await oauthBearer.request(sasl, oauthBearerToken) - const response = oauthBearer.response - const { host, port } = this.connection - const broker = `${host}:${port}` - - try { - this.logger.debug('Authenticate with SASL OAUTHBEARER', { broker }) - await this.saslAuthenticate({ request, response }) - this.logger.debug('SASL OAUTHBEARER authentication successful', { broker }) - } catch (e) { - const error = new KafkaJSSASLAuthenticationError( - `SASL OAUTHBEARER authentication failed: ${e.message}` - ) - this.logger.error(error.message, { broker }) - throw error - } +const oauthBearerAuthenticatorProvider = sasl => (host, port, logger, saslAuthenticate) => { + return { + authenticate: async () => { + const { oauthBearerProvider } = sasl + + if (oauthBearerProvider == null) { + throw new KafkaJSSASLAuthenticationError( + 'SASL OAUTHBEARER: Missing OAuth bearer token provider' + ) + } + + const oauthBearerToken = await oauthBearerProvider() + + if (oauthBearerToken.value == null) { + throw new KafkaJSSASLAuthenticationError('SASL OAUTHBEARER: Invalid OAuth bearer token') + } + + const broker = `${host}:${port}` + + try { + logger.debug('Authenticate with SASL OAUTHBEARER', { broker }) + await saslAuthenticate({ request: request(sasl, oauthBearerToken), response }) + logger.debug('SASL OAUTHBEARER authentication successful', { broker }) + } catch (e) { + const error = new KafkaJSSASLAuthenticationError( + `SASL OAUTHBEARER authentication failed: ${e.message}` + ) + logger.error(error.message, { broker }) + throw error + } + }, } } + +module.exports = oauthBearerAuthenticatorProvider diff --git a/src/broker/saslAuthenticator/oauthBearer.spec.js b/src/broker/saslAuthenticator/oauthBearer.spec.js index 7bf1b08a0..0b69790f5 100644 --- a/src/broker/saslAuthenticator/oauthBearer.spec.js +++ b/src/broker/saslAuthenticator/oauthBearer.spec.js @@ -1,9 +1,9 @@ const { newLogger } = require('testHelpers') -const OAuthBearer = require('./oauthBearer') +const oauthBearerAuthenticatorProvider = require('./oauthBearer') describe('Broker > SASL Authenticator > OAUTHBEARER', () => { it('throws KafkaJSSASLAuthenticationError for missing oauthBearerProvider', async () => { - const oauthBearer = new OAuthBearer({ sasl: {} }, newLogger()) + const oauthBearer = oauthBearerAuthenticatorProvider({})('', 0, newLogger()) await expect(oauthBearer.authenticate()).rejects.toThrow('Missing OAuth bearer token provider') }) @@ -12,7 +12,11 @@ describe('Broker > SASL Authenticator > OAUTHBEARER', () => { return {} } - const oauthBearer = new OAuthBearer({ sasl: { oauthBearerProvider } }, newLogger()) + const oauthBearer = oauthBearerAuthenticatorProvider({ oauthBearerProvider })( + '', + 0, + newLogger() + ) await expect(oauthBearer.authenticate()).rejects.toThrow('Invalid OAuth bearer token') }) }) diff --git a/src/broker/saslAuthenticator/plain.js b/src/broker/saslAuthenticator/plain.js index 2e06bc4ba..98c6f56a4 100644 --- a/src/broker/saslAuthenticator/plain.js +++ b/src/broker/saslAuthenticator/plain.js @@ -1,34 +1,28 @@ -const plain = require('../../protocol/sasl/plain') +const { request, response } = require('../../protocol/sasl/plain') const { KafkaJSSASLAuthenticationError } = require('../../errors') -module.exports = class PlainAuthenticator { - constructor(connection, logger, saslAuthenticate) { - this.connection = connection - this.logger = logger.namespace('SASLPlainAuthenticator') - this.saslAuthenticate = saslAuthenticate - } - - async authenticate() { - const { sasl } = this.connection - if (sasl.username == null || sasl.password == null) { - throw new KafkaJSSASLAuthenticationError('SASL Plain: Invalid username or password') - } +const plainAuthenticatorProvider = sasl => (host, port, logger, saslAuthenticate) => { + return { + authenticate: async () => { + if (sasl.username == null || sasl.password == null) { + throw new KafkaJSSASLAuthenticationError('SASL Plain: Invalid username or password') + } - const request = plain.request(sasl) - const response = plain.response - const { host, port } = this.connection - const broker = `${host}:${port}` + const broker = `${host}:${port}` - try { - this.logger.debug('Authenticate with SASL PLAIN', { broker }) - await this.saslAuthenticate({ request, response }) - this.logger.debug('SASL PLAIN authentication successful', { broker }) - } catch (e) { - const error = new KafkaJSSASLAuthenticationError( - `SASL PLAIN authentication failed: ${e.message}` - ) - this.logger.error(error.message, { broker }) - throw error - } + try { + logger.debug('Authenticate with SASL PLAIN', { broker }) + await saslAuthenticate({ request: request(sasl), response }) + logger.debug('SASL PLAIN authentication successful', { broker }) + } catch (e) { + const error = new KafkaJSSASLAuthenticationError( + `SASL PLAIN authentication failed: ${e.message}` + ) + logger.error(error.message, { broker }) + throw error + } + }, } } + +module.exports = plainAuthenticatorProvider diff --git a/src/broker/saslAuthenticator/plain.spec.js b/src/broker/saslAuthenticator/plain.spec.js index a1a95e52f..316f0b7de 100644 --- a/src/broker/saslAuthenticator/plain.spec.js +++ b/src/broker/saslAuthenticator/plain.spec.js @@ -1,14 +1,14 @@ const { newLogger } = require('testHelpers') -const Plain = require('./plain') +const plainAuthenticatorProvider = require('./plain') describe('Broker > SASL Authenticator > PLAIN', () => { it('throws KafkaJSSASLAuthenticationError for invalid username', async () => { - const plain = new Plain({ sasl: {} }, newLogger()) + const plain = plainAuthenticatorProvider({})('', 0, newLogger()) await expect(plain.authenticate()).rejects.toThrow('Invalid username or password') }) it('throws KafkaJSSASLAuthenticationError for invalid password', async () => { - const plain = new Plain({ sasl: { username: '' } }, newLogger()) + const plain = plainAuthenticatorProvider({ username: '' })('', 0, newLogger()) await expect(plain.authenticate()).rejects.toThrow('Invalid username or password') }) }) diff --git a/src/broker/saslAuthenticator/scram.js b/src/broker/saslAuthenticator/scram.js index 222d8384d..7c0e6164a 100644 --- a/src/broker/saslAuthenticator/scram.js +++ b/src/broker/saslAuthenticator/scram.js @@ -108,13 +108,15 @@ class SCRAM { } /** - * @param {Connection} connection + * @param {SASLOptions} sasl * @param {Logger} logger * @param {Function} saslAuthenticate * @param {DigestDefinition} digestDefinition */ - constructor(connection, logger, saslAuthenticate, digestDefinition) { - this.connection = connection + constructor(sasl, host, port, logger, saslAuthenticate, digestDefinition) { + this.sasl = sasl + this.host = host + this.port = port this.logger = logger this.saslAuthenticate = saslAuthenticate this.digestDefinition = digestDefinition @@ -127,10 +129,9 @@ class SCRAM { async authenticate() { const { PREFIX } = this - const { host, port, sasl } = this.connection - const broker = `${host}:${port}` + const broker = `${this.host}:${this.port}` - if (sasl.username == null || sasl.password == null) { + if (this.sasl.username == null || this.sasl.password == null) { throw new KafkaJSSASLAuthenticationError(`${this.PREFIX}: Invalid username or password`) } @@ -287,7 +288,7 @@ class SCRAM { * @private */ encodedUsername() { - const { username } = this.connection.sasl + const { username } = this.sasl return SCRAM.sanitizeString(username).toString('utf-8') } @@ -295,7 +296,7 @@ class SCRAM { * @private */ encodedPassword() { - const { password } = this.connection.sasl + const { password } = this.sasl return password.toString('utf-8') } diff --git a/src/broker/saslAuthenticator/scram.spec.js b/src/broker/saslAuthenticator/scram.spec.js index 6ffde10cb..35fb87e28 100644 --- a/src/broker/saslAuthenticator/scram.spec.js +++ b/src/broker/saslAuthenticator/scram.spec.js @@ -1,29 +1,33 @@ const Decoder = require('../../protocol/decoder') const { newLogger } = require('testHelpers') -const SCRAM256 = require('./scram256') +const scram256AuthenticatorProvider = require('./scram256') +const { SCRAM, DIGESTS } = require('./scram') describe('Broker > SASL Authenticator > SCRAM', () => { - let connection, saslAuthenticate, logger + let sasl, saslAuthenticate, logger, host, port beforeEach(() => { - connection = { - authenticate: jest.fn(), - sasl: { username: 'user', password: 'pencil' }, - } - saslAuthenticate = ({ request, response, authExpectResponse }) => - connection.authenticate({ request, response, authExpectResponse }) + sasl = { username: 'user', password: 'pencil' } + saslAuthenticate = jest.fn() + + host = 'host' + port = 9094 logger = { debug: jest.fn() } - logger.namespace = () => logger }) it('throws KafkaJSSASLAuthenticationError for invalid username', async () => { - const scram = new SCRAM256({ sasl: {} }, newLogger(), saslAuthenticate) + const scram = scram256AuthenticatorProvider({})('', 0, newLogger()) await expect(scram.authenticate()).rejects.toThrow('Invalid username or password') }) it('throws KafkaJSSASLAuthenticationError for invalid password', async () => { - const scram = new SCRAM256({ sasl: { username: '' } }, newLogger(), saslAuthenticate) + const scram = scram256AuthenticatorProvider({ username: '' })( + '', + 0, + newLogger(), + saslAuthenticate + ) await expect(scram.authenticate()).rejects.toThrow('Invalid username or password') }) @@ -31,11 +35,11 @@ describe('Broker > SASL Authenticator > SCRAM', () => { let scram beforeEach(() => { - scram = new SCRAM256(connection, logger, saslAuthenticate) + scram = new SCRAM(sasl, host, port, logger, saslAuthenticate, DIGESTS.SHA256) }) test('saltPassword', async () => { - connection.sasl.password = 'password' + sasl.password = 'password' const clientMessageResponse = { s: 'enBxNzV4aGphMjJmbnZ0ejF5M2o4Y3JjdA==', i: '4096', @@ -47,7 +51,7 @@ describe('Broker > SASL Authenticator > SCRAM', () => { }) test('clientKey', async () => { - connection.sasl.password = 'password' + sasl.password = 'password' const clientMessageResponse = { s: 'enBxNzV4aGphMjJmbnZ0ejF5M2o4Y3JjdA==', i: '4096', @@ -59,7 +63,7 @@ describe('Broker > SASL Authenticator > SCRAM', () => { }) test('storedKey', async () => { - connection.sasl.password = 'password' + sasl.password = 'password' const clientMessageResponse = { s: 'enBxNzV4aGphMjJmbnZ0ejF5M2o4Y3JjdA==', i: '4096', @@ -75,43 +79,43 @@ describe('Broker > SASL Authenticator > SCRAM', () => { test('regular use case', async () => { scram.currentNonce = 'rOprNGfwEbeRWgbNEkqO' await scram.sendClientFirstMessage() - expect(connection.authenticate).toHaveBeenCalledWith({ + expect(saslAuthenticate).toHaveBeenCalledWith({ authExpectResponse: true, request: expect.any(Object), response: expect.any(Object), }) - const { request } = connection.authenticate.mock.calls[0][0] + const { request } = saslAuthenticate.mock.calls[0][0] const buffer = await request.encode() const decoder = new Decoder(buffer) expect(decoder.readBytes().toString()).toEqual(`n,,n=user,r=${scram.currentNonce}`) }) test('username with comma', async () => { - connection.sasl.username = 'bob,' + sasl.username = 'bob,' await scram.sendClientFirstMessage() - expect(connection.authenticate).toHaveBeenCalledWith({ + expect(saslAuthenticate).toHaveBeenCalledWith({ authExpectResponse: true, request: expect.any(Object), response: expect.any(Object), }) - const { request } = connection.authenticate.mock.calls[0][0] + const { request } = saslAuthenticate.mock.calls[0][0] const buffer = await request.encode() const decoder = new Decoder(buffer) expect(decoder.readBytes().toString()).toEqual(`n,,n=bob=2C,r=${scram.currentNonce}`) }) test('username with equals', async () => { - connection.sasl.username = 'bob=' + sasl.username = 'bob=' await scram.sendClientFirstMessage() - expect(connection.authenticate).toHaveBeenCalledWith({ + expect(saslAuthenticate).toHaveBeenCalledWith({ authExpectResponse: true, request: expect.any(Object), response: expect.any(Object), }) - const { request } = connection.authenticate.mock.calls[0][0] + const { request } = saslAuthenticate.mock.calls[0][0] const buffer = await request.encode() const decoder = new Decoder(buffer) expect(decoder.readBytes().toString()).toEqual(`n,,n=bob=3D,r=${scram.currentNonce}`) @@ -130,13 +134,13 @@ describe('Broker > SASL Authenticator > SCRAM', () => { } await scram.sendClientFinalMessage(clientMessageResponse) - expect(connection.authenticate).toHaveBeenCalledWith({ + expect(saslAuthenticate).toHaveBeenCalledWith({ authExpectResponse: true, request: expect.any(Object), response: expect.any(Object), }) - const { request } = connection.authenticate.mock.calls[0][0] + const { request } = saslAuthenticate.mock.calls[0][0] const buffer = await request.encode() const decoder = new Decoder(buffer) expect(decoder.readBytes().toString()).toEqual( diff --git a/src/broker/saslAuthenticator/scram256.js b/src/broker/saslAuthenticator/scram256.js index 48fcb4d96..256c6f652 100644 --- a/src/broker/saslAuthenticator/scram256.js +++ b/src/broker/saslAuthenticator/scram256.js @@ -1,7 +1,10 @@ const { SCRAM, DIGESTS } = require('./scram') -module.exports = class SCRAM256Authenticator extends SCRAM { - constructor(connection, logger, saslAuthenticate) { - super(connection, logger.namespace('SCRAM256Authenticator'), saslAuthenticate, DIGESTS.SHA256) +const scram256AuthenticatorProvider = sasl => (host, port, logger, saslAuthenticate) => { + const scram = new SCRAM(sasl, host, port, logger, saslAuthenticate, DIGESTS.SHA256) + return { + authenticate: async () => await scram.authenticate(), } } + +module.exports = scram256AuthenticatorProvider diff --git a/src/broker/saslAuthenticator/scram512.js b/src/broker/saslAuthenticator/scram512.js index 6e5d7fad2..b2854d83e 100644 --- a/src/broker/saslAuthenticator/scram512.js +++ b/src/broker/saslAuthenticator/scram512.js @@ -1,7 +1,10 @@ const { SCRAM, DIGESTS } = require('./scram') -module.exports = class SCRAM512Authenticator extends SCRAM { - constructor(connection, logger, saslAuthenticate) { - super(connection, logger.namespace('SCRAM512Authenticator'), saslAuthenticate, DIGESTS.SHA512) +const scram512AuthenticatorProvider = sasl => (host, port, logger, saslAuthenticate) => { + const scram = new SCRAM(sasl, host, port, logger, saslAuthenticate, DIGESTS.SHA512) + return { + authenticate: async () => await scram.authenticate(), } } + +module.exports = scram512AuthenticatorProvider From e8ddaac9d7a96561de06a595ac81163a5d35a936 Mon Sep 17 00:00:00 2001 From: Mark Gaylard Date: Thu, 2 Jun 2022 16:19:33 +1000 Subject: [PATCH 07/14] fix missing await --- src/broker/saslAuthenticator/oauthBearer.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/broker/saslAuthenticator/oauthBearer.js b/src/broker/saslAuthenticator/oauthBearer.js index a22254d1b..28fa027bb 100644 --- a/src/broker/saslAuthenticator/oauthBearer.js +++ b/src/broker/saslAuthenticator/oauthBearer.js @@ -10,7 +10,7 @@ * reused and refreshed when appropriate. */ -const { request, response } = require('../../protocol/sasl/oauthBearer') +const { request } = require('../../protocol/sasl/oauthBearer') const { KafkaJSSASLAuthenticationError } = require('../../errors') const oauthBearerAuthenticatorProvider = sasl => (host, port, logger, saslAuthenticate) => { @@ -34,7 +34,7 @@ const oauthBearerAuthenticatorProvider = sasl => (host, port, logger, saslAuthen try { logger.debug('Authenticate with SASL OAUTHBEARER', { broker }) - await saslAuthenticate({ request: request(sasl, oauthBearerToken), response }) + await saslAuthenticate({ request: await request(sasl, oauthBearerToken) }) logger.debug('SASL OAUTHBEARER authentication successful', { broker }) } catch (e) { const error = new KafkaJSSASLAuthenticationError( From 4e6b966c40592449079aad68f03f375c5c0f68ad Mon Sep 17 00:00:00 2001 From: Mark Gaylard Date: Thu, 2 Jun 2022 16:24:28 +1000 Subject: [PATCH 08/14] remove authExpectResponse param from saslAuthenticate. Use presence of response to decide instead --- src/broker/saslAuthenticator/index.js | 6 +++--- src/broker/saslAuthenticator/scram.js | 2 -- src/broker/saslAuthenticator/scram.spec.js | 4 ---- src/network/connection.js | 4 ++-- 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/src/broker/saslAuthenticator/index.js b/src/broker/saslAuthenticator/index.js index bd5dc32dd..f494c0e05 100644 --- a/src/broker/saslAuthenticator/index.js +++ b/src/broker/saslAuthenticator/index.js @@ -39,7 +39,7 @@ module.exports = class SASLAuthenticator { ) } - const saslAuthenticate = async ({ request, response, authExpectResponse }) => { + const saslAuthenticate = async ({ request, response }) => { if (this.protocolAuthentication) { const requestAuthBytes = await request.encode() const authResponse = await this.connection.send( @@ -50,7 +50,7 @@ module.exports = class SASLAuthenticator { // This is not present in SaslAuthenticateV0, so we default to `"0"` this.sessionLifetime = authResponse.sessionLifetimeMs || UNLIMITED_SESSION_LIFETIME - if (!authExpectResponse) { + if (!response) { return } @@ -59,7 +59,7 @@ module.exports = class SASLAuthenticator { return response.parse(payloadDecoded) } - return this.connection.sendAuthRequest({ request, response, authExpectResponse }) + return this.connection.sendAuthRequest({ request, response }) } if (Object.keys(BUILT_IN_AUTHENTICATION_PROVIDERS).includes(mechanism)) { diff --git a/src/broker/saslAuthenticator/scram.js b/src/broker/saslAuthenticator/scram.js index 7c0e6164a..355fdbcc9 100644 --- a/src/broker/saslAuthenticator/scram.js +++ b/src/broker/saslAuthenticator/scram.js @@ -170,7 +170,6 @@ class SCRAM { const response = scram.firstMessage.response return this.saslAuthenticate({ - authExpectResponse: true, request, response, }) @@ -203,7 +202,6 @@ class SCRAM { const response = scram.finalMessage.response return this.saslAuthenticate({ - authExpectResponse: true, request, response, }) diff --git a/src/broker/saslAuthenticator/scram.spec.js b/src/broker/saslAuthenticator/scram.spec.js index 35fb87e28..5edeb8476 100644 --- a/src/broker/saslAuthenticator/scram.spec.js +++ b/src/broker/saslAuthenticator/scram.spec.js @@ -80,7 +80,6 @@ describe('Broker > SASL Authenticator > SCRAM', () => { scram.currentNonce = 'rOprNGfwEbeRWgbNEkqO' await scram.sendClientFirstMessage() expect(saslAuthenticate).toHaveBeenCalledWith({ - authExpectResponse: true, request: expect.any(Object), response: expect.any(Object), }) @@ -95,7 +94,6 @@ describe('Broker > SASL Authenticator > SCRAM', () => { sasl.username = 'bob,' await scram.sendClientFirstMessage() expect(saslAuthenticate).toHaveBeenCalledWith({ - authExpectResponse: true, request: expect.any(Object), response: expect.any(Object), }) @@ -110,7 +108,6 @@ describe('Broker > SASL Authenticator > SCRAM', () => { sasl.username = 'bob=' await scram.sendClientFirstMessage() expect(saslAuthenticate).toHaveBeenCalledWith({ - authExpectResponse: true, request: expect.any(Object), response: expect.any(Object), }) @@ -135,7 +132,6 @@ describe('Broker > SASL Authenticator > SCRAM', () => { await scram.sendClientFinalMessage(clientMessageResponse) expect(saslAuthenticate).toHaveBeenCalledWith({ - authExpectResponse: true, request: expect.any(Object), response: expect.any(Object), }) diff --git a/src/network/connection.js b/src/network/connection.js index d2aa20e97..2ac65c5c2 100644 --- a/src/network/connection.js +++ b/src/network/connection.js @@ -319,8 +319,8 @@ module.exports = class Connection { * @public * @returns {Promise} */ - sendAuthRequest({ authExpectResponse = false, request, response }) { - this.authExpectResponse = authExpectResponse + sendAuthRequest({ request, response }) { + this.authExpectResponse = !!response /** * TODO: rewrite removing the async promise executor From 845e36fcc80e326f552759b17b5fdf3c658837ad Mon Sep 17 00:00:00 2001 From: Mark Gaylard Date: Thu, 2 Jun 2022 17:05:32 +1000 Subject: [PATCH 09/14] add custom authenticator docs --- docs/Configuration.md | 17 +++ docs/CustomAuthenticationMechanism.md | 151 ++++++++++++++++++++++++++ 2 files changed, 168 insertions(+) create mode 100644 docs/CustomAuthenticationMechanism.md diff --git a/docs/Configuration.md b/docs/Configuration.md index dfce30348..51523f0a8 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -265,6 +265,23 @@ A complete breakdown can be found in the IAM User Guide's It is **highly recommended** that you use SSL for encryption when using `PLAIN` or `AWS`, otherwise credentials will be transmitted in cleartext! +### Custom Authentication Mechanisms + +If an authentication mechanism is not supported out of the box in KafkaJS, a custom authentication +mechanism can be introduced as a plugin: + +```js +{ + sasl: { + mechanism: , + authenticationProvider: (host, port, logger, saslAuthenticate) => { authenticate: () => Promise } + } +} +``` + +See [Custom Authentication Mechanisms](CustomAuthenticationMechanism.md) for more information on how to implement your own +authentication mechanism. + ## Connection Timeout Time in milliseconds to wait for a successful connection. The default value is: `1000`. diff --git a/docs/CustomAuthenticationMechanism.md b/docs/CustomAuthenticationMechanism.md new file mode 100644 index 000000000..b479c134b --- /dev/null +++ b/docs/CustomAuthenticationMechanism.md @@ -0,0 +1,151 @@ +--- +id: custom-authentication-mechanism +title: Custom Authentication Mechanisms +--- + +To use an authentication mechanism that is not supported out of the box by KafkaJS, +custom authentication mechanisms can be introduced: + +```js +{ + sasl: { + mechanism: , + authenticationProvider: (host, port, logger, saslAuthenticate) => { authenticate: () => Promise } + } +} +``` + +`` needs to match the SASL mechanism configured in the `sasl.enabled.mechanisms` +property in `server.properties`. See the Kafka documentation for information on how to +configure your brokers. + +## Writing a custom authentication mechanism + +A custom authentication mechanism needs to fulfill the following interface: + +```ts +type AuthenticationProvider = ( + host: string, + port: number, + logger: Logger, + saslAuthenticate: (request: SaslAuthenticationRequest, response?: SaslAuthenticationResponse) => Promise +) => Authenticator + +type Authenticator = { + authenticate(): Promise +} +type SaslAuthenticationRequest = { + encode: () => Buffer | Promise +} +type SaslAuthenticationResponse = { + decode: (rawResponse: Buffer) => Buffer | Promise + parse: (data: Buffer) => ParseResult +} +``` +* `host` - Hostname of the specific broker to connect to +* `port` - Port of the specific broker to connect to +* `logger` - A logger instance namespaced to the authentication mechanism +* `saslAuthenticate` - an async function to make [`SaslAuthenticate`](https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate) +requests towards the broker. The `request` and `response` functions are used to encode the `auth_bytes` of the request, and to optionally +decode and parse the `auth_bytes` in the response. `response` can be omitted if no response `auth_bytes` are expected. +### Example +In this example we will create a custom authentication mechanism called `simon`. The general +flow will be: +1. Send a [`SaslAuthenticate`](https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate) +request with the value of `says` as `auth_bytes`. +2. Read the response from the broker. If `says` starts with "Simon says", the response `auth_bytes` +should equal `says`, if it does not start with "Simon says", it should be an empty string. +```js +const simonAuthenticator = says = (host, port, logger, saslAuthenticate) => { + const INT32_SIZE = 4 + + const request = { + /** + * Encodes the value for `auth_bytes` in SaslAuthenticate request + * @see https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate + * + * In this example, we are just sending `says` as a string, + * with the length of the string in bytes prepended as an int32 + **/ + encode: () => { + const byteLength = Buffer.byteLength(says, 'utf8') + const buf = Buffer.alloc(INT32_SIZE + byteLength) + buf.writeUInt32BE(byteLength, 0) + buf.write(says, INT32_SIZE, byteLength, 'utf8') + return buf + }, + } + const response = { + /** + * Decodes the `auth_bytes` in SaslAuthenticate response + * @see https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate + * + * This is essentially the reverse of `request.encode`, where + * we read the length of the string as an int32 and then read + * that many bytes + */ + decode: rawData => { + const byteLength = rawData.readInt32BE(0) + return rawData.slice(INT32_SIZE, INT32_SIZE + byteLength) + }, + /** + * The return value from `response.decode` is passed into + * this function, which is responsible for interpreting + * the data. In this case, we just turn the buffer back + * into a string + */ + parse: data => { + return data.toString() + }, + } + return { + /** + * This function is responsible for orchestrating the authentication flow. + * Essentially we will send a SaslAuthenticate request with the + * value of `sasl.says` to the broker, and expect to + * get the same value back. + * + * Other authentication methods may do any other operations they + * like, but communication with the brokers goes through + * the SaslAuthenticate request. + */ + authenticate: async () => { + if (says == null) { + throw new Error('SASL Simon: Invalid "says"') + } + const broker = `${host}:${port}` + try { + logger.info('Authenticate with SASL Simon', { broker }) + const authenticateResponse = await saslAuthenticate({ request, response }) + + const saidSimon = says.startsWith("Simon says ") + const expectedResponse = saidSimon ? says : "" + if (authenticateResponse !== expectedResponse) { + throw new Error("Mismatching response from broker") + } + logger.info('SASL Simon authentication successful', { broker }) + } catch (e) { + const error = new Error( + `SASL Simon authentication failed: ${e.message}` + ) + logger.error(error.message, { broker }) + throw error + } + }, + } +} +``` + +The `response` argument to `saslAuthenticate` is optional, in case the authentication +method does not require the `auth_bytes` in the response. + +In the example above, we expect the client to be configured as such: + +```js +const config = { + sasl: { + mechanism: 'simon' + authenticationProvider: simonAuthenticator('Simon says authenticate me') + } +} +``` \ No newline at end of file From 2bf7f10b76f42f601385321123ec4f2c3c9e5c80 Mon Sep 17 00:00:00 2001 From: Mark Gaylard Date: Wed, 6 Jul 2022 08:18:17 +1000 Subject: [PATCH 10/14] add comment to auth example to hopefully make it clear it isn't a real auth mechanism --- docs/CustomAuthenticationMechanism.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/CustomAuthenticationMechanism.md b/docs/CustomAuthenticationMechanism.md index b479c134b..9b31151eb 100644 --- a/docs/CustomAuthenticationMechanism.md +++ b/docs/CustomAuthenticationMechanism.md @@ -55,6 +55,11 @@ flow will be: request with the value of `says` as `auth_bytes`. 2. Read the response from the broker. If `says` starts with "Simon says", the response `auth_bytes` should equal `says`, if it does not start with "Simon says", it should be an empty string. + +**This is a made up example!** + +It is a non-existent authentication mechanism just made up to show how to implement the expected interface. It is not a real authentication mechanism. + ```js const simonAuthenticator = says = (host, port, logger, saslAuthenticate) => { const INT32_SIZE = 4 From f291de33293249e71e654ee11560f2bccf1349eb Mon Sep 17 00:00:00 2001 From: Mark Gaylard Date: Wed, 6 Jul 2022 09:04:24 +1000 Subject: [PATCH 11/14] use user provided authentication provider if present for built in authenticators --- src/broker/__tests__/connect.spec.js | 21 +++++++++++++++++++++ src/broker/saslAuthenticator/index.js | 5 ++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/broker/__tests__/connect.spec.js b/src/broker/__tests__/connect.spec.js index 0c14d4d98..c13f61044 100644 --- a/src/broker/__tests__/connect.spec.js +++ b/src/broker/__tests__/connect.spec.js @@ -58,6 +58,27 @@ describe('Broker > connect', () => { ) }) + test('user provided authenticator overrides built in ones', async () => { + broker = new Broker({ + connectionPool: createConnectionPool( + Object.assign(sslConnectionOpts(), { + port: 9094, + sasl: { + mechanism: 'PLAIN', + authenticationProvider: () => ({ + authenticate: async () => { + throw new Error('test error') + }, + }), + }, + }) + ), + logger: newLogger(), + }) + + await expect(broker.connect()).rejects.toThrow('test error') + }) + for (const e of saslEntries) { test(`authenticate with SASL ${e.name} if configured`, async () => { broker = new Broker({ diff --git a/src/broker/saslAuthenticator/index.js b/src/broker/saslAuthenticator/index.js index f494c0e05..c32abb7ef 100644 --- a/src/broker/saslAuthenticator/index.js +++ b/src/broker/saslAuthenticator/index.js @@ -62,7 +62,10 @@ module.exports = class SASLAuthenticator { return this.connection.sendAuthRequest({ request, response }) } - if (Object.keys(BUILT_IN_AUTHENTICATION_PROVIDERS).includes(mechanism)) { + if ( + !this.connection.sasl.authenticationProvider && + Object.keys(BUILT_IN_AUTHENTICATION_PROVIDERS).includes(mechanism) + ) { this.connection.sasl.authenticationProvider = BUILT_IN_AUTHENTICATION_PROVIDERS[mechanism]( this.connection.sasl ) From 3bbd0b62caa6e5edf64b8fb86c63c49f28927e1b Mon Sep 17 00:00:00 2001 From: Mark Gaylard Date: Wed, 6 Jul 2022 09:18:46 +1000 Subject: [PATCH 12/14] pass authentication provider args as an object rather than positional --- docs/CustomAuthenticationMechanism.md | 24 +++++++++++++------ src/broker/saslAuthenticator/awsIam.js | 2 +- src/broker/saslAuthenticator/awsIam.spec.js | 6 ++--- src/broker/saslAuthenticator/index.js | 12 +++++----- src/broker/saslAuthenticator/oauthBearer.js | 2 +- .../saslAuthenticator/oauthBearer.spec.js | 16 ++++++++----- src/broker/saslAuthenticator/plain.js | 2 +- src/broker/saslAuthenticator/plain.spec.js | 8 +++++-- src/broker/saslAuthenticator/scram.spec.js | 14 +++++------ src/broker/saslAuthenticator/scram256.js | 2 +- src/broker/saslAuthenticator/scram512.js | 2 +- types/index.d.ts | 24 ++++++++++--------- 12 files changed, 67 insertions(+), 47 deletions(-) diff --git a/docs/CustomAuthenticationMechanism.md b/docs/CustomAuthenticationMechanism.md index 9b31151eb..b4435e39d 100644 --- a/docs/CustomAuthenticationMechanism.md +++ b/docs/CustomAuthenticationMechanism.md @@ -24,19 +24,29 @@ configure your brokers. A custom authentication mechanism needs to fulfill the following interface: ```ts -type AuthenticationProvider = ( - host: string, - port: number, - logger: Logger, - saslAuthenticate: (request: SaslAuthenticationRequest, response?: SaslAuthenticationResponse) => Promise -) => Authenticator +type AuthenticationProviderArgs = { + host: string + port: number + logger: Logger + saslAuthenticate: ( + request: SaslAuthenticationRequest, + response?: SaslAuthenticationResponse + ) => Promise +} + +type Mechanism = { + mechanism: string + authenticationProvider: (args: AuthenticationProviderArgs) => Authenticator +} type Authenticator = { authenticate(): Promise } + type SaslAuthenticationRequest = { encode: () => Buffer | Promise } + type SaslAuthenticationResponse = { decode: (rawResponse: Buffer) => Buffer | Promise parse: (data: Buffer) => ParseResult @@ -61,7 +71,7 @@ should equal `says`, if it does not start with "Simon says", it should be an emp It is a non-existent authentication mechanism just made up to show how to implement the expected interface. It is not a real authentication mechanism. ```js -const simonAuthenticator = says = (host, port, logger, saslAuthenticate) => { +const simonAuthenticator = says = ({ host, port, logger, saslAuthenticate }) => { const INT32_SIZE = 4 const request = { diff --git a/src/broker/saslAuthenticator/awsIam.js b/src/broker/saslAuthenticator/awsIam.js index 583e5954c..45e166fa6 100644 --- a/src/broker/saslAuthenticator/awsIam.js +++ b/src/broker/saslAuthenticator/awsIam.js @@ -1,7 +1,7 @@ const { request, response } = require('../../protocol/sasl/awsIam') const { KafkaJSSASLAuthenticationError } = require('../../errors') -const awsIAMAuthenticatorProvider = sasl => (host, port, logger, saslAuthenticate) => { +const awsIAMAuthenticatorProvider = sasl => ({ host, port, logger, saslAuthenticate }) => { return { authenticate: async () => { if (!sasl.authorizationIdentity) { diff --git a/src/broker/saslAuthenticator/awsIam.spec.js b/src/broker/saslAuthenticator/awsIam.spec.js index 3d7b77392..e7b29ad2a 100644 --- a/src/broker/saslAuthenticator/awsIam.spec.js +++ b/src/broker/saslAuthenticator/awsIam.spec.js @@ -3,7 +3,7 @@ const awsIAMAuthenticatorProvider = require('./awsIam') describe('Broker > SASL Authenticator > AWS-IAM', () => { it('throws KafkaJSSASLAuthenticationError for missing authorizationIdentity', async () => { - const awsIam = awsIAMAuthenticatorProvider({})('', 0, newLogger()) + const awsIam = awsIAMAuthenticatorProvider({})({ host: '', port: 0, logger: newLogger() }) await expect(awsIam.authenticate()).rejects.toThrow( 'SASL AWS-IAM: Missing authorizationIdentity' ) @@ -13,7 +13,7 @@ describe('Broker > SASL Authenticator > AWS-IAM', () => { const awsIam = awsIAMAuthenticatorProvider({ authorizationIdentity: '', secretAccessKey: '', - })('', 0, newLogger()) + })({ host: '', port: 0, logger: newLogger() }) await expect(awsIam.authenticate()).rejects.toThrow('SASL AWS-IAM: Missing accessKeyId') }) @@ -21,7 +21,7 @@ describe('Broker > SASL Authenticator > AWS-IAM', () => { const awsIam = awsIAMAuthenticatorProvider({ authorizationIdentity: '', accessKeyId: '', - })('', 0, newLogger()) + })({ host: '', port: 0, logger: newLogger() }) await expect(awsIam.authenticate()).rejects.toThrow('SASL AWS-IAM: Missing secretAccessKey') }) }) diff --git a/src/broker/saslAuthenticator/index.js b/src/broker/saslAuthenticator/index.js index c32abb7ef..ddd1f906c 100644 --- a/src/broker/saslAuthenticator/index.js +++ b/src/broker/saslAuthenticator/index.js @@ -71,12 +71,12 @@ module.exports = class SASLAuthenticator { ) } await this.connection.sasl - .authenticationProvider( - this.connection.host, - this.connection.port, - this.logger.namespace(`SaslAuthenticator-${mechanism}`), - saslAuthenticate - ) + .authenticationProvider({ + host: this.connection.host, + port: this.connection.port, + logger: this.logger.namespace(`SaslAuthenticator-${mechanism}`), + saslAuthenticate, + }) .authenticate() } } diff --git a/src/broker/saslAuthenticator/oauthBearer.js b/src/broker/saslAuthenticator/oauthBearer.js index 28fa027bb..086a17aa9 100644 --- a/src/broker/saslAuthenticator/oauthBearer.js +++ b/src/broker/saslAuthenticator/oauthBearer.js @@ -13,7 +13,7 @@ const { request } = require('../../protocol/sasl/oauthBearer') const { KafkaJSSASLAuthenticationError } = require('../../errors') -const oauthBearerAuthenticatorProvider = sasl => (host, port, logger, saslAuthenticate) => { +const oauthBearerAuthenticatorProvider = sasl => ({ host, port, logger, saslAuthenticate }) => { return { authenticate: async () => { const { oauthBearerProvider } = sasl diff --git a/src/broker/saslAuthenticator/oauthBearer.spec.js b/src/broker/saslAuthenticator/oauthBearer.spec.js index 0b69790f5..189fa2f7f 100644 --- a/src/broker/saslAuthenticator/oauthBearer.spec.js +++ b/src/broker/saslAuthenticator/oauthBearer.spec.js @@ -3,7 +3,11 @@ const oauthBearerAuthenticatorProvider = require('./oauthBearer') describe('Broker > SASL Authenticator > OAUTHBEARER', () => { it('throws KafkaJSSASLAuthenticationError for missing oauthBearerProvider', async () => { - const oauthBearer = oauthBearerAuthenticatorProvider({})('', 0, newLogger()) + const oauthBearer = oauthBearerAuthenticatorProvider({})({ + host: '', + port: 0, + logger: newLogger(), + }) await expect(oauthBearer.authenticate()).rejects.toThrow('Missing OAuth bearer token provider') }) @@ -12,11 +16,11 @@ describe('Broker > SASL Authenticator > OAUTHBEARER', () => { return {} } - const oauthBearer = oauthBearerAuthenticatorProvider({ oauthBearerProvider })( - '', - 0, - newLogger() - ) + const oauthBearer = oauthBearerAuthenticatorProvider({ oauthBearerProvider })({ + host: '', + port: 0, + logger: newLogger(), + }) await expect(oauthBearer.authenticate()).rejects.toThrow('Invalid OAuth bearer token') }) }) diff --git a/src/broker/saslAuthenticator/plain.js b/src/broker/saslAuthenticator/plain.js index 98c6f56a4..fb63463d7 100644 --- a/src/broker/saslAuthenticator/plain.js +++ b/src/broker/saslAuthenticator/plain.js @@ -1,7 +1,7 @@ const { request, response } = require('../../protocol/sasl/plain') const { KafkaJSSASLAuthenticationError } = require('../../errors') -const plainAuthenticatorProvider = sasl => (host, port, logger, saslAuthenticate) => { +const plainAuthenticatorProvider = sasl => ({ host, port, logger, saslAuthenticate }) => { return { authenticate: async () => { if (sasl.username == null || sasl.password == null) { diff --git a/src/broker/saslAuthenticator/plain.spec.js b/src/broker/saslAuthenticator/plain.spec.js index 316f0b7de..a3ff0768d 100644 --- a/src/broker/saslAuthenticator/plain.spec.js +++ b/src/broker/saslAuthenticator/plain.spec.js @@ -3,12 +3,16 @@ const plainAuthenticatorProvider = require('./plain') describe('Broker > SASL Authenticator > PLAIN', () => { it('throws KafkaJSSASLAuthenticationError for invalid username', async () => { - const plain = plainAuthenticatorProvider({})('', 0, newLogger()) + const plain = plainAuthenticatorProvider({})({ host: '', port: 0, logger: newLogger() }) await expect(plain.authenticate()).rejects.toThrow('Invalid username or password') }) it('throws KafkaJSSASLAuthenticationError for invalid password', async () => { - const plain = plainAuthenticatorProvider({ username: '' })('', 0, newLogger()) + const plain = plainAuthenticatorProvider({ username: '' })({ + host: '', + port: 0, + logger: newLogger(), + }) await expect(plain.authenticate()).rejects.toThrow('Invalid username or password') }) }) diff --git a/src/broker/saslAuthenticator/scram.spec.js b/src/broker/saslAuthenticator/scram.spec.js index 5edeb8476..5abbbfaae 100644 --- a/src/broker/saslAuthenticator/scram.spec.js +++ b/src/broker/saslAuthenticator/scram.spec.js @@ -17,17 +17,17 @@ describe('Broker > SASL Authenticator > SCRAM', () => { }) it('throws KafkaJSSASLAuthenticationError for invalid username', async () => { - const scram = scram256AuthenticatorProvider({})('', 0, newLogger()) + const scram = scram256AuthenticatorProvider({})({ host: '', port: 0, logger: newLogger() }) await expect(scram.authenticate()).rejects.toThrow('Invalid username or password') }) it('throws KafkaJSSASLAuthenticationError for invalid password', async () => { - const scram = scram256AuthenticatorProvider({ username: '' })( - '', - 0, - newLogger(), - saslAuthenticate - ) + const scram = scram256AuthenticatorProvider({ username: '' })({ + host: '', + port: 0, + logger: newLogger(), + saslAuthenticate, + }) await expect(scram.authenticate()).rejects.toThrow('Invalid username or password') }) diff --git a/src/broker/saslAuthenticator/scram256.js b/src/broker/saslAuthenticator/scram256.js index 256c6f652..34030f4a2 100644 --- a/src/broker/saslAuthenticator/scram256.js +++ b/src/broker/saslAuthenticator/scram256.js @@ -1,6 +1,6 @@ const { SCRAM, DIGESTS } = require('./scram') -const scram256AuthenticatorProvider = sasl => (host, port, logger, saslAuthenticate) => { +const scram256AuthenticatorProvider = sasl => ({ host, port, logger, saslAuthenticate }) => { const scram = new SCRAM(sasl, host, port, logger, saslAuthenticate, DIGESTS.SHA256) return { authenticate: async () => await scram.authenticate(), diff --git a/src/broker/saslAuthenticator/scram512.js b/src/broker/saslAuthenticator/scram512.js index b2854d83e..961ec0d53 100644 --- a/src/broker/saslAuthenticator/scram512.js +++ b/src/broker/saslAuthenticator/scram512.js @@ -1,6 +1,6 @@ const { SCRAM, DIGESTS } = require('./scram') -const scram512AuthenticatorProvider = sasl => (host, port, logger, saslAuthenticate) => { +const scram512AuthenticatorProvider = sasl => ({ host, port, logger, saslAuthenticate }) => { const scram = new SCRAM(sasl, host, port, logger, saslAuthenticate, DIGESTS.SHA512) return { authenticate: async () => await scram.authenticate(), diff --git a/types/index.d.ts b/types/index.d.ts index 347f42628..04d8e9c0d 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -28,17 +28,19 @@ export type Authenticator = { authenticate: () => Promise } +export type AuthenticationProviderArgs = { + host: string + port: number + logger: Logger + saslAuthenticate: ( + request: SaslAuthenticationRequest, + response?: SaslAuthenticationResponse + ) => Promise +} + export type Mechanism = { mechanism: string - authenticationProvider: ( - host: string, - port: number, - logger: Logger, - saslAuthenticate: ( - request: SaslAuthenticationRequest, - response?: SaslAuthenticationResponse - ) => Promise - ) => Authenticator + authenticationProvider: (args: AuthenticationProviderArgs) => Authenticator } export interface KafkaConfig { @@ -119,8 +121,8 @@ export type DefaultPartitioner = ICustomPartitioner export type LegacyPartitioner = ICustomPartitioner export const Partitioners: { - DefaultPartitioner: DefaultPartitioner, - LegacyPartitioner: LegacyPartitioner, + DefaultPartitioner: DefaultPartitioner + LegacyPartitioner: LegacyPartitioner /** * @deprecated Use DefaultPartitioner instead * From 09e8163f9d25481cfdbecd3f89eb5155f3f77f80 Mon Sep 17 00:00:00 2001 From: Mark Gaylard Date: Wed, 6 Jul 2022 11:40:50 +1000 Subject: [PATCH 13/14] only run auth override test when plain auth mechanism is available --- src/broker/__tests__/connect.spec.js | 38 +++++++++++++++------------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/src/broker/__tests__/connect.spec.js b/src/broker/__tests__/connect.spec.js index c13f61044..9987704d7 100644 --- a/src/broker/__tests__/connect.spec.js +++ b/src/broker/__tests__/connect.spec.js @@ -58,25 +58,27 @@ describe('Broker > connect', () => { ) }) - test('user provided authenticator overrides built in ones', async () => { - broker = new Broker({ - connectionPool: createConnectionPool( - Object.assign(sslConnectionOpts(), { - port: 9094, - sasl: { - mechanism: 'PLAIN', - authenticationProvider: () => ({ - authenticate: async () => { - throw new Error('test error') - }, - }), - }, - }) - ), - logger: newLogger(), - }) + describeIfOauthbearerDisabled('when PLAIN is configured', () => { + test('user provided authenticator overrides built in ones', async () => { + broker = new Broker({ + connectionPool: createConnectionPool( + Object.assign(sslConnectionOpts(), { + port: 9094, + sasl: { + mechanism: 'PLAIN', + authenticationProvider: () => ({ + authenticate: async () => { + throw new Error('test error') + }, + }), + }, + }) + ), + logger: newLogger(), + }) - await expect(broker.connect()).rejects.toThrow('test error') + await expect(broker.connect()).rejects.toThrow('test error') + }) }) for (const e of saslEntries) { From a3d2cc6f94283d4d38bebe7ffdf9ae595d331ac2 Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Wed, 6 Jul 2022 07:59:48 +0200 Subject: [PATCH 14/14] Update docs with new authenticationProvider interface --- docs/Configuration.md | 2 +- docs/CustomAuthenticationMechanism.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/Configuration.md b/docs/Configuration.md index 51523f0a8..a40cb7d1c 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -274,7 +274,7 @@ mechanism can be introduced as a plugin: { sasl: { mechanism: , - authenticationProvider: (host, port, logger, saslAuthenticate) => { authenticate: () => Promise } + authenticationProvider: ({ host, port, logger, saslAuthenticate }) => { authenticate: () => Promise } } } ``` diff --git a/docs/CustomAuthenticationMechanism.md b/docs/CustomAuthenticationMechanism.md index b4435e39d..f02eced77 100644 --- a/docs/CustomAuthenticationMechanism.md +++ b/docs/CustomAuthenticationMechanism.md @@ -10,7 +10,7 @@ custom authentication mechanisms can be introduced: { sasl: { mechanism: , - authenticationProvider: (host, port, logger, saslAuthenticate) => { authenticate: () => Promise } + authenticationProvider: ({ host, port, logger, saslAuthenticate }) => { authenticate: () => Promise } } } ```