diff --git a/demos/airflow-scheduled-job/01-airflow-demo-clusterrole.yaml b/demos/airflow-scheduled-job/01-airflow-demo-clusterrole.yaml new file mode 100644 index 00000000..bf4c978e --- /dev/null +++ b/demos/airflow-scheduled-job/01-airflow-demo-clusterrole.yaml @@ -0,0 +1,42 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: airflow-demo-clusterrole +rules: +- apiGroups: + - spark.stackable.tech + resources: + - sparkapplications + verbs: + - create + - get + - list +- apiGroups: + - apps + resources: + - statefulsets + verbs: + - get + - watch + - list +- apiGroups: + - "" + resources: + - persistentvolumeclaims + verbs: + - list +- apiGroups: + - "" + resources: + - pods + verbs: + - get + - watch + - list +- apiGroups: + - "" + resources: + - pods/exec + verbs: + - create diff --git a/demos/airflow-scheduled-job/01-airflow-spark-clusterrole.yaml b/demos/airflow-scheduled-job/01-airflow-spark-clusterrole.yaml deleted file mode 100644 index d63b6ec9..00000000 --- a/demos/airflow-scheduled-job/01-airflow-spark-clusterrole.yaml +++ /dev/null @@ -1,36 +0,0 @@ ---- -apiVersion: rbac.authorization.k8s.io/v1 -kind: ClusterRole -metadata: - name: airflow-spark-clusterrole -rules: -- apiGroups: - - spark.stackable.tech - resources: - - sparkapplications - verbs: - - create - - get - - list -- apiGroups: - - airflow.stackable.tech - resources: - - airflowdbs - verbs: - - create - - get - - list -- apiGroups: - - apps - resources: - - statefulsets - verbs: - - get - - watch - - list -- apiGroups: - - "" - resources: - - persistentvolumeclaims - verbs: - - list diff --git a/demos/airflow-scheduled-job/02-airflow-spark-clusterrolebinding.yaml b/demos/airflow-scheduled-job/02-airflow-demo-clusterrolebinding.yaml similarity index 75% rename from demos/airflow-scheduled-job/02-airflow-spark-clusterrolebinding.yaml rename to demos/airflow-scheduled-job/02-airflow-demo-clusterrolebinding.yaml index d3441a4c..1eccf63e 100644 --- a/demos/airflow-scheduled-job/02-airflow-spark-clusterrolebinding.yaml +++ b/demos/airflow-scheduled-job/02-airflow-demo-clusterrolebinding.yaml @@ -2,11 +2,11 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: - name: airflow-spark-clusterrole-binding + name: airflow-demo-clusterrole-binding roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole - name: airflow-spark-clusterrole + name: airflow-demo-clusterrole subjects: - apiGroup: rbac.authorization.k8s.io kind: Group diff --git a/demos/airflow-scheduled-job/03-enable-and-run-spark-dag.yaml b/demos/airflow-scheduled-job/03-enable-and-run-spark-dag.yaml index ed7edd5b..124f4c66 100644 --- a/demos/airflow-scheduled-job/03-enable-and-run-spark-dag.yaml +++ b/demos/airflow-scheduled-job/03-enable-and-run-spark-dag.yaml @@ -9,10 +9,12 @@ spec: containers: - name: start-pyspark-job image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev - # N.B. it is possible for the scheduler to report that a DAG exists, only for the worker task to fail if a pod is unexpectedly - # restarted. Additionally, the db-init job takes a few minutes to complete before the cluster is deployed. The wait/watch steps - # below are not "water-tight" but add a layer of stability by at least ensuring that the db is initialized and ready and that - # all pods are reachable (albeit independent of each other). + # N.B. it is possible for the scheduler to report that a DAG exists, + # only for the worker task to fail if a pod is unexpectedly + # restarted. The wait/watch steps below are not "water-tight" but add + # a layer of stability by at least ensuring that the cluster is + # initialized and ready and that all pods are reachable (albeit + # independent of each other). command: - bash - -euo diff --git a/demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml b/demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml index 08eaccf3..78ec737d 100644 --- a/demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml +++ b/demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml @@ -9,10 +9,12 @@ spec: containers: - name: start-date-job image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev - # N.B. it is possible for the scheduler to report that a DAG exists, only for the worker task to fail if a pod is unexpectedly - # restarted. Additionally, the db-init job takes a few minutes to complete before the cluster is deployed. The wait/watch steps - # below are not "water-tight" but add a layer of stability by at least ensuring that the db is initialized and ready and that - # all pods are reachable (albeit independent of each other). + # N.B. it is possible for the scheduler to report that a DAG exists, + # only for the worker task to fail if a pod is unexpectedly + # restarted. The wait/watch steps below are not "water-tight" but add + # a layer of stability by at least ensuring that the cluster is + # initialized and ready and that all pods are reachable (albeit + # independent of each other). command: - bash - -euo diff --git a/demos/airflow-scheduled-job/05-enable-and-run-kafka-dag.yaml b/demos/airflow-scheduled-job/05-enable-and-run-kafka-dag.yaml new file mode 100644 index 00000000..2cbcb2b4 --- /dev/null +++ b/demos/airflow-scheduled-job/05-enable-and-run-kafka-dag.yaml @@ -0,0 +1,68 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: start-kafka-job +spec: + template: + spec: + containers: + - name: start-kafka-job + image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev + env: + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + # N.B. it is possible for the scheduler to report that a DAG exists, + # only for the worker task to fail if a pod is unexpectedly + # restarted. The wait/watch steps below are not "water-tight" but add + # a layer of stability by at least ensuring that the cluster is + # initialized and ready and that all pods are reachable (albeit + # independent of each other). + command: + - bash + - -euo + - pipefail + - -c + - | + # Kafka: wait for cluster + kubectl rollout status --watch statefulset/kafka-broker-default + kubectl rollout status --watch statefulset/kafka-controller-default + + # Kafka: create consumer offsets topics (required for group coordinator) + kubectl exec kafka-broker-default-0 -c kafka -- \ + /stackable/kafka/bin/kafka-topics.sh \ + --bootstrap-server kafka-broker-default-0-listener-broker.$(NAMESPACE).svc.cluster.local:9093 \ + --create \ + --if-not-exists \ + --topic __consumer_offsets \ + --partitions 50 \ + --replication-factor 1 \ + --config cleanup.policy=compact \ + --command-config /stackable/config/client.properties + + # Airflow: wait for cluster + kubectl rollout status --watch statefulset/airflow-webserver-default + kubectl rollout status --watch statefulset/airflow-scheduler-default + + # Airflow: activate DAG + AIRFLOW_ADMIN_PASSWORD=$(cat /airflow-credentials/adminUser.password) + ACCESS_TOKEN=$(curl -XPOST http://airflow-webserver-default-headless:8080/auth/token -H 'Content-Type: application/json' -d '{"username": "admin", "password": "'$AIRFLOW_ADMIN_PASSWORD'"}' | jq -r .access_token) + curl -H "Authorization: Bearer $ACCESS_TOKEN" -H 'Content-Type: application/json' -XPATCH http://airflow-webserver-default-headless:8080/api/v2/dags/kafka_watcher -d '{"is_paused": false}' | jq + + # Kafka: produce a message to create the topic + kubectl exec kafka-broker-default-0 -c kafka -- bash -c \ + 'echo "Hello World at: $(date)" | /stackable/kafka/bin/kafka-console-producer.sh \ + --bootstrap-server kafka-broker-default-0-listener-broker.$(NAMESPACE).svc.cluster.local:9093 \ + --producer.config /stackable/config/client.properties \ + --topic test-topic' + volumeMounts: + - name: airflow-credentials + mountPath: /airflow-credentials + volumes: + - name: airflow-credentials + secret: + secretName: airflow-credentials + restartPolicy: OnFailure + backoffLimit: 20 # give some time for the Airflow cluster to be available diff --git a/demos/airflow-scheduled-job/06-create-opa-users.yaml b/demos/airflow-scheduled-job/06-create-opa-users.yaml new file mode 100644 index 00000000..f9834cc9 --- /dev/null +++ b/demos/airflow-scheduled-job/06-create-opa-users.yaml @@ -0,0 +1,52 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: start-users-job +spec: + template: + spec: + containers: + - name: start-users-job + image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev + # N.B. it is possible for the scheduler to report that a DAG exists, + # only for the worker task to fail if a pod is unexpectedly + # restarted. The wait/watch steps below are not "water-tight" but add + # a layer of stability by at least ensuring that the cluster is + # initialized and ready and that all pods are reachable (albeit + # independent of each other). + command: + - bash + - -euo + - pipefail + - -c + - | + # Airflow: wait for cluster + kubectl rollout status --watch statefulset/airflow-webserver-default + kubectl rollout status --watch statefulset/airflow-scheduler-default + + # Airflow: create users + kubectl exec airflow-webserver-default-0 -- airflow users create \ + --username "jane.doe" \ + --firstname "Jane" \ + --lastname "Doe" \ + --email "jane.doe@stackable.tech" \ + --password "jane.doe" \ + --role "User" + + kubectl exec airflow-webserver-default-0 -- airflow users create \ + --username "richard.roe" \ + --firstname "Richard" \ + --lastname "Roe" \ + --email "richard.roe@stackable.tech" \ + --password "richard.roe" \ + --role "User" + volumeMounts: + - name: airflow-credentials + mountPath: /airflow-credentials + volumes: + - name: airflow-credentials + secret: + secretName: airflow-credentials + restartPolicy: OnFailure + backoffLimit: 20 # give some time for the Airflow cluster to be available diff --git a/demos/demos-v2.yaml b/demos/demos-v2.yaml index edc7a790..9b1c6901 100644 --- a/demos/demos-v2.yaml +++ b/demos/demos-v2.yaml @@ -46,10 +46,12 @@ demos: - airflow - job-scheduling manifests: - - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/01-airflow-spark-clusterrole.yaml - - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/02-airflow-spark-clusterrolebinding.yaml + - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/01-airflow-demo-clusterrole.yaml + - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/02-airflow-demo-clusterrolebinding.yaml - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/03-enable-and-run-spark-dag.yaml - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml + - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/05-enable-and-run-kafka-dag.yaml + - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/demos/airflow-scheduled-job/06-create-opa-users.yaml supportedNamespaces: [] resourceRequests: cpu: 2401m diff --git a/docs/modules/demos/images/airflow-scheduled-job/airflow_12.png b/docs/modules/demos/images/airflow-scheduled-job/airflow_12.png new file mode 100644 index 00000000..bca6b5ef Binary files /dev/null and b/docs/modules/demos/images/airflow-scheduled-job/airflow_12.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/airflow_13.png b/docs/modules/demos/images/airflow-scheduled-job/airflow_13.png new file mode 100644 index 00000000..4a83c168 Binary files /dev/null and b/docs/modules/demos/images/airflow-scheduled-job/airflow_13.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/airflow_14.png b/docs/modules/demos/images/airflow-scheduled-job/airflow_14.png new file mode 100644 index 00000000..51f3527a Binary files /dev/null and b/docs/modules/demos/images/airflow-scheduled-job/airflow_14.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/airflow_15.png b/docs/modules/demos/images/airflow-scheduled-job/airflow_15.png new file mode 100644 index 00000000..969b5fa8 Binary files /dev/null and b/docs/modules/demos/images/airflow-scheduled-job/airflow_15.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/airflow_16.png b/docs/modules/demos/images/airflow-scheduled-job/airflow_16.png new file mode 100644 index 00000000..c142b674 Binary files /dev/null and b/docs/modules/demos/images/airflow-scheduled-job/airflow_16.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/airflow_17.png b/docs/modules/demos/images/airflow-scheduled-job/airflow_17.png new file mode 100644 index 00000000..4ae7a134 Binary files /dev/null and b/docs/modules/demos/images/airflow-scheduled-job/airflow_17.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/airflow_2.png b/docs/modules/demos/images/airflow-scheduled-job/airflow_2.png index d9ed51ba..c86204d7 100644 Binary files a/docs/modules/demos/images/airflow-scheduled-job/airflow_2.png and b/docs/modules/demos/images/airflow-scheduled-job/airflow_2.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/airflow_3.png b/docs/modules/demos/images/airflow-scheduled-job/airflow_3.png index d4060b4e..771d3da7 100644 Binary files a/docs/modules/demos/images/airflow-scheduled-job/airflow_3.png and b/docs/modules/demos/images/airflow-scheduled-job/airflow_3.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/airflow_4.png b/docs/modules/demos/images/airflow-scheduled-job/airflow_4.png deleted file mode 100644 index 3dff1b3b..00000000 Binary files a/docs/modules/demos/images/airflow-scheduled-job/airflow_4.png and /dev/null differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/deferrable_01_running.png b/docs/modules/demos/images/airflow-scheduled-job/deferrable_01_running.png new file mode 100644 index 00000000..2306fcfb Binary files /dev/null and b/docs/modules/demos/images/airflow-scheduled-job/deferrable_01_running.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/deferrable_02_queued.png b/docs/modules/demos/images/airflow-scheduled-job/deferrable_02_queued.png new file mode 100644 index 00000000..a59e36c1 Binary files /dev/null and b/docs/modules/demos/images/airflow-scheduled-job/deferrable_02_queued.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/deferrable_03_deferred.png b/docs/modules/demos/images/airflow-scheduled-job/deferrable_03_deferred.png new file mode 100644 index 00000000..c3bbdfdd Binary files /dev/null and b/docs/modules/demos/images/airflow-scheduled-job/deferrable_03_deferred.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/deferrable_04_queued.png b/docs/modules/demos/images/airflow-scheduled-job/deferrable_04_queued.png new file mode 100644 index 00000000..e86a2ba2 Binary files /dev/null and b/docs/modules/demos/images/airflow-scheduled-job/deferrable_04_queued.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/deferrable_05_running.png b/docs/modules/demos/images/airflow-scheduled-job/deferrable_05_running.png new file mode 100644 index 00000000..5fa40026 Binary files /dev/null and b/docs/modules/demos/images/airflow-scheduled-job/deferrable_05_running.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/deferrable_06_success.png b/docs/modules/demos/images/airflow-scheduled-job/deferrable_06_success.png new file mode 100644 index 00000000..6cf21279 Binary files /dev/null and b/docs/modules/demos/images/airflow-scheduled-job/deferrable_06_success.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/opa_01.png b/docs/modules/demos/images/airflow-scheduled-job/opa_01.png new file mode 100644 index 00000000..dd24c144 Binary files /dev/null and b/docs/modules/demos/images/airflow-scheduled-job/opa_01.png differ diff --git a/docs/modules/demos/images/airflow-scheduled-job/overview.png b/docs/modules/demos/images/airflow-scheduled-job/overview.png index 4f2b9c70..0329bd4b 100644 Binary files a/docs/modules/demos/images/airflow-scheduled-job/overview.png and b/docs/modules/demos/images/airflow-scheduled-job/overview.png differ diff --git a/docs/modules/demos/pages/airflow-scheduled-job.adoc b/docs/modules/demos/pages/airflow-scheduled-job.adoc index 54c38be0..c8bde619 100644 --- a/docs/modules/demos/pages/airflow-scheduled-job.adoc +++ b/docs/modules/demos/pages/airflow-scheduled-job.adoc @@ -1,5 +1,5 @@ = airflow-scheduled-job -:description: This demo installs Airflow with Postgres and Redis on Kubernetes, showcasing DAG scheduling, job runs, and status verification via the Airflow UI. +:description: This demo installs Airflow with Postgres, Kafka and OPA on Kubernetes, showcasing DAG scheduling, job runs, and status verification via the Airflow UI. Install this demo on an existing Kubernetes cluster: @@ -8,6 +8,13 @@ Install this demo on an existing Kubernetes cluster: $ stackablectl demo install airflow-scheduled-job ---- +You can also deploy it to a specific namespace (the namespace `airflow-demo` will be assumed in this guide): + +[source,console] +---- +$ stackablectl demo install airflow-scheduled-job -n airflow-demo +---- + [WARNING] ==== This demo should not be run alongside other demos. @@ -29,11 +36,14 @@ This demo will * Install the required Stackable operators * Spin up the following data products ** *Postgresql*: An open-source database used for Airflow cluster and job metadata. -** *Redis*: An in-memory data structure store used for queuing Airflow jobs ** *Airflow*: An open-source workflow management platform for data engineering pipelines. -* Mount two Airflow jobs (referred to as Directed Acyclic Graphs, or DAGs) for the cluster to use +** *Kafka*: An open-source messaging broker that will be used to trigger an Airflow DAG. +** *Open Policy Agent*: An open-source policy engine used for user authorization. +* Mount several Airflow jobs (referred to as Directed Acyclic Graphs, or DAGs) for the cluster to use * Enable and schedule the jobs * Verify the job status with the Airflow Webserver UI +* Illustrate DAG event-based scheduling +* Illustrate user authorization You can see the deployed products and their relationship in the following diagram: @@ -45,39 +55,44 @@ To list the installed Stackable services run the following command: [source,console] ---- -$ stackablectl stacklet list -┌─────────┬─────────┬───────────┬─────────────────────────────────────────┬─────────────────────────────────┐ -│ PRODUCT ┆ NAME ┆ NAMESPACE ┆ ENDPOINTS ┆ CONDITIONS │ -╞═════════╪═════════╪═══════════╪═════════════════════════════════════════╪═════════════════════════════════╡ -│ airflow ┆ airflow ┆ default ┆ webserver-http http://172.19.0.5:30913 ┆ Available, Reconciling, Running │ -└─────────┴─────────┴───────────┴─────────────────────────────────────────┴─────────────────────────────────┘ +$ stackablectl stacklet list -n airflow-demo + +┌─────────┬─────────────┬──────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬─────────────────────────────────┐ +│ PRODUCT ┆ NAME ┆ NAMESPACE ┆ ENDPOINTS ┆ CONDITIONS │ +╞═════════╪═════════════╪══════════════╪════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╪═════════════════════════════════╡ +│ airflow ┆ airflow ┆ airflow-demo ┆ webserver-http http://172.19.0.6:32111 ┆ Available, Reconciling, Running │ +├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ kafka ┆ kafka ┆ airflow-demo ┆ broker-default-0-listener-broker-kafka-tls kafka-broker-default-0-listener-broker.airflow-demo.svc.cluster.local:9093 ┆ Available, Reconciling, Running │ +│ ┆ ┆ ┆ broker-default-0-listener-broker-metrics kafka-broker-default-0-listener-broker.airflow-demo.svc.cluster.local:9606 ┆ │ +│ ┆ ┆ ┆ broker-default-bootstrap-kafka-tls kafka-broker-default-bootstrap.airflow-demo.svc.cluster.local:9093 ┆ │ +├╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ opa ┆ opa-airflow ┆ airflow-demo ┆ ┆ Available, Reconciling, Running │ +└─────────┴─────────────┴──────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴─────────────────────────────────┘ ---- include::partial$instance-hint.adoc[] == Airflow Webserver UI -Open the `airflow` endpoint `webserver-airflow` in your browser (`http://172.19.0.5:30913` in this case). +Open the `airflow` endpoint `webserver-airflow` in your browser (`http://172.19.0.6:32111` in this case). image::airflow-scheduled-job/airflow_1.png[] Log in with the username `admin` and password `adminadmin`. -Click in 'Active DAGs' at the top and you will see an overview showing the DAGs mounted during the demo +Click on 'DAGs' in the left margin and you will see an overview showing the DAGs mounted during the demo setup (`date_demo` and `sparkapp_dag`). image::airflow-scheduled-job/airflow_2.png[] There are two things to notice here. -Both DAGs have been enabled, as shown by the slider on the far right of the screen for each DAG -(DAGs are all `paused` initially and can be activated manually in the UI or via a REST call, as done in the setup for this demo): +All DAGs but one (the one demonstrating a deferrable operator) have been enabled, as shown by the slider on the far right of the screen for each DAG +(DAGs are all `paused` initially and can be activated manually in the UI or via a REST call, as done in the setup for this demo). +Secondly, the `date_demo` job has been busy, with several runs already logged: image::airflow-scheduled-job/airflow_3.png[] -Secondly, the `date_demo` job has been busy, with several runs already logged. The `sparkapp_dag` has only been run once because they have been defined with different schedules. - -image::airflow-scheduled-job/airflow_4.png[] - +The `kafka_watcher` job has been activated and is awaiting a trigger action (in this case, a message arriving in a designated Kafka topic). Clicking on the DAG name and then on `Runs` will display the individual job runs: image::airflow-scheduled-job/airflow_5.png[] @@ -129,8 +144,77 @@ asynchronously - and another to poll the running job to report on its status. image::airflow-scheduled-job/airflow_11.png[] +=== `kafka_watcher` DAG + +This DAG is using the event-scheduling feature of Airflow. +Click on the DAG and then select the `Code` tab in the window on the right half of the screen. +When a message arrives in the topic named test-topic, the DAG will be triggered. +Note that the connection to Kafka (`kafka_conn`) is expected: this has been created as part of the airflow cluster: + +image::airflow-scheduled-job/airflow_12.png[] + +We can use the kafka-producer script bundled with Kafka to write to this topic (note the namespace we chose initially is used consistently in this demo): + +[source,bash] +---- +kubectl exec -n airflow-demo kafka-broker-default-0 -c kafka -- bash -c \ +'echo "Hello World at: $(date)" | /stackable/kafka/bin/kafka-console-producer.sh \ + --bootstrap-server $BOOTSTRAP_SERVER \ + --topic test-topic \ + --producer.config /stackable/config/client.properties' +---- + +The triggerer logs will show that this DAG was fired (logging out the message that we wrote to the topic above): + +image::airflow-scheduled-job/airflow_13.png[] + +The DAG view will show that run was successful. +Note the `kafka_queue_asset` under `Schedule`. +This is an Airflow object (also defined in the DAG code) that wraps the actual trigger/wait mechanism in an generic way for use in DAG code. + +image::airflow-scheduled-job/airflow_14.png[] + +Clicking on the asset will show the triggers that have been fired, called `Asset Events`: + +image::airflow-scheduled-job/airflow_15.png[] + +=== `core_deferrable_sleep_demo` DAG + +Now log out of the UI. +This next section will illustrate user authorization with the Open Policy Agent (OPA) and the OPA Authorizer that is included in Stackable's Airflow product images. +Two other users - other than `admin` - are supplied with this demo: `jane.doe` and `richard.roe` (the passwords are the same as the user names). +`jane.doe` has permission to view all DAGs but only has permission to activate and run `core_deferrable_sleep_demo`. +`richard.roe` does not have permission to view or action any DAGs. +Login in as `jane.doe`. +Select the DAGs view, making sure to set the filter from `Enabled` to `All`. +You will see the list of 4 DAGs, and can enable `core_deferrable_sleep_demo` (the only DAG not automatically enabled when the demo was deployed). +But if you try and run any other DAG, permission will be denied: + +image::airflow-scheduled-job/airflow_16.png[] + +Enable and run `core_deferrable_sleep_demo`: + +image::airflow-scheduled-job/airflow_17.png[] + +Click on the DAG, switching to the task view. +This DAG uses a deferrable operator which, in conjunction with the triggerer process, "offloads" the DAG from its worker for a specfic period of time, before being picked up and again and executed. +You will see the task cycle through the following states: + +image::airflow-scheduled-job/deferrable_01_running.png[] +image::airflow-scheduled-job/deferrable_02_queued.png[] +image::airflow-scheduled-job/deferrable_03_deferred.png[] +image::airflow-scheduled-job/deferrable_04_queued.png[] +image::airflow-scheduled-job/deferrable_05_running.png[] +image::airflow-scheduled-job/deferrable_06_success.png[] + +Now log out and log in again as `richard.roe`. On the home screen no DAGs are visible, as expected by the authorization rules defined for this user: + +image::airflow-scheduled-job/opa_01.png[] + + == Patching Airflow to stress-test DAG parsing using relevant environment variables +Log back into the UI as `admin`. The demo also created a third DAG in the ConfigMap, called `dag_factory.py`, which was not mounted to the cluster and therefore does not appear in the UI. This DAG can be used to create a number of individual DAGs on-the-fly, thus allowing a certain degree of stress-testing of the DAG scan/register steps (the generated DAGs themselves are trivial and so this approach will not really increase the burden of DAG _parsing_). To include this in the list of DAGs (without removing the existing ones), an extra volumeMount is needed, as shown below. @@ -149,20 +233,14 @@ spec: - name: airflow-dags mountPath: /dags/dag_factory.py subPath: dag_factory.py - - name: airflow-dags - mountPath: /dags/date_demo.py - subPath: date_demo.py - - name: airflow-dags - mountPath: /dags/pyspark_pi.py - subPath: pyspark_pi.py - - name: airflow-dags - mountPath: /dags/pyspark_pi.yaml - subPath: pyspark_pi.yaml + - name: kafka-tls-pem + mountPath: /stackable/kafka-tls-pem webservers: roleGroups: default: envOverrides: &envOverrides AIRFLOW__CORE__DAGS_FOLDER: "/dags" + PYTHONPATH: "/stackable/app/log_config:/dags" AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL: "60" AIRFLOW__CORE__MIN_SERIALIZED_DAG_FETCH_INTERVAL: "60" AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL: "60" @@ -180,7 +258,7 @@ THe patch can be applied like this: [source,console] ---- -kubectl patch airflowcluster airflow --type="merge" --patch-file stacks/airflow/patch_airflow.yaml +kubectl patch airflowcluster airflow --type="merge" --patch-file stacks/airflow/patch_airflow.yaml -n airflow-demo ---- [NOTE] diff --git a/stacks/airflow/airflow.yaml b/stacks/airflow/airflow.yaml index 334eb07f..7e4ca791 100644 --- a/stacks/airflow/airflow.yaml +++ b/stacks/airflow/airflow.yaml @@ -9,13 +9,22 @@ spec: productVersion: 3.0.6 pullPolicy: IfNotPresent clusterConfig: + authorization: + opa: + configMapName: opa-airflow + package: airflow + cache: + entryTimeToLive: 5s + maxEntries: 10 loadExamples: false - exposeConfig: false credentialsSecret: airflow-credentials volumes: - name: airflow-dags configMap: name: airflow-dags + - name: kafka-tls-pem + configMap: + name: truststore-pem volumeMounts: - name: airflow-dags mountPath: /dags/date_demo.py @@ -26,37 +35,71 @@ spec: - name: airflow-dags mountPath: /dags/pyspark_pi.yaml subPath: pyspark_pi.yaml + - name: airflow-dags + mountPath: /dags/kafka.py + subPath: kafka.py + - name: airflow-dags + mountPath: /dags/triggerer.py + subPath: triggerer.py + - name: kafka-tls-pem + mountPath: /stackable/kafka-tls-pem webservers: roleConfig: listenerClass: external-stable config: resources: cpu: - min: 400m - max: "1" + min: "2" + max: "3" memory: - limit: 2Gi - gracefulShutdownTimeout: 30s + limit: 3Gi + envOverrides: &envOverrides + AIRFLOW__CORE__DAGS_FOLDER: "/dags" + PYTHONPATH: "/stackable/app/log_config:/dags" + AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" + # Airflow 3: Disable decision caching for easy debugging + AIRFLOW__CORE__AUTH_OPA_CACHE_MAXSIZE: "0" + configOverrides: + webserver_config.py: + # Allow "POST /login/" without CSRF token + WTF_CSRF_ENABLED: "False" + podOverrides: &podOverrides + spec: + containers: + - name: airflow + image: oci.stackable.tech/sdp/airflow:3.0.6-stackable0.0.0-dev + imagePullPolicy: IfNotPresent + env: + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: AIRFLOW_CONN_KAFKA_CONN + value: "{\"conn_type\": \"kafka\", \"extra\": {\"bootstrap.servers\": \"kafka-broker-default-0-listener-broker.$(NAMESPACE).svc.cluster.local:9093\", \"security.protocol\": \"SSL\", \"ssl.ca.location\": \"/stackable/kafka-tls-pem/ca.crt\", \"group.id\": \"airflow_group\", \"auto.offset.reset\": \"latest\"}}" roleGroups: default: - envOverrides: &envOverrides - AIRFLOW__CORE__DAGS_FOLDER: "/dags" - AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" replicas: 1 kubernetesExecutors: + # do not apply the podOverrides here as we don't need and it will interfere + # with the pod template envOverrides: *envOverrides schedulers: - config: - gracefulShutdownTimeout: 30s - resources: - cpu: - min: 400m - max: "1" - memory: - limit: 1Gi + envOverrides: *envOverrides + podOverrides: *podOverrides + roleGroups: + default: + replicas: 1 + dagProcessors: + envOverrides: *envOverrides + podOverrides: *podOverrides + roleGroups: + default: + replicas: 1 + triggerers: + envOverrides: *envOverrides + podOverrides: *podOverrides roleGroups: default: - envOverrides: *envOverrides replicas: 1 --- apiVersion: v1 @@ -64,6 +107,97 @@ kind: ConfigMap metadata: name: airflow-dags data: + kafka.py: | + from airflow.providers.apache.kafka.triggers.msg_queue import KafkaMessageQueueTrigger + from airflow.providers.standard.operators.empty import EmptyOperator + from airflow.sdk import DAG, Asset, AssetWatcher + + import logging + logger = logging.getLogger(__name__) + + logger.info("✅ kafka.apply_function module imported") + + def apply_function(message): + try: + logger.info("apply_function called") + logger.info("message payload: %r", message.value()) + return True + except Exception: + logger.exception("apply_function failed") + return False + + # Define a trigger that listens to an Apache Kafka message queue + trigger = KafkaMessageQueueTrigger( + topics=["test-topic"], + apply_function="kafka.apply_function", + kafka_config_id="kafka_conn", + apply_function_args=None, + apply_function_kwargs=None, + poll_timeout=1, + poll_interval=5, + ) + + # Define an asset that watches for messages on the queue + asset = Asset("kafka_queue_asset", watchers=[AssetWatcher(name="kafka_watcher", trigger=trigger)]) + + with DAG(dag_id="kafka_watcher", schedule=[asset]) as dag: + EmptyOperator(task_id="task") + + triggerer.py: | + from datetime import datetime, timedelta + + from airflow import DAG + from airflow.models.baseoperator import BaseOperator + from airflow.triggers.temporal import TimeDeltaTrigger + from airflow.utils.context import Context + from airflow.operators.empty import EmptyOperator + + # ------------------------------------------------------ + # Custom deferrable operator - does a simple async sleep + # ------------------------------------------------------ + class CoreDeferrableSleepOperator(BaseOperator): + """ + Sleeps for ``duration`` seconds without occupying a worker. + The async hand-off happens via ``self.defer`` + ``TimeDeltaTrigger``. + """ + ui_color = "#ffefeb" + + def __init__(self, *, duration: int, **kwargs): + super().__init__(**kwargs) + self.duration = duration + + def execute(self, context: Context): + """Run on a worker, then hand control to the Triggerer.""" + # Build the trigger that will fire after `duration` seconds. + trigger = TimeDeltaTrigger(timedelta(seconds=self.duration)) + + # *** Asynchronous hand-off *** + # This tells the scheduler: “pause this task, let the Triggerer watch the timer”. + self.defer(trigger=trigger, method_name="execute_complete") + + def execute_complete(self, context: Context, event=None): + """Resumes here once the Triggerer fires.""" + self.log.info("Deferrable sleep of %s seconds finished.", self.duration) + return "DONE" + + default_args = {"owner": "stackable", "retries": 0} + + with DAG( + dag_id="core_deferrable_sleep_demo", + schedule=None, + # N.B. this be earlier than the current timestamp! + start_date=datetime(2025, 8, 1), + catchup=False, + default_args=default_args, + tags=["example", "triggerer"], + ) as dag: + + sleep = CoreDeferrableSleepOperator( + task_id="deferrable_sleep", + duration=10, + ) + + sleep date_demo.py: | """Example DAG returning the current date""" from datetime import datetime, timedelta diff --git a/stacks/airflow/kafka.yaml b/stacks/airflow/kafka.yaml new file mode 100644 index 00000000..0816dccd --- /dev/null +++ b/stacks/airflow/kafka.yaml @@ -0,0 +1,41 @@ +--- +apiVersion: secrets.stackable.tech/v1alpha1 +kind: TrustStore +metadata: + name: truststore-pem +spec: + secretClassName: tls + format: tls-pem + targetKind: ConfigMap +--- +apiVersion: kafka.stackable.tech/v1alpha1 +kind: KafkaCluster +metadata: + name: kafka +spec: + image: + productVersion: 4.1.0 + clusterConfig: + tls: + serverSecretClass: tls + controllers: + roleGroups: + default: + replicas: 1 + brokers: + config: + bootstrapListenerClass: cluster-internal + brokerListenerClass: cluster-internal + podOverrides: + spec: + containers: + - name: kafka + env: + - name: BOOTSTRAP_SERVER + valueFrom: + configMapKeyRef: + name: kafka + key: KAFKA + roleGroups: + default: + replicas: 1 diff --git a/stacks/airflow/opa-rules.yaml b/stacks/airflow/opa-rules.yaml new file mode 100644 index 00000000..e9c45d8a --- /dev/null +++ b/stacks/airflow/opa-rules.yaml @@ -0,0 +1,96 @@ +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: airflow-rules + labels: + opa.stackable.tech/bundle: "true" +data: + airflow.rego: | + package airflow + + default is_authorized_configuration := false + default is_authorized_connection := false + default is_authorized_dag := false + default is_authorized_backfill := false + default is_authorized_asset := false + default is_authorized_asset_alias := false + + # Allow the user "admin" to create test users + # POST /auth/fab/v1/users + is_authorized_custom_view if { + input.method == "POST" + input.resource_name == "Users" + + input.user.name == "admin" + } + is_authorized_configuration if { + input.user.name == "admin" + } + is_authorized_configuration if { + input.user.name == "admin" + } + is_authorized_connection if { + input.user.name == "admin" + } + is_authorized_dag if { + input.user.name == "admin" + } + is_authorized_dataset if { + input.user.name == "admin" + } + is_authorized_pool if { + input.user.name == "admin" + } + is_authorized_variable if { + input.user.name == "admin" + } + is_authorized_view if { + input.user.name == "admin" + } + is_authorized_custom_view if { + input.user.name == "admin" + } + is_authorized_backfill if { + input.user.name == "admin" + } + is_authorized_asset if { + input.user.name == "admin" + } + is_authorized_asset_alias if { + input.user.name == "admin" + } + + # GET /home + is_authorized_view if { + input.access_view == "WEBSITE" + + input.user.name == "jane.doe" + } + + is_authorized_dag if { + input.method == "GET" + input.user.name == "jane.doe" + } + + is_authorized_dag if { + input.method == "GET" + input.access_entity == "RUN" + input.details.id == "core_deferrable_sleep_demo" + + input.user.name == "jane.doe" + } + + is_authorized_dag if { + input.method == "PUT" + input.details.id == "core_deferrable_sleep_demo" + + input.user.name == "jane.doe" + } + + is_authorized_dag if { + input.method == "POST" + input.details.id == "core_deferrable_sleep_demo" + + input.user.name == "jane.doe" + } diff --git a/stacks/airflow/opa.yaml b/stacks/airflow/opa.yaml new file mode 100644 index 00000000..783cdab4 --- /dev/null +++ b/stacks/airflow/opa.yaml @@ -0,0 +1,19 @@ +--- +apiVersion: opa.stackable.tech/v1alpha1 +kind: OpaCluster +metadata: + name: opa-airflow +spec: + image: + productVersion: 1.8.0 + pullPolicy: IfNotPresent + servers: + config: + logging: + containers: + opa: + loggers: + decision: + level: INFO + roleGroups: + default: {} diff --git a/stacks/airflow/patch_airflow.yaml b/stacks/airflow/patch_airflow.yaml index 60889187..90542256 100644 --- a/stacks/airflow/patch_airflow.yaml +++ b/stacks/airflow/patch_airflow.yaml @@ -9,20 +9,14 @@ spec: - name: airflow-dags mountPath: /dags/dag_factory.py subPath: dag_factory.py - - name: airflow-dags - mountPath: /dags/date_demo.py - subPath: date_demo.py - - name: airflow-dags - mountPath: /dags/pyspark_pi.py - subPath: pyspark_pi.py - - name: airflow-dags - mountPath: /dags/pyspark_pi.yaml - subPath: pyspark_pi.yaml + - name: kafka-tls-pem + mountPath: /stackable/kafka-tls-pem webservers: roleGroups: default: envOverrides: &envOverrides AIRFLOW__CORE__DAGS_FOLDER: "/dags" + PYTHONPATH: "/stackable/app/log_config:/dags" AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL: "60" AIRFLOW__CORE__MIN_SERIALIZED_DAG_FETCH_INTERVAL: "60" AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL: "60" diff --git a/stacks/stacks-v2.yaml b/stacks/stacks-v2.yaml index 53454ebb..bb28f3a9 100644 --- a/stacks/stacks-v2.yaml +++ b/stacks/stacks-v2.yaml @@ -139,12 +139,17 @@ stacks: - listener - secret - airflow - - spark-k8s # Some demo does schedule a Spark job + - spark-k8s + - kafka + - opa labels: - airflow manifests: - helmChart: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/_templates/postgresql-airflow.yaml - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/airflow/airflow.yaml + - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/airflow/kafka.yaml + - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/airflow/opa.yaml + - plainYaml: https://raw.githubusercontent.com/stackabletech/demos/main/stacks/airflow/opa-rules.yaml supportedNamespaces: [] resourceRequests: cpu: 3400m