diff --git a/docs/Configuration.md b/docs/Configuration.md index dfce30348..a40cb7d1c 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -265,6 +265,23 @@ A complete breakdown can be found in the IAM User Guide's It is **highly recommended** that you use SSL for encryption when using `PLAIN` or `AWS`, otherwise credentials will be transmitted in cleartext! +### Custom Authentication Mechanisms + +If an authentication mechanism is not supported out of the box in KafkaJS, a custom authentication +mechanism can be introduced as a plugin: + +```js +{ + sasl: { + mechanism: , + authenticationProvider: ({ host, port, logger, saslAuthenticate }) => { authenticate: () => Promise } + } +} +``` + +See [Custom Authentication Mechanisms](CustomAuthenticationMechanism.md) for more information on how to implement your own +authentication mechanism. + ## Connection Timeout Time in milliseconds to wait for a successful connection. The default value is: `1000`. diff --git a/docs/CustomAuthenticationMechanism.md b/docs/CustomAuthenticationMechanism.md new file mode 100644 index 000000000..f02eced77 --- /dev/null +++ b/docs/CustomAuthenticationMechanism.md @@ -0,0 +1,166 @@ +--- +id: custom-authentication-mechanism +title: Custom Authentication Mechanisms +--- + +To use an authentication mechanism that is not supported out of the box by KafkaJS, +custom authentication mechanisms can be introduced: + +```js +{ + sasl: { + mechanism: , + authenticationProvider: ({ host, port, logger, saslAuthenticate }) => { authenticate: () => Promise } + } +} +``` + +`` needs to match the SASL mechanism configured in the `sasl.enabled.mechanisms` +property in `server.properties`. See the Kafka documentation for information on how to +configure your brokers. + +## Writing a custom authentication mechanism + +A custom authentication mechanism needs to fulfill the following interface: + +```ts +type AuthenticationProviderArgs = { + host: string + port: number + logger: Logger + saslAuthenticate: ( + request: SaslAuthenticationRequest, + response?: SaslAuthenticationResponse + ) => Promise +} + +type Mechanism = { + mechanism: string + authenticationProvider: (args: AuthenticationProviderArgs) => Authenticator +} + +type Authenticator = { + authenticate(): Promise +} + +type SaslAuthenticationRequest = { + encode: () => Buffer | Promise +} + +type SaslAuthenticationResponse = { + decode: (rawResponse: Buffer) => Buffer | Promise + parse: (data: Buffer) => ParseResult +} +``` +* `host` - Hostname of the specific broker to connect to +* `port` - Port of the specific broker to connect to +* `logger` - A logger instance namespaced to the authentication mechanism +* `saslAuthenticate` - an async function to make [`SaslAuthenticate`](https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate) +requests towards the broker. The `request` and `response` functions are used to encode the `auth_bytes` of the request, and to optionally +decode and parse the `auth_bytes` in the response. `response` can be omitted if no response `auth_bytes` are expected. +### Example +In this example we will create a custom authentication mechanism called `simon`. The general +flow will be: +1. Send a [`SaslAuthenticate`](https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate) +request with the value of `says` as `auth_bytes`. +2. Read the response from the broker. If `says` starts with "Simon says", the response `auth_bytes` +should equal `says`, if it does not start with "Simon says", it should be an empty string. + +**This is a made up example!** + +It is a non-existent authentication mechanism just made up to show how to implement the expected interface. It is not a real authentication mechanism. + +```js +const simonAuthenticator = says = ({ host, port, logger, saslAuthenticate }) => { + const INT32_SIZE = 4 + + const request = { + /** + * Encodes the value for `auth_bytes` in SaslAuthenticate request + * @see https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate + * + * In this example, we are just sending `says` as a string, + * with the length of the string in bytes prepended as an int32 + **/ + encode: () => { + const byteLength = Buffer.byteLength(says, 'utf8') + const buf = Buffer.alloc(INT32_SIZE + byteLength) + buf.writeUInt32BE(byteLength, 0) + buf.write(says, INT32_SIZE, byteLength, 'utf8') + return buf + }, + } + const response = { + /** + * Decodes the `auth_bytes` in SaslAuthenticate response + * @see https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate + * + * This is essentially the reverse of `request.encode`, where + * we read the length of the string as an int32 and then read + * that many bytes + */ + decode: rawData => { + const byteLength = rawData.readInt32BE(0) + return rawData.slice(INT32_SIZE, INT32_SIZE + byteLength) + }, + /** + * The return value from `response.decode` is passed into + * this function, which is responsible for interpreting + * the data. In this case, we just turn the buffer back + * into a string + */ + parse: data => { + return data.toString() + }, + } + return { + /** + * This function is responsible for orchestrating the authentication flow. + * Essentially we will send a SaslAuthenticate request with the + * value of `sasl.says` to the broker, and expect to + * get the same value back. + * + * Other authentication methods may do any other operations they + * like, but communication with the brokers goes through + * the SaslAuthenticate request. + */ + authenticate: async () => { + if (says == null) { + throw new Error('SASL Simon: Invalid "says"') + } + const broker = `${host}:${port}` + try { + logger.info('Authenticate with SASL Simon', { broker }) + const authenticateResponse = await saslAuthenticate({ request, response }) + + const saidSimon = says.startsWith("Simon says ") + const expectedResponse = saidSimon ? says : "" + if (authenticateResponse !== expectedResponse) { + throw new Error("Mismatching response from broker") + } + logger.info('SASL Simon authentication successful', { broker }) + } catch (e) { + const error = new Error( + `SASL Simon authentication failed: ${e.message}` + ) + logger.error(error.message, { broker }) + throw error + } + }, + } +} +``` + +The `response` argument to `saslAuthenticate` is optional, in case the authentication +method does not require the `auth_bytes` in the response. + +In the example above, we expect the client to be configured as such: + +```js +const config = { + sasl: { + mechanism: 'simon' + authenticationProvider: simonAuthenticator('Simon says authenticate me') + } +} +``` \ No newline at end of file diff --git a/src/broker/__tests__/connect.spec.js b/src/broker/__tests__/connect.spec.js index 07b014d50..9987704d7 100644 --- a/src/broker/__tests__/connect.spec.js +++ b/src/broker/__tests__/connect.spec.js @@ -2,6 +2,7 @@ const { createConnectionPool, connectionOpts, saslSCRAM256ConnectionOpts, + sslConnectionOpts, newLogger, testIfKafkaAtLeast_1_1_0, describeIfOauthbearerDisabled, @@ -34,6 +35,52 @@ describe('Broker > connect', () => { expect(broker.versions).toBeTruthy() }) + test("throws if the mechanism isn't supported by the server", async () => { + broker = new Broker({ + connectionPool: createConnectionPool( + Object.assign(sslConnectionOpts(), { + port: 9094, + sasl: { + mechanism: 'fake-mechanism', + authenticationProvider: () => ({ + authenticate: async () => { + throw new Error('🥸') + }, + }), + }, + }) + ), + logger: newLogger(), + }) + + await expect(broker.connect()).rejects.toThrow( + 'The broker does not support the requested SASL mechanism' + ) + }) + + describeIfOauthbearerDisabled('when PLAIN is configured', () => { + test('user provided authenticator overrides built in ones', async () => { + broker = new Broker({ + connectionPool: createConnectionPool( + Object.assign(sslConnectionOpts(), { + port: 9094, + sasl: { + mechanism: 'PLAIN', + authenticationProvider: () => ({ + authenticate: async () => { + throw new Error('test error') + }, + }), + }, + }) + ), + logger: newLogger(), + }) + + await expect(broker.connect()).rejects.toThrow('test error') + }) + }) + for (const e of saslEntries) { test(`authenticate with SASL ${e.name} if configured`, async () => { broker = new Broker({ diff --git a/src/broker/saslAuthenticator/awsIam.js b/src/broker/saslAuthenticator/awsIam.js index 795d0c076..45e166fa6 100644 --- a/src/broker/saslAuthenticator/awsIam.js +++ b/src/broker/saslAuthenticator/awsIam.js @@ -1,43 +1,37 @@ -const awsIam = require('../../protocol/sasl/awsIam') +const { request, response } = require('../../protocol/sasl/awsIam') const { KafkaJSSASLAuthenticationError } = require('../../errors') -module.exports = class AWSIAMAuthenticator { - constructor(connection, logger, saslAuthenticate) { - this.connection = connection - this.logger = logger.namespace('SASLAWSIAMAuthenticator') - this.saslAuthenticate = saslAuthenticate - } - - async authenticate() { - const { sasl } = this.connection - if (!sasl.authorizationIdentity) { - throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing authorizationIdentity') - } - if (!sasl.accessKeyId) { - throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing accessKeyId') - } - if (!sasl.secretAccessKey) { - throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing secretAccessKey') - } - if (!sasl.sessionToken) { - sasl.sessionToken = '' - } +const awsIAMAuthenticatorProvider = sasl => ({ host, port, logger, saslAuthenticate }) => { + return { + authenticate: async () => { + if (!sasl.authorizationIdentity) { + throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing authorizationIdentity') + } + if (!sasl.accessKeyId) { + throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing accessKeyId') + } + if (!sasl.secretAccessKey) { + throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing secretAccessKey') + } + if (!sasl.sessionToken) { + sasl.sessionToken = '' + } - const request = awsIam.request(sasl) - const response = awsIam.response - const { host, port } = this.connection - const broker = `${host}:${port}` + const broker = `${host}:${port}` - try { - this.logger.debug('Authenticate with SASL AWS-IAM', { broker }) - await this.saslAuthenticate({ request, response }) - this.logger.debug('SASL AWS-IAM authentication successful', { broker }) - } catch (e) { - const error = new KafkaJSSASLAuthenticationError( - `SASL AWS-IAM authentication failed: ${e.message}` - ) - this.logger.error(error.message, { broker }) - throw error - } + try { + logger.debug('Authenticate with SASL AWS-IAM', { broker }) + await saslAuthenticate({ request: request(sasl), response }) + logger.debug('SASL AWS-IAM authentication successful', { broker }) + } catch (e) { + const error = new KafkaJSSASLAuthenticationError( + `SASL AWS-IAM authentication failed: ${e.message}` + ) + logger.error(error.message, { broker }) + throw error + } + }, } } + +module.exports = awsIAMAuthenticatorProvider diff --git a/src/broker/saslAuthenticator/awsIam.spec.js b/src/broker/saslAuthenticator/awsIam.spec.js index 42b539a30..e7b29ad2a 100644 --- a/src/broker/saslAuthenticator/awsIam.spec.js +++ b/src/broker/saslAuthenticator/awsIam.spec.js @@ -1,32 +1,27 @@ const { newLogger } = require('testHelpers') -const AWSIAM = require('./awsIam') +const awsIAMAuthenticatorProvider = require('./awsIam') describe('Broker > SASL Authenticator > AWS-IAM', () => { it('throws KafkaJSSASLAuthenticationError for missing authorizationIdentity', async () => { - const awsIam = new AWSIAM({ sasl: {} }, newLogger()) + const awsIam = awsIAMAuthenticatorProvider({})({ host: '', port: 0, logger: newLogger() }) await expect(awsIam.authenticate()).rejects.toThrow( 'SASL AWS-IAM: Missing authorizationIdentity' ) }) it('throws KafkaJSSASLAuthenticationError for invalid accessKeyId', async () => { - const awsIam = new AWSIAM( - { - sasl: { - authorizationIdentity: '', - secretAccessKey: '', - }, - }, - newLogger() - ) + const awsIam = awsIAMAuthenticatorProvider({ + authorizationIdentity: '', + secretAccessKey: '', + })({ host: '', port: 0, logger: newLogger() }) await expect(awsIam.authenticate()).rejects.toThrow('SASL AWS-IAM: Missing accessKeyId') }) it('throws KafkaJSSASLAuthenticationError for invalid secretAccessKey', async () => { - const awsIam = new AWSIAM( - { sasl: { authorizationIdentity: '', accessKeyId: '' } }, - newLogger() - ) + const awsIam = awsIAMAuthenticatorProvider({ + authorizationIdentity: '', + accessKeyId: '', + })({ host: '', port: 0, logger: newLogger() }) await expect(awsIam.authenticate()).rejects.toThrow('SASL AWS-IAM: Missing secretAccessKey') }) }) diff --git a/src/broker/saslAuthenticator/index.js b/src/broker/saslAuthenticator/index.js index 9d79d13e7..ddd1f906c 100644 --- a/src/broker/saslAuthenticator/index.js +++ b/src/broker/saslAuthenticator/index.js @@ -1,21 +1,20 @@ const { requests, lookup } = require('../../protocol/requests') const apiKeys = require('../../protocol/requests/apiKeys') -const PlainAuthenticator = require('./plain') -const SCRAM256Authenticator = require('./scram256') -const SCRAM512Authenticator = require('./scram512') -const AWSIAMAuthenticator = require('./awsIam') -const OAuthBearerAuthenticator = require('./oauthBearer') +const plainAuthenticatorProvider = require('./plain') +const scram256AuthenticatorProvider = require('./scram256') +const scram512AuthenticatorProvider = require('./scram512') +const awsIAMAuthenticatorProvider = require('./awsIam') +const oauthBearerAuthenticatorProvider = require('./oauthBearer') const { KafkaJSSASLAuthenticationError } = require('../../errors') -const AUTHENTICATORS = { - PLAIN: PlainAuthenticator, - 'SCRAM-SHA-256': SCRAM256Authenticator, - 'SCRAM-SHA-512': SCRAM512Authenticator, - AWS: AWSIAMAuthenticator, - OAUTHBEARER: OAuthBearerAuthenticator, +const BUILT_IN_AUTHENTICATION_PROVIDERS = { + AWS: awsIAMAuthenticatorProvider, + PLAIN: plainAuthenticatorProvider, + OAUTHBEARER: oauthBearerAuthenticatorProvider, + 'SCRAM-SHA-256': scram256AuthenticatorProvider, + 'SCRAM-SHA-512': scram512AuthenticatorProvider, } -const SUPPORTED_MECHANISMS = Object.keys(AUTHENTICATORS) const UNLIMITED_SESSION_LIFETIME = '0' module.exports = class SASLAuthenticator { @@ -33,12 +32,6 @@ module.exports = class SASLAuthenticator { async authenticate() { const mechanism = this.connection.sasl.mechanism.toUpperCase() - if (!SUPPORTED_MECHANISMS.includes(mechanism)) { - throw new KafkaJSSASLAuthenticationError( - `SASL ${mechanism} mechanism is not supported by the client` - ) - } - const handshake = await this.connection.send(this.saslHandshake({ mechanism })) if (!handshake.enabledMechanisms.includes(mechanism)) { throw new KafkaJSSASLAuthenticationError( @@ -46,9 +39,9 @@ module.exports = class SASLAuthenticator { ) } - const saslAuthenticate = async ({ request, response, authExpectResponse }) => { + const saslAuthenticate = async ({ request, response }) => { if (this.protocolAuthentication) { - const { buffer: requestAuthBytes } = await request.encode() + const requestAuthBytes = await request.encode() const authResponse = await this.connection.send( this.protocolAuthentication({ authBytes: requestAuthBytes }) ) @@ -57,7 +50,7 @@ module.exports = class SASLAuthenticator { // This is not present in SaslAuthenticateV0, so we default to `"0"` this.sessionLifetime = authResponse.sessionLifetimeMs || UNLIMITED_SESSION_LIFETIME - if (!authExpectResponse) { + if (!response) { return } @@ -66,10 +59,24 @@ module.exports = class SASLAuthenticator { return response.parse(payloadDecoded) } - return this.connection.sendAuthRequest({ request, response, authExpectResponse }) + return this.connection.sendAuthRequest({ request, response }) } - const Authenticator = AUTHENTICATORS[mechanism] - await new Authenticator(this.connection, this.logger, saslAuthenticate).authenticate() + if ( + !this.connection.sasl.authenticationProvider && + Object.keys(BUILT_IN_AUTHENTICATION_PROVIDERS).includes(mechanism) + ) { + this.connection.sasl.authenticationProvider = BUILT_IN_AUTHENTICATION_PROVIDERS[mechanism]( + this.connection.sasl + ) + } + await this.connection.sasl + .authenticationProvider({ + host: this.connection.host, + port: this.connection.port, + logger: this.logger.namespace(`SaslAuthenticator-${mechanism}`), + saslAuthenticate, + }) + .authenticate() } } diff --git a/src/broker/saslAuthenticator/oauthBearer.js b/src/broker/saslAuthenticator/oauthBearer.js index 2ede35674..086a17aa9 100644 --- a/src/broker/saslAuthenticator/oauthBearer.js +++ b/src/broker/saslAuthenticator/oauthBearer.js @@ -10,47 +10,41 @@ * reused and refreshed when appropriate. */ -const oauthBearer = require('../../protocol/sasl/oauthBearer') +const { request } = require('../../protocol/sasl/oauthBearer') const { KafkaJSSASLAuthenticationError } = require('../../errors') -module.exports = class OAuthBearerAuthenticator { - constructor(connection, logger, saslAuthenticate) { - this.connection = connection - this.logger = logger.namespace('SASLOAuthBearerAuthenticator') - this.saslAuthenticate = saslAuthenticate - } - - async authenticate() { - const { sasl } = this.connection - if (sasl.oauthBearerProvider == null) { - throw new KafkaJSSASLAuthenticationError( - 'SASL OAUTHBEARER: Missing OAuth bearer token provider' - ) - } - - const { oauthBearerProvider } = sasl - - const oauthBearerToken = await oauthBearerProvider() - - if (oauthBearerToken.value == null) { - throw new KafkaJSSASLAuthenticationError('SASL OAUTHBEARER: Invalid OAuth bearer token') - } - - const request = await oauthBearer.request(sasl, oauthBearerToken) - const response = oauthBearer.response - const { host, port } = this.connection - const broker = `${host}:${port}` - - try { - this.logger.debug('Authenticate with SASL OAUTHBEARER', { broker }) - await this.saslAuthenticate({ request, response }) - this.logger.debug('SASL OAUTHBEARER authentication successful', { broker }) - } catch (e) { - const error = new KafkaJSSASLAuthenticationError( - `SASL OAUTHBEARER authentication failed: ${e.message}` - ) - this.logger.error(error.message, { broker }) - throw error - } +const oauthBearerAuthenticatorProvider = sasl => ({ host, port, logger, saslAuthenticate }) => { + return { + authenticate: async () => { + const { oauthBearerProvider } = sasl + + if (oauthBearerProvider == null) { + throw new KafkaJSSASLAuthenticationError( + 'SASL OAUTHBEARER: Missing OAuth bearer token provider' + ) + } + + const oauthBearerToken = await oauthBearerProvider() + + if (oauthBearerToken.value == null) { + throw new KafkaJSSASLAuthenticationError('SASL OAUTHBEARER: Invalid OAuth bearer token') + } + + const broker = `${host}:${port}` + + try { + logger.debug('Authenticate with SASL OAUTHBEARER', { broker }) + await saslAuthenticate({ request: await request(sasl, oauthBearerToken) }) + logger.debug('SASL OAUTHBEARER authentication successful', { broker }) + } catch (e) { + const error = new KafkaJSSASLAuthenticationError( + `SASL OAUTHBEARER authentication failed: ${e.message}` + ) + logger.error(error.message, { broker }) + throw error + } + }, } } + +module.exports = oauthBearerAuthenticatorProvider diff --git a/src/broker/saslAuthenticator/oauthBearer.spec.js b/src/broker/saslAuthenticator/oauthBearer.spec.js index 7bf1b08a0..189fa2f7f 100644 --- a/src/broker/saslAuthenticator/oauthBearer.spec.js +++ b/src/broker/saslAuthenticator/oauthBearer.spec.js @@ -1,9 +1,13 @@ const { newLogger } = require('testHelpers') -const OAuthBearer = require('./oauthBearer') +const oauthBearerAuthenticatorProvider = require('./oauthBearer') describe('Broker > SASL Authenticator > OAUTHBEARER', () => { it('throws KafkaJSSASLAuthenticationError for missing oauthBearerProvider', async () => { - const oauthBearer = new OAuthBearer({ sasl: {} }, newLogger()) + const oauthBearer = oauthBearerAuthenticatorProvider({})({ + host: '', + port: 0, + logger: newLogger(), + }) await expect(oauthBearer.authenticate()).rejects.toThrow('Missing OAuth bearer token provider') }) @@ -12,7 +16,11 @@ describe('Broker > SASL Authenticator > OAUTHBEARER', () => { return {} } - const oauthBearer = new OAuthBearer({ sasl: { oauthBearerProvider } }, newLogger()) + const oauthBearer = oauthBearerAuthenticatorProvider({ oauthBearerProvider })({ + host: '', + port: 0, + logger: newLogger(), + }) await expect(oauthBearer.authenticate()).rejects.toThrow('Invalid OAuth bearer token') }) }) diff --git a/src/broker/saslAuthenticator/plain.js b/src/broker/saslAuthenticator/plain.js index 2e06bc4ba..fb63463d7 100644 --- a/src/broker/saslAuthenticator/plain.js +++ b/src/broker/saslAuthenticator/plain.js @@ -1,34 +1,28 @@ -const plain = require('../../protocol/sasl/plain') +const { request, response } = require('../../protocol/sasl/plain') const { KafkaJSSASLAuthenticationError } = require('../../errors') -module.exports = class PlainAuthenticator { - constructor(connection, logger, saslAuthenticate) { - this.connection = connection - this.logger = logger.namespace('SASLPlainAuthenticator') - this.saslAuthenticate = saslAuthenticate - } - - async authenticate() { - const { sasl } = this.connection - if (sasl.username == null || sasl.password == null) { - throw new KafkaJSSASLAuthenticationError('SASL Plain: Invalid username or password') - } +const plainAuthenticatorProvider = sasl => ({ host, port, logger, saslAuthenticate }) => { + return { + authenticate: async () => { + if (sasl.username == null || sasl.password == null) { + throw new KafkaJSSASLAuthenticationError('SASL Plain: Invalid username or password') + } - const request = plain.request(sasl) - const response = plain.response - const { host, port } = this.connection - const broker = `${host}:${port}` + const broker = `${host}:${port}` - try { - this.logger.debug('Authenticate with SASL PLAIN', { broker }) - await this.saslAuthenticate({ request, response }) - this.logger.debug('SASL PLAIN authentication successful', { broker }) - } catch (e) { - const error = new KafkaJSSASLAuthenticationError( - `SASL PLAIN authentication failed: ${e.message}` - ) - this.logger.error(error.message, { broker }) - throw error - } + try { + logger.debug('Authenticate with SASL PLAIN', { broker }) + await saslAuthenticate({ request: request(sasl), response }) + logger.debug('SASL PLAIN authentication successful', { broker }) + } catch (e) { + const error = new KafkaJSSASLAuthenticationError( + `SASL PLAIN authentication failed: ${e.message}` + ) + logger.error(error.message, { broker }) + throw error + } + }, } } + +module.exports = plainAuthenticatorProvider diff --git a/src/broker/saslAuthenticator/plain.spec.js b/src/broker/saslAuthenticator/plain.spec.js index a1a95e52f..a3ff0768d 100644 --- a/src/broker/saslAuthenticator/plain.spec.js +++ b/src/broker/saslAuthenticator/plain.spec.js @@ -1,14 +1,18 @@ const { newLogger } = require('testHelpers') -const Plain = require('./plain') +const plainAuthenticatorProvider = require('./plain') describe('Broker > SASL Authenticator > PLAIN', () => { it('throws KafkaJSSASLAuthenticationError for invalid username', async () => { - const plain = new Plain({ sasl: {} }, newLogger()) + const plain = plainAuthenticatorProvider({})({ host: '', port: 0, logger: newLogger() }) await expect(plain.authenticate()).rejects.toThrow('Invalid username or password') }) it('throws KafkaJSSASLAuthenticationError for invalid password', async () => { - const plain = new Plain({ sasl: { username: '' } }, newLogger()) + const plain = plainAuthenticatorProvider({ username: '' })({ + host: '', + port: 0, + logger: newLogger(), + }) await expect(plain.authenticate()).rejects.toThrow('Invalid username or password') }) }) diff --git a/src/broker/saslAuthenticator/scram.js b/src/broker/saslAuthenticator/scram.js index 222d8384d..355fdbcc9 100644 --- a/src/broker/saslAuthenticator/scram.js +++ b/src/broker/saslAuthenticator/scram.js @@ -108,13 +108,15 @@ class SCRAM { } /** - * @param {Connection} connection + * @param {SASLOptions} sasl * @param {Logger} logger * @param {Function} saslAuthenticate * @param {DigestDefinition} digestDefinition */ - constructor(connection, logger, saslAuthenticate, digestDefinition) { - this.connection = connection + constructor(sasl, host, port, logger, saslAuthenticate, digestDefinition) { + this.sasl = sasl + this.host = host + this.port = port this.logger = logger this.saslAuthenticate = saslAuthenticate this.digestDefinition = digestDefinition @@ -127,10 +129,9 @@ class SCRAM { async authenticate() { const { PREFIX } = this - const { host, port, sasl } = this.connection - const broker = `${host}:${port}` + const broker = `${this.host}:${this.port}` - if (sasl.username == null || sasl.password == null) { + if (this.sasl.username == null || this.sasl.password == null) { throw new KafkaJSSASLAuthenticationError(`${this.PREFIX}: Invalid username or password`) } @@ -169,7 +170,6 @@ class SCRAM { const response = scram.firstMessage.response return this.saslAuthenticate({ - authExpectResponse: true, request, response, }) @@ -202,7 +202,6 @@ class SCRAM { const response = scram.finalMessage.response return this.saslAuthenticate({ - authExpectResponse: true, request, response, }) @@ -287,7 +286,7 @@ class SCRAM { * @private */ encodedUsername() { - const { username } = this.connection.sasl + const { username } = this.sasl return SCRAM.sanitizeString(username).toString('utf-8') } @@ -295,7 +294,7 @@ class SCRAM { * @private */ encodedPassword() { - const { password } = this.connection.sasl + const { password } = this.sasl return password.toString('utf-8') } diff --git a/src/broker/saslAuthenticator/scram.spec.js b/src/broker/saslAuthenticator/scram.spec.js index 5578abca2..5abbbfaae 100644 --- a/src/broker/saslAuthenticator/scram.spec.js +++ b/src/broker/saslAuthenticator/scram.spec.js @@ -1,29 +1,33 @@ const Decoder = require('../../protocol/decoder') const { newLogger } = require('testHelpers') -const SCRAM256 = require('./scram256') +const scram256AuthenticatorProvider = require('./scram256') +const { SCRAM, DIGESTS } = require('./scram') describe('Broker > SASL Authenticator > SCRAM', () => { - let connection, saslAuthenticate, logger + let sasl, saslAuthenticate, logger, host, port beforeEach(() => { - connection = { - authenticate: jest.fn(), - sasl: { username: 'user', password: 'pencil' }, - } - saslAuthenticate = ({ request, response, authExpectResponse }) => - connection.authenticate({ request, response, authExpectResponse }) + sasl = { username: 'user', password: 'pencil' } + saslAuthenticate = jest.fn() + + host = 'host' + port = 9094 logger = { debug: jest.fn() } - logger.namespace = () => logger }) it('throws KafkaJSSASLAuthenticationError for invalid username', async () => { - const scram = new SCRAM256({ sasl: {} }, newLogger(), saslAuthenticate) + const scram = scram256AuthenticatorProvider({})({ host: '', port: 0, logger: newLogger() }) await expect(scram.authenticate()).rejects.toThrow('Invalid username or password') }) it('throws KafkaJSSASLAuthenticationError for invalid password', async () => { - const scram = new SCRAM256({ sasl: { username: '' } }, newLogger(), saslAuthenticate) + const scram = scram256AuthenticatorProvider({ username: '' })({ + host: '', + port: 0, + logger: newLogger(), + saslAuthenticate, + }) await expect(scram.authenticate()).rejects.toThrow('Invalid username or password') }) @@ -31,11 +35,11 @@ describe('Broker > SASL Authenticator > SCRAM', () => { let scram beforeEach(() => { - scram = new SCRAM256(connection, logger, saslAuthenticate) + scram = new SCRAM(sasl, host, port, logger, saslAuthenticate, DIGESTS.SHA256) }) test('saltPassword', async () => { - connection.sasl.password = 'password' + sasl.password = 'password' const clientMessageResponse = { s: 'enBxNzV4aGphMjJmbnZ0ejF5M2o4Y3JjdA==', i: '4096', @@ -47,7 +51,7 @@ describe('Broker > SASL Authenticator > SCRAM', () => { }) test('clientKey', async () => { - connection.sasl.password = 'password' + sasl.password = 'password' const clientMessageResponse = { s: 'enBxNzV4aGphMjJmbnZ0ejF5M2o4Y3JjdA==', i: '4096', @@ -59,7 +63,7 @@ describe('Broker > SASL Authenticator > SCRAM', () => { }) test('storedKey', async () => { - connection.sasl.password = 'password' + sasl.password = 'password' const clientMessageResponse = { s: 'enBxNzV4aGphMjJmbnZ0ejF5M2o4Y3JjdA==', i: '4096', @@ -75,45 +79,42 @@ describe('Broker > SASL Authenticator > SCRAM', () => { test('regular use case', async () => { scram.currentNonce = 'rOprNGfwEbeRWgbNEkqO' await scram.sendClientFirstMessage() - expect(connection.authenticate).toHaveBeenCalledWith({ - authExpectResponse: true, + expect(saslAuthenticate).toHaveBeenCalledWith({ request: expect.any(Object), response: expect.any(Object), }) - const { request } = connection.authenticate.mock.calls[0][0] - const encoder = await request.encode() - const decoder = new Decoder(encoder.buffer) + const { request } = saslAuthenticate.mock.calls[0][0] + const buffer = await request.encode() + const decoder = new Decoder(buffer) expect(decoder.readBytes().toString()).toEqual(`n,,n=user,r=${scram.currentNonce}`) }) test('username with comma', async () => { - connection.sasl.username = 'bob,' + sasl.username = 'bob,' await scram.sendClientFirstMessage() - expect(connection.authenticate).toHaveBeenCalledWith({ - authExpectResponse: true, + expect(saslAuthenticate).toHaveBeenCalledWith({ request: expect.any(Object), response: expect.any(Object), }) - const { request } = connection.authenticate.mock.calls[0][0] - const encoder = await request.encode() - const decoder = new Decoder(encoder.buffer) + const { request } = saslAuthenticate.mock.calls[0][0] + const buffer = await request.encode() + const decoder = new Decoder(buffer) expect(decoder.readBytes().toString()).toEqual(`n,,n=bob=2C,r=${scram.currentNonce}`) }) test('username with equals', async () => { - connection.sasl.username = 'bob=' + sasl.username = 'bob=' await scram.sendClientFirstMessage() - expect(connection.authenticate).toHaveBeenCalledWith({ - authExpectResponse: true, + expect(saslAuthenticate).toHaveBeenCalledWith({ request: expect.any(Object), response: expect.any(Object), }) - const { request } = connection.authenticate.mock.calls[0][0] - const encoder = await request.encode() - const decoder = new Decoder(encoder.buffer) + const { request } = saslAuthenticate.mock.calls[0][0] + const buffer = await request.encode() + const decoder = new Decoder(buffer) expect(decoder.readBytes().toString()).toEqual(`n,,n=bob=3D,r=${scram.currentNonce}`) }) }) @@ -130,15 +131,14 @@ describe('Broker > SASL Authenticator > SCRAM', () => { } await scram.sendClientFinalMessage(clientMessageResponse) - expect(connection.authenticate).toHaveBeenCalledWith({ - authExpectResponse: true, + expect(saslAuthenticate).toHaveBeenCalledWith({ request: expect.any(Object), response: expect.any(Object), }) - const { request } = connection.authenticate.mock.calls[0][0] - const encoder = await request.encode() - const decoder = new Decoder(encoder.buffer) + const { request } = saslAuthenticate.mock.calls[0][0] + const buffer = await request.encode() + const decoder = new Decoder(buffer) expect(decoder.readBytes().toString()).toEqual( 'c=biws,r=rOprNGfwEbeRWgbNEkqO%hvYDpWUa2RaTCAfuxFIlj)hNlF$k0,p=dHzbZapWIk4jUhN+Ute9ytag9zjfMHgsqmmiz7AndVQ=' ) diff --git a/src/broker/saslAuthenticator/scram256.js b/src/broker/saslAuthenticator/scram256.js index 48fcb4d96..34030f4a2 100644 --- a/src/broker/saslAuthenticator/scram256.js +++ b/src/broker/saslAuthenticator/scram256.js @@ -1,7 +1,10 @@ const { SCRAM, DIGESTS } = require('./scram') -module.exports = class SCRAM256Authenticator extends SCRAM { - constructor(connection, logger, saslAuthenticate) { - super(connection, logger.namespace('SCRAM256Authenticator'), saslAuthenticate, DIGESTS.SHA256) +const scram256AuthenticatorProvider = sasl => ({ host, port, logger, saslAuthenticate }) => { + const scram = new SCRAM(sasl, host, port, logger, saslAuthenticate, DIGESTS.SHA256) + return { + authenticate: async () => await scram.authenticate(), } } + +module.exports = scram256AuthenticatorProvider diff --git a/src/broker/saslAuthenticator/scram512.js b/src/broker/saslAuthenticator/scram512.js index 6e5d7fad2..961ec0d53 100644 --- a/src/broker/saslAuthenticator/scram512.js +++ b/src/broker/saslAuthenticator/scram512.js @@ -1,7 +1,10 @@ const { SCRAM, DIGESTS } = require('./scram') -module.exports = class SCRAM512Authenticator extends SCRAM { - constructor(connection, logger, saslAuthenticate) { - super(connection, logger.namespace('SCRAM512Authenticator'), saslAuthenticate, DIGESTS.SHA512) +const scram512AuthenticatorProvider = sasl => ({ host, port, logger, saslAuthenticate }) => { + const scram = new SCRAM(sasl, host, port, logger, saslAuthenticate, DIGESTS.SHA512) + return { + authenticate: async () => await scram.authenticate(), } } + +module.exports = scram512AuthenticatorProvider diff --git a/src/network/connection.js b/src/network/connection.js index d2aa20e97..2ac65c5c2 100644 --- a/src/network/connection.js +++ b/src/network/connection.js @@ -319,8 +319,8 @@ module.exports = class Connection { * @public * @returns {Promise} */ - sendAuthRequest({ authExpectResponse = false, request, response }) { - this.authExpectResponse = authExpectResponse + sendAuthRequest({ request, response }) { + this.authExpectResponse = !!response /** * TODO: rewrite removing the async promise executor diff --git a/src/protocol/sasl/awsIam/request.js b/src/protocol/sasl/awsIam/request.js index 165591bb8..f97149c18 100644 --- a/src/protocol/sasl/awsIam/request.js +++ b/src/protocol/sasl/awsIam/request.js @@ -6,6 +6,6 @@ module.exports = ({ authorizationIdentity, accessKeyId, secretAccessKey, session encode: async () => { return new Encoder().writeBytes( [authorizationIdentity, accessKeyId, secretAccessKey, sessionToken].join(US_ASCII_NULL_CHAR) - ) + ).buffer }, }) diff --git a/src/protocol/sasl/oauthBearer/request.js b/src/protocol/sasl/oauthBearer/request.js index fac9b86e8..3cff0bb13 100644 --- a/src/protocol/sasl/oauthBearer/request.js +++ b/src/protocol/sasl/oauthBearer/request.js @@ -56,7 +56,7 @@ module.exports = async ({ authorizationIdentity = null }, oauthBearerToken) => { return { encode: async () => { - return new Encoder().writeBytes(Buffer.from(oauthMsg)) + return new Encoder().writeBytes(Buffer.from(oauthMsg)).buffer }, } } diff --git a/src/protocol/sasl/plain/request.js b/src/protocol/sasl/plain/request.js index 182dee5a6..a52c98321 100644 --- a/src/protocol/sasl/plain/request.js +++ b/src/protocol/sasl/plain/request.js @@ -23,6 +23,6 @@ module.exports = ({ authorizationIdentity = null, username, password }) => ({ encode: async () => { return new Encoder().writeBytes( [authorizationIdentity, username, password].join(US_ASCII_NULL_CHAR) - ) + ).buffer }, }) diff --git a/src/protocol/sasl/scram/finalMessage/request.js b/src/protocol/sasl/scram/finalMessage/request.js index bcb8f28a3..58d40aaa6 100644 --- a/src/protocol/sasl/scram/finalMessage/request.js +++ b/src/protocol/sasl/scram/finalMessage/request.js @@ -1,5 +1,5 @@ const Encoder = require('../../../encoder') module.exports = ({ finalMessage }) => ({ - encode: async () => new Encoder().writeBytes(finalMessage), + encode: async () => new Encoder().writeBytes(finalMessage).buffer, }) diff --git a/src/protocol/sasl/scram/firstMessage/request.js b/src/protocol/sasl/scram/firstMessage/request.js index 6213f450c..865f0e586 100644 --- a/src/protocol/sasl/scram/firstMessage/request.js +++ b/src/protocol/sasl/scram/firstMessage/request.js @@ -18,5 +18,5 @@ const Encoder = require('../../../encoder') module.exports = ({ clientFirstMessage }) => ({ - encode: async () => new Encoder().writeBytes(clientFirstMessage), + encode: async () => new Encoder().writeBytes(clientFirstMessage).buffer, }) diff --git a/types/index.d.ts b/types/index.d.ts index 23f33ba38..b5e041a31 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -16,10 +16,37 @@ export class Kafka { export type BrokersFunction = () => string[] | Promise +type SaslAuthenticationRequest = { + encode: () => Buffer | Promise +} +type SaslAuthenticationResponse = { + decode: (rawResponse: Buffer) => Buffer | Promise + parse: (data: Buffer) => ParseResult +} + +export type Authenticator = { + authenticate: () => Promise +} + +export type AuthenticationProviderArgs = { + host: string + port: number + logger: Logger + saslAuthenticate: ( + request: SaslAuthenticationRequest, + response?: SaslAuthenticationResponse + ) => Promise +} + +export type Mechanism = { + mechanism: string + authenticationProvider: (args: AuthenticationProviderArgs) => Authenticator +} + export interface KafkaConfig { brokers: string[] | BrokersFunction ssl?: tls.ConnectionOptions | boolean - sasl?: SASLOptions + sasl?: SASLOptions | Mechanism clientId?: string connectionTimeout?: number authenticationTimeout?: number @@ -94,8 +121,8 @@ export type DefaultPartitioner = ICustomPartitioner export type LegacyPartitioner = ICustomPartitioner export const Partitioners: { - DefaultPartitioner: DefaultPartitioner, - LegacyPartitioner: LegacyPartitioner, + DefaultPartitioner: DefaultPartitioner + LegacyPartitioner: LegacyPartitioner /** * @deprecated Use DefaultPartitioner instead *