Skip to content

Commit

Permalink
KAFKA-13202: KIP-768: Extend SASL/OAUTHBEARER with Support for OIDC (a…
Browse files Browse the repository at this point in the history
…pache#11284)

This task is to provide a concrete implementation of the interfaces defined in KIP-255 to allow Kafka to connect to an OAuth/OIDC identity provider for authentication and token retrieval. While KIP-255 provides an unsecured JWT example for development, this will fill in the gap and provide a production-grade implementation.

The OAuth/OIDC work will allow out-of-the-box configuration by any Apache Kafka users to connect to an external identity provider service (e.g. Okta, Auth0, Azure, etc.). The code will implement the standard OAuth client credentials grant type.

The proposed change is largely composed of a pair of AuthenticateCallbackHandler implementations: one to login on the client and one to validate on the broker.

See the following for more detail:

KIP-768
KAFKA-13202

Reviewers: Yi Ding <dingyi.zj@gmail.com>, Ismael Juma <ismael@juma.me.uk>, Jun Rao <junrao@gmail.com>
  • Loading branch information
Kirk True committed Oct 28, 2021
1 parent 2c0364f commit 7b37953
Show file tree
Hide file tree
Showing 48 changed files with 5,896 additions and 5 deletions.
6 changes: 6 additions & 0 deletions build.gradle
Expand Up @@ -829,6 +829,7 @@ project(':core') {
implementation libs.jacksonDataformatCsv
implementation libs.jacksonJDK8Datatypes
implementation libs.joptSimple
implementation libs.jose4j
implementation libs.metrics
implementation libs.scalaCollectionCompat
implementation libs.scalaJava8Compat
Expand Down Expand Up @@ -1200,6 +1201,7 @@ project(':clients') {

compileOnly libs.jacksonDatabind // for SASL/OAUTHBEARER bearer token parsing
compileOnly libs.jacksonJDK8Datatypes
compileOnly libs.jose4j // for SASL/OAUTHBEARER JWT validation; only used by broker

testImplementation libs.bcpkix
testImplementation libs.junitJupiter
Expand All @@ -1208,6 +1210,7 @@ project(':clients') {
testRuntimeOnly libs.slf4jlog4j
testRuntimeOnly libs.jacksonDatabind
testRuntimeOnly libs.jacksonJDK8Datatypes
testImplementation libs.jose4j
testImplementation libs.jacksonJaxrsJsonProvider
}

Expand Down Expand Up @@ -1604,6 +1607,7 @@ project(':tools') {
implementation libs.slf4jApi
implementation libs.log4j

implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
implementation libs.jacksonJaxrsJsonProvider

testImplementation project(':clients')
Expand Down Expand Up @@ -1701,6 +1705,7 @@ project(':shell') {
implementation project(':metadata')
implementation project(':raft')

implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
implementation libs.jacksonJaxrsJsonProvider

testImplementation project(':clients')
Expand Down Expand Up @@ -2377,6 +2382,7 @@ project(':connect:runtime') {

implementation libs.slf4jApi
implementation libs.log4j
implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation
implementation libs.jacksonAnnotations
implementation libs.jacksonJaxrsJsonProvider
implementation libs.jerseyContainerServlet
Expand Down
4 changes: 4 additions & 0 deletions checkstyle/import-control.xml
Expand Up @@ -139,6 +139,9 @@
</subpackage>
<subpackage name="oauthbearer">
<allow pkg="com.fasterxml.jackson.databind" />
<subpackage name="secured">
<allow pkg="org.jose4j" />
</subpackage>
</subpackage>
</subpackage>

Expand Down Expand Up @@ -338,6 +341,7 @@
<allow pkg="org.apache.kafka.clients.producer" />
<allow pkg="org.apache.kafka.clients.consumer" />
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.jose4j" />
<allow pkg="net.sourceforge.argparse4j" />
<allow pkg="org.apache.log4j" />
</subpackage>
Expand Down
117 changes: 112 additions & 5 deletions clients/src/main/java/org/apache/kafka/common/config/SaslConfigs.java
Expand Up @@ -19,6 +19,9 @@
import org.apache.kafka.common.config.ConfigDef.Range;

public class SaslConfigs {

private static final String OAUTHBEARER_NOTE = " Currently applies only to OAUTHBEARER.";

/*
* NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
*/
Expand Down Expand Up @@ -75,30 +78,120 @@ public class SaslConfigs {
public static final String SASL_LOGIN_REFRESH_WINDOW_FACTOR_DOC = "Login refresh thread will sleep until the specified window factor relative to the"
+ " credential's lifetime has been reached, at which time it will try to refresh the credential."
+ " Legal values are between 0.5 (50%) and 1.0 (100%) inclusive; a default value of 0.8 (80%) is used"
+ " if no value is specified. Currently applies only to OAUTHBEARER.";
+ " if no value is specified."
+ OAUTHBEARER_NOTE;
public static final double DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR = 0.80;

public static final String SASL_LOGIN_REFRESH_WINDOW_JITTER = "sasl.login.refresh.window.jitter";
public static final String SASL_LOGIN_REFRESH_WINDOW_JITTER_DOC = "The maximum amount of random jitter relative to the credential's lifetime"
+ " that is added to the login refresh thread's sleep time. Legal values are between 0 and 0.25 (25%) inclusive;"
+ " a default value of 0.05 (5%) is used if no value is specified. Currently applies only to OAUTHBEARER.";
+ " a default value of 0.05 (5%) is used if no value is specified."
+ OAUTHBEARER_NOTE;
public static final double DEFAULT_LOGIN_REFRESH_WINDOW_JITTER = 0.05;

public static final String SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS = "sasl.login.refresh.min.period.seconds";
public static final String SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_DOC = "The desired minimum time for the login refresh thread to wait before refreshing a credential,"
+ " in seconds. Legal values are between 0 and 900 (15 minutes); a default value of 60 (1 minute) is used if no value is specified. This value and "
+ " sasl.login.refresh.buffer.seconds are both ignored if their sum exceeds the remaining lifetime of a credential."
+ " Currently applies only to OAUTHBEARER.";
+ OAUTHBEARER_NOTE;
public static final short DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS = 60;

public static final String SASL_LOGIN_REFRESH_BUFFER_SECONDS = "sasl.login.refresh.buffer.seconds";
public static final String SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC = "The amount of buffer time before credential expiration to maintain when refreshing a credential,"
+ " in seconds. If a refresh would otherwise occur closer to expiration than the number of buffer seconds then the refresh will be moved up to maintain"
+ " as much of the buffer time as possible. Legal values are between 0 and 3600 (1 hour); a default value of 300 (5 minutes) is used if no value is specified."
+ " This value and sasl.login.refresh.min.period.seconds are both ignored if their sum exceeds the remaining lifetime of a credential."
+ " Currently applies only to OAUTHBEARER.";
+ OAUTHBEARER_NOTE;
public static final short DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS = 300;

public static final String SASL_LOGIN_CONNECT_TIMEOUT_MS = "sasl.login.connect.timeout.ms";
public static final String SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC = "The (optional) value in milliseconds for the external authentication provider connection timeout."
+ OAUTHBEARER_NOTE;

public static final String SASL_LOGIN_READ_TIMEOUT_MS = "sasl.login.read.timeout.ms";
public static final String SASL_LOGIN_READ_TIMEOUT_MS_DOC = "The (optional) value in milliseconds for the external authentication provider read timeout."
+ OAUTHBEARER_NOTE;

private static final String LOGIN_EXPONENTIAL_BACKOFF_NOTE = " Login uses an exponential backoff algorithm with an initial wait based on the"
+ " sasl.login.retry.backoff.ms setting and will double in wait length between attempts up to a maximum wait length specified by the"
+ " sasl.login.retry.backoff.max.ms setting."
+ OAUTHBEARER_NOTE;

public static final String SASL_LOGIN_RETRY_BACKOFF_MAX_MS = "sasl.login.retry.backoff.max.ms";
public static final long DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MAX_MS = 10000;
public static final String SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC = "The (optional) value in milliseconds for the maximum wait between login attempts to the"
+ " external authentication provider."
+ LOGIN_EXPONENTIAL_BACKOFF_NOTE;

public static final String SASL_LOGIN_RETRY_BACKOFF_MS = "sasl.login.retry.backoff.ms";
public static final long DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MS = 100;
public static final String SASL_LOGIN_RETRY_BACKOFF_MS_DOC = "The (optional) value in milliseconds for the initial wait between login attempts to the external"
+ " authentication provider."
+ LOGIN_EXPONENTIAL_BACKOFF_NOTE;

public static final String SASL_OAUTHBEARER_SCOPE_CLAIM_NAME = "sasl.oauthbearer.scope.claim.name";
public static final String DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME = "scope";
public static final String SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC = "The OAuth claim for the scope is often named \"" + DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME + "\", but this (optional)"
+ " setting can provide a different name to use for the scope included in the JWT payload's claims if the OAuth/OIDC provider uses a different"
+ " name for that claim.";

public static final String SASL_OAUTHBEARER_SUB_CLAIM_NAME = "sasl.oauthbearer.sub.claim.name";
public static final String DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME = "sub";
public static final String SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC = "The OAuth claim for the subject is often named \"" + DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME + "\", but this (optional)"
+ " setting can provide a different name to use for the subject included in the JWT payload's claims if the OAuth/OIDC provider uses a different"
+ " name for that claim.";

public static final String SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL = "sasl.oauthbearer.token.endpoint.url";
public static final String SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC = "The URL for the OAuth/OIDC identity provider. If the URL is HTTP(S)-based, it is the issuer's token"
+ " endpoint URL to which requests will be made to login based on the configuration in " + SASL_JAAS_CONFIG + ". If the URL is file-based, it"
+ " specifies a file containing an access token (in JWT serialized form) issued by the OAuth/OIDC identity provider to use for authorization.";

public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_URL = "sasl.oauthbearer.jwks.endpoint.url";
public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC = "The OAuth/OIDC provider URL from which the provider's"
+ " <a href=\"https://datatracker.ietf.org/doc/html/rfc7517#section-5\">JWKS (JSON Web Key Set)</a> can be retrieved. The URL can be HTTP(S)-based or file-based."
+ " If the URL is HTTP(S)-based, the JWKS data will be retrieved from the OAuth/OIDC provider via the configured URL on broker startup. All then-current"
+ " keys will be cached on the broker for incoming requests. If an authentication request is received for a JWT that includes a \"kid\" header claim value that"
+ " isn't yet in the cache, the JWKS endpoint will be queried again on demand. However, the broker polls the URL every sasl.oauthbearer.jwks.endpoint.refresh.ms"
+ " milliseconds to refresh the cache with any forthcoming keys before any JWT requests that include them are received."
+ " If the URL is file-based, the broker will load the JWKS file from a configured location on startup. In the event that the JWT includes a \"kid\" header"
+ " value that isn't in the JWKS file, the broker will reject the JWT and authentication will fail.";

public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS = "sasl.oauthbearer.jwks.endpoint.refresh.ms";
public static final long DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS = 60 * 60 * 1000;
public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC = "The (optional) value in milliseconds for the broker to wait between refreshing its JWKS (JSON Web Key Set)"
+ " cache that contains the keys to verify the signature of the JWT.";

private static final String JWKS_EXPONENTIAL_BACKOFF_NOTE = " JWKS retrieval uses an exponential backoff algorithm with an initial wait based on the"
+ " sasl.oauthbearer.jwks.endpoint.retry.backoff.ms setting and will double in wait length between attempts up to a maximum wait length specified by the"
+ " sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms setting.";

public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS = "sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms";
public static final long DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS = 10000;
public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC = "The (optional) value in milliseconds for the maximum wait between attempts to retrieve the JWKS (JSON Web Key Set)"
+ " from the external authentication provider."
+ JWKS_EXPONENTIAL_BACKOFF_NOTE;

public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS = "sasl.oauthbearer.jwks.endpoint.retry.backoff.ms";
public static final long DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS = 100;
public static final String SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC = "The (optional) value in milliseconds for the initial wait between JWKS (JSON Web Key Set) retrieval attempts from the external"
+ " authentication provider."
+ JWKS_EXPONENTIAL_BACKOFF_NOTE;

public static final String SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS = "sasl.oauthbearer.clock.skew.seconds";
public static final int DEFAULT_SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS = 30;
public static final String SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC = "The (optional) value in seconds to allow for differences between the time of the OAuth/OIDC identity provider and"
+ " the broker.";

public static final String SASL_OAUTHBEARER_EXPECTED_AUDIENCE = "sasl.oauthbearer.expected.audience";
public static final String SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC = "The (optional) comma-delimited setting for the broker to use to verify that the JWT was issued for one of the"
+ " expected audiences. The JWT will be inspected for the standard OAuth \"aud\" claim and if this value is set, the broker will match the value from JWT's \"aud\" claim "
+ " to see if there is an exact match. If there is no match, the broker will reject the JWT and authentication will fail.";

public static final String SASL_OAUTHBEARER_EXPECTED_ISSUER = "sasl.oauthbearer.expected.issuer";
public static final String SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC = "The (optional) setting for the broker to use to verify that the JWT was created by the expected issuer. The JWT will"
+ " be inspected for the standard OAuth \"iss\" claim and if this value is set, the broker will match it exactly against what is in the JWT's \"iss\" claim. If there is no"
+ " match, the broker will reject the JWT and authentication will fail.";

public static void addClientSaslSupport(ConfigDef config) {
config.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC)
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
Expand All @@ -113,6 +206,20 @@ public static void addClientSaslSupport(ConfigDef config) {
.define(SaslConfigs.SASL_JAAS_CONFIG, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_JAAS_CONFIG_DOC)
.define(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS_DOC)
.define(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS_DOC)
.define(SaslConfigs.SASL_LOGIN_CLASS, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_LOGIN_CLASS_DOC);
.define(SaslConfigs.SASL_LOGIN_CLASS, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_LOGIN_CLASS_DOC)
.define(SaslConfigs.SASL_LOGIN_CONNECT_TIMEOUT_MS, ConfigDef.Type.INT, null, ConfigDef.Importance.LOW, SASL_LOGIN_CONNECT_TIMEOUT_MS_DOC)
.define(SaslConfigs.SASL_LOGIN_READ_TIMEOUT_MS, ConfigDef.Type.INT, null, ConfigDef.Importance.LOW, SASL_LOGIN_READ_TIMEOUT_MS_DOC)
.define(SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MAX_MS, ConfigDef.Type.LONG, DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MAX_MS, ConfigDef.Importance.LOW, SASL_LOGIN_RETRY_BACKOFF_MAX_MS_DOC)
.define(SaslConfigs.SASL_LOGIN_RETRY_BACKOFF_MS, ConfigDef.Type.LONG, DEFAULT_SASL_LOGIN_RETRY_BACKOFF_MS, ConfigDef.Importance.LOW, SASL_LOGIN_RETRY_BACKOFF_MS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, ConfigDef.Type.STRING, DEFAULT_SASL_OAUTHBEARER_SCOPE_CLAIM_NAME, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_SCOPE_CLAIM_NAME_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_SUB_CLAIM_NAME, ConfigDef.Type.STRING, DEFAULT_SASL_OAUTHBEARER_SUB_CLAIM_NAME, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_SUB_CLAIM_NAME_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SASL_OAUTHBEARER_JWKS_ENDPOINT_URL_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, ConfigDef.Type.LONG, DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_JWKS_ENDPOINT_REFRESH_MS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, ConfigDef.Type.LONG, DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, ConfigDef.Type.LONG, DEFAULT_SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, ConfigDef.Type.INT, DEFAULT_SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE, ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);
}
}

0 comments on commit 7b37953

Please sign in to comment.