Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Security: streamnative/kop

docs/security.md

Security

KoP supports authentication and SSL connection.

Authentication

By default, KoP is installed with no encryption, authentication, and authorization. You can enable the authentication feature for KoP to improve security. Currently, this feature is only available in Java.

KoP authentication mechanism uses Kafka SASL mechanisms and achieves authentication with Pulsar token-based authentication mechanism. Consequently, if you want to enable the authentication feature for KoP, you need to enable authentication for the following components:

  • Pulsar brokers
  • KoP (some configurations of KoP rely on the configurations of Pulsar brokers)
  • Kafka clients

Currently, KoP supports the following SASL mechanisms:

They are both based on the JWT authentication, so you must configure authenticationProviders with AuthenticationProviderToken. See following chapters for more details.

PLAIN

If you want to enable the authentication feature for KoP using the PLAIN mechanism, follow the steps below.

  1. Enable authentication on Pulsar broker.

    For the PLAIN mechanism, the Kafka authentication is forwarded to the JWT authentication of Pulsar, so you need to configure the JWT authentication and set the following properties in the conf/broker.conf or conf/standalone.conf file.

    (1) Enable authentication for the Pulsar broker.

    authenticationEnabled=true
    authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken

    (2) Enable authentication between Pulsar broker and KoP.

    brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
    brokerClientAuthenticationParameters=token:<token-of-super-user-role>
    superUserRoles=<super-user-roles>

    (3) Specify the key.

    For more information, see Enable Token Authentication on Brokers.

    • If you use a secret key, set the property as below.

      tokenSecretKey=file:///path/to/secret.key

      The key can also be passed inline.

      tokenSecretKey=data:;base64,FLFyW0oLJ2Fi22KKCm21J18mbAdztfSHN/lAT5ucEKU=
    • If you use a public/private key, set the property as below.

      tokenPublicKey=file:///path/to/public.key
  2. Enable authentication on KoP.

    Set the following property in the conf/broker.conf or conf/standalone.conf file.

    saslAllowedMechanisms=PLAIN
  3. Enable authentication on Kafka client.

    To forward your credentials, SASL-PLAIN is used on the Kafka client side. To enable SASL-PLAIN, you need to set the following properties through Kafka JAAS.

Property Description Example value
username The username of Kafka JAAS is tenant/namespace or tenant, where Kafka’s topics are stored in Pulsar.

Note In KoP 2.9.0 or higher, the username can be used with kafkaEnableMultiTenantMetadata to implement multi-tenancy for metadata.
empty string
password password must be your token authentication parameters from Pulsar.

The token can be created by Pulsar token tools. The role is the subject for the token. It is embedded in the created token and the broker can get role by parsing this token.
token:xxx
```properties
security.protocol=SASL_PLAINTEXT  # or security.protocol=SASL_SSL if SSL connection is used
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="public/default" password="token:xxx";
```

OAUTHBEARER

If you want to enable the authentication feature for KoP using the OAUTHBEARER mechanism, follow the steps below.

  1. Enable authentication on Pulsar broker.

    Set the following properties in the conf/broker.conf or conf/standalone.conf file.

    The properties here are same to that of the PLAIN mechanism except brokerClientAuthenticationPlugin and brokerClientAuthenticationParameters.

    Property Description Required or optional Example value
    type OAuth 2.0 authentication type

    The default value is client_credentials
    Optional client_credentials
    privateKey URL to a JSON credential file

    The following pattern formats are supported:
    - file:///path/to/file
    - file:/path/to/file
    - data:application/json;base64,<base64-encoded value>
    Required file:///path/to/credentials_file.json
    issuerUrl URL of the authentication provider which allows the Pulsar client to obtain an access token Required https://accounts.google.com
    audience An OAuth 2.0 "resource server" identifier for the Pulsar cluster Optional https://broker.example.com
    scope The scope of the access request that is expressed as a list of space-delimited, case-sensitive strings Optional api://pulsar-cluster-1/.default
    authenticationEnabled=true
    authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
    superUserRoles=<super-user-roles>
    brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
    brokerClientAuthenticationParameters={"type":"client_credentials","privateKey":"file:///path/to/credentials_file.json","issuerUrl":"<issuer-url>","audience":"<audience>"}
    tokenPublicKey=<token-public-key>
  2. Enable authentication on KoP.

    (1) Set the following property in the conf/broker.conf or conf/standalone.conf file.

    saslAllowedMechanisms=OAUTHBEARER

    (2) Specify the Kafka server callback handler.

    For the OAUTHBEARER mechanism, you can use AuthenticationProviderToken or custom your authentication provider to process the access tokens from OAuth 2.0 server.

    KoP provides a built-in AuthenticateCallbackHandler that uses the authentication provider of Pulsar for authentication. You need to configure the following properties in the Pulsar broker's configuration file (e.g. conf/broker.conf)

    # Use KoP's built-in handler
    kopOauth2AuthenticateCallbackHandler=io.streamnative.pulsar.handlers.kop.security.oauth.OauthValidatorCallbackHandler
    
    # OauthValidatorCallbackHandler configuration file (Java Properties format)
    kopOauth2ConfigFile=conf/kop-handler.properties

(3) Specify the Authentication Method name of the provider (that is, oauth.validate.method) in the conf/kop-handler.properties file. By default, it uses the token authentication method (AuthenticationProviderToken).

  • If you use AuthenticationProviderToken, set oauth.validate.method to token (since AuthenticationProviderToken#getAuthMethodName() returns token).

  • If you use other providers, set the oauth.validate.method as the result of getAuthMethodName() of your AuthenticationProvider. For example, if your authentication provider in Pulsar is AuthenticationProviderAthenz, then set the following:

    oauth.validate.method=athenz

as it has the following code in it:

     ```java
         @Override
         public String getAuthMethodName() {
             return "athenz";
         }
     ```
  1. Enable authentication on Kafka client.

    (1) Install the KoP built-in callback handler to your local Maven repository.

    To get an access token from an OAuth 2.0 server, you need to use the KoP built-in callback handler instead of the Kafka login callback handler.

    mvn clean install -pl oauth-client -DskipTests

    (2) Add the following dependencies to the pom.xml file.

    For stable releases, the pulsar.version is the same to the kop.version.

    <dependency>
      <groupId>io.streamnative.pulsar.handlers</groupId>
      <artifactId>oauth-client</artifactId>
      <version>${kop.version}</version>
    </dependency>

    (3) Configure the producer or consumer with the following required properties.

    Property Description Example value Note
    sasl.login.callback.handler.class Class of SASL login callback handler io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler Set the value of this property as the same to the value in the example below.
    security.protocol Security protocol SASL_PLAINTEXT
    sasl.mechanism SASL mechanism OAUTHBEARER
    sasl.jaas.config JAAS configuration org.apache.kafka.coPropertymmon.security.oauthbearer.OAuthBearerLoginModule
    oauth.issuer.url URL of the authentication provider which allows the Pulsar client to obtain an access token. https://accounts.google.com This property is the same to the issuerUrl property in Pulsar client credentials
    oauth.credentials.url URL to a JSON credentials file.

    The following pattern formats are supported:
    - file:///path/to/file
    - file:/path/to/file
    - data:application/json;base64,[base64-encoded value]
    file:///path/to/credentials_file.json This property is the same to the privateKey property in Pulsar client credentials
    oauth.audience OAuth 2.0 "resource server" identifier for the Pulsar cluster. https://broker.example.com This property is the same to the audience property in Pulsar client credentials
    oauth.scope The scope of the access request that is expressed as a list of space-delimited, case-sensitive strings. api://pulsar-cluster-1/.default This property is the same to the scope property in Pulsar client credentials
    sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler
    security.protocol=SASL_PLAINTEXT  # or security.protocol=SASL_SSL if SSL connection is used
    sasl.mechanism=OAUTHBEARER
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
         required oauth.issuer.url="https://accounts.google.com"\
         oauth.credentials.url="file:///path/to/credentials_file.json"\
         oauth.audience="https://broker.example.com";

    (4) Config the credentials_file.json. The client_id and client_secret is required fields. And the tenant and group_id is optional fields. When use group_id field and set kafkaEnableAuthorizationForceGroupIdCheck=true, then the client will only able to use this group id to consumer.

     {
       "client_id": "my-id",
       "client_secret": "my-secret",
       "tenant": "my-tenant",
       "group_id": "my-group-id"
     }

Authentication for the Schema Registry

KoP supports Confluent's Schema Registry since 2.11. See schema.md for a quick start.

When KoP enables the authentication, the Schema Registry also requires the authentication from the Kafka client. You need to set the following properties on the client side.

basic.auth.credentials.source=USER_INFO
basic.auth.user.info=<tenant>:<token>

Together with Pulsar's authentication

Since KoP reuses Pulsar's authentication providers for authentication, you can enable KoP's authentication and Pulsar authentication at the same time.

For example, you can enable KoP's PLAIN SASL mechanism and Pulsar's TLS authentication simultaneously.

superUserRoles=admin
authenticationEnabled=true
# Enable both AuthenticationProviderToken and AuthenticationProviderTls here
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken,org.apache.pulsar.broker.authentication.AuthenticationProviderTls

# Enable JWT authentication at Pulsar side
tokenPublicKey=/path/to/my-public.key

# Enable PLAIN SASL mechanism for KoP, which reuses the JWT authentication
saslAllowedMechanisms=PLAIN

# Enable TLS authentication at Pulsar side
tlsEnabled=true
brokerServicePortTls=6651
webServicePortTls=8443
tlsRequireTrustedClientCertOnConnect=true
tlsCertificateFilePath=/path/to/broker.cert.pem
tlsKeyFilePath=/path/to/broker.key-pk8.pem
tlsTrustCertsFilePath=/path/to/ca.cert.pem

# Configure broker's built-in client
brokerClientTlsEnabled=true
brokerClientTlsTrustCertsFilePath=/path/to/ca.cert.pem
brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationTls
brokerClientAuthenticationParameters=tlsCertFile:/path/to/admin.cert.pem,tlsKeyFile:/path/to/my-ca/admin.key-pk8.pem

Note

tlsEnabled is actually not required to enable TLS authentication at the broker side. However, for some legacy versions of KoP, you have to enable it.

The following versions of KoP don't need to enable tlsEnabled:

  • Pulsar 3.x.y: KoP 3.0.0.1 or later
  • Pulsar 2.11.x: KoP 2.11.1.2 or later
  • Pulsar 2.10.x: KoP 2.10.4.3 or later

See Transport Encryption using TLS and Authentication using TLS for how to generate certificates and keys for TLS authentication.

Authorization

To enable authorization on KoP, please make sure the authentication is enabled.

Note: For more information, see Authorization.

  1. Enable authorization and assign superusers for the Pulsar broker.

    authorizationEnabled=true
  2. Generate JWT tokens.

    A token is the credential associated with a user. The association is done through the "principal" or "role". In the case of JWT tokens, this field is typically referred as subject, though they are exactly the same concept. Then, you need to use this command to require the generated token to have a subject field set.

    $ bin/pulsar tokens create --secret-key file:///path/to/secret.key \
         --subject <user-role>

    This command prints the token string on stdout.

  3. Grant permission to specific role.

    The token itself does not have any permission associated. The authorization engine determines whether the token should have permissions or not. Once you have created the token, you can grant permission for this token to do certain actions.
    The following is an example.

    $ bin/pulsar-admin --auth-plugin "org.apache.pulsar.client.impl.auth.AuthenticationToken" --auth-params "token:<token-of-super-user-role>" \
             namespaces grant-permission <tenant>/<namespace> \
             --role <user-role> \
             --actions produce,consume

SSL connection

KoP supports the following configuration types for Kafka listeners:

  • PLAINTEXT
  • SSL

Example

listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093

Tip For how to configure SSL keys, see Kafka SSL.

The following example shows how to connect KoP through SSL.

  1. Create SSL related keys.

    This example creates the related CA and JKS files.

    # You need to input a password, for example "server-keystore".
    keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
    # You need to input a password, for example "server".
    openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
    # You need to input a password, for example "server-truststore"
    keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
    # You need to input a password, for example "client-truststore"
    keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
    # You must input the password of server.keystore.jks: "server-keystore"
    keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
    # NOTE: the password followed by `-passin pass:` is the password of ca-cert: "server"
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:server
    # You must input the password of server.keystore.jks: "server-keystore"
    keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
    # You must input the password of server.keystore.jks: "server-keystore"
    keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed

    In above example, we have input four passwords:

    • server-keystore for server.keystore.jks file
    • server for ca-cert and ca-key files
    • server-truststore for server.truststore.jks file
    • client-truststore for client.truststore.jks file
  2. Configure the KoP broker.

    In the StreamNative Platform configuration file (${PLATFORM_HOME}/etc/pulsar/broker.conf or ${PLATFORM_HOME}/etc/pulsar/standalone.conf), add the related configurations that using the jks configurations created in Step 1:

    listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
    
    # You need to use the full path of server.keystore.jks
    kopSslKeystoreLocation=server.keystore.jks
    kopSslKeystorePassword=server-keystore
    kopSslKeyPassword=server-keystore
    # You need to use the full path of server.truststore.jks
    kopSslTruststoreLocation=server.truststore.jks
    kopSslTruststorePassword=server-truststore
  3. Configure the Kafka client.

    (1) Prepare a file named client-ssl.properties. The file contains the following information.

    security.protocol=SSL
    # You need to use the full path of client.truststore.jks
    ssl.truststore.location=client.truststore.jks
    ssl.truststore.password=client-truststore
    # The identification algorithm must be empty
    ssl.endpoint.identification.algorithm=

    (2) Verify the console-producer and the console-consumer.

    kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties
    kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties

    Tip For more information, see Configure Kafka client.