diff --git a/.github/scripts/setVersions.sh b/.github/scripts/setVersions.sh index 522c8cf..3e15fb0 100755 --- a/.github/scripts/setVersions.sh +++ b/.github/scripts/setVersions.sh @@ -8,3 +8,4 @@ fi mvn -B versions:set -DgenerateBackupPoms=false -DnewVersion=${1} sed --in-place --regexp-extended "s|flink-examples-data-generator:([^[:space:]]*)|flink-examples-data-generator:${2}|g" tutorials/recommendation-app/data-generator.yaml sed --in-place --regexp-extended "s|flink-sql-runner-with-flink-udf-currency-converter:([^[:space:]]*)|flink-sql-runner-with-flink-udf-currency-converter:${2}|g" tutorials/user-defined-functions/standalone-etl-udf-deployment.yaml +sed --in-place --regexp-extended "s|flink-examples-data-generator:([^[:space:]]*)|flink-examples-data-generator:${2}|g" tutorials/secure-kafka/data-generator/data-generator.yaml diff --git a/docs/secure-kafka/_index.md b/docs/secure-kafka/_index.md new file mode 100644 index 0000000..a183a2f --- /dev/null +++ b/docs/secure-kafka/_index.md @@ -0,0 +1,658 @@ ++++ +title = 'Connecting to Apache Kafka securely using Flink SQL' ++++ + +> Note: This tutorial is mainly focused on securing connections between Flink SQL and Kafka. +> For detailed information on working with [Flink ETL Jobs](https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/learn-flink/etl/) +> and [Session Clusters](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#session-cluster-deployments), +> look at the [Interactive ETL example](../interactive-etl/_index.md). + +[Flink SQL](https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/table/overview/) is a powerful tool for data exploration, manipulation and inter-connection. +It allows you to access the power of Flink's distributed stream processing abilities with a familiar interface. +In this tutorial, we go over ways to securely connect to Kafka from Flink SQL. + +In the sections below, we cover all the supported secure connection options when running Kafka with the [Strimzi](strimzi.io) Kubernetes operator. +In each case, we go over how to configure the data generator script so that it sets up an example Kafka cluster with the appropriate secure listener, and then we show the configuration needed to connect to that secure listener in Flink SQL. + +The tutorial is based on the StreamsHub [Flink SQL Examples](https://github.com/streamshub/flink-sql-examples) repository and the code can be found under the [`tutorials/secure-kafka`](https://github.com/streamshub/flink-sql-examples/tree/main/tutorials/secure-kafka) directory. + +> Note: +> - This tutorial only covers authentication, not authorization. For information on configuring authorization for the Kafka Users used in this tutorial see the [Strimzi documentation](https://strimzi.io/docs/operators/latest/deploying#assembly-securing-access-str). +> - All the commands below are meant to be run from the `tutorials` directory. +> - Re-running the `data-gen-setup.sh` script with different `SECURE_KAFKA` values on the same Kubernetes cluster may lead to a variety of errors. +> - The Flink SQL commands assume the query is being run on our Flink distribution. +> - `quay.io/streamshub/flink-sql-runner` +> - Includes [Strimzi's OAuth 2.0 callback handler](https://github.com/strimzi/strimzi-kafka-oauth). +> - Shades Flink Kafka dependencies (this is done to prevent dependency conflicts). +> - Only relevant lines are included in the code blocks. +> - The `tutorials/secure-kafka` directory contains the complete deployment files. +> - For greater detail on what is covered in this tutorial, you can read the following: +> - [“Security” section in the Apache Kafka SQL Connector documentation](https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/connectors/table/kafka/#security) +> - ["Securing access to a Kafka cluster" in the Strimzi documentation](https://strimzi.io/docs/operators/latest/deploying#assembly-securing-access-str) +> - [“Security” section in the Apache Kafka documentation](https://kafka.apache.org/documentation/#security) + +## Unsecure + +### PLAINTEXT + +- No encryption. +- No authentication. + +Set up the demo application: + +```shell +# Note: PLAINTEXT is the default option if you don't pass SECURE_KAFKA. +# Note: This sets up Kafka, Flink, recommendation-app (generates example data), etc. +SECURE_KAFKA=PLAINTEXT ./scripts/data-gen-setup.sh + +# This creates a standalone Flink job +kubectl -n flink apply -f recommendation-app/flink-deployment.yaml +``` + +The commands above apply the following: + +```yaml +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: my-cluster +spec: + kafka: + listeners: + - name: plain + port: 9092 + type: internal + tls: false # Plain listener with no encryption +``` + +We can connect to the plain listener, like in the other Flink SQL tutorials, using the query below: + +```sql +CREATE TABLE SalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND +) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.sales.records', + + -- Connect over PLAINTEXT (commented out because it is the default) + -- 'properties.security.protocol' = 'PLAINTEXT', + + -- Point to our plain listener + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', + + 'properties.group.id' = 'sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' +); +``` + +You can verify that the test data is flowing correctly by querying the Kafka topic using the console consumer: + +```shell +kubectl exec -it my-cluster-dual-role-0 -n flink -- /bin/bash \ +./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic flink.sales.records +``` + +## Secure + +> Note: +> - Each example code block is `diff`ed against the PLAINTEXT example above. +> - The mTLS example below provides a `KafkaUser`. Each `KafkaUser` after that is `diff`ed against it. +> - A TLS listener is always included to allow the `recommendation-app` to send example data. +> - A `recommendation-app-kafka-user` `KafkaUser` is also created and is always added to the `superUsers` list in the Kafka cluster, if authorization is enabled. + +### Testing (preamble) + +You can verify that the different authentication methods, shown below, are working by doing the following: + +- Run the commands in the example, as instructed. + +- Port forward the Flink Job Manager pod so we can access it: + + ```shell + kubectl -n flink port-forward 8081:8081 + ``` + + The job manager pod will have the name format `standalone-etl-secure-`, your `kubectl` should tab-complete the name. + If it doesn't then you can find the job manager name by running `kubectl -n flink get pods`. + +- Make an API request to the [JobManager REST API](https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/ops/rest_api/) and parse the JSON response with [`jq`](https://jqlang.org/) like so: + + ```shell + # Note: There should only be one continuously running job + RUNNING_JOB_ID=$(curl -s localhost:8081/jobs/ | \ + jq -r '.jobs[] | select(.status == "RUNNING") | .id') + + # Get the number of records written to the output table/topic + curl -s localhost:8081/jobs/$RUNNING_JOB_ID | \ + jq '.vertices[] | select(.name | contains("Writer")) | + { + "write-records": .metrics."write-records", + "write-records-complete": .metrics."write-records-complete" + }' + ``` + + This should output the following result: + + ```jsonc + { + "write-records": 10, + "write-records-complete": true + } + ``` + + > Note: If the result is different, you might've made the API request too quickly. Try again in a few seconds. + +### TLS + +- Encrypted using TLS (e.g. `TLSv1.3` and `TLS_AES_256_GCM_SHA384`). +- Server is authenticated by client. + +Set up the demo application: + +```shell +SECURE_KAFKA=TLS ./scripts/data-gen-setup.sh + +# Note: This creates a standalone Flink job that connects to the Kafka listener +# below and copies 10 records from an existing topic to a newly created one +kubectl -n flink apply -f secure-kafka/TLS/standalone-etl-secure-deployment.yaml +``` + +The commands above apply the following: + +```diff +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: my-cluster +spec: + kafka: + listeners: +- - name: plain +- port: 9092 +- type: internal +- tls: false + ++ - name: tls ++ port: 9093 ++ type: internal ++ tls: true # Enable TLS encryption on listener +``` + +To securely connect to a listener with TLS enabled, we need to mount the `my-cluster-cluster-ca-cert` secret generated by Strimzi into our Flink pod, so that we can instruct Flink to trust the Kafka cluster's TLS certificate. + +> Note: The secret contains more than one CA certificate during the [CA certificate renewal period](https://strimzi.io/docs/operators/latest/deploying#con-certificate-renewal-str) and all of them must be added to the truststore. +> You must handle this in production. + +In `standalone-etl-secure-deployment.yaml`, we do this in the following way (secrets in other examples are mounted similarly): + +```yaml +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: standalone-etl-secure +spec: + image: ... + podTemplate: + kind: Pod + spec: + volumes: + - name: my-cluster-cluster-ca-cert + secret: + secretName: my-cluster-cluster-ca-cert + items: + - key: ca.crt # We only need the current cluster CA + path: ca.crt # certificate for the examples. + containers: + - name: flink-main-container + volumeMounts: + - name: my-cluster-cluster-ca-cert + mountPath: "/opt/my-cluster-cluster-ca-cert" + readOnly: true +``` + +We can connect to the listener using the query below: + +```diff +-- NOTE: This line is included in standalone-etl-secure-deployment.yaml +CREATE TABLE SalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND +) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.sales.records', + +- 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', + ++ -- Change to secure listener ++ 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9093', + ++ -- Connect over SSL ++ 'properties.security.protocol' = 'SSL', + ++ -- Provide path to mounted secret containing the ++ -- Kafka cluster CA certificate generated by Strimzi ++ 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', + ++ -- Indicate certificate is of type PEM (as opposed to JKS or PKCS12) ++ 'properties.ssl.truststore.type' = 'PEM', + + 'properties.group.id' = 'sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' +); +``` + +### mTLS + +- Encrypted using TLS (e.g. `TLSv1.3` and `TLS_AES_256_GCM_SHA384`). +- Both server and client are authenticated. + +Set up the demo application: + +```shell +SECURE_KAFKA=mTLS ./scripts/data-gen-setup.sh + +kubectl -n flink apply -f secure-kafka/mTLS/standalone-etl-secure-deployment.yaml +``` + +The commands above apply the following: + +```diff +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: my-cluster +spec: + kafka: + listeners: +- - name: plain +- port: 9092 +- type: internal +- tls: false + ++ - name: mtls ++ port: 9094 ++ type: internal ++ tls: true ++ authentication: ++ type: tls # Enable TLS client authentication +``` + +```yaml +# KafkaUser added for client authentication examples, +# not necessary for Kafka listeners without +# 'authentication' property. +apiVersion: kafka.strimzi.io/v1beta1 +kind: KafkaUser +metadata: + name: my-user + labels: + strimzi.io/cluster: my-cluster +spec: + authentication: + type: tls # This will generate a 'my-user' Secret containing credentials +``` + +To securely connect to a listener with mTLS enabled, we need to mount the `my-cluster-cluster-ca-cert` secret onto our Flink pod _and_ pass the contents of the certificate and key from the `my-user` secret generated by Strimzi to our Flink SQL statements. +By doing this, we can instruct Flink to trust the Kafka cluster's TLS certificate _and_ to send on our own credentials. + +> Note: The `FlinkDeployment` volumes are the same as before. + +We can connect to the listener using the query below: + +```diff +-- NOTE: This line is included in standalone-etl-secure-deployment.yaml +CREATE TABLE SalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND +) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.sales.records', +- 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', ++ 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094', ++ 'properties.security.protocol' = 'SSL', ++ 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', ++ 'properties.ssl.truststore.type' = 'PEM', + ++ -- Provide contents of the user certificate and key from the ++ -- 'my-user' secret generated by the Strimzi User Operator ++ -- for the 'my-user' KafkaUser ++ 'properties.ssl.keystore.certificate.chain' = '{{secret:flink/my-user/user.crt}}', ++ 'properties.ssl.keystore.key' = '{{secret:flink/my-user/user.key}}', ++ 'properties.ssl.keystore.type' = 'PEM', + + 'properties.group.id' = 'sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' +); +``` + +### SCRAM-SHA-512 + +A TLS connection will lose its security if the Certificate Authority that issued the TLS certificate is compromised. +[SCRAM](https://datatracker.ietf.org/doc/html/rfc7804) is a family of authentication mechanisms that aim to prevent this using a combination of challenge-response, salting, hashing, channel binding, etc. + +Set up the demo application: + +```shell +SECURE_KAFKA=SCRAM ./scripts/data-gen-setup.sh + +kubectl -n flink apply -f secure-kafka/SCRAM/standalone-etl-secure-deployment.yaml +``` + +The commands above apply the following: + +```diff +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: my-cluster +spec: + kafka: + listeners: +- - name: plain +- port: 9092 +- type: internal +- tls: false + ++ - name: scram ++ port: 9094 ++ type: internal ++ tls: true ++ authentication: ++ type: scram-sha-512 # Specify SCRAM authentication +``` + +```diff +apiVersion: kafka.strimzi.io/v1beta1 +kind: KafkaUser +metadata: + name: my-user + labels: + strimzi.io/cluster: my-cluster +spec: +- authentication: +- type: tls ++ authentication: ++ type: scram-sha-512 # Specify SCRAM authentication +``` + +We can connect to the listener using the query below: + +```diff +-- NOTE: This line is included in standalone-etl-secure-deployment.yaml +CREATE TABLE SalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND +) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.sales.records', +- 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', ++ 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094', ++ 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', ++ 'properties.ssl.truststore.type' = 'PEM', + ++ -- Connect over SASL_SSL, this allows us to specify a SASL mechanism ++ 'properties.security.protocol' = 'SASL_SSL', + ++ -- Connect using SCRAM mechanism ++ 'properties.sasl.mechanism' = 'SCRAM-SHA-512', + ++ -- Connect using Kafka's ScramLoginModule ++ -- Provide `user.password` from the generated `my-user` secret. ++ 'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule ++ required ++ username="my-user" ++ password="{{secret:flink/my-user/password}}" ++ ;', + + 'properties.group.id' = 'sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' +); +``` + +### OAuth 2.0 + +OAuth 2.0 is a standardized protocol for _authorization_, but is commonly used +[inside of authentication protocols](https://oauth.net/articles/authentication/). + +In this example, the `data-gen-setup.sh` script creates a secure [Keycloak](https://www.keycloak.org/) deployment as our OAuth 2.0 provider. +It is an open-source piece of Identity and Access Management software that is [built on top of the OAuth 2.0 specification](https://www.keycloak.org/docs/latest/authorization_services/index.html#_service_overview). +The script automatically creates a self-signed TLS certificate for Keycloak's HTTPS endpoints. + +Since our `data-gen-setup.sh` script sets up a Kafka cluster using Strimzi, this example uses the Keycloak config and realm from the [`strimzi-kafka-operator` Keycloak example](https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/security/keycloak-authorization/kafka-authz-realm.json). + +> Note: +> - Keycloak Web UI credentials: +> - Username: `admin` +> - Password: `admin` + +Set up the demo application: + +```shell +# Note: For OAuth 2.0, a secure Keycloak deployment with +# a self-signed HTTPS TLS certificate is generated +SECURE_KAFKA=OAuth2 ./scripts/data-gen-setup.sh + +kubectl -n flink apply -f secure-kafka/OAuth2/standalone-etl-secure-deployment.yaml +``` + +The commands above apply the following: + +```diff +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: my-cluster +spec: + kafka: + listeners: +- - name: plain +- port: 9092 +- type: internal +- tls: false + ++ - name: oauth2 ++ port: 9094 ++ type: internal ++ tls: true ++ authentication: ++ type: oauth # Specify OAuth 2.0 authentication + ++ # Specify OAuth 2.0 JWKS/JWT details ++ validIssuerUri: https://keycloak.flink.svc:8443/realms/kafka-authz ++ jwksEndpointUri: https://keycloak.flink.svc:8443/realms/kafka-authz/protocol/openid-connect/certs ++ userNameClaim: preferred_username + ++ # Trust self-signed TLS certificate used by Keycloak HTTPS endpoint ++ tlsTrustedCertificates: ++ - secretName: keycloak-cert ++ certificate: tls.crt +``` + +```diff +# This KafkaUser is only needed if the Kafka listener is using simple authorization +apiVersion: kafka.strimzi.io/v1beta1 +kind: KafkaUser +metadata: +- name: my-user ++ name: service-account-kafka # Create KafkaUser for user in the Keycloak realm + labels: + strimzi.io/cluster: my-cluster +spec: +- authentication: # Remove authentication +- type: tls +``` + +The end of a typical OAuth 2.0 flow involves granting the user an [access token](https://www.oauth.com/oauth2-servers/access-tokens/) after successful authorization and redirecting them to a [callback URL](https://www.oauth.com/oauth2-servers/redirect-uris/). +Since we're using Strimzi, we need to instruct Flink to use [Strimzi's OAuth 2.0 callback handler](https://github.com/strimzi/strimzi-kafka-oauth) to handle this. + +> Note: As mentioned at the top of this tutorial, the StreamsHub Flink SQL Runner distribution comes with Strimzi's OAuth 2.0 callback handler included. + +We can connect to the listener using the query below: + +```diff +-- NOTE: This line is included in standalone-etl-secure-deployment.yaml +CREATE TABLE SalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND +) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.sales.records', +- 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', ++ 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094', ++ 'properties.security.protocol' = 'SASL_SSL', ++ 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', ++ 'properties.ssl.truststore.type' = 'PEM', + ++ -- Connect using OAUTHBEARER mechanism (OAuth 2.0 with Bearer/access token) ++ 'properties.sasl.mechanism' = 'OAUTHBEARER', + ++ -- Connect using Kafka's OAuthBearerLoginModule (shaded in our Flink distribution to prevent dependency conflicts) ++ -- Provide path to mounted secret containing Keycloak's self-signed TLS certificate ++ -- Provide OAuth 2.0 client and endpoint details for/from Keycloak realm ++ 'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule ++ required ++ oauth.ssl.truststore.location="/opt/keycloak-ca-cert/ca.crt" ++ oauth.ssl.truststore.type="PEM" ++ oauth.client.id="kafka" ++ oauth.client.secret="{{secret:flink/keycloak-kafka-client-secret/secret}}" ++ oauth.token.endpoint.uri="https://keycloak.flink.svc:8443/realms/kafka-authz/protocol/openid-connect/token" ++ ;', + ++ -- Use Strimzi's OAuth 2.0 callback handler ++ 'properties.sasl.login.callback.handler.class' = 'io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler', + + 'properties.group.id' = 'sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' +); +``` + +### Custom + +Custom authentication allows wide flexibility in how authentication is carried out. +For the sake of simplicity, this example shows how to use a custom TLS client authentication truststore. + +Set up the demo application: + +```shell +# Note: For custom authentication, a self-signed +# 'my-user-custom-cert' TLS certificate is created +SECURE_KAFKA=custom ./scripts/data-gen-setup.sh + +kubectl -n flink apply -f secure-kafka/custom/standalone-etl-secure-deployment.yaml +``` + +The commands above apply the following: + +```diff +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: my-cluster +spec: + kafka: + listeners: +- - name: plain +- port: 9092 +- type: internal +- tls: false + ++ - name: custom ++ port: 9094 ++ type: internal ++ tls: true ++ authentication: ++ type: custom # Specify custom authentication ++ sasl: false # Disable SASL since it is unnecessary ++ listenerConfig: ++ ssl.client.auth: required # Require TLS client authentication + ++ # Provide path to self-signed client TLS truststore ++ ssl.truststore.location: /mnt/my-user-custom-cert/ca.crt ++ ssl.truststore.type: PEM +``` + +```diff +apiVersion: kafka.strimzi.io/v1beta1 +kind: KafkaUser +metadata: + name: my-user + labels: + strimzi.io/cluster: my-cluster +spec: +- authentication: +- type: tls ++ authentication: ++ type: tls-external # Specify external TLS authentication ++ # (we use our own self-signed certificate) +``` + +We can connect to the listener using the query below: + +```diff +-- NOTE: This line is included in standalone-etl-secure-deployment.yaml +CREATE TABLE SalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND +) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.sales.records', +- 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9092', ++ 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094', ++ 'properties.security.protocol' = 'SSL', ++ 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', ++ 'properties.ssl.truststore.type' = 'PEM', + ++ -- Provide contents of the self-signed TLS certificate and key from the ++ -- 'my-user-custom-cert' secret generated by the Cert-Manager Operator ++ -- for our 'my-user-custom-cert' Certificate ++ 'properties.ssl.keystore.certificate.chain' = '{{secret:flink/my-user-custom-cert/tls.crt}}', ++ 'properties.ssl.keystore.key' = '{{secret:flink/my-user-custom-cert/tls.key}}', ++ 'properties.ssl.keystore.type' = 'PEM', + + 'properties.group.id' = 'sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' +); +``` diff --git a/tutorials/scripts/data-gen-setup.sh b/tutorials/scripts/data-gen-setup.sh index f4e4804..efce9bf 100755 --- a/tutorials/scripts/data-gen-setup.sh +++ b/tutorials/scripts/data-gen-setup.sh @@ -8,6 +8,8 @@ KUBE_CMD=${KUBE_CMD:-kubectl} TIMEOUT=${TIMEOUT:-180} FLINK_OPERATOR_VERSION="1.12.1" CERT_MANAGER_VERSION="1.18.2" +KEYCLOAK_OPERATOR_VERSION="26.3.3" +SECURE_KAFKA=${SECURE_KAFKA:-"PLAINTEXT"} printf "\n\n\e[32mInstalling example components into namespace: %s\e[0m\n\n" "${NAMESPACE}" @@ -60,32 +62,161 @@ else ${KUBE_CMD} create -f 'https://strimzi.io/install/latest?namespace=flink' -n "${NAMESPACE}" fi -printf "\n\e[32mCreating Kafka cluster\e[0m\n" -${KUBE_CMD} apply -f https://strimzi.io/examples/latest/kafka/kafka-single-node.yaml -n "${NAMESPACE}" +printf "\n\e[32mChecking that this script is being run from within the tutorial directory\e[0m\n" +if [ -f "scripts/data-gen-setup.sh" ]; then + printf "\e[32mFound scripts/data-gen-setup.sh\e[0m\n" +else + printf "\e[31mError: scripts/data-gen-setup.sh file not found. Please make sure to run this script from the tutorial directory.\e[0m\n" + exit 1 +fi + +case $SECURE_KAFKA in + "PLAINTEXT") + printf "\n\e[32mCreating Kafka pool and cluster (PLAINTEXT)\e[0m\n" + ${KUBE_CMD} apply -f https://strimzi.io/examples/latest/kafka/kafka-single-node.yaml -n "${NAMESPACE}" + ;; + + "TLS") + printf "\n\e[32mCreating Kafka pool (TLS)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/kafka-pool.yaml -n "${NAMESPACE}" + + printf "\n\e[32mCreating Kafka cluster (TLS)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/TLS/kafka.yaml -n "${NAMESPACE}" + ;; + + "mTLS") + printf "\n\e[32mCreating Kafka pool (mTLS)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/kafka-pool.yaml -n "${NAMESPACE}" + + printf "\n\e[32mCreating Kafka cluster (mTLS)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/mTLS/kafka.yaml -n "${NAMESPACE}" + + printf "\n\e[32mCreating Kafka user (mTLS)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/mTLS/kafka-user.yaml -n "${NAMESPACE}" + + printf "\n\e[32mWaiting for Kafka user Secret to be generated (mTLS)...\e[0m\n" + ${KUBE_CMD} -n "${NAMESPACE}" wait --for=create --timeout="${TIMEOUT}"s secret my-user + ;; + + "SCRAM") + printf "\n\e[32mCreating Kafka pool (SCRAM)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/kafka-pool.yaml -n "${NAMESPACE}" + + printf "\n\e[32mCreating Kafka cluster (SCRAM)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/SCRAM/kafka.yaml -n "${NAMESPACE}" + + printf "\n\e[32mCreating Kafka user (SCRAM)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/SCRAM/kafka-user.yaml -n "${NAMESPACE}" + + printf "\n\e[32mWaiting for Kafka user Secret to be generated (SCRAM)...\e[0m\n" + ${KUBE_CMD} -n "${NAMESPACE}" wait --for=create --timeout="${TIMEOUT}"s secret my-user + ;; + + "OAuth2") + printf "\n\e[32mCreating Kafka pool (OAuth2)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/kafka-pool.yaml -n "${NAMESPACE}" + + printf "\n\e[32mInstalling Keycloak Operator CRDs (OAuth2)\e[0m\n" + ${KUBE_CMD} apply -f "https://raw.githubusercontent.com/keycloak/keycloak-k8s-resources/${KEYCLOAK_OPERATOR_VERSION}/kubernetes/keycloaks.k8s.keycloak.org-v1.yml" -n "${NAMESPACE}" + ${KUBE_CMD} apply -f "https://raw.githubusercontent.com/keycloak/keycloak-k8s-resources/${KEYCLOAK_OPERATOR_VERSION}/kubernetes/keycloakrealmimports.k8s.keycloak.org-v1.yml" -n "${NAMESPACE}" + + printf "\n\e[32mInstalling Keycloak Operator deployment (OAuth2)\e[0m\n" + ${KUBE_CMD} apply -f "https://raw.githubusercontent.com/keycloak/keycloak-k8s-resources/${KEYCLOAK_OPERATOR_VERSION}/kubernetes/kubernetes.yml" -n "${NAMESPACE}" + + printf "\n\e[32mCreating self-signed CA issuer using cert-manager (OAuth2)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/selfsigned-ca.yaml -n "${NAMESPACE}" + + printf "\n\e[32mCreating self-signed Keycloak TLS certificate using cert-manager (OAuth2)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/keycloak/keycloak-cert.yaml -n "${NAMESPACE}" + + printf "\n\e[32mWaiting for self-signed Keycloak TLS certificate Secret to be generated (OAuth2)\e[0m\n" + ${KUBE_CMD} wait --for=create --timeout="${TIMEOUT}"s secret keycloak-cert -n "${NAMESPACE}" + + printf "\n\e[32mCreating Keycloak \"kafka\" client Secret (OAuth2)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/keycloak/keycloak-kafka-client-secret.yaml -n "${NAMESPACE}" + + printf "\n\e[32mCreating Keycloak instance (OAuth2)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/keycloak/keycloak.yaml -n "${NAMESPACE}" + + printf "\n\e[32mImporting Keycloak realm \"kafka-authz-realm\" (OAuth2)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/keycloak/kafka-authz-realm.yaml -n "${NAMESPACE}" + + printf "\n\e[32mWaiting for Keycloak realm \"kafka-authz-realm\" to be done importing (OAuth2)\e[0m\n" + ${KUBE_CMD} wait --for=condition=Done --timeout="${TIMEOUT}"s keycloakrealmimports kafka-authz-realm -n "${NAMESPACE}" + + printf "\n\e[32mCleaning up done \"kafka-authz-realm\" import resources (OAuth2)\e[0m\n" + ${KUBE_CMD} delete keycloakrealmimports kafka-authz-realm -n "${NAMESPACE}" + + printf "\n\e[32mWaiting for Keycloak instance to be ready (OAuth2)\e[0m\n" + ${KUBE_CMD} wait --for=condition=Ready --timeout="${TIMEOUT}"s keycloak keycloak -n "${NAMESPACE}" + + printf "\n\e[32mCreating Kafka cluster (OAuth2)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/OAuth2/kafka.yaml -n "${NAMESPACE}" + + printf "\n\e[32mCreating Kafka user (OAuth2)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/OAuth2/kafka-user.yaml -n "${NAMESPACE}" + ;; + + "custom") + printf "\n\e[32mCreating Kafka pool (custom)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/kafka-pool.yaml -n "${NAMESPACE}" + + printf "\n\e[32mCreating self-signed CA issuer using cert-manager (custom)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/selfsigned-ca.yaml -n "${NAMESPACE}" + + printf "\n\e[32mCreating self-signed custom user TLS certificate for using cert-manager (custom)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/custom/my-user-custom-cert.yaml -n "${NAMESPACE}" + + printf "\n\e[32mCreating Kafka cluster (custom)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/custom/kafka.yaml -n "${NAMESPACE}" + + printf "\n\e[32mCreating Kafka user (custom)\e[0m\n" + ${KUBE_CMD} apply -f secure-kafka/custom/kafka-user.yaml -n "${NAMESPACE}" + ;; + + *) + printf "\n\e[31mError: Unknown value passed for SECURE_KAFKA environment variable.\e[0m\n" + exit 1 + ;; +esac printf "\n\e[32mWaiting for Kafka to be ready...\e[0m\n" ${KUBE_CMD} -n "${NAMESPACE}" wait --for=condition=Ready --timeout="${TIMEOUT}"s kafka my-cluster -printf "\n\e[32mChecking for Apicurio Registry configuration file\e[0m\n" -if [ -f "apicurio-registry.yaml" ]; then - printf "\n\e[32mInstalling Apicurio Registry\e[0m\n" - ${KUBE_CMD} apply -f apicurio-registry.yaml -n "${NAMESPACE}" -else - printf "\n\e[31mError: apicurio-registry.yaml file not found. Please make sure to run this script from the tutorial directory.\e[0m\n" - exit 1 -fi +printf "\n\e[32mInstalling Apicurio Registry\e[0m\n" +${KUBE_CMD} apply -f apicurio-registry.yaml -n "${NAMESPACE}" printf "\n\e[32mWaiting for Apicurio to be ready...\e[0m\n" ${KUBE_CMD} -n "${NAMESPACE}" wait --for=condition=Available --timeout="${TIMEOUT}"s deployment apicurio-registry -printf "\n\e[32mChecking for data generator configuration file\e[0m\n" -if [ -f "recommendation-app/data-generator.yaml" ]; then +case $SECURE_KAFKA in + "PLAINTEXT") printf "\n\e[32mDeploying data generation application...\e[0m\n" ${KUBE_CMD} -n "${NAMESPACE}" apply -f recommendation-app/data-generator.yaml -else - printf "\n\e[31mError: recommendation-app/data-generator.yaml file not found. Please make sure to run this script from the tutorial directory.\e[0m\n" + ;; + + "TLS" | "mTLS" | "SCRAM" | "OAuth2" | "custom") + printf "\n\e[32mCreating secure data generation Kafka user...\e[0m\n" + ${KUBE_CMD} -n "${NAMESPACE}" apply -f secure-kafka/data-generator/kafka-user.yaml + + printf "\n\e[32mWaiting for Kafka user Secret to be generated...\e[0m\n" + ${KUBE_CMD} -n "${NAMESPACE}" wait --for=create --timeout="${TIMEOUT}"s secret recommendation-app-kafka-user + + printf "\n\e[32mDeploying secure data generation application...\e[0m\n" + ${KUBE_CMD} -n "${NAMESPACE}" apply -f secure-kafka/data-generator/data-generator.yaml + + printf "\n\e[32mCreating Role that allows Secret reading (if it doesn't exist)...\e[0m\n" + ${KUBE_CMD} -n "${NAMESPACE}" create role secret-getter --dry-run=client -o yaml --verb=get --verb=list --verb=watch --resource=secrets | ${KUBE_CMD} -n "${NAMESPACE}" apply -f - + + printf "\n\e[32mCreating RoleBinding to allow flink service account to read Secrets (if it doesn't exist)...\e[0m\n" + ${KUBE_CMD} -n "${NAMESPACE}" create rolebinding allow-flink-secret-getting --dry-run=client -o yaml --role=secret-getter --serviceaccount=flink:flink | ${KUBE_CMD} -n "${NAMESPACE}" apply -f - + ;; + + *) + printf "\n\e[31mError: Unknown value passed for SECURE_KAFKA environment variable.\e[0m\n" exit 1 -fi + ;; +esac printf "\n\e[32mWaiting for Flink operator to be ready...\e[0m\n" ${KUBE_CMD} -n "${NAMESPACE}" wait --for=condition=Available --timeout="${TIMEOUT}"s deployment flink-kubernetes-operator diff --git a/tutorials/secure-kafka/OAuth2/kafka-user.yaml b/tutorials/secure-kafka/OAuth2/kafka-user.yaml new file mode 100644 index 0000000..4370682 --- /dev/null +++ b/tutorials/secure-kafka/OAuth2/kafka-user.yaml @@ -0,0 +1,31 @@ +apiVersion: kafka.strimzi.io/v1beta1 +kind: KafkaUser +metadata: + name: service-account-kafka + labels: + strimzi.io/cluster: my-cluster +spec: + # spec.authorization.acls is required for topic access + # if simple Kafka spec.kafka.authorization was specified + authorization: + type: simple + acls: + - resource: + type: topic + name: flink.sales.records + operations: + - Describe + - Read + - resource: + type: topic + name: flink.oauth2.auth.sales.records + operations: + - Describe + - Create + - Write + - Read + - resource: + type: group + name: sales-record-group + operations: + - Read diff --git a/tutorials/secure-kafka/OAuth2/kafka.yaml b/tutorials/secure-kafka/OAuth2/kafka.yaml new file mode 100644 index 0000000..60fec1e --- /dev/null +++ b/tutorials/secure-kafka/OAuth2/kafka.yaml @@ -0,0 +1,45 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: my-cluster + annotations: + strimzi.io/node-pools: enabled + strimzi.io/kraft: enabled +spec: + kafka: + version: 4.0.0 + metadataVersion: 4.0-IV3 + listeners: + - name: tls + port: 9093 + type: internal + tls: true + authentication: + type: tls + - name: oauth2 + port: 9094 + type: internal + tls: true + authentication: + type: oauth + validIssuerUri: https://keycloak.flink.svc:8443/realms/kafka-authz + jwksEndpointUri: https://keycloak.flink.svc:8443/realms/kafka-authz/protocol/openid-connect/certs + userNameClaim: preferred_username + tlsTrustedCertificates: + - secretName: keycloak-cert + certificate: tls.crt + # You can remove spec.kafka.authorization + # to grant all users unlimited access rights + authorization: + type: simple + superUsers: + - CN=recommendation-app-kafka-user + config: + offsets.topic.replication.factor: 1 + transaction.state.log.replication.factor: 1 + transaction.state.log.min.isr: 1 + default.replication.factor: 1 + min.insync.replicas: 1 + entityOperator: + topicOperator: {} + userOperator: {} diff --git a/tutorials/secure-kafka/OAuth2/standalone-etl-secure-deployment.yaml b/tutorials/secure-kafka/OAuth2/standalone-etl-secure-deployment.yaml new file mode 100644 index 0000000..5fa5de7 --- /dev/null +++ b/tutorials/secure-kafka/OAuth2/standalone-etl-secure-deployment.yaml @@ -0,0 +1,119 @@ +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: standalone-etl-secure +spec: + image: quay.io/streamshub/flink-sql-runner:0.3.0 + flinkVersion: v2_0 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + podTemplate: + kind: Pod + spec: + volumes: + - name: my-cluster-cluster-ca-cert + secret: + secretName: my-cluster-cluster-ca-cert + items: + - key: ca.crt + path: ca.crt + - name: keycloak-cert + secret: + secretName: keycloak-cert + items: + - key: ca.crt + path: ca.crt + containers: + - name: flink-main-container + volumeMounts: + - name: my-cluster-cluster-ca-cert + mountPath: "/opt/my-cluster-cluster-ca-cert" + readOnly: true + - name: keycloak-cert + mountPath: "/opt/keycloak-ca-cert" + readOnly: true + env: + - name: SQL_STATEMENTS + value: | + CREATE TABLE SalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094', + 'properties.security.protocol' = 'SASL_SSL', + 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', + 'properties.ssl.truststore.type' = 'PEM', + 'properties.sasl.mechanism' = 'OAUTHBEARER', + 'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule + required + oauth.ssl.truststore.location="/opt/keycloak-ca-cert/ca.crt" + oauth.ssl.truststore.type="PEM" + oauth.client.id="kafka" + oauth.client.secret="{{secret:flink/keycloak-kafka-client-secret/secret}}" + oauth.token.endpoint.uri="https://keycloak.flink.svc:8443/realms/kafka-authz/protocol/openid-connect/token" + \;', + 'properties.sasl.login.callback.handler.class' = 'io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler', + 'properties.group.id' = 'sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' + ); + + CREATE TABLE OAuth2AuthSalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + PRIMARY KEY (`user_id`) NOT ENFORCED + ) WITH ( + 'connector' = 'upsert-kafka', + 'topic' = 'flink.oauth2.auth.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094', + 'properties.security.protocol' = 'SASL_SSL', + 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', + 'properties.ssl.truststore.type' = 'PEM', + 'properties.sasl.mechanism' = 'OAUTHBEARER', + 'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule + required + oauth.ssl.truststore.location="/opt/keycloak-ca-cert/ca.crt" + oauth.ssl.truststore.type="PEM" + oauth.client.id="kafka" + oauth.client.secret="{{secret:flink/keycloak-kafka-client-secret/secret}}" + oauth.token.endpoint.uri="https://keycloak.flink.svc:8443/realms/kafka-authz/protocol/openid-connect/token" + \;', + 'properties.sasl.login.callback.handler.class' = 'io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler', + 'properties.client.id' = 'sql-secure-client', + 'properties.transaction.timeout.ms' = '800000', + 'key.format' = 'csv', + 'value.format' = 'csv', + 'value.fields-include' = 'ALL' + ); + + INSERT INTO OAuth2AuthSalesRecordTable + SELECT + invoice_id, + user_id, + product_id, + quantity + FROM SalesRecordTable + LIMIT 10; + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 1 + job: + jarURI: local:///opt/streamshub/flink-sql-runner.jar + parallelism: 1 + upgradeMode: stateless diff --git a/tutorials/secure-kafka/SCRAM/kafka-user.yaml b/tutorials/secure-kafka/SCRAM/kafka-user.yaml new file mode 100644 index 0000000..5660d87 --- /dev/null +++ b/tutorials/secure-kafka/SCRAM/kafka-user.yaml @@ -0,0 +1,33 @@ +apiVersion: kafka.strimzi.io/v1beta1 +kind: KafkaUser +metadata: + name: my-user + labels: + strimzi.io/cluster: my-cluster +spec: + authentication: + type: scram-sha-512 + # spec.authorization.acls is required for topic access + # if simple Kafka spec.kafka.authorization was specified + authorization: + type: simple + acls: + - resource: + type: topic + name: flink.sales.records + operations: + - Describe + - Read + - resource: + type: topic + name: flink.scram.auth.sales.records + operations: + - Describe + - Create + - Write + - Read + - resource: + type: group + name: sales-record-group + operations: + - Read diff --git a/tutorials/secure-kafka/SCRAM/kafka.yaml b/tutorials/secure-kafka/SCRAM/kafka.yaml new file mode 100644 index 0000000..2fa9ca2 --- /dev/null +++ b/tutorials/secure-kafka/SCRAM/kafka.yaml @@ -0,0 +1,39 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: my-cluster + annotations: + strimzi.io/node-pools: enabled + strimzi.io/kraft: enabled +spec: + kafka: + version: 4.0.0 + metadataVersion: 4.0-IV3 + listeners: + - name: tls + port: 9093 + type: internal + tls: true + authentication: + type: tls + - name: scram + port: 9094 + type: internal + tls: true + authentication: + type: scram-sha-512 + # You can remove spec.kafka.authorization + # to grant all users unlimited access rights + authorization: + type: simple + superUsers: + - CN=recommendation-app-kafka-user + config: + offsets.topic.replication.factor: 1 + transaction.state.log.replication.factor: 1 + transaction.state.log.min.isr: 1 + default.replication.factor: 1 + min.insync.replicas: 1 + entityOperator: + topicOperator: {} + userOperator: {} diff --git a/tutorials/secure-kafka/SCRAM/standalone-etl-secure-deployment.yaml b/tutorials/secure-kafka/SCRAM/standalone-etl-secure-deployment.yaml new file mode 100644 index 0000000..0258c92 --- /dev/null +++ b/tutorials/secure-kafka/SCRAM/standalone-etl-secure-deployment.yaml @@ -0,0 +1,102 @@ +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: standalone-etl-secure +spec: + image: quay.io/streamshub/flink-sql-runner:0.3.0 + flinkVersion: v2_0 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + podTemplate: + kind: Pod + spec: + volumes: + - name: my-cluster-cluster-ca-cert + secret: + secretName: my-cluster-cluster-ca-cert + items: + - key: ca.crt + path: ca.crt + containers: + - name: flink-main-container + volumeMounts: + - name: my-cluster-cluster-ca-cert + mountPath: "/opt/my-cluster-cluster-ca-cert" + readOnly: true + env: + - name: SQL_STATEMENTS + value: | + CREATE TABLE SalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094', + 'properties.security.protocol' = 'SASL_SSL', + 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', + 'properties.ssl.truststore.type' = 'PEM', + 'properties.sasl.mechanism' = 'SCRAM-SHA-512', + 'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule + required + username="my-user" + password="{{secret:flink/my-user/password}}" + \;', + 'properties.group.id' = 'sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' + ); + + CREATE TABLE ScramAuthSalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + PRIMARY KEY (`user_id`) NOT ENFORCED + ) WITH ( + 'connector' = 'upsert-kafka', + 'topic' = 'flink.scram.auth.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094', + 'properties.security.protocol' = 'SASL_SSL', + 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', + 'properties.ssl.truststore.type' = 'PEM', + 'properties.sasl.mechanism' = 'SCRAM-SHA-512', + 'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule + required + username="my-user" + password="{{secret:flink/my-user/password}}" + \;', + 'properties.client.id' = 'sql-secure-client', + 'properties.transaction.timeout.ms' = '800000', + 'key.format' = 'csv', + 'value.format' = 'csv', + 'value.fields-include' = 'ALL' + ); + + INSERT INTO ScramAuthSalesRecordTable + SELECT + invoice_id, + user_id, + product_id, + quantity + FROM SalesRecordTable + LIMIT 10; + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 1 + job: + jarURI: local:///opt/streamshub/flink-sql-runner.jar + parallelism: 1 + upgradeMode: stateless diff --git a/tutorials/secure-kafka/TLS/kafka.yaml b/tutorials/secure-kafka/TLS/kafka.yaml new file mode 100644 index 0000000..f1bfacf --- /dev/null +++ b/tutorials/secure-kafka/TLS/kafka.yaml @@ -0,0 +1,25 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: my-cluster + annotations: + strimzi.io/node-pools: enabled + strimzi.io/kraft: enabled +spec: + kafka: + version: 4.0.0 + metadataVersion: 4.0-IV3 + listeners: + - name: tls + port: 9093 + type: internal + tls: true + config: + offsets.topic.replication.factor: 1 + transaction.state.log.replication.factor: 1 + transaction.state.log.min.isr: 1 + default.replication.factor: 1 + min.insync.replicas: 1 + entityOperator: + topicOperator: {} + userOperator: {} diff --git a/tutorials/secure-kafka/TLS/standalone-etl-secure-deployment.yaml b/tutorials/secure-kafka/TLS/standalone-etl-secure-deployment.yaml new file mode 100644 index 0000000..b7bf3d7 --- /dev/null +++ b/tutorials/secure-kafka/TLS/standalone-etl-secure-deployment.yaml @@ -0,0 +1,90 @@ +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: standalone-etl-secure +spec: + image: quay.io/streamshub/flink-sql-runner:0.3.0 + flinkVersion: v2_0 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + podTemplate: + kind: Pod + spec: + volumes: + - name: my-cluster-cluster-ca-cert + secret: + secretName: my-cluster-cluster-ca-cert + items: + - key: ca.crt + path: ca.crt + containers: + - name: flink-main-container + volumeMounts: + - name: my-cluster-cluster-ca-cert + mountPath: "/opt/my-cluster-cluster-ca-cert" + readOnly: true + env: + - name: SQL_STATEMENTS + value: | + CREATE TABLE SalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9093', + 'properties.security.protocol' = 'SSL', + 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', + 'properties.ssl.truststore.type' = 'PEM', + 'properties.group.id' = 'sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' + ); + + CREATE TABLE TlsAuthSalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + PRIMARY KEY (`user_id`) NOT ENFORCED + ) WITH ( + 'connector' = 'upsert-kafka', + 'topic' = 'flink.mtls.auth.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9093', + 'properties.security.protocol' = 'SSL', + 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', + 'properties.ssl.truststore.type' = 'PEM', + 'properties.client.id' = 'sql-secure-client', + 'properties.transaction.timeout.ms' = '800000', + 'key.format' = 'csv', + 'value.format' = 'csv', + 'value.fields-include' = 'ALL' + ); + + INSERT INTO TlsAuthSalesRecordTable + SELECT + invoice_id, + user_id, + product_id, + quantity + FROM SalesRecordTable + LIMIT 10; + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 1 + job: + jarURI: local:///opt/streamshub/flink-sql-runner.jar + parallelism: 1 + upgradeMode: stateless diff --git a/tutorials/secure-kafka/custom/kafka-user.yaml b/tutorials/secure-kafka/custom/kafka-user.yaml new file mode 100644 index 0000000..cd32d60 --- /dev/null +++ b/tutorials/secure-kafka/custom/kafka-user.yaml @@ -0,0 +1,33 @@ +apiVersion: kafka.strimzi.io/v1beta1 +kind: KafkaUser +metadata: + name: my-user + labels: + strimzi.io/cluster: my-cluster +spec: + authentication: + type: tls-external + # spec.authorization.acls is required for topic access + # if simple Kafka spec.kafka.authorization was specified + authorization: + type: simple + acls: + - resource: + type: topic + name: flink.sales.records + operations: + - Describe + - Read + - resource: + type: topic + name: flink.custom.auth.sales.records + operations: + - Describe + - Create + - Write + - Read + - resource: + type: group + name: sales-record-group + operations: + - Read diff --git a/tutorials/secure-kafka/custom/kafka.yaml b/tutorials/secure-kafka/custom/kafka.yaml new file mode 100644 index 0000000..d2f9ad4 --- /dev/null +++ b/tutorials/secure-kafka/custom/kafka.yaml @@ -0,0 +1,57 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: my-cluster + annotations: + strimzi.io/node-pools: enabled + strimzi.io/kraft: enabled +spec: + kafka: + version: 4.0.0 + metadataVersion: 4.0-IV3 + listeners: + - name: tls + port: 9093 + type: internal + tls: true + authentication: + type: tls + - name: custom + port: 9094 + type: internal + tls: true + authentication: + type: custom + sasl: false + listenerConfig: + ssl.client.auth: required + ssl.truststore.location: /mnt/my-user-custom-cert/ca.crt + ssl.truststore.type: PEM + # You can remove spec.kafka.authorization + # to grant all users unlimited access rights + authorization: + type: simple + superUsers: + - CN=recommendation-app-kafka-user + template: + pod: + volumes: + - name: my-user-custom-cert + secret: + secretName: my-user-custom-cert + items: + - key: ca.crt + path: ca.crt + kafkaContainer: + volumeMounts: + - name: my-user-custom-cert + mountPath: /mnt/my-user-custom-cert + config: + offsets.topic.replication.factor: 1 + transaction.state.log.replication.factor: 1 + transaction.state.log.min.isr: 1 + default.replication.factor: 1 + min.insync.replicas: 1 + entityOperator: + topicOperator: {} + userOperator: {} diff --git a/tutorials/secure-kafka/custom/my-user-custom-cert.yaml b/tutorials/secure-kafka/custom/my-user-custom-cert.yaml new file mode 100644 index 0000000..0c3e238 --- /dev/null +++ b/tutorials/secure-kafka/custom/my-user-custom-cert.yaml @@ -0,0 +1,14 @@ +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + name: my-user-custom-cert +spec: + secretName: my-user-custom-cert + issuerRef: + name: my-selfsigned-ca-issuer + kind: Issuer + commonName: my-user + privateKey: + algorithm: RSA + encoding: PKCS8 + size: 2048 diff --git a/tutorials/secure-kafka/custom/standalone-etl-secure-deployment.yaml b/tutorials/secure-kafka/custom/standalone-etl-secure-deployment.yaml new file mode 100644 index 0000000..bbdc1df --- /dev/null +++ b/tutorials/secure-kafka/custom/standalone-etl-secure-deployment.yaml @@ -0,0 +1,96 @@ +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: standalone-etl-secure +spec: + image: quay.io/streamshub/flink-sql-runner:0.3.0 + flinkVersion: v2_0 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + podTemplate: + kind: Pod + spec: + volumes: + - name: my-cluster-cluster-ca-cert + secret: + secretName: my-cluster-cluster-ca-cert + items: + - key: ca.crt + path: ca.crt + containers: + - name: flink-main-container + volumeMounts: + - name: my-cluster-cluster-ca-cert + mountPath: "/opt/my-cluster-cluster-ca-cert" + readOnly: true + env: + - name: SQL_STATEMENTS + value: | + CREATE TABLE SalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094', + 'properties.security.protocol' = 'SSL', + 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', + 'properties.ssl.truststore.type' = 'PEM', + 'properties.ssl.keystore.certificate.chain' = '{{secret:flink/my-user-custom-cert/tls.crt}}', + 'properties.ssl.keystore.key' = '{{secret:flink/my-user-custom-cert/tls.key}}', + 'properties.ssl.keystore.type' = 'PEM', + 'properties.group.id' = 'sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' + ); + + CREATE TABLE CustomAuthSalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + PRIMARY KEY (`user_id`) NOT ENFORCED + ) WITH ( + 'connector' = 'upsert-kafka', + 'topic' = 'flink.custom.auth.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094', + 'properties.security.protocol' = 'SSL', + 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', + 'properties.ssl.truststore.type' = 'PEM', + 'properties.ssl.keystore.certificate.chain' = '{{secret:flink/my-user-custom-cert/tls.crt}}', + 'properties.ssl.keystore.key' = '{{secret:flink/my-user-custom-cert/tls.key}}', + 'properties.ssl.keystore.type' = 'PEM', + 'properties.client.id' = 'sql-secure-client', + 'properties.transaction.timeout.ms' = '800000', + 'key.format' = 'csv', + 'value.format' = 'csv', + 'value.fields-include' = 'ALL' + ); + + INSERT INTO CustomAuthSalesRecordTable + SELECT + invoice_id, + user_id, + product_id, + quantity + FROM SalesRecordTable + LIMIT 10; + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 1 + job: + jarURI: local:///opt/streamshub/flink-sql-runner.jar + parallelism: 1 + upgradeMode: stateless diff --git a/tutorials/secure-kafka/data-generator/data-generator.yaml b/tutorials/secure-kafka/data-generator/data-generator.yaml new file mode 100644 index 0000000..76e8c12 --- /dev/null +++ b/tutorials/secure-kafka/data-generator/data-generator.yaml @@ -0,0 +1,61 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: recommendation-app-data + labels: + app: recommendation-app-data +spec: + replicas: 1 + selector: + matchLabels: + app: recommendation-app-data + template: + metadata: + labels: + app: recommendation-app-data + spec: + containers: + - name: data-generator + image: quay.io/streamshub/flink-examples-data-generator:main + env: + - name: KAFKA_BOOTSTRAP_SERVERS + value: "my-cluster-kafka-bootstrap.flink.svc:9093" + - name: KAFKA_SECURITY_PROTOCOL + value: "SSL" + - name: KAFKA_SSL_TRUSTSTORE_LOCATION + value: "/opt/my-cluster-cluster-ca-cert/ca.crt" + - name: KAFKA_SSL_TRUSTSTORE_TYPE + value: "PEM" + - name: KAFKA_SSL_KEYSTORE_LOCATION + value: "/opt/recommendation-app-kafka-user/user.p12" + - name: KAFKA_SSL_KEYSTORE_PASSWORD + valueFrom: + secretKeyRef: + name: recommendation-app-kafka-user + key: user.password + - name: DATA + value: "clickStream,sales,internationalSales" + - name: USE_APICURIO_REGISTRY + value: "true" + - name: REGISTRY_URL + value: "http://apicurio-registry-service.flink.svc:8080/apis/registry/v2" + volumeMounts: + - name: my-cluster-cluster-ca-cert + mountPath: "/opt/my-cluster-cluster-ca-cert" + readOnly: true + - name: recommendation-app-kafka-user + mountPath: "/opt/recommendation-app-kafka-user" + readOnly: true + volumes: + - name: my-cluster-cluster-ca-cert + secret: + secretName: my-cluster-cluster-ca-cert + items: + - key: ca.crt + path: ca.crt + - name: recommendation-app-kafka-user + secret: + secretName: recommendation-app-kafka-user + items: + - key: user.p12 + path: user.p12 \ No newline at end of file diff --git a/tutorials/secure-kafka/data-generator/kafka-user.yaml b/tutorials/secure-kafka/data-generator/kafka-user.yaml new file mode 100644 index 0000000..2e09fb4 --- /dev/null +++ b/tutorials/secure-kafka/data-generator/kafka-user.yaml @@ -0,0 +1,9 @@ +apiVersion: kafka.strimzi.io/v1beta1 +kind: KafkaUser +metadata: + name: recommendation-app-kafka-user + labels: + strimzi.io/cluster: my-cluster +spec: + authentication: + type: tls diff --git a/tutorials/secure-kafka/kafka-pool.yaml b/tutorials/secure-kafka/kafka-pool.yaml new file mode 100644 index 0000000..b9c7df5 --- /dev/null +++ b/tutorials/secure-kafka/kafka-pool.yaml @@ -0,0 +1,19 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaNodePool +metadata: + name: dual-role + labels: + strimzi.io/cluster: my-cluster +spec: + replicas: 1 + roles: + - controller + - broker + storage: + type: jbod + volumes: + - id: 0 + type: persistent-claim + size: 100Gi + deleteClaim: false + kraftMetadata: shared diff --git a/tutorials/secure-kafka/keycloak/kafka-authz-realm.yaml b/tutorials/secure-kafka/keycloak/kafka-authz-realm.yaml new file mode 100644 index 0000000..ae91b79 --- /dev/null +++ b/tutorials/secure-kafka/keycloak/kafka-authz-realm.yaml @@ -0,0 +1,699 @@ +# Realm copied from https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/security/keycloak-authorization/kafka-authz-realm.json +apiVersion: k8s.keycloak.org/v2alpha1 +kind: KeycloakRealmImport +metadata: + name: kafka-authz-realm +spec: + keycloakCRName: keycloak + realm: + { + "realm": "kafka-authz", + "accessTokenLifespan": 3600, + "ssoSessionIdleTimeout": 864000, + "ssoSessionMaxLifespan": 864000, + "enabled": true, + "sslRequired": "external", + "roles": { + "realm": [ + { + "name": "Dev Team A", + "description": "Developer on Dev Team A" + }, + { + "name": "Dev Team B", + "description": "Developer on Dev Team B" + }, + { + "name": "Ops Team", + "description": "Operations team member" + } + ], + "client": { + "team-a-client": [], + "team-b-client": [], + "kafka-cli": [], + "kafka": [ + { + "name": "uma_protection", + "clientRole": true + } + ] + } + }, + "groups" : [ + { + "name" : "ClusterManager Group", + "path" : "/ClusterManager Group" + }, { + "name" : "ClusterManager-my-cluster Group", + "path" : "/ClusterManager-my-cluster Group" + }, { + "name" : "Ops Team Group", + "path" : "/Ops Team Group" + } + ], + "users": [ + { + "username" : "alice", + "enabled" : true, + "totp" : false, + "emailVerified" : true, + "firstName" : "Alice", + "email" : "alice@strimzi.io", + "credentials" : [ { + "type" : "password", + "secretData" : "{\"value\":\"KqABIiReBuRWbP4pBct3W067pNvYzeN7ILBV+8vT8nuF5cgYs2fdl2QikJT/7bGTW/PBXg6CYLwJQFYrBK9MWg==\",\"salt\":\"EPgscX9CQz7UnuZDNZxtMw==\"}", + "credentialData" : "{\"hashIterations\":27500,\"algorithm\":\"pbkdf2-sha256\"}" + } ], + "disableableCredentialTypes" : [ ], + "requiredActions" : [ ], + "realmRoles" : [ "offline_access", "uma_authorization" ], + "clientRoles" : { + "account" : [ "view-profile", "manage-account" ] + }, + "groups" : [ "/ClusterManager Group" ] + }, { + "username" : "bob", + "enabled" : true, + "totp" : false, + "emailVerified" : true, + "firstName" : "Bob", + "email" : "bob@strimzi.io", + "credentials" : [ { + "type" : "password", + "secretData" : "{\"value\":\"QhK0uLsKuBDrMm9Z9XHvq4EungecFRnktPgutfjKtgVv2OTPd8D390RXFvJ8KGvqIF8pdoNxHYQyvDNNwMORpg==\",\"salt\":\"yxkgwEyTnCGLn42Yr9GxBQ==\"}", + "credentialData" : "{\"hashIterations\":27500,\"algorithm\":\"pbkdf2-sha256\"}" + } ], + "disableableCredentialTypes" : [ ], + "requiredActions" : [ ], + "realmRoles" : [ "offline_access", "uma_authorization" ], + "clientRoles" : { + "account" : [ "view-profile", "manage-account" ] + }, + "groups" : [ "/ClusterManager-my-cluster Group" ] + }, + { + "username" : "service-account-team-a-client", + "enabled" : true, + "serviceAccountClientId" : "team-a-client", + "realmRoles" : [ "offline_access", "Dev Team A" ], + "clientRoles" : { + "account" : [ "manage-account", "view-profile" ] + }, + "groups" : [ ] + }, + { + "username" : "service-account-team-b-client", + "enabled" : true, + "serviceAccountClientId" : "team-b-client", + "realmRoles" : [ "offline_access", "Dev Team B" ], + "clientRoles" : { + "account" : [ "manage-account", "view-profile" ] + }, + "groups" : [ ] + } + ], + "clients": [ + { + "clientId": "team-a-client", + "enabled": true, + "clientAuthenticatorType": "client-secret", + "secret": "team-a-client-secret", + "bearerOnly": false, + "consentRequired": false, + "standardFlowEnabled": false, + "implicitFlowEnabled": false, + "directAccessGrantsEnabled": true, + "serviceAccountsEnabled": true, + "publicClient": false, + "fullScopeAllowed": true + }, + { + "clientId": "team-b-client", + "enabled": true, + "clientAuthenticatorType": "client-secret", + "secret": "team-b-client-secret", + "bearerOnly": false, + "consentRequired": false, + "standardFlowEnabled": false, + "implicitFlowEnabled": false, + "directAccessGrantsEnabled": true, + "serviceAccountsEnabled": true, + "publicClient": false, + "fullScopeAllowed": true + }, + { + "clientId": "kafka", + "enabled": true, + "clientAuthenticatorType": "client-secret", + "secret": "kafka-secret", + "bearerOnly": false, + "consentRequired": false, + "standardFlowEnabled": false, + "implicitFlowEnabled": false, + "directAccessGrantsEnabled": true, + "serviceAccountsEnabled": true, + "authorizationServicesEnabled": true, + "publicClient": false, + "fullScopeAllowed": true, + "authorizationSettings": { + "allowRemoteResourceManagement": true, + "policyEnforcementMode": "ENFORCING", + "resources": [ + { + "name": "Topic:a_*", + "type": "Topic", + "ownerManagedAccess": false, + "displayName": "Topics that start with a_", + "attributes": {}, + "uris": [], + "scopes": [ + { + "name": "Create" + }, + { + "name": "Delete" + }, + { + "name": "Describe" + }, + { + "name": "Write" + }, + { + "name": "Read" + }, + { + "name": "Alter" + }, + { + "name": "DescribeConfigs" + }, + { + "name": "AlterConfigs" + } + ] + }, + { + "name": "Group:x_*", + "type": "Group", + "ownerManagedAccess": false, + "displayName": "Consumer groups that start with x_", + "attributes": {}, + "uris": [], + "scopes": [ + { + "name": "Describe" + }, + { + "name": "Delete" + }, + { + "name": "Read" + } + ] + }, + { + "name": "Topic:x_*", + "type": "Topic", + "ownerManagedAccess": false, + "displayName": "Topics that start with x_", + "attributes": {}, + "uris": [], + "scopes": [ + { + "name": "Create" + }, + { + "name": "Describe" + }, + { + "name": "Delete" + }, + { + "name": "Write" + }, + { + "name": "Read" + }, + { + "name": "Alter" + }, + { + "name": "DescribeConfigs" + }, + { + "name": "AlterConfigs" + } + ] + }, + { + "name": "Group:a_*", + "type": "Group", + "ownerManagedAccess": false, + "displayName": "Groups that start with a_", + "attributes": {}, + "uris": [], + "scopes": [ + { + "name": "Describe" + }, + { + "name": "Read" + } + ] + }, + { + "name": "Group:*", + "type": "Group", + "ownerManagedAccess": false, + "displayName": "Any group", + "attributes": {}, + "uris": [], + "scopes": [ + { + "name": "Describe" + }, + { + "name": "Read" + }, + { + "name": "DescribeConfigs" + }, + { + "name": "AlterConfigs" + } + ] + }, + { + "name": "Topic:*", + "type": "Topic", + "ownerManagedAccess": false, + "displayName": "Any topic", + "attributes": {}, + "uris": [], + "scopes": [ + { + "name": "Create" + }, + { + "name": "Delete" + }, + { + "name": "Describe" + }, + { + "name": "Write" + }, + { + "name": "Read" + }, + { + "name": "Alter" + }, + { + "name": "DescribeConfigs" + }, + { + "name": "AlterConfigs" + } + ] + }, + { + "name": "kafka-cluster:my-cluster,Topic:b_*", + "type": "Topic", + "ownerManagedAccess": false, + "attributes": {}, + "uris": [], + "scopes": [ + { + "name": "Create" + }, + { + "name": "Delete" + }, + { + "name": "Describe" + }, + { + "name": "Write" + }, + { + "name": "Read" + }, + { + "name": "Alter" + }, + { + "name": "DescribeConfigs" + }, + { + "name": "AlterConfigs" + } + ] + }, + { + "name": "kafka-cluster:my-cluster,Cluster:*", + "type": "Cluster", + "ownerManagedAccess": false, + "displayName": "Cluster scope on my-cluster", + "attributes": {}, + "uris": [], + "scopes": [ + { + "name": "DescribeConfigs" + }, + { + "name": "AlterConfigs" + }, + { + "name": "ClusterAction" + }, + { + "name": "IdempotentWrite" + } + ] + }, + { + "name": "kafka-cluster:my-cluster,Group:*", + "type": "Group", + "ownerManagedAccess": false, + "displayName": "Any group on my-cluster", + "attributes": {}, + "uris": [], + "scopes": [ + { + "name": "Delete" + }, + { + "name": "Describe" + }, + { + "name": "Read" + }, + { + "name": "DescribeConfigs" + }, + { + "name": "AlterConfigs" + } + ] + }, + { + "name": "kafka-cluster:my-cluster,Topic:*", + "type": "Topic", + "ownerManagedAccess": false, + "displayName": "Any topic on my-cluster", + "attributes": {}, + "uris": [], + "scopes": [ + { + "name": "Create" + }, + { + "name": "Delete" + }, + { + "name": "Describe" + }, + { + "name": "Write" + }, + { + "name": "IdempotentWrite" + }, + { + "name": "Read" + }, + { + "name": "Alter" + }, + { + "name": "DescribeConfigs" + }, + { + "name": "AlterConfigs" + } + ] + }, + { + "name" : "Cluster:*", + "type" : "Cluster", + "ownerManagedAccess" : false, + "attributes" : { }, + "uris" : [ ], + "scopes": [ + { + "name": "DescribeConfigs" + }, + { + "name": "AlterConfigs" + }, + { + "name": "ClusterAction" + }, + { + "name": "IdempotentWrite" + } + ] + } + ], + "policies": [ + { + "name": "Dev Team A", + "type": "role", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "roles": "[{\"id\":\"Dev Team A\",\"required\":true}]" + } + }, + { + "name": "Dev Team B", + "type": "role", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "roles": "[{\"id\":\"Dev Team B\",\"required\":true}]" + } + }, + { + "name": "Ops Team", + "type": "role", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "roles": "[{\"id\":\"Ops Team\",\"required\":true}]" + } + }, + { + "name" : "ClusterManager Group", + "type" : "group", + "logic" : "POSITIVE", + "decisionStrategy" : "UNANIMOUS", + "config" : { + "groups" : "[{\"path\":\"/ClusterManager Group\",\"extendChildren\":false}]" + } + }, { + "name" : "ClusterManager of my-cluster Group", + "type" : "group", + "logic" : "POSITIVE", + "decisionStrategy" : "UNANIMOUS", + "config" : { + "groups" : "[{\"path\":\"/ClusterManager-my-cluster Group\",\"extendChildren\":false}]" + } + }, + { + "name": "Dev Team A owns topics that start with a_ on any cluster", + "type": "resource", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"Topic:a_*\"]", + "applyPolicies": "[\"Dev Team A\"]" + } + }, + { + "name": "Dev Team A can write to topics that start with x_ on any cluster", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"Topic:x_*\"]", + "scopes": "[\"Describe\",\"Write\"]", + "applyPolicies": "[\"Dev Team A\"]" + } + }, + { + "name": "Dev Team A can do IdempotentWrites on cluster my-cluster", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"kafka-cluster:my-cluster,Cluster:*\"]", + "scopes": "[\"IdempotentWrite\"]", + "applyPolicies": "[\"Dev Team A\"]" + } + }, + { + "name": "Dev Team B owns topics that start with b_ on cluster my-cluster", + "type": "resource", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"kafka-cluster:my-cluster,Topic:b_*\"]", + "applyPolicies": "[\"Dev Team B\"]" + } + }, + { + "name": "Dev Team B can read from topics that start with x_ on any cluster", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"Topic:x_*\"]", + "scopes": "[\"Describe\",\"Read\"]", + "applyPolicies": "[\"Dev Team B\"]" + } + }, + { + "name": "Dev Team B can update consumer group offsets that start with x_ on any cluster", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"Group:x_*\"]", + "scopes": "[\"Describe\",\"Read\"]", + "applyPolicies": "[\"Dev Team B\"]" + } + }, + { + "name": "Dev Team B can do IdempotentWrites on cluster my-cluster", + "type": "scope", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"kafka-cluster:my-cluster,Cluster:*\"]", + "scopes": "[\"IdempotentWrite\"]", + "applyPolicies": "[\"Dev Team B\"]" + } + }, + { + "name": "Dev Team A can use consumer groups that start with a_ on any cluster", + "type": "resource", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"Group:a_*\"]", + "applyPolicies": "[\"Dev Team A\"]" + } + }, + { + "name": "ClusterManager of my-cluster Group has full access to topics on my-cluster", + "type": "resource", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"kafka-cluster:my-cluster,Topic:*\"]", + "applyPolicies": "[\"ClusterManager of my-cluster Group\"]" + } + }, + { + "name": "ClusterManager of my-cluster Group has full access to consumer groups on my-cluster", + "type": "resource", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"kafka-cluster:my-cluster,Group:*\"]", + "applyPolicies": "[\"ClusterManager of my-cluster Group\"]" + } + }, + { + "name": "ClusterManager of my-cluster Group has full access to cluster config on my-cluster", + "type": "resource", + "logic": "POSITIVE", + "decisionStrategy": "UNANIMOUS", + "config": { + "resources": "[\"kafka-cluster:my-cluster,Cluster:*\"]", + "applyPolicies": "[\"ClusterManager of my-cluster Group\"]" + } + }, { + "name" : "ClusterManager Group has full access to manage and affect groups", + "type" : "resource", + "logic" : "POSITIVE", + "decisionStrategy" : "UNANIMOUS", + "config" : { + "resources" : "[\"Group:*\"]", + "applyPolicies" : "[\"ClusterManager Group\"]" + } + }, { + "name" : "ClusterManager Group has full access to manage and affect topics", + "type" : "resource", + "logic" : "POSITIVE", + "decisionStrategy" : "UNANIMOUS", + "config" : { + "resources" : "[\"Topic:*\"]", + "applyPolicies" : "[\"ClusterManager Group\"]" + } + }, { + "name" : "ClusterManager Group has full access to cluster config", + "type" : "resource", + "logic" : "POSITIVE", + "decisionStrategy" : "UNANIMOUS", + "config" : { + "resources" : "[\"Cluster:*\"]", + "applyPolicies" : "[\"ClusterManager Group\"]" + } + } + ], + "scopes": [ + { + "name": "Create" + }, + { + "name": "Read" + }, + { + "name": "Write" + }, + { + "name": "Delete" + }, + { + "name": "Alter" + }, + { + "name": "Describe" + }, + { + "name": "ClusterAction" + }, + { + "name": "DescribeConfigs" + }, + { + "name": "AlterConfigs" + }, + { + "name": "IdempotentWrite" + } + ], + "decisionStrategy": "AFFIRMATIVE" + } + }, + { + "clientId": "kafka-cli", + "enabled": true, + "clientAuthenticatorType": "client-secret", + "secret": "kafka-cli-secret", + "bearerOnly": false, + "consentRequired": false, + "standardFlowEnabled": false, + "implicitFlowEnabled": false, + "directAccessGrantsEnabled": true, + "serviceAccountsEnabled": false, + "publicClient": true, + "fullScopeAllowed": true + } + ] + } diff --git a/tutorials/secure-kafka/keycloak/keycloak-cert.yaml b/tutorials/secure-kafka/keycloak/keycloak-cert.yaml new file mode 100644 index 0000000..616df27 --- /dev/null +++ b/tutorials/secure-kafka/keycloak/keycloak-cert.yaml @@ -0,0 +1,12 @@ +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + name: keycloak-cert +spec: + secretName: keycloak-cert + issuerRef: + name: my-selfsigned-ca-issuer + kind: Issuer + dnsNames: + - keycloak.flink.svc.cluster.local + - keycloak.flink.svc diff --git a/tutorials/secure-kafka/keycloak/keycloak-kafka-client-secret.yaml b/tutorials/secure-kafka/keycloak/keycloak-kafka-client-secret.yaml new file mode 100644 index 0000000..ba3eefc --- /dev/null +++ b/tutorials/secure-kafka/keycloak/keycloak-kafka-client-secret.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: Secret +metadata: + name: keycloak-kafka-client-secret +type: Opaque +stringData: + secret: "kafka-secret" # This is the "kafka" client secret from + # Strimzi's keycloak example realm diff --git a/tutorials/secure-kafka/keycloak/keycloak.yaml b/tutorials/secure-kafka/keycloak/keycloak.yaml new file mode 100644 index 0000000..2934782 --- /dev/null +++ b/tutorials/secure-kafka/keycloak/keycloak.yaml @@ -0,0 +1,116 @@ +# Modified version of https://github.com/keycloak/keycloak-quickstarts/blob/81e719fa0423206a93a087f31673a9db0728ab4f/kubernetes/keycloak.yaml +# Some code copied from https://www.keycloak.org/operator/basic-deployment +apiVersion: v1 +kind: Service +metadata: + name: keycloak + labels: + app: keycloak +spec: + ports: + - protocol: TCP + port: 8080 + targetPort: http + name: http + - protocol: TCP + port: 8443 + targetPort: https + name: https + selector: + app: keycloak + type: ClusterIP +--- +apiVersion: v1 +kind: Secret +metadata: + name: keycloak-db-secret +type: Opaque +stringData: + username: "keycloak" + password: "keycloak" +--- +apiVersion: k8s.keycloak.org/v2alpha1 +kind: Keycloak +metadata: + name: keycloak + labels: + app: keycloak +spec: + instances: 1 + db: + vendor: postgres + host: postgres + usernameSecret: + name: keycloak-db-secret + key: username + passwordSecret: + name: keycloak-db-secret + key: password + http: + tlsSecret: keycloak-cert + hostname: + hostname: keycloak.flink.svc + proxy: + headers: xforwarded # double check your reverse proxy sets and overwrites the X-Forwarded-* headers +--- +# This is deployment of PostgreSQL with an ephemeral storage for testing: Once the Pod stops, the data is lost. +# For a production setup, replace it with a database setup that persists your data. +apiVersion: apps/v1 +kind: Deployment +metadata: + name: postgres + labels: + app: postgres +spec: + replicas: 1 + selector: + matchLabels: + app: postgres + template: + metadata: + labels: + app: postgres + spec: + containers: + - name: postgres + image: mirror.gcr.io/postgres:17 + env: + - name: 'POSTGRES_USER' + valueFrom: + secretKeyRef: + name: keycloak-db-secret + key: username + - name: 'POSTGRES_PASSWORD' + valueFrom: + secretKeyRef: + name: keycloak-db-secret + key: username + - name: POSTGRES_DB + value: "keycloak" + - name: POSTGRES_LOG_STATEMENT + value: "all" + ports: + - name: postgres + containerPort: 5432 + volumeMounts: + # Using volume mount for PostgreSQL's data folder as it is otherwise not writable + - name: postgres-data + mountPath: /var/lib/postgresql + volumes: + - name: postgres-data + emptyDir: {} +--- +apiVersion: v1 +kind: Service +metadata: + labels: + app: postgres + name: postgres +spec: + selector: + app: postgres + ports: + - protocol: TCP + port: 5432 + targetPort: 5432 + type: ClusterIP diff --git a/tutorials/secure-kafka/mTLS/kafka-user.yaml b/tutorials/secure-kafka/mTLS/kafka-user.yaml new file mode 100644 index 0000000..e961238 --- /dev/null +++ b/tutorials/secure-kafka/mTLS/kafka-user.yaml @@ -0,0 +1,33 @@ +apiVersion: kafka.strimzi.io/v1beta1 +kind: KafkaUser +metadata: + name: my-user + labels: + strimzi.io/cluster: my-cluster +spec: + authentication: + type: tls + # spec.authorization.acls is required for topic access + # if simple Kafka spec.kafka.authorization was specified + authorization: + type: simple + acls: + - resource: + type: topic + name: flink.sales.records + operations: + - Describe + - Read + - resource: + type: topic + name: flink.mtls.auth.sales.records + operations: + - Describe + - Create + - Write + - Read + - resource: + type: group + name: sales-record-group + operations: + - Read \ No newline at end of file diff --git a/tutorials/secure-kafka/mTLS/kafka.yaml b/tutorials/secure-kafka/mTLS/kafka.yaml new file mode 100644 index 0000000..9d9e667 --- /dev/null +++ b/tutorials/secure-kafka/mTLS/kafka.yaml @@ -0,0 +1,39 @@ +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: my-cluster + annotations: + strimzi.io/node-pools: enabled + strimzi.io/kraft: enabled +spec: + kafka: + version: 4.0.0 + metadataVersion: 4.0-IV3 + listeners: + - name: tls + port: 9093 + type: internal + tls: true + authentication: + type: tls + - name: mtls + port: 9094 + type: internal + tls: true + authentication: + type: tls + # You can remove spec.kafka.authorization + # to grant all users unlimited access rights + authorization: + type: simple + superUsers: + - CN=recommendation-app-kafka-user + config: + offsets.topic.replication.factor: 1 + transaction.state.log.replication.factor: 1 + transaction.state.log.min.isr: 1 + default.replication.factor: 1 + min.insync.replicas: 1 + entityOperator: + topicOperator: {} + userOperator: {} diff --git a/tutorials/secure-kafka/mTLS/standalone-etl-secure-deployment.yaml b/tutorials/secure-kafka/mTLS/standalone-etl-secure-deployment.yaml new file mode 100644 index 0000000..d03514b --- /dev/null +++ b/tutorials/secure-kafka/mTLS/standalone-etl-secure-deployment.yaml @@ -0,0 +1,96 @@ +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: standalone-etl-secure +spec: + image: quay.io/streamshub/flink-sql-runner:0.3.0 + flinkVersion: v2_0 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + serviceAccount: flink + podTemplate: + kind: Pod + spec: + volumes: + - name: my-cluster-cluster-ca-cert + secret: + secretName: my-cluster-cluster-ca-cert + items: + - key: ca.crt + path: ca.crt + containers: + - name: flink-main-container + volumeMounts: + - name: my-cluster-cluster-ca-cert + mountPath: "/opt/my-cluster-cluster-ca-cert" + readOnly: true + env: + - name: SQL_STATEMENTS + value: | + CREATE TABLE SalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094', + 'properties.security.protocol' = 'SSL', + 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', + 'properties.ssl.truststore.type' = 'PEM', + 'properties.ssl.keystore.certificate.chain' = '{{secret:flink/my-user/user.crt}}', + 'properties.ssl.keystore.key' = '{{secret:flink/my-user/user.key}}', + 'properties.ssl.keystore.type' = 'PEM', + 'properties.group.id' = 'sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' + ); + + CREATE TABLE MTlsAuthSalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + PRIMARY KEY (`user_id`) NOT ENFORCED + ) WITH ( + 'connector' = 'upsert-kafka', + 'topic' = 'flink.mtls.auth.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094', + 'properties.security.protocol' = 'SSL', + 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', + 'properties.ssl.truststore.type' = 'PEM', + 'properties.ssl.keystore.certificate.chain' = '{{secret:flink/my-user/user.crt}}', + 'properties.ssl.keystore.key' = '{{secret:flink/my-user/user.key}}', + 'properties.ssl.keystore.type' = 'PEM', + 'properties.client.id' = 'sql-secure-client', + 'properties.transaction.timeout.ms' = '800000', + 'key.format' = 'csv', + 'value.format' = 'csv', + 'value.fields-include' = 'ALL' + ); + + INSERT INTO MTlsAuthSalesRecordTable + SELECT + invoice_id, + user_id, + product_id, + quantity + FROM SalesRecordTable + LIMIT 10; + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 1 + job: + jarURI: local:///opt/streamshub/flink-sql-runner.jar + parallelism: 1 + upgradeMode: stateless diff --git a/tutorials/secure-kafka/selfsigned-ca.yaml b/tutorials/secure-kafka/selfsigned-ca.yaml new file mode 100644 index 0000000..4f26137 --- /dev/null +++ b/tutorials/secure-kafka/selfsigned-ca.yaml @@ -0,0 +1,33 @@ +# Taken from https://cert-manager.io/docs/configuration/selfsigned/ +apiVersion: cert-manager.io/v1 +kind: Issuer +metadata: + name: selfsigned-issuer +spec: + selfSigned: {} + +--- +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + name: my-selfsigned-ca +spec: + isCA: true + commonName: my-selfsigned-ca + secretName: my-selfsigned-ca-secret + privateKey: + algorithm: ECDSA + size: 256 + issuerRef: + name: selfsigned-issuer + kind: Issuer + group: cert-manager.io + +--- +apiVersion: cert-manager.io/v1 +kind: Issuer +metadata: + name: my-selfsigned-ca-issuer +spec: + ca: + secretName: my-selfsigned-ca-secret