Skip to content
This repository was archived by the owner on Feb 16, 2024. It is now read-only.
Closed
36 changes: 36 additions & 0 deletions demos/airflow-scheduled-job/01-airflow-spark-clusterrole.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: airflow-spark-clusterrole
rules:
- apiGroups:
- spark.stackable.tech
resources:
- sparkapplications
verbs:
- create
- get
- list
- apiGroups:
- airflow.stackable.tech
resources:
- airflowdbs
verbs:
- create
- get
- list
- apiGroups:
- apps
resources:
- statefulsets
verbs:
- get
- watch
- list
- apiGroups:
- ""
resources:
- persistentvolumeclaims
verbs:
- list
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: airflow-spark-clusterrole-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: airflow-spark-clusterrole
subjects:
- apiGroup: rbac.authorization.k8s.io
kind: Group
name: system:serviceaccounts
26 changes: 26 additions & 0 deletions demos/airflow-scheduled-job/03-enable-and-run-spark-dag.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
apiVersion: batch/v1
kind: Job
metadata:
name: start-pyspark-job
spec:
template:
spec:
containers:
- name: start-pyspark-job
image: docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0
# N.B. it is possible for the scheduler to report that a DAG exists, only for the worker task to fail if a pod is unexpectedly
# restarted. Additionally, the db-init job takes a few minutes to complete before the cluster is deployed. The wait/watch steps
# below are not "water-tight" but add a layer of stability by at least ensuring that the db is initialized and ready and that
# all pods are reachable (albeit independent of each other).
command: ["bash", "-c", "
kubectl wait airflowdb/airflow --for jsonpath='{.status.condition}'=Ready --timeout 600s
&& kubectl rollout status --watch statefulset/airflow-webserver-default
&& kubectl rollout status --watch statefulset/airflow-worker-default
&& kubectl rollout status --watch statefulset/airflow-scheduler-default
&& curl -i -s --user airflow:airflow http://airflow-webserver-default:8080/api/v1/dags/sparkapp_dag
&& curl -i -s --user airflow:airflow -H 'Content-Type:application/json' -XPATCH http://airflow-webserver-default:8080/api/v1/dags/sparkapp_dag -d '{\"is_paused\": false}'
&& curl -i -s --user airflow:airflow -H 'Content-Type:application/json' -XPOST http://airflow-webserver-default:8080/api/v1/dags/sparkapp_dag/dagRuns -d '{}'
"]
restartPolicy: OnFailure
backoffLimit: 20 # give some time for the Airflow cluster to be available
26 changes: 26 additions & 0 deletions demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
apiVersion: batch/v1
kind: Job
metadata:
name: start-date-job
spec:
template:
spec:
containers:
- name: start-date-job
image: docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0
# N.B. it is possible for the scheduler to report that a DAG exists, only for the worker task to fail if a pod is unexpectedly
# restarted. Additionally, the db-init job takes a few minutes to complete before the cluster is deployed. The wait/watch steps
# below are not "water-tight" but add a layer of stability by at least ensuring that the db is initialized and ready and that
# all pods are reachable (albeit independent of each other).
command: ["bash", "-c", "
kubectl wait airflowdb/airflow --for jsonpath='{.status.condition}'=Ready --timeout 600s
&& kubectl rollout status --watch statefulset/airflow-webserver-default
&& kubectl rollout status --watch statefulset/airflow-worker-default
&& kubectl rollout status --watch statefulset/airflow-scheduler-default
&& curl -i -s --user airflow:airflow http://airflow-webserver-default:8080/api/v1/dags/date_demo
&& curl -i -s --user airflow:airflow -H 'Content-Type:application/json' -XPATCH http://airflow-webserver-default:8080/api/v1/dags/date_demo -d '{\"is_paused\": false}'
&& curl -i -s --user airflow:airflow -H 'Content-Type:application/json' -XPOST http://airflow-webserver-default:8080/api/v1/dags/date_demo/dagRuns -d '{}'
"]
restartPolicy: OnFailure
backoffLimit: 20 # give some time for the Airflow cluster to be available
17 changes: 0 additions & 17 deletions demos/airflow-scheduled-job/enable-and-run-dag.yaml

This file was deleted.

5 changes: 4 additions & 1 deletion demos/demos-v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ demos:
- airflow
- job-scheduling
manifests:
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/airflow-scheduled-job/enable-and-run-dag.yaml
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/airflow-scheduled-job/01-airflow-spark-clusterrole.yaml
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/airflow-scheduled-job/02-airflow-spark-clusterrolebinding.yaml
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/airflow-scheduled-job/03-enable-and-run-spark-dag.yaml
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml
hbase-hdfs-load-cycling-data:
description: Copy data from S3 bucket to an HBase table
stackableStack: hdfs-hbase
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_7.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_8.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/modules/ROOT/images/demo-airflow-scheduled-job/overview.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
47 changes: 33 additions & 14 deletions docs/modules/ROOT/pages/demos/airflow-scheduled-job.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

[NOTE]
====
This guide assumes you already have the demo `airflow-cron-dag` installed.
This guide assumes you already have the demo `airflow-scheduled-job` installed.
If you don't have it installed please follow the xref:commands/demo.adoc#_install_demo[documentation on how to install a demo].
To put it simply you have to run `stackablectl demo install airflow-cron-dag`.
To put it simply you have to run `stackablectl demo install airflow-scheduled-job`.
====

This demo will
Expand All @@ -14,8 +14,8 @@ This demo will
** *Postgresql*: An open-source database used for Airflow cluster- and job-metadata.
** *Redis*: An in-memory data structure store used for queuing Airflow jobs
** *Airflow*: An open-source workflow management platform for data engineering pipelines.
* Mount an Airflow job (referred to as a Directed Acyclic Graph, or DAG) for the cluster to use
* Enable and schedule the job
* Mount two Airflow jobs (referred to as Directed Acyclic Graphs, or DAGs) for the cluster to use
* Enable and schedule the jobs
* Verify the job status with the Airflow Webserver UI

You can see the deployed products as well as their relationship in the following diagram:
Expand All @@ -27,10 +27,11 @@ To list the installed Stackable services run the following command:

[source,console]
----
$ stackablectl services list --all-namespaces
$ stackablectl services list
PRODUCT NAME NAMESPACE ENDPOINTS EXTRA INFOS

airflow airflow default webserver-airflow http://172.18.0.4:32754 Admin user: airflow, password: airflow
airflow airflow default webserver-airflow http://172.18.0.2:31979 Admin user: airflow, password: airflow

----

[NOTE]
Expand All @@ -42,29 +43,29 @@ In case the product is not ready yet a warning might be shown.

== Airflow Webserver UI
Superset gives the ability to execute SQL queries and build dashboards.
Open the `airflow` endpoint `webserver-airflow` in your browser (`http://172.18.0.4:32754` in this case).
Open the `airflow` endpoint `webserver-airflow` in your browser (`http://172.18.0.2:31979` in this case).

image::demo-airflow-scheduled-job/airflow_1.png[]

Log in with the username `airflow` and password `airflow`. The overview screen shows the DAG that has been mounted during the demo set-up (`date_demo`).
Log in with the username `airflow` and password `airflow`. The overview screen shows the DAGs that have been mounted during the demo set-up (`date_demo`).

image::demo-airflow-scheduled-job/airflow_2.png[]

There are two things to notice here. The DAG has been enabled, as shown by the slider to the left of the DAG name (DAGs are initially all `paused` and can be activated either manually in the UI or via a REST call, as done in the setup for this demo):
There are two things to notice here. Both DAGs have been enabled, as shown by the slider to the left of the DAG name (DAGs are initially all `paused` and can be activated either manually in the UI or via a REST call, as done in the setup for this demo):

image::demo-airflow-scheduled-job/airflow_3.png[]

Secondly, the job has been busy, with several runs already logged!
Secondly, the `date_demo` job has been busy, with several runs already logged! The `sparkapp_dag` has only been run once. This is because they have been defined with different schedules.

image::demo-airflow-scheduled-job/airflow_4.png[]

Clicking on this number will display the individual job runs:
Clicking on the number under `Runs` will display the individual job runs:

image::demo-airflow-scheduled-job/airflow_5.png[]

The job is running every minute. With Airflow, DAGs can be started manually or scheduled to run when certain conditions are fulfilled: in this case the DAG has been set up to run using a cron table, which is part of the DAG definition.
The `demo_date` job is running every minute. With Airflow, DAGs can be started manually or scheduled to run when certain conditions are fulfilled: in this case the DAG has been set up to run using a cron table, which is part of the DAG definition.

=== View DAG details
=== `demo_date` DAG

Let's drill down a bit deeper into this DAG. Click on one of the individual job runs shown in the previous step to display the job details. The DAG is displayed in the form of a graph (this job is so simple that it only has one step, called `run_every_minute`).

Expand All @@ -86,8 +87,26 @@ To look at the actual DAG code click on `Code`. Here we can see the crontab info

image::demo-airflow-scheduled-job/airflow_10.png[]

=== `sparkapp_dag` DAG

Go back to DAG overview screen. The `sparkapp_dag` job has a scheduled entry of `None` and a last-execution time (`2022-09-19, 07:36:55`). This allows a DAG to be executed exactly once, with neither schedule-based runs nor any https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html?highlight=backfill#backfill[backfill]. The DAG can always be triggered manually again via REST or from within the Webserver UI.

image::demo-airflow-scheduled-job/airflow_11.png[]

By navigating to the graphical overview of the job we can see that DAG has two steps, one to start the job - which runs asynchronously - and another to poll the running job to report on its status.

image::demo-airflow-scheduled-job/airflow_12.png[]

The logs for the first task - `spark-pi-submit` - indicate that it has been started, at which point the task exits without any further information:

image::demo-airflow-scheduled-job/airflow_13.png[]

The second task - `spark-pi-monitor` - polls this job and waits for a final result (in this case: `Success`). In this case, the actual result of the job (a value of `pi`) is logged by Spark in its driver pod, but more sophisticated jobs would persist this in a sink (e.g. a Kafka topic or HBase row) or use the result to trigger subsequent actions.

image::demo-airflow-scheduled-job/airflow_14.png[]

== Summary

This demo showed how a DAG can be made available for Airflow, scheduled, run and then inspected with the Webserver UI.
This demo showed how DAGs can be made available for Airflow, scheduled, run and then inspected with the Webserver UI.


Loading