Skip to content
This repository was archived by the owner on Feb 16, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions demos/airflow-scheduled-job/enable-and-run-dag.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
apiVersion: batch/v1
kind: Job
metadata:
name: start-dag-job
spec:
template:
spec:
containers:
- name: start-dag-job
image: docker.stackable.tech/stackable/testing-tools:0.1.0-stackable0.1.0
command: ["bash", "-c", "
curl -s --user airflow:airflow -H 'Content-Type:application/json' -XPATCH http://airflow-webserver-default:8080/api/v1/dags/date_demo -d '{\"is_paused\": false}'
&& curl -s --user airflow:airflow -H 'Content-Type:application/json' -XPOST http://airflow-webserver-default:8080/api/v1/dags/date_demo/dagRuns -d '{}'
"]
restartPolicy: OnFailure
backoffLimit: 20 # give some time for the Airflow cluster to be available
8 changes: 8 additions & 0 deletions demos/demos-v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,11 @@ demos:
manifests:
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/hbase-hdfs-load-cycling-data/01-distcp-cycling-data.yaml
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/hbase-hdfs-load-cycling-data/02-create-hfile-and-import-to-hbase.yaml
airflow-scheduled-job:
description: Activate a simple Airflow DAG to run continously at a set interval
stackableStack: airflow
labels:
- airflow
- job-scheduling
manifests:
- plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/airflow-scheduled-job/enable-and-run-dag.yaml
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
5 changes: 3 additions & 2 deletions docs/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
** xref:commands/services.adoc[]
** xref:commands/stack.adoc[]
* xref:demos/index.adoc[]
** xref:demos/trino-taxi-data.adoc[]
** xref:demos/nifi-kafka-druid-earthquake-data.adoc[]
** xref:demos/airflow-scheduled-job.adoc[]
** xref:demos/kafka-druid-water-level-data.adoc[]
** xref:demos/nifi-kafka-druid-earthquake-data.adoc[]
** xref:demos/trino-taxi-data.adoc[]
* xref:customization.adoc[]
* xref:troubleshooting.adoc[]
93 changes: 93 additions & 0 deletions docs/modules/ROOT/pages/demos/airflow-scheduled-job.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
= airflow-scheduled-job

[NOTE]
====
This guide assumes you already have the demo `airflow-cron-dag` installed.
If you don't have it installed please follow the xref:commands/demo.adoc#_install_demo[documentation on how to install a demo].
To put it simply you have to run `stackablectl demo install airflow-cron-dag`.
====

This demo will

* Install the required Stackable operators
* Spin up the follow data products
** *Postgresql*: An open-source database used for Airflow cluster- and job-metadata.
** *Redis*: An in-memory data structure store used for queuing Airflow jobs
** *Airflow*: An open-source workflow management platform for data engineering pipelines.
* Mount an Airflow job (referred to as a Directed Acyclic Graph, or DAG) for the cluster to use
* Enable and schedule the job
* Verify the job status with the Airflow Webserver UI

You can see the deployed products as well as their relationship in the following diagram:

image::demo-airflow-scheduled-job/overview.png[]

== List deployed Stackable services
To list the installed Stackable services run the following command:

[source,console]
----
$ stackablectl services list --all-namespaces
PRODUCT NAME NAMESPACE ENDPOINTS EXTRA INFOS

airflow airflow default webserver-airflow http://172.18.0.4:32754 Admin user: airflow, password: airflow
----

[NOTE]
====
When a product instance has not finished starting yet, the service will have no endpoint.
Starting all the product instances might take a considerable amount of time depending on your internet connectivity.
In case the product is not ready yet a warning might be shown.
====

== Airflow Webserver UI
Superset gives the ability to execute SQL queries and build dashboards.
Open the `airflow` endpoint `webserver-airflow` in your browser (`http://172.18.0.4:32754` in this case).

image::demo-airflow-scheduled-job/airflow_1.png[]

Log in with the username `airflow` and password `airflow`. The overview screen shows the DAG that has been mounted during the demo set-up (`date_demo`).

image::demo-airflow-scheduled-job/airflow_2.png[]

There are two things to notice here. The DAG has been enabled, as shown by the slider to the left of the DAG name (DAGs are initially all `paused` and can be activated either manually in the UI or via a REST call, as done in the setup for this demo):

image::demo-airflow-scheduled-job/airflow_3.png[]

Secondly, the job has been busy, with several runs already logged!

image::demo-airflow-scheduled-job/airflow_4.png[]

Clicking on this number will display the individual job runs:

image::demo-airflow-scheduled-job/airflow_5.png[]

The job is running every minute. With Airflow, DAGs can be started manually or scheduled to run when certain conditions are fulfilled: in this case the DAG has been set up to run using a cron table, which is part of the DAG definition.

=== View DAG details

Let's drill down a bit deeper into this DAG. Click on one of the individual job runs shown in the previous step to display the job details. The DAG is displayed in the form of a graph (this job is so simple that it only has one step, called `run_every_minute`).

image::demo-airflow-scheduled-job/airflow_6.png[]

In the top right-hand corner there is some scheduling information, which tells us that this job will run every minute continuously:

image::demo-airflow-scheduled-job/airflow_7.png[]

Click on the `run_every_minute` box in the centre of the page and then select `Log`:

image::demo-airflow-scheduled-job/airflow_8.png[]

This will navigate to the worker where this job was run (with multiple workers the jobs will be queued and distributed to the next free worker) and display the log. In this case the output is a simple printout of the timestamp:

image::demo-airflow-scheduled-job/airflow_9.png[]

To look at the actual DAG code click on `Code`. Here we can see the crontab information used to schedule the job as well the `bash` command that provides the output:

image::demo-airflow-scheduled-job/airflow_10.png[]

== Summary

This demo showed how a DAG can be made available for Airflow, scheduled, run and then inspected with the Webserver UI.


43 changes: 42 additions & 1 deletion stacks/airflow/airflow.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,64 @@ spec:
version: 2.2.5-python39-stackable0.3.0
statsdExporterVersion: v0.22.4
executor: CeleryExecutor
loadExamples: true
loadExamples: false
exposeConfig: false
credentialsSecret: airflow-credentials
volumes:
- name: airflow-dags
configMap:
name: airflow-dags
volumeMounts:
- name: airflow-dags
mountPath: /dags/date_demo.py
subPath: date_demo.py
webservers:
roleGroups:
default:
envOverrides:
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
replicas: 1
workers:
roleGroups:
default:
envOverrides:
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
replicas: 2
schedulers:
roleGroups:
default:
envOverrides:
AIRFLOW__CORE__DAGS_FOLDER: "/dags"
replicas: 1
---
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-dags
data:
date_demo.py: |
"""Example DAG returning the current date"""
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator

with DAG(
dag_id='date_demo',
schedule_interval='0-59 * * * *',
start_date=datetime(2021, 1, 1),
catchup=False,
dagrun_timeout=timedelta(minutes=5),
tags=['example'],
params={},
) as dag:

run_this = BashOperator(
task_id='run_every_minute',
bash_command='date',
)
---
apiVersion: v1
kind: Secret
metadata:
name: airflow-credentials
Expand Down