Skip to content

Commit

Permalink
Merge pull request #1372 from markgaylard/pluggable-auth
Browse files Browse the repository at this point in the history
Ability to use custom authenticators
  • Loading branch information
Nevon committed Jul 7, 2022
2 parents ad9abd1 + a3d2cc6 commit 7e9970b
Show file tree
Hide file tree
Showing 21 changed files with 471 additions and 213 deletions.
17 changes: 17 additions & 0 deletions docs/Configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,23 @@ A complete breakdown can be found in the IAM User Guide's
It is **highly recommended** that you use SSL for encryption when using `PLAIN` or `AWS`,
otherwise credentials will be transmitted in cleartext!

### Custom Authentication Mechanisms

If an authentication mechanism is not supported out of the box in KafkaJS, a custom authentication
mechanism can be introduced as a plugin:

```js
{
sasl: {
mechanism: <mechanism name>,
authenticationProvider: ({ host, port, logger, saslAuthenticate }) => { authenticate: () => Promise<void> }
}
}
```

See [Custom Authentication Mechanisms](CustomAuthenticationMechanism.md) for more information on how to implement your own
authentication mechanism.

## Connection Timeout

Time in milliseconds to wait for a successful connection. The default value is: `1000`.
Expand Down
166 changes: 166 additions & 0 deletions docs/CustomAuthenticationMechanism.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
---
id: custom-authentication-mechanism
title: Custom Authentication Mechanisms
---

To use an authentication mechanism that is not supported out of the box by KafkaJS,
custom authentication mechanisms can be introduced:

```js
{
sasl: {
mechanism: <mechanism name>,
authenticationProvider: ({ host, port, logger, saslAuthenticate }) => { authenticate: () => Promise<void> }
}
}
```

`<mechanism name>` needs to match the SASL mechanism configured in the `sasl.enabled.mechanisms`
property in `server.properties`. See the Kafka documentation for information on how to
configure your brokers.

## Writing a custom authentication mechanism

A custom authentication mechanism needs to fulfill the following interface:

```ts
type AuthenticationProviderArgs = {
host: string
port: number
logger: Logger
saslAuthenticate: <ParseResult>(
request: SaslAuthenticationRequest,
response?: SaslAuthenticationResponse<ParseResult>
) => Promise<ParseResult | void>
}

type Mechanism = {
mechanism: string
authenticationProvider: (args: AuthenticationProviderArgs) => Authenticator
}

type Authenticator = {
authenticate(): Promise<void>
}

type SaslAuthenticationRequest = {
encode: () => Buffer | Promise<Buffer>
}

type SaslAuthenticationResponse<ParseResult> = {
decode: (rawResponse: Buffer) => Buffer | Promise<Buffer>
parse: (data: Buffer) => ParseResult
}
```
* `host` - Hostname of the specific broker to connect to
* `port` - Port of the specific broker to connect to
* `logger` - A logger instance namespaced to the authentication mechanism
* `saslAuthenticate` - an async function to make [`SaslAuthenticate`](https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate)
requests towards the broker. The `request` and `response` functions are used to encode the `auth_bytes` of the request, and to optionally
decode and parse the `auth_bytes` in the response. `response` can be omitted if no response `auth_bytes` are expected.
### Example
In this example we will create a custom authentication mechanism called `simon`. The general
flow will be:
1. Send a [`SaslAuthenticate`](https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate)
request with the value of `says` as `auth_bytes`.
2. Read the response from the broker. If `says` starts with "Simon says", the response `auth_bytes`
should equal `says`, if it does not start with "Simon says", it should be an empty string.
**This is a made up example!**
It is a non-existent authentication mechanism just made up to show how to implement the expected interface. It is not a real authentication mechanism.
```js
const simonAuthenticator = says = ({ host, port, logger, saslAuthenticate }) => {
const INT32_SIZE = 4

const request = {
/**
* Encodes the value for `auth_bytes` in SaslAuthenticate request
* @see https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate
*
* In this example, we are just sending `says` as a string,
* with the length of the string in bytes prepended as an int32
**/
encode: () => {
const byteLength = Buffer.byteLength(says, 'utf8')
const buf = Buffer.alloc(INT32_SIZE + byteLength)
buf.writeUInt32BE(byteLength, 0)
buf.write(says, INT32_SIZE, byteLength, 'utf8')
return buf
},
}
const response = {
/**
* Decodes the `auth_bytes` in SaslAuthenticate response
* @see https://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate
*
* This is essentially the reverse of `request.encode`, where
* we read the length of the string as an int32 and then read
* that many bytes
*/
decode: rawData => {
const byteLength = rawData.readInt32BE(0)
return rawData.slice(INT32_SIZE, INT32_SIZE + byteLength)
},
/**
* The return value from `response.decode` is passed into
* this function, which is responsible for interpreting
* the data. In this case, we just turn the buffer back
* into a string
*/
parse: data => {
return data.toString()
},
}
return {
/**
* This function is responsible for orchestrating the authentication flow.
* Essentially we will send a SaslAuthenticate request with the
* value of `sasl.says` to the broker, and expect to
* get the same value back.
*
* Other authentication methods may do any other operations they
* like, but communication with the brokers goes through
* the SaslAuthenticate request.
*/
authenticate: async () => {
if (says == null) {
throw new Error('SASL Simon: Invalid "says"')
}
const broker = `${host}:${port}`
try {
logger.info('Authenticate with SASL Simon', { broker })
const authenticateResponse = await saslAuthenticate({ request, response })

const saidSimon = says.startsWith("Simon says ")
const expectedResponse = saidSimon ? says : ""
if (authenticateResponse !== expectedResponse) {
throw new Error("Mismatching response from broker")
}
logger.info('SASL Simon authentication successful', { broker })
} catch (e) {
const error = new Error(
`SASL Simon authentication failed: ${e.message}`
)
logger.error(error.message, { broker })
throw error
}
},
}
}
```

The `response` argument to `saslAuthenticate` is optional, in case the authentication
method does not require the `auth_bytes` in the response.

In the example above, we expect the client to be configured as such:

```js
const config = {
sasl: {
mechanism: 'simon'
authenticationProvider: simonAuthenticator('Simon says authenticate me')
}
}
```
47 changes: 47 additions & 0 deletions src/broker/__tests__/connect.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ const {
createConnectionPool,
connectionOpts,
saslSCRAM256ConnectionOpts,
sslConnectionOpts,
newLogger,
testIfKafkaAtLeast_1_1_0,
describeIfOauthbearerDisabled,
Expand Down Expand Up @@ -34,6 +35,52 @@ describe('Broker > connect', () => {
expect(broker.versions).toBeTruthy()
})

test("throws if the mechanism isn't supported by the server", async () => {
broker = new Broker({
connectionPool: createConnectionPool(
Object.assign(sslConnectionOpts(), {
port: 9094,
sasl: {
mechanism: 'fake-mechanism',
authenticationProvider: () => ({
authenticate: async () => {
throw new Error('🥸')
},
}),
},
})
),
logger: newLogger(),
})

await expect(broker.connect()).rejects.toThrow(
'The broker does not support the requested SASL mechanism'
)
})

describeIfOauthbearerDisabled('when PLAIN is configured', () => {
test('user provided authenticator overrides built in ones', async () => {
broker = new Broker({
connectionPool: createConnectionPool(
Object.assign(sslConnectionOpts(), {
port: 9094,
sasl: {
mechanism: 'PLAIN',
authenticationProvider: () => ({
authenticate: async () => {
throw new Error('test error')
},
}),
},
})
),
logger: newLogger(),
})

await expect(broker.connect()).rejects.toThrow('test error')
})
})

for (const e of saslEntries) {
test(`authenticate with SASL ${e.name} if configured`, async () => {
broker = new Broker({
Expand Down
68 changes: 31 additions & 37 deletions src/broker/saslAuthenticator/awsIam.js
Original file line number Diff line number Diff line change
@@ -1,43 +1,37 @@
const awsIam = require('../../protocol/sasl/awsIam')
const { request, response } = require('../../protocol/sasl/awsIam')
const { KafkaJSSASLAuthenticationError } = require('../../errors')

module.exports = class AWSIAMAuthenticator {
constructor(connection, logger, saslAuthenticate) {
this.connection = connection
this.logger = logger.namespace('SASLAWSIAMAuthenticator')
this.saslAuthenticate = saslAuthenticate
}

async authenticate() {
const { sasl } = this.connection
if (!sasl.authorizationIdentity) {
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing authorizationIdentity')
}
if (!sasl.accessKeyId) {
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing accessKeyId')
}
if (!sasl.secretAccessKey) {
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing secretAccessKey')
}
if (!sasl.sessionToken) {
sasl.sessionToken = ''
}
const awsIAMAuthenticatorProvider = sasl => ({ host, port, logger, saslAuthenticate }) => {
return {
authenticate: async () => {
if (!sasl.authorizationIdentity) {
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing authorizationIdentity')
}
if (!sasl.accessKeyId) {
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing accessKeyId')
}
if (!sasl.secretAccessKey) {
throw new KafkaJSSASLAuthenticationError('SASL AWS-IAM: Missing secretAccessKey')
}
if (!sasl.sessionToken) {
sasl.sessionToken = ''
}

const request = awsIam.request(sasl)
const response = awsIam.response
const { host, port } = this.connection
const broker = `${host}:${port}`
const broker = `${host}:${port}`

try {
this.logger.debug('Authenticate with SASL AWS-IAM', { broker })
await this.saslAuthenticate({ request, response })
this.logger.debug('SASL AWS-IAM authentication successful', { broker })
} catch (e) {
const error = new KafkaJSSASLAuthenticationError(
`SASL AWS-IAM authentication failed: ${e.message}`
)
this.logger.error(error.message, { broker })
throw error
}
try {
logger.debug('Authenticate with SASL AWS-IAM', { broker })
await saslAuthenticate({ request: request(sasl), response })
logger.debug('SASL AWS-IAM authentication successful', { broker })
} catch (e) {
const error = new KafkaJSSASLAuthenticationError(
`SASL AWS-IAM authentication failed: ${e.message}`
)
logger.error(error.message, { broker })
throw error
}
},
}
}

module.exports = awsIAMAuthenticatorProvider
25 changes: 10 additions & 15 deletions src/broker/saslAuthenticator/awsIam.spec.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,27 @@
const { newLogger } = require('testHelpers')
const AWSIAM = require('./awsIam')
const awsIAMAuthenticatorProvider = require('./awsIam')

describe('Broker > SASL Authenticator > AWS-IAM', () => {
it('throws KafkaJSSASLAuthenticationError for missing authorizationIdentity', async () => {
const awsIam = new AWSIAM({ sasl: {} }, newLogger())
const awsIam = awsIAMAuthenticatorProvider({})({ host: '', port: 0, logger: newLogger() })
await expect(awsIam.authenticate()).rejects.toThrow(
'SASL AWS-IAM: Missing authorizationIdentity'
)
})

it('throws KafkaJSSASLAuthenticationError for invalid accessKeyId', async () => {
const awsIam = new AWSIAM(
{
sasl: {
authorizationIdentity: '<authorizationIdentity>',
secretAccessKey: '<secretAccessKey>',
},
},
newLogger()
)
const awsIam = awsIAMAuthenticatorProvider({
authorizationIdentity: '<authorizationIdentity>',
secretAccessKey: '<secretAccessKey>',
})({ host: '', port: 0, logger: newLogger() })
await expect(awsIam.authenticate()).rejects.toThrow('SASL AWS-IAM: Missing accessKeyId')
})

it('throws KafkaJSSASLAuthenticationError for invalid secretAccessKey', async () => {
const awsIam = new AWSIAM(
{ sasl: { authorizationIdentity: '<authorizationIdentity>', accessKeyId: '<accessKeyId>' } },
newLogger()
)
const awsIam = awsIAMAuthenticatorProvider({
authorizationIdentity: '<authorizationIdentity>',
accessKeyId: '<accessKeyId>',
})({ host: '', port: 0, logger: newLogger() })
await expect(awsIam.authenticate()).rejects.toThrow('SASL AWS-IAM: Missing secretAccessKey')
})
})
Loading

0 comments on commit 7e9970b

Please sign in to comment.