diff --git a/demos/airflow-scheduled-job/01-airflow-spark-clusterrole.yaml b/demos/airflow-scheduled-job/01-airflow-spark-clusterrole.yaml new file mode 100644 index 00000000..d63b6ec9 --- /dev/null +++ b/demos/airflow-scheduled-job/01-airflow-spark-clusterrole.yaml @@ -0,0 +1,36 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: airflow-spark-clusterrole +rules: +- apiGroups: + - spark.stackable.tech + resources: + - sparkapplications + verbs: + - create + - get + - list +- apiGroups: + - airflow.stackable.tech + resources: + - airflowdbs + verbs: + - create + - get + - list +- apiGroups: + - apps + resources: + - statefulsets + verbs: + - get + - watch + - list +- apiGroups: + - "" + resources: + - persistentvolumeclaims + verbs: + - list diff --git a/demos/airflow-scheduled-job/02-airflow-spark-clusterrolebinding.yaml b/demos/airflow-scheduled-job/02-airflow-spark-clusterrolebinding.yaml new file mode 100644 index 00000000..d3441a4c --- /dev/null +++ b/demos/airflow-scheduled-job/02-airflow-spark-clusterrolebinding.yaml @@ -0,0 +1,13 @@ +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: airflow-spark-clusterrole-binding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: airflow-spark-clusterrole +subjects: +- apiGroup: rbac.authorization.k8s.io + kind: Group + name: system:serviceaccounts diff --git a/demos/airflow-scheduled-job/03-enable-and-run-spark-dag.yaml b/demos/airflow-scheduled-job/03-enable-and-run-spark-dag.yaml new file mode 100644 index 00000000..3a1076d0 --- /dev/null +++ b/demos/airflow-scheduled-job/03-enable-and-run-spark-dag.yaml @@ -0,0 +1,26 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: start-pyspark-job +spec: + template: + spec: + containers: + - name: start-pyspark-job + image: docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0 + # N.B. it is possible for the scheduler to report that a DAG exists, only for the worker task to fail if a pod is unexpectedly + # restarted. Additionally, the db-init job takes a few minutes to complete before the cluster is deployed. The wait/watch steps + # below are not "water-tight" but add a layer of stability by at least ensuring that the db is initialized and ready and that + # all pods are reachable (albeit independent of each other). + command: ["bash", "-c", " + kubectl wait airflowdb/airflow --for jsonpath='{.status.condition}'=Ready --timeout 600s + && kubectl rollout status --watch statefulset/airflow-webserver-default + && kubectl rollout status --watch statefulset/airflow-worker-default + && kubectl rollout status --watch statefulset/airflow-scheduler-default + && curl -i -s --user airflow:airflow http://airflow-webserver-default:8080/api/v1/dags/sparkapp_dag + && curl -i -s --user airflow:airflow -H 'Content-Type:application/json' -XPATCH http://airflow-webserver-default:8080/api/v1/dags/sparkapp_dag -d '{\"is_paused\": false}' + && curl -i -s --user airflow:airflow -H 'Content-Type:application/json' -XPOST http://airflow-webserver-default:8080/api/v1/dags/sparkapp_dag/dagRuns -d '{}' + "] + restartPolicy: OnFailure + backoffLimit: 20 # give some time for the Airflow cluster to be available diff --git a/demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml b/demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml new file mode 100644 index 00000000..38da95c4 --- /dev/null +++ b/demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml @@ -0,0 +1,26 @@ +--- +apiVersion: batch/v1 +kind: Job +metadata: + name: start-date-job +spec: + template: + spec: + containers: + - name: start-date-job + image: docker.stackable.tech/stackable/tools:0.2.0-stackable0.3.0 + # N.B. it is possible for the scheduler to report that a DAG exists, only for the worker task to fail if a pod is unexpectedly + # restarted. Additionally, the db-init job takes a few minutes to complete before the cluster is deployed. The wait/watch steps + # below are not "water-tight" but add a layer of stability by at least ensuring that the db is initialized and ready and that + # all pods are reachable (albeit independent of each other). + command: ["bash", "-c", " + kubectl wait airflowdb/airflow --for jsonpath='{.status.condition}'=Ready --timeout 600s + && kubectl rollout status --watch statefulset/airflow-webserver-default + && kubectl rollout status --watch statefulset/airflow-worker-default + && kubectl rollout status --watch statefulset/airflow-scheduler-default + && curl -i -s --user airflow:airflow http://airflow-webserver-default:8080/api/v1/dags/date_demo + && curl -i -s --user airflow:airflow -H 'Content-Type:application/json' -XPATCH http://airflow-webserver-default:8080/api/v1/dags/date_demo -d '{\"is_paused\": false}' + && curl -i -s --user airflow:airflow -H 'Content-Type:application/json' -XPOST http://airflow-webserver-default:8080/api/v1/dags/date_demo/dagRuns -d '{}' + "] + restartPolicy: OnFailure + backoffLimit: 20 # give some time for the Airflow cluster to be available diff --git a/demos/airflow-scheduled-job/enable-and-run-dag.yaml b/demos/airflow-scheduled-job/enable-and-run-dag.yaml deleted file mode 100644 index 95669a8f..00000000 --- a/demos/airflow-scheduled-job/enable-and-run-dag.yaml +++ /dev/null @@ -1,17 +0,0 @@ ---- -apiVersion: batch/v1 -kind: Job -metadata: - name: start-dag-job -spec: - template: - spec: - containers: - - name: start-dag-job - image: docker.stackable.tech/stackable/testing-tools:0.1.0-stackable0.1.0 - command: ["bash", "-c", " - curl -s --user airflow:airflow -H 'Content-Type:application/json' -XPATCH http://airflow-webserver-default:8080/api/v1/dags/date_demo -d '{\"is_paused\": false}' - && curl -s --user airflow:airflow -H 'Content-Type:application/json' -XPOST http://airflow-webserver-default:8080/api/v1/dags/date_demo/dagRuns -d '{}' - "] - restartPolicy: OnFailure - backoffLimit: 20 # give some time for the Airflow cluster to be available diff --git a/demos/demos-v1.yaml b/demos/demos-v1.yaml index 16e8f3bc..15cfcaaa 100644 --- a/demos/demos-v1.yaml +++ b/demos/demos-v1.yaml @@ -7,7 +7,10 @@ demos: - airflow - job-scheduling manifests: - - plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/airflow-scheduled-job/enable-and-run-dag.yaml + - plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/airflow-scheduled-job/01-airflow-spark-clusterrole.yaml + - plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/airflow-scheduled-job/02-airflow-spark-clusterrolebinding.yaml + - plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/airflow-scheduled-job/03-enable-and-run-spark-dag.yaml + - plainYaml: https://raw.githubusercontent.com/stackabletech/stackablectl/main/demos/airflow-scheduled-job/04-enable-and-run-date-dag.yaml hbase-hdfs-load-cycling-data: description: Copy data from S3 bucket to an HBase table stackableStack: hdfs-hbase diff --git a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_11.png b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_11.png new file mode 100644 index 00000000..6055c2b2 Binary files /dev/null and b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_11.png differ diff --git a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_12.png b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_12.png new file mode 100644 index 00000000..73ee210c Binary files /dev/null and b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_12.png differ diff --git a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_13.png b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_13.png new file mode 100644 index 00000000..59aafdb0 Binary files /dev/null and b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_13.png differ diff --git a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_14.png b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_14.png new file mode 100644 index 00000000..953b4831 Binary files /dev/null and b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_14.png differ diff --git a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_2.png b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_2.png index 97850925..692a7756 100644 Binary files a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_2.png and b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_2.png differ diff --git a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_3.png b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_3.png index e38acde3..3f8faf90 100644 Binary files a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_3.png and b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_3.png differ diff --git a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_4.png b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_4.png index 2ad28ab0..f08b86ed 100644 Binary files a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_4.png and b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_4.png differ diff --git a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_5.png b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_5.png index 4e619488..267bac23 100644 Binary files a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_5.png and b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_5.png differ diff --git a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_6.png b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_6.png index eeabc2bf..70abe859 100644 Binary files a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_6.png and b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_6.png differ diff --git a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_7.png b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_7.png index 388e9bca..7e2a39c8 100644 Binary files a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_7.png and b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_7.png differ diff --git a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_8.png b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_8.png index 01e061b2..3d5b1f11 100644 Binary files a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_8.png and b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_8.png differ diff --git a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_9.png b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_9.png index edc9e516..76e70f59 100644 Binary files a/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_9.png and b/docs/modules/ROOT/images/demo-airflow-scheduled-job/airflow_9.png differ diff --git a/docs/modules/ROOT/images/demo-airflow-scheduled-job/overview.png b/docs/modules/ROOT/images/demo-airflow-scheduled-job/overview.png index 89bdf70b..4f2b9c70 100644 Binary files a/docs/modules/ROOT/images/demo-airflow-scheduled-job/overview.png and b/docs/modules/ROOT/images/demo-airflow-scheduled-job/overview.png differ diff --git a/docs/modules/ROOT/pages/demos/airflow-scheduled-job.adoc b/docs/modules/ROOT/pages/demos/airflow-scheduled-job.adoc index 245e83fd..f243d67a 100644 --- a/docs/modules/ROOT/pages/demos/airflow-scheduled-job.adoc +++ b/docs/modules/ROOT/pages/demos/airflow-scheduled-job.adoc @@ -2,9 +2,9 @@ [NOTE] ==== -This guide assumes you already have the demo `airflow-cron-dag` installed. +This guide assumes you already have the demo `airflow-scheduled-job` installed. If you don't have it installed please follow the xref:commands/demo.adoc#_install_demo[documentation on how to install a demo]. -To put it simply you have to run `stackablectl demo install airflow-cron-dag`. +To put it simply you have to run `stackablectl demo install airflow-scheduled-job`. ==== This demo will @@ -14,8 +14,8 @@ This demo will ** *Postgresql*: An open-source database used for Airflow cluster- and job-metadata. ** *Redis*: An in-memory data structure store used for queuing Airflow jobs ** *Airflow*: An open-source workflow management platform for data engineering pipelines. -* Mount an Airflow job (referred to as a Directed Acyclic Graph, or DAG) for the cluster to use -* Enable and schedule the job +* Mount two Airflow jobs (referred to as Directed Acyclic Graphs, or DAGs) for the cluster to use +* Enable and schedule the jobs * Verify the job status with the Airflow Webserver UI You can see the deployed products as well as their relationship in the following diagram: @@ -27,10 +27,11 @@ To list the installed Stackable services run the following command: [source,console] ---- -$ stackablectl services list --all-namespaces +$ stackablectl services list PRODUCT NAME NAMESPACE ENDPOINTS EXTRA INFOS - airflow airflow default webserver-airflow http://172.18.0.4:32754 Admin user: airflow, password: airflow + airflow airflow default webserver-airflow http://172.18.0.2:31979 Admin user: airflow, password: airflow + ---- [NOTE] @@ -42,29 +43,29 @@ In case the product is not ready yet a warning might be shown. == Airflow Webserver UI Superset gives the ability to execute SQL queries and build dashboards. -Open the `airflow` endpoint `webserver-airflow` in your browser (`http://172.18.0.4:32754` in this case). +Open the `airflow` endpoint `webserver-airflow` in your browser (`http://172.18.0.2:31979` in this case). image::demo-airflow-scheduled-job/airflow_1.png[] -Log in with the username `airflow` and password `airflow`. The overview screen shows the DAG that has been mounted during the demo set-up (`date_demo`). +Log in with the username `airflow` and password `airflow`. The overview screen shows the DAGs that have been mounted during the demo set-up (`date_demo`). image::demo-airflow-scheduled-job/airflow_2.png[] -There are two things to notice here. The DAG has been enabled, as shown by the slider to the left of the DAG name (DAGs are initially all `paused` and can be activated either manually in the UI or via a REST call, as done in the setup for this demo): +There are two things to notice here. Both DAGs have been enabled, as shown by the slider to the left of the DAG name (DAGs are initially all `paused` and can be activated either manually in the UI or via a REST call, as done in the setup for this demo): image::demo-airflow-scheduled-job/airflow_3.png[] -Secondly, the job has been busy, with several runs already logged! +Secondly, the `date_demo` job has been busy, with several runs already logged! The `sparkapp_dag` has only been run once. This is because they have been defined with different schedules. image::demo-airflow-scheduled-job/airflow_4.png[] -Clicking on this number will display the individual job runs: +Clicking on the number under `Runs` will display the individual job runs: image::demo-airflow-scheduled-job/airflow_5.png[] -The job is running every minute. With Airflow, DAGs can be started manually or scheduled to run when certain conditions are fulfilled: in this case the DAG has been set up to run using a cron table, which is part of the DAG definition. +The `demo_date` job is running every minute. With Airflow, DAGs can be started manually or scheduled to run when certain conditions are fulfilled: in this case the DAG has been set up to run using a cron table, which is part of the DAG definition. -=== View DAG details +=== `demo_date` DAG Let's drill down a bit deeper into this DAG. Click on one of the individual job runs shown in the previous step to display the job details. The DAG is displayed in the form of a graph (this job is so simple that it only has one step, called `run_every_minute`). @@ -86,8 +87,26 @@ To look at the actual DAG code click on `Code`. Here we can see the crontab info image::demo-airflow-scheduled-job/airflow_10.png[] +=== `sparkapp_dag` DAG + +Go back to DAG overview screen. The `sparkapp_dag` job has a scheduled entry of `None` and a last-execution time (`2022-09-19, 07:36:55`). This allows a DAG to be executed exactly once, with neither schedule-based runs nor any https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html?highlight=backfill#backfill[backfill]. The DAG can always be triggered manually again via REST or from within the Webserver UI. + +image::demo-airflow-scheduled-job/airflow_11.png[] + +By navigating to the graphical overview of the job we can see that DAG has two steps, one to start the job - which runs asynchronously - and another to poll the running job to report on its status. + +image::demo-airflow-scheduled-job/airflow_12.png[] + +The logs for the first task - `spark-pi-submit` - indicate that it has been started, at which point the task exits without any further information: + +image::demo-airflow-scheduled-job/airflow_13.png[] + +The second task - `spark-pi-monitor` - polls this job and waits for a final result (in this case: `Success`). In this case, the actual result of the job (a value of `pi`) is logged by Spark in its driver pod, but more sophisticated jobs would persist this in a sink (e.g. a Kafka topic or HBase row) or use the result to trigger subsequent actions. + +image::demo-airflow-scheduled-job/airflow_14.png[] + == Summary -This demo showed how a DAG can be made available for Airflow, scheduled, run and then inspected with the Webserver UI. +This demo showed how DAGs can be made available for Airflow, scheduled, run and then inspected with the Webserver UI. diff --git a/stacks/airflow/airflow.yaml b/stacks/airflow/airflow.yaml index c0e97964..c08c8ad8 100644 --- a/stacks/airflow/airflow.yaml +++ b/stacks/airflow/airflow.yaml @@ -18,23 +18,32 @@ spec: - name: airflow-dags mountPath: /dags/date_demo.py subPath: date_demo.py + - name: airflow-dags + mountPath: /dags/pyspark_pi.py + subPath: pyspark_pi.py + - name: airflow-dags + mountPath: /dags/pyspark_pi.yaml + subPath: pyspark_pi.yaml webservers: roleGroups: default: envOverrides: AIRFLOW__CORE__DAGS_FOLDER: "/dags" + AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" replicas: 1 workers: roleGroups: default: envOverrides: AIRFLOW__CORE__DAGS_FOLDER: "/dags" + AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" replicas: 2 schedulers: roleGroups: default: envOverrides: AIRFLOW__CORE__DAGS_FOLDER: "/dags" + AIRFLOW_CONN_KUBERNETES_IN_CLUSTER: "kubernetes://?__extra__=%7B%22extra__kubernetes__in_cluster%22%3A+true%2C+%22extra__kubernetes__kube_config%22%3A+%22%22%2C+%22extra__kubernetes__kube_config_path%22%3A+%22%22%2C+%22extra__kubernetes__namespace%22%3A+%22%22%7D" replicas: 1 --- apiVersion: v1 @@ -63,6 +72,201 @@ data: task_id='run_every_minute', bash_command='date', ) + pyspark_pi.py: | + """Example DAG demonstrating how to apply a Kubernetes Resource from Airflow running in-cluster""" + from datetime import datetime, timedelta + from airflow import DAG + from typing import TYPE_CHECKING, Optional, Sequence, Dict + from kubernetes import client + from airflow.exceptions import AirflowException + from airflow.sensors.base import BaseSensorOperator + from airflow.models import BaseOperator + from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook + import yaml + from airflow.utils import yaml + import os + + if TYPE_CHECKING: + from airflow.utils.context import Context + + class SparkKubernetesOperator(BaseOperator): + template_fields: Sequence[str] = ('application_file', 'namespace') + template_ext: Sequence[str] = ('.yaml', '.yml', '.json') + ui_color = '#f4a460' + + def __init__( + self, + *, + application_file: str, + namespace: Optional[str] = None, + kubernetes_conn_id: str = 'kubernetes_in_cluster', + api_group: str = 'spark.stackable.tech', + api_version: str = 'v1alpha1', + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.application_file = application_file + self.namespace = namespace + self.kubernetes_conn_id = kubernetes_conn_id + self.api_group = api_group + self.api_version = api_version + self.plural = "sparkapplications" + + def execute(self, context: 'Context'): + hook = KubernetesHook(conn_id=self.kubernetes_conn_id) + self.log.info("Creating SparkApplication...") + response = hook.create_custom_object( + group=self.api_group, + version=self.api_version, + plural=self.plural, + body=self.application_file, + namespace=self.namespace, + ) + return response + + + class SparkKubernetesSensor(BaseSensorOperator): + template_fields = ("application_name", "namespace") + FAILURE_STATES = ("Failed", "Unknown") + SUCCESS_STATES = ("Succeeded") + + def __init__( + self, + *, + application_name: str, + attach_log: bool = False, + namespace: Optional[str] = None, + kubernetes_conn_id: str = 'kubernetes_in_cluster', + api_group: str = 'spark.stackable.tech', + api_version: str = 'v1alpha1', + poke_interval: float = 60, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.application_name = application_name + self.attach_log = attach_log + self.namespace = namespace + self.kubernetes_conn_id = kubernetes_conn_id + self.hook = KubernetesHook(conn_id=self.kubernetes_conn_id) + self.api_group = api_group + self.api_version = api_version + self.poke_interval = poke_interval + + def _log_driver(self, application_state: str, response: dict) -> None: + if not self.attach_log: + return + status_info = response["status"] + if "driverInfo" not in status_info: + return + driver_info = status_info["driverInfo"] + if "podName" not in driver_info: + return + driver_pod_name = driver_info["podName"] + namespace = response["metadata"]["namespace"] + log_method = self.log.error if application_state in self.FAILURE_STATES else self.log.info + try: + log = "" + for line in self.hook.get_pod_logs(driver_pod_name, namespace=namespace): + log += line.decode() + log_method(log) + except client.rest.ApiException as e: + self.log.warning( + "Could not read logs for pod %s. It may have been disposed.\n" + "Make sure timeToLiveSeconds is set on your SparkApplication spec.\n" + "underlying exception: %s", + driver_pod_name, + e, + ) + + def poke(self, context: Dict) -> bool: + self.log.info("Poking: %s", self.application_name) + response = self.hook.get_custom_object( + group=self.api_group, + version=self.api_version, + plural="sparkapplications", + name=self.application_name, + namespace=self.namespace, + ) + try: + application_state = response["status"]["phase"] + except KeyError: + self.log.debug(f"SparkApplication status could not be established: {response}") + return False + if self.attach_log and application_state in self.FAILURE_STATES + self.SUCCESS_STATES: + self._log_driver(application_state, response) + if application_state in self.FAILURE_STATES: + raise AirflowException(f"SparkApplication failed with state: {application_state}") + elif application_state in self.SUCCESS_STATES: + self.log.info("SparkApplication ended successfully") + return True + else: + self.log.info("SparkApplication is still in state: %s", application_state) + return False + + with DAG( + dag_id='sparkapp_dag', + schedule_interval=None, + start_date=datetime(2022, 1, 1), + catchup=False, + dagrun_timeout=timedelta(minutes=60), + tags=['example'], + params={"example_key": "example_value"}, + ) as dag: + + def load_body_to_dict(body): + try: + body_dict = yaml.safe_load(body) + except yaml.YAMLError as e: + raise AirflowException(f"Exception when loading resource definition: {e}\n") + return body_dict + + yaml_path = os.path.join(os.environ.get('AIRFLOW__CORE__DAGS_FOLDER'), 'pyspark_pi.yaml') + + with open(yaml_path, 'r') as file: + crd = file.read() + with open('/run/secrets/kubernetes.io/serviceaccount/namespace', 'r') as file: + ns = file.read() + + document=load_body_to_dict(crd) + application_name='pyspark-pi-'+datetime.utcnow().strftime('%Y%m%d%H%M%S') + document.update({'metadata': {'name': application_name, 'namespace': ns}}) + + t1 = SparkKubernetesOperator( + task_id='spark_pi_submit', + namespace=ns, + application_file=document, + do_xcom_push=True, + dag=dag, + ) + + t2 = SparkKubernetesSensor( + task_id='spark_pi_monitor', + namespace=ns, + application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}", + poke_interval=5, + dag=dag, + ) + + t1 >> t2 + pyspark_pi.yaml: | + --- + apiVersion: spark.stackable.tech/v1alpha1 + kind: SparkApplication + metadata: + name: pyspark-pi + spec: + version: "1.0" + sparkImage: docker.stackable.tech/stackable/pyspark-k8s:3.3.0-stackable0.1.0 + mode: cluster + mainApplicationFile: local:///stackable/spark/examples/src/main/python/pi.py + driver: + cores: 1 + coreLimit: "1200m" + memory: "512m" + executor: + cores: 1 + instances: 3 + memory: "512m" --- apiVersion: v1 kind: Secret