From 503d4a2fab7c2369a4bb2f3433ab6fe62134fb49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Tue, 9 Sep 2025 17:54:57 +0100 Subject: [PATCH 1/5] Add internal Flink certificate --- .../secure-flink/flink-internal-cert.yaml | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 tutorials/secure-flink/flink-internal-cert.yaml diff --git a/tutorials/secure-flink/flink-internal-cert.yaml b/tutorials/secure-flink/flink-internal-cert.yaml new file mode 100644 index 0000000..a362142 --- /dev/null +++ b/tutorials/secure-flink/flink-internal-cert.yaml @@ -0,0 +1,27 @@ +apiVersion: v1 +kind: Secret +metadata: + name: flink-internal-cert-password +type: Opaque +stringData: + password: "j62gAn9ervq2" +--- +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + name: flink-internal-cert +spec: + secretName: flink-internal-cert + issuerRef: + name: my-selfsigned-ca-issuer + kind: Issuer + dnsNames: + - '*.flink.svc.cluster.local' + - '*.flink.svc' + commonName: FlinkDeployment + keystores: + pkcs12: + create: true + passwordSecretRef: + name: flink-internal-cert-password + key: password From 5d0e88908249ed0fd694fd68fa51e7f373e91429 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Tue, 9 Sep 2025 17:55:39 +0100 Subject: [PATCH 2/5] Add Flink TLS configuration --- .../standalone-etl-secure-deployment.yaml | 123 ++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 tutorials/secure-flink/tls-rest/standalone-etl-secure-deployment.yaml diff --git a/tutorials/secure-flink/tls-rest/standalone-etl-secure-deployment.yaml b/tutorials/secure-flink/tls-rest/standalone-etl-secure-deployment.yaml new file mode 100644 index 0000000..6adfc3c --- /dev/null +++ b/tutorials/secure-flink/tls-rest/standalone-etl-secure-deployment.yaml @@ -0,0 +1,123 @@ +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: standalone-etl-secure +spec: + image: quay.io/streamshub/flink-sql-runner:main + flinkVersion: v2_0 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + # Internal mTLS + security.ssl.internal.enabled: 'true' + security.ssl.internal.keystore: /opt/flink/flink-internal-cert/keystore.p12 + security.ssl.internal.keystore-password: ${INTERNAL_PASSWORD} + security.ssl.internal.keystore-type: PKCS12 + security.ssl.internal.key-password: ${INTERNAL_PASSWORD} + security.ssl.internal.truststore: /opt/flink/flink-internal-cert/truststore.p12 + security.ssl.internal.truststore-password: ${INTERNAL_PASSWORD} + security.ssl.internal.truststore-type: PKCS12 + # REST API TLS (non-mutual auth) + security.ssl.rest.enabled: 'true' + security.ssl.rest.keystore: /opt/flink/flink-internal-cert/keystore.p12 + security.ssl.rest.keystore-password: ${INTERNAL_PASSWORD} + security.ssl.rest.keystore-type: PKCS12 + security.ssl.rest.key-password: ${INTERNAL_PASSWORD} + security.ssl.rest.truststore: /opt/flink/flink-internal-cert/truststore.p12 + security.ssl.rest.truststore-password: ${INTERNAL_PASSWORD} + security.ssl.rest.truststore-type: PKCS12 + # Secrets + kubernetes.secrets: 'flink-internal-cert:/opt/flink/flink-internal-cert' + serviceAccount: flink + podTemplate: + kind: Pod + spec: + volumes: + - name: my-cluster-cluster-ca-cert + secret: + secretName: my-cluster-cluster-ca-cert + items: + - key: ca.crt + path: ca.crt + - name: my-user + secret: + secretName: my-user + items: + - key: user.p12 + path: user.p12 + containers: + - name: flink-main-container + volumeMounts: + - name: my-cluster-cluster-ca-cert + mountPath: "/opt/my-cluster-cluster-ca-cert" + readOnly: true + - name: my-user + mountPath: "/opt/my-user" + readOnly: true + env: + - name: SQL_STATEMENTS + value: | + CREATE TABLE SalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094', + 'properties.security.protocol' = 'SSL', + 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', + 'properties.ssl.truststore.type' = 'PEM', + 'properties.ssl.keystore.location' = '/opt/my-user/user.p12', + 'properties.ssl.keystore.password' = '{{secret:flink/my-user/user.password}}', + 'properties.group.id' = 'sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' + ); + + CREATE TABLE MTlsAuthSalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + PRIMARY KEY (`user_id`) NOT ENFORCED + ) WITH ( + 'connector' = 'upsert-kafka', + 'topic' = 'flink.mtls.auth.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094', + 'properties.security.protocol' = 'SSL', + 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', + 'properties.ssl.truststore.type' = 'PEM', + 'properties.ssl.keystore.location' = '/opt/my-user/user.p12', + 'properties.ssl.keystore.password' = '{{secret:flink/my-user/user.password}}', + 'properties.client.id' = 'sql-secure-client', + 'properties.transaction.timeout.ms' = '800000', + 'key.format' = 'csv', + 'value.format' = 'csv', + 'value.fields-include' = 'ALL' + ); + + INSERT INTO MTlsAuthSalesRecordTable + SELECT + invoice_id, + user_id, + product_id, + quantity + FROM SalesRecordTable + LIMIT 10; + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 1 + job: + jarURI: local:///opt/streamshub/flink-sql-runner.jar + parallelism: 1 + upgradeMode: stateless From ff8cfc3ebcbb8a4b6ccfdde78e3864a0097cf038 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Tue, 9 Sep 2025 17:56:43 +0100 Subject: [PATCH 3/5] Add Flink proxy configuration and certs --- .../flink-rest-proxy-client-cert.yaml | 27 +++++++++++++++++++ .../flink-rest-proxy-server-cert.yaml | 26 ++++++++++++++++++ ...andalone-etl-secure-rest-mtls-ingress.yaml | 24 +++++++++++++++++ 3 files changed, 77 insertions(+) create mode 100644 tutorials/secure-flink/tls-rest/mtls-proxy/flink-rest-proxy-client-cert.yaml create mode 100644 tutorials/secure-flink/tls-rest/mtls-proxy/flink-rest-proxy-server-cert.yaml create mode 100644 tutorials/secure-flink/tls-rest/mtls-proxy/standalone-etl-secure-rest-mtls-ingress.yaml diff --git a/tutorials/secure-flink/tls-rest/mtls-proxy/flink-rest-proxy-client-cert.yaml b/tutorials/secure-flink/tls-rest/mtls-proxy/flink-rest-proxy-client-cert.yaml new file mode 100644 index 0000000..25c2feb --- /dev/null +++ b/tutorials/secure-flink/tls-rest/mtls-proxy/flink-rest-proxy-client-cert.yaml @@ -0,0 +1,27 @@ +apiVersion: v1 +kind: Secret +metadata: + name: flink-rest-proxy-client-cert-password +type: Opaque +stringData: + password: "l83cJn9ervq4" +--- +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + name: flink-rest-proxy-client-cert +spec: + secretName: flink-rest-proxy-client-cert + issuerRef: + name: my-selfsigned-ca-issuer + kind: Issuer + dnsNames: + - 'standalone-etl-secure-rest-client.flink.svc.cluster.local' + - 'standalone-etl-secure-rest-client.flink.svc' + commonName: FlinkDeployment + keystores: + pkcs12: + create: true + passwordSecretRef: + name: flink-rest-proxy-client-cert-password + key: password diff --git a/tutorials/secure-flink/tls-rest/mtls-proxy/flink-rest-proxy-server-cert.yaml b/tutorials/secure-flink/tls-rest/mtls-proxy/flink-rest-proxy-server-cert.yaml new file mode 100644 index 0000000..4a04101 --- /dev/null +++ b/tutorials/secure-flink/tls-rest/mtls-proxy/flink-rest-proxy-server-cert.yaml @@ -0,0 +1,26 @@ +apiVersion: v1 +kind: Secret +metadata: + name: flink-rest-proxy-server-cert-password +type: Opaque +stringData: + password: "m38gNn9ehvj8" +--- +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + name: flink-rest-proxy-server-cert +spec: + secretName: flink-rest-proxy-server-cert + issuerRef: + name: my-selfsigned-ca-issuer + kind: Issuer + dnsNames: + - 'standalone-etl-secure-rest-mtls-ingress.example' + commonName: FlinkDeployment + keystores: + pkcs12: + create: true + passwordSecretRef: + name: flink-rest-proxy-server-cert-password + key: password diff --git a/tutorials/secure-flink/tls-rest/mtls-proxy/standalone-etl-secure-rest-mtls-ingress.yaml b/tutorials/secure-flink/tls-rest/mtls-proxy/standalone-etl-secure-rest-mtls-ingress.yaml new file mode 100644 index 0000000..f2b0fe5 --- /dev/null +++ b/tutorials/secure-flink/tls-rest/mtls-proxy/standalone-etl-secure-rest-mtls-ingress.yaml @@ -0,0 +1,24 @@ +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: standalone-etl-secure-rest-mtls-ingress + annotations: + nginx.ingress.kubernetes.io/backend-protocol: "HTTPS" + nginx.ingress.kubernetes.io/auth-tls-verify-client: "on" + nginx.ingress.kubernetes.io/auth-tls-secret: flink/flink-rest-proxy-client-cert +spec: + tls: + - hosts: + - standalone-etl-secure-rest-mtls-ingress.example + secretName: flink-rest-proxy-server-cert + rules: + - host: standalone-etl-secure-rest-mtls-ingress.example + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: standalone-etl-secure-rest + port: + number: 8081 From dba74179a316133956384c9a6a34d90b3b3d50a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Tue, 9 Sep 2025 17:57:14 +0100 Subject: [PATCH 4/5] Add Flink mTLS configuration --- .../mtls-rest/flink-rest-client-cert.yaml | 27 ++++ .../mtls-rest/flink-rest-server-cert.yaml | 27 ++++ .../standalone-etl-secure-deployment.yaml | 124 ++++++++++++++++++ .../standalone-etl-secure-rest-client.yaml | 33 +++++ 4 files changed, 211 insertions(+) create mode 100644 tutorials/secure-flink/mtls-rest/flink-rest-client-cert.yaml create mode 100644 tutorials/secure-flink/mtls-rest/flink-rest-server-cert.yaml create mode 100644 tutorials/secure-flink/mtls-rest/standalone-etl-secure-deployment.yaml create mode 100644 tutorials/secure-flink/mtls-rest/standalone-etl-secure-rest-client.yaml diff --git a/tutorials/secure-flink/mtls-rest/flink-rest-client-cert.yaml b/tutorials/secure-flink/mtls-rest/flink-rest-client-cert.yaml new file mode 100644 index 0000000..4a37109 --- /dev/null +++ b/tutorials/secure-flink/mtls-rest/flink-rest-client-cert.yaml @@ -0,0 +1,27 @@ +apiVersion: v1 +kind: Secret +metadata: + name: flink-rest-client-cert-password +type: Opaque +stringData: + password: "l83cJn9ervq4" +--- +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + name: flink-rest-client-cert +spec: + secretName: flink-rest-client-cert + issuerRef: + name: my-selfsigned-ca-issuer + kind: Issuer + dnsNames: + - 'standalone-etl-secure-rest-client.flink.svc.cluster.local' + - 'standalone-etl-secure-rest-client.flink.svc' + commonName: FlinkDeployment + keystores: + pkcs12: + create: true + passwordSecretRef: + name: flink-rest-client-cert-password + key: password diff --git a/tutorials/secure-flink/mtls-rest/flink-rest-server-cert.yaml b/tutorials/secure-flink/mtls-rest/flink-rest-server-cert.yaml new file mode 100644 index 0000000..546269b --- /dev/null +++ b/tutorials/secure-flink/mtls-rest/flink-rest-server-cert.yaml @@ -0,0 +1,27 @@ +apiVersion: v1 +kind: Secret +metadata: + name: flink-rest-server-cert-password +type: Opaque +stringData: + password: "m38gNn9ehvj8" +--- +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + name: flink-rest-server-cert +spec: + secretName: flink-rest-server-cert + issuerRef: + name: my-selfsigned-ca-issuer + kind: Issuer + dnsNames: + - 'standalone-etl-secure-rest.flink.svc.cluster.local' + - 'standalone-etl-secure-rest.flink.svc' + commonName: FlinkDeployment + keystores: + pkcs12: + create: true + passwordSecretRef: + name: flink-rest-server-cert-password + key: password diff --git a/tutorials/secure-flink/mtls-rest/standalone-etl-secure-deployment.yaml b/tutorials/secure-flink/mtls-rest/standalone-etl-secure-deployment.yaml new file mode 100644 index 0000000..3c45cdc --- /dev/null +++ b/tutorials/secure-flink/mtls-rest/standalone-etl-secure-deployment.yaml @@ -0,0 +1,124 @@ +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: standalone-etl-secure +spec: + image: quay.io/streamshub/flink-sql-runner:main + flinkVersion: v2_0 + flinkConfiguration: + taskmanager.numberOfTaskSlots: "1" + # Internal mTLS + security.ssl.internal.enabled: 'true' + security.ssl.internal.keystore: /opt/flink/flink-internal-cert/keystore.p12 + security.ssl.internal.keystore-password: ${INTERNAL_PASSWORD} + security.ssl.internal.keystore-type: PKCS12 + security.ssl.internal.key-password: ${INTERNAL_PASSWORD} + security.ssl.internal.truststore: /opt/flink/flink-internal-cert/truststore.p12 + security.ssl.internal.truststore-password: ${INTERNAL_PASSWORD} + security.ssl.internal.truststore-type: PKCS12 + # REST API mTLS + security.ssl.rest.enabled: 'true' + security.ssl.rest.authentication-enabled: 'true' + security.ssl.rest.keystore: /opt/flink/flink-rest-server-cert/keystore.p12 + security.ssl.rest.keystore-password: ${REST_SERVER_KEYSTORE_PASSWORD} + security.ssl.rest.keystore-type: PKCS12 + security.ssl.rest.key-password: ${REST_SERVER_KEYSTORE_PASSWORD} + security.ssl.rest.truststore: /opt/flink/flink-rest-client-cert/truststore.p12 + security.ssl.rest.truststore-password: ${REST_CLIENT_TRUSTSTORE_PASSWORD} + security.ssl.rest.truststore-type: PKCS12 + # Secrets + kubernetes.secrets: 'flink-internal-cert:/opt/flink/flink-internal-cert,flink-rest-server-cert:/opt/flink/flink-rest-server-cert,flink-rest-client-cert:/opt/flink/flink-rest-client-cert' + serviceAccount: flink + podTemplate: + kind: Pod + spec: + volumes: + - name: my-cluster-cluster-ca-cert + secret: + secretName: my-cluster-cluster-ca-cert + items: + - key: ca.crt + path: ca.crt + - name: my-user + secret: + secretName: my-user + items: + - key: user.p12 + path: user.p12 + containers: + - name: flink-main-container + volumeMounts: + - name: my-cluster-cluster-ca-cert + mountPath: "/opt/my-cluster-cluster-ca-cert" + readOnly: true + - name: my-user + mountPath: "/opt/my-user" + readOnly: true + env: + - name: SQL_STATEMENTS + value: | + CREATE TABLE SalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + unit_cost STRING, + `purchase_time` TIMESTAMP(3) METADATA FROM 'timestamp', + WATERMARK FOR purchase_time AS purchase_time - INTERVAL '1' SECOND + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'flink.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094', + 'properties.security.protocol' = 'SSL', + 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', + 'properties.ssl.truststore.type' = 'PEM', + 'properties.ssl.keystore.location' = '/opt/my-user/user.p12', + 'properties.ssl.keystore.password' = '{{secret:flink/my-user/user.password}}', + 'properties.group.id' = 'sales-record-group', + 'value.format' = 'avro-confluent', + 'value.avro-confluent.url' = 'http://apicurio-registry-service.flink.svc:8080/apis/ccompat/v6', + 'scan.startup.mode' = 'latest-offset' + ); + + CREATE TABLE MTlsAuthSalesRecordTable ( + invoice_id STRING, + user_id STRING, + product_id STRING, + quantity STRING, + PRIMARY KEY (`user_id`) NOT ENFORCED + ) WITH ( + 'connector' = 'upsert-kafka', + 'topic' = 'flink.mtls.auth.sales.records', + 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap.flink.svc:9094', + 'properties.security.protocol' = 'SSL', + 'properties.ssl.truststore.location' = '/opt/my-cluster-cluster-ca-cert/ca.crt', + 'properties.ssl.truststore.type' = 'PEM', + 'properties.ssl.keystore.location' = '/opt/my-user/user.p12', + 'properties.ssl.keystore.password' = '{{secret:flink/my-user/user.password}}', + 'properties.client.id' = 'sql-secure-client', + 'properties.transaction.timeout.ms' = '800000', + 'key.format' = 'csv', + 'value.format' = 'csv', + 'value.fields-include' = 'ALL' + ); + + INSERT INTO MTlsAuthSalesRecordTable + SELECT + invoice_id, + user_id, + product_id, + quantity + FROM SalesRecordTable + LIMIT 10; + jobManager: + resource: + memory: "2048m" + cpu: 1 + taskManager: + resource: + memory: "2048m" + cpu: 1 + job: + jarURI: local:///opt/streamshub/flink-sql-runner.jar + parallelism: 1 + upgradeMode: stateless diff --git a/tutorials/secure-flink/mtls-rest/standalone-etl-secure-rest-client.yaml b/tutorials/secure-flink/mtls-rest/standalone-etl-secure-rest-client.yaml new file mode 100644 index 0000000..a336d1a --- /dev/null +++ b/tutorials/secure-flink/mtls-rest/standalone-etl-secure-rest-client.yaml @@ -0,0 +1,33 @@ +apiVersion: v1 +kind: Pod +metadata: + name: standalone-etl-secure-rest-client +spec: + containers: + - name: curl + image: quay.io/curl/curl:8.15.0 + # Wait forever so a user can exec into the container and run their own commands + command: [ "/bin/sh", "-c", "--" ] + args: [ "while true; do sleep 5; done;" ] + volumeMounts: + - name: flink-rest-server-cert + mountPath: "/opt/flink-rest-server-cert" + readOnly: true + - name: flink-rest-client-cert + mountPath: "/opt/flink-rest-client-cert" + readOnly: true + volumes: + - name: flink-rest-server-cert + secret: + secretName: flink-rest-server-cert + items: + - key: ca.crt + path: ca.crt + - name: flink-rest-client-cert + secret: + secretName: flink-rest-client-cert + items: + - key: tls.crt + path: tls.crt + - key: tls.key + path: tls.key From 7fc5766c1a28ab1488b05f4ec2137ce839bf2991 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20P=C5=82aczek?= Date: Tue, 9 Sep 2025 17:57:28 +0100 Subject: [PATCH 5/5] Add Secure Flink docs --- docs/secure-flink/_index.md | 351 ++++++++++++++++++++++++++++++++++++ 1 file changed, 351 insertions(+) create mode 100644 docs/secure-flink/_index.md diff --git a/docs/secure-flink/_index.md b/docs/secure-flink/_index.md new file mode 100644 index 0000000..fa11951 --- /dev/null +++ b/docs/secure-flink/_index.md @@ -0,0 +1,351 @@ ++++ +title = 'Securing Flink components and its REST API' ++++ + +> Note: This tutorial is mainly focused on securing Flink components and its REST API. +> +> For detailed information on working with [Flink ETL Jobs](https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/learn-flink/etl/) and [Session Clusters](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#session-cluster-deployments), look at the [Interactive ETL example](../interactive-etl/_index.md). +> +> For detailed information on connecting to Apache Kafka securely using Flink SQL, look at the [Secure Kafka example](../secure-kafka/_index.md). + +[Flink SQL](https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/table/overview/) is a powerful tool for data exploration, manipulation and inter-connection. +It allows you to access the power of Flink's distributed stream processing abilities with a familiar interface. +In this tutorial, we go over how to secure the communications between internal Flink components, and also how to secure communications between the Flink [JobManager REST API](https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/ops/rest_api/) and clients. + +We will do this by enabling TLS authentication on both the internal Flink components and the Flink REST API. +For internal Flink components, only mutual authentication (mTLS) is available. However, for the REST API, both TLS and mTLS are available (TLS is recommended, more details below). + +The tutorial is based on the StreamsHub [Flink SQL Examples](https://github.com/streamshub/flink-sql-examples) repository and the code can be found under the [`tutorials/secure-flink`](https://github.com/streamshub/flink-sql-examples/tree/main/tutorials/secure-flink) directory. + +> Note: +> - All the commands below are meant to be run from the `tutorials` directory. +> - Only relevant lines are included in the code blocks. +> - The `tutorials/secure-flink` directory contains the complete files. +> - For greater detail on what is covered in this tutorial, you can read the following: +> - [“SSL Setup” section in the Apache Flink documentation](https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/deployment/security/security-ssl/) +> - ["Flink TLS Example" in the Apache Flink Kubernetes Operator repository](https://github.com/apache/flink-kubernetes-operator/tree/release-1.12/examples/flink-tls-example) + +## Internal mTLS, REST TLS + +This is the default when TLS is enabled on both the internal Flink components and the REST API. + +Set up the demo application: + +```shell +# Note: This sets up a Kafka cluster with an mTLS listener, Flink, +# recommendation-app (generates example data), etc. +# Note: This enables mTLS between Flink SQL and Kafka. This isn't +# related to internal Flink components and/or the REST API. +SECURE_KAFKA=mTLS ./scripts/data-gen-setup.sh + +# Note: This creates a self-signed CA, TLS certificate, and Secret with the truststore/keystore password +# (we use this for both the internal Flink components and the REST API) +kubectl -n flink apply -f secure-kafka/selfsigned-ca.yaml +kubectl -n flink apply -f secure-flink/flink-internal-cert.yaml + +# Get the password for the internal truststore and keystore +export INTERNAL_PASSWORD=` + kubectl -n flink get secret flink-internal-cert-password \ + -o jsonpath='{.data.password}' | base64 --decode + ` + +# Note: This creates a standalone Flink job that connects to the mTLS Kafka listener +# and copies 10 records from an existing topic to a newly created one +cat secure-flink/tls-rest/standalone-etl-secure-deployment.yaml | envsubst | kubectl -n flink apply -f - +``` + +`standalone-etl-secure-deployment.yaml` contains the following configuration: + +```yaml +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: standalone-etl-secure +spec: + flinkConfiguration: + # Internal mTLS (mutual auth between Flink components) + security.ssl.internal.enabled: 'true' + + security.ssl.internal.keystore: /opt/flink/flink-internal-cert/keystore.p12 + security.ssl.internal.keystore-password: ${INTERNAL_PASSWORD} + security.ssl.internal.keystore-type: PKCS12 + + security.ssl.internal.key-password: ${INTERNAL_PASSWORD} + + security.ssl.internal.truststore: /opt/flink/flink-internal-cert/truststore.p12 + security.ssl.internal.truststore-password: ${INTERNAL_PASSWORD} + security.ssl.internal.truststore-type: PKCS12 + + # REST API TLS (non-mutual auth) + security.ssl.rest.enabled: 'true' + + security.ssl.rest.keystore: /opt/flink/flink-internal-cert/keystore.p12 + security.ssl.rest.keystore-password: ${INTERNAL_PASSWORD} + security.ssl.rest.keystore-type: PKCS12 + + security.ssl.rest.key-password: ${INTERNAL_PASSWORD} + + security.ssl.rest.truststore: /opt/flink/flink-internal-cert/truststore.p12 + security.ssl.rest.truststore-password: ${INTERNAL_PASSWORD} + security.ssl.rest.truststore-type: PKCS12 + + # Secrets + kubernetes.secrets: 'flink-internal-cert:/opt/flink/flink-internal-cert' +``` + +You can verify that the demo is working by doing the following: + +- Port forward the Flink Job Manager pod so we can access it: + + ```shell + kubectl -n flink port-forward 8081:8081 + ``` + + The job manager pod will have the name format `standalone-etl-secure-`, your `kubectl` should tab-complete the name. + If it doesn't then you can find the job manager name by running `kubectl -n flink get pods`. + +- Verify the TLS certificate is being sent: + + ```shell + # Display details about the certificate + openssl s_client -showcerts -connect localhost:8081 Note: We pass the `-k` flag to connect insecurely for now. In the other sections, we go over how to verify the CA when connecting. + +### REST mTLS proxy + +[Using a proxy with mTLS is recommended instead of enabling mTLS on the REST API](https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/deployment/security/security-ssl/#external--rest-connectivity), since proxies typically provide more authentication options and integrate better with existing infrastructure. + +We will follow this approach by exposing Flink's REST API `Service` (with TLS enabled) using an `Ingress` (with mTLS enabled). +[There are many ingress controllers available](https://kubernetes.io/docs/concepts/services-networking/ingress-controllers/); we will use [Ingress NGINX Controller](https://github.com/kubernetes/ingress-nginx) because of its popularity. + +[Install the ingress controller](https://kubernetes.github.io/ingress-nginx/deploy/): + +```shell +# If using a real cluster: +kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/controller-v1.13.2/deploy/static/provider/cloud/deploy.yaml + +# If using minikube: +minikube addons enable ingress +``` + +Create TLS certificates for the proxy server and client: + +```shell +kubectl -n flink apply -f secure-flink/tls-rest/mtls-proxy/flink-rest-proxy-server-cert.yaml +kubectl -n flink apply -f secure-flink/tls-rest/mtls-proxy/flink-rest-proxy-client-cert.yaml +``` + +Create the `Ingress` / proxy: + +```shell +kubectl -n flink apply -f secure-flink/tls-rest/mtls-proxy/standalone-etl-secure-rest-mtls-ingress.yaml +``` + +The `Ingress` is configured as follows: + +> Note: For greater detail on mTLS and Ingress NGINX Controller, read the following: +> - ["Client Certificate Authentication" section in the Ingress NGINX Controller documentation](https://kubernetes.github.io/ingress-nginx/user-guide/nginx-configuration/annotations/#client-certificate-authentication) +> - ["TLS/HTTPS" section in the Ingress NGINX Controller documentation](https://kubernetes.github.io/ingress-nginx/user-guide/tls/) + +```shell +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: standalone-etl-secure-rest-mtls-ingress + annotations: + # Use HTTPS between Ingress and REST API (since TLS is enabled for the REST API) + nginx.ingress.kubernetes.io/backend-protocol: "HTTPS" + # Enable client authentication + nginx.ingress.kubernetes.io/auth-tls-verify-client: "on" + # Specify Secret containing our client certificate + nginx.ingress.kubernetes.io/auth-tls-secret: flink/flink-rest-proxy-client-cert +spec: + # Enable HTTPS/TLS for our Ingress + tls: + - hosts: + - standalone-etl-secure-rest-mtls-ingress.example + # Specify Secret containing our proxy's TLS certificate + secretName: flink-rest-proxy-server-cert + rules: + # Point Ingress to Flink's REST API Service + - host: standalone-etl-secure-rest-mtls-ingress.example + http: + paths: + - path: / + pathType: Prefix + backend: + service: + name: standalone-etl-secure-rest + port: + number: 8081 +``` + +We can connect to the proxy by doing the following: + +```shell +# Generate tmp directories for the certs +mkdir /tmp/flink-rest-proxy-server-cert +mkdir /tmp/flink-rest-proxy-client-cert + +# Get the proxy's CA cert so we can authenticate the proxy +kubectl -n flink get secret flink-rest-proxy-server-cert \ + -o jsonpath='{.data.ca\.crt}' | base64 --decode \ + > /tmp/flink-rest-proxy-server-cert/ca.crt + +# Get the client's (our) cert so the proxy can authenticate us +kubectl -n flink get secret flink-rest-proxy-client-cert \ + -o jsonpath='{.data.tls\.crt}' | base64 --decode \ + > /tmp/flink-rest-proxy-client-cert/tls.crt + +# Get the client's (our) cert key +kubectl -n flink get secret flink-rest-proxy-client-cert \ + -o jsonpath='{.data.tls\.key}' | base64 --decode \ + > /tmp/flink-rest-proxy-client-cert/tls.key + +# (If using minikube) tunnel so we can reach the Ingress +minikube tunnel + +# Define cURL params for connecting to proxy securely +CURL_PROXY_PARAMS=( + # We don't care about the progress meter + --silent + # Pass our cert so the proxy can authenticate us + --cert /tmp/flink-rest-proxy-client-cert/tls.crt + --key /tmp/flink-rest-proxy-client-cert/tls.key + # Authenticate the proxy using the proxy's cert + --cacert /tmp/flink-rest-proxy-server-cert/ca.crt + # If not using minikube, remove this parameter (it tells cURL to resolve our domain to minikube's IP). + --resolve "standalone-etl-secure-rest-mtls-ingress.example:443:$( minikube ip )" +) + +# Get job ID +RUNNING_JOB_ID=$(curl "${CURL_PROXY_PARAMS[@]}" https://standalone-etl-secure-rest-mtls-ingress.example/jobs/ | \ + jq -r '.jobs[] | select(.status == "RUNNING") | .id') + +# Verify 10 records were written +curl "${CURL_PROXY_PARAMS[@]}" https://standalone-etl-secure-rest-mtls-ingress.example/jobs/$RUNNING_JOB_ID | \ + jq '.vertices[] | select(.name | contains("Writer")) | + { + "write-records": .metrics."write-records", + "write-records-complete": .metrics."write-records-complete" + }' +``` + +## Internal mTLS, REST mTLS + +> Note: As mentioned/performed above, using a proxy for mTLS is recommended instead of the following. + +Set up the demo application: + +```shell +SECURE_KAFKA=mTLS ./scripts/data-gen-setup.sh + +kubectl -n flink apply -f secure-kafka/selfsigned-ca.yaml +kubectl -n flink apply -f secure-flink/flink-internal-cert.yaml + +# Create TLS certificates for the REST API server and client +kubectl -n flink apply -f secure-flink/mtls-rest/flink-rest-server-cert.yaml +kubectl -n flink apply -f secure-flink/mtls-rest/flink-rest-client-cert.yaml + +# Get the password for the internal truststore and keystore +export INTERNAL_PASSWORD=` + kubectl -n flink get secret flink-internal-cert-password \ + -o jsonpath='{.data.password}' | base64 --decode + ` + +# Get the password for the REST API keystore +export REST_SERVER_KEYSTORE_PASSWORD=` + kubectl -n flink get secret flink-rest-server-cert-password \ + -o jsonpath='{.data.password}' | base64 --decode + ` + +# Get the password for the REST API truststore +export REST_CLIENT_TRUSTSTORE_PASSWORD=` + kubectl -n flink get secret flink-rest-client-cert-password \ + -o jsonpath='{.data.password}' | base64 --decode + ` + +cat secure-flink/mtls-rest/standalone-etl-secure-deployment.yaml | envsubst | kubectl -n flink apply -f - +``` + +`standalone-etl-secure-deployment.yaml` contains the following configuration: + +```yaml +apiVersion: flink.apache.org/v1beta1 +kind: FlinkDeployment +metadata: + name: standalone-etl-secure +spec: + flinkConfiguration: + # Internal mTLS + # ...same as TLS example above + + # REST API mTLS (mutual auth) + security.ssl.rest.enabled: 'true' + + security.ssl.rest.authentication-enabled: 'true' # Enable mutual authentication + + security.ssl.rest.keystore: /opt/flink/flink-rest-server-cert/keystore.p12 + security.ssl.rest.keystore-password: ${REST_SERVER_KEYSTORE_PASSWORD} + security.ssl.rest.keystore-type: PKCS12 + + security.ssl.rest.key-password: ${REST_SERVER_KEYSTORE_PASSWORD} + + security.ssl.rest.truststore: /opt/flink/flink-rest-client-cert/truststore.p12 + security.ssl.rest.truststore-password: ${REST_CLIENT_TRUSTSTORE_PASSWORD} + security.ssl.rest.truststore-type: PKCS12 + + # Secrets + kubernetes.secrets: 'flink-internal-cert:/opt/flink/flink-internal-cert,flink-rest-server-cert:/opt/flink/flink-rest-server-cert,flink-rest-client-cert:/opt/flink/flink-rest-client-cert' +``` + +We can connect to the REST API by doing the following: + +```shell +# Create a pod containing cURL and the relevant certificates mounted +kubectl -n flink apply -f secure-flink/mtls-rest/standalone-etl-secure-rest-client.yaml + +# Get a shell to the pod's container +kubectl -n flink exec -it standalone-etl-secure-rest-client -- /bin/sh + +# Define cURL params for connecting to REST API securely +CURL_MTLS_REST_PARAMS=( + # We don't care about the progress meter + --silent + # Pass our cert so the server can authenticate us + --cert /opt/flink-rest-client-cert/tls.crt + --key /opt/flink-rest-client-cert/tls.key + # Authenticate the server using the server's cert + --cacert /opt/flink-rest-server-cert/ca.crt +) + +# Get job ID +RUNNING_JOB_ID=$(curl "${CURL_MTLS_REST_PARAMS[@]}" https://standalone-etl-secure-rest.flink.svc:8081/jobs/ | \ + jq -r '.jobs[] | select(.status == "RUNNING") | .id') + +# Verify 10 records were written +curl "${CURL_MTLS_REST_PARAMS[@]}" https://standalone-etl-secure-rest.flink.svc:8081/jobs/$RUNNING_JOB_ID | \ + jq '.vertices[] | select(.name | contains("Writer")) | + { + "write-records": .metrics."write-records", + "write-records-complete": .metrics."write-records-complete" + }' +``` \ No newline at end of file