From 091086bd15bfabb7963510d2996c44f9b9cd9f2e Mon Sep 17 00:00:00 2001 From: Kevin Pullin Date: Sun, 6 Jan 2019 12:37:15 -0800 Subject: [PATCH] Port AIRFLOW-3402 (PR #4247) to 1.10-test --- airflow/config_templates/default_airflow.cfg | 10 +++ .../example_kubernetes_executor.py | 32 ++++++- .../contrib/executors/kubernetes_executor.py | 23 ++++- airflow/contrib/kubernetes/pod.py | 4 + .../kubernetes/worker_configuration.py | 8 +- .../operators/kubernetes_pod_operator.py | 6 +- scripts/ci/kubernetes/kube/configmaps.yaml | 4 + .../executors/test_kubernetes_executor.py | 88 ++++++++++++++++++- 8 files changed, 162 insertions(+), 13 deletions(-) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index c8aa4061e7271..a72604a536c01 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -630,6 +630,16 @@ gcp_service_account_keys = # It will raise an exception if called from a process not running in a kubernetes environment. in_cluster = True +# Affinity configuration as a single line formatted JSON object. +# See the affinity model for top-level key names (e.g. `nodeAffinity`, etc.): +# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#affinity-v1-core +affinity = + +# A list of toleration objects as a single line formatted JSON array +# See: +# https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.12/#toleration-v1-core +tolerations = + [kubernetes_node_selectors] # The Key-value pairs to be given to worker pods. # The worker pods will be scheduled to the nodes of the specified key-value pairs. diff --git a/airflow/contrib/example_dags/example_kubernetes_executor.py b/airflow/contrib/example_dags/example_kubernetes_executor.py index 1d9bb7304318b..d03e255ab3287 100644 --- a/airflow/contrib/example_dags/example_kubernetes_executor.py +++ b/airflow/contrib/example_dags/example_kubernetes_executor.py @@ -32,6 +32,31 @@ schedule_interval=None ) +affinity = { + 'podAntiAffinity': { + 'requiredDuringSchedulingIgnoredDuringExecution': [ + { + 'topologyKey': 'kubernetes.io/hostname', + 'labelSelector': { + 'matchExpressions': [ + { + 'key': 'app', + 'operator': 'In', + 'values': ['airflow'] + } + ] + } + } + ] + } +} + +tolerations = [{ + 'key': 'dedicated', + 'operator': 'Equal', + 'value': 'airflow' +}] + def print_stuff(): print("stuff!") @@ -59,11 +84,14 @@ def use_zip_binary(): executor_config={"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}} ) -# Limit resources on this operator/task +# Limit resources on this operator/task with node affinity & tolerations three_task = PythonOperator( task_id="three_task", python_callable=print_stuff, dag=dag, executor_config={ - "KubernetesExecutor": {"request_memory": "128Mi", "limit_memory": "128Mi"}} + "KubernetesExecutor": {"request_memory": "128Mi", + "limit_memory": "128Mi", + "tolerations": tolerations, + "affinity": affinity}} ) start_task.set_downstream([one_task, two_task, three_task]) diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index dd9cd3ec53b15..e06a5f47e1ed0 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -16,6 +16,7 @@ # under the License. import base64 +import json import multiprocessing from queue import Queue from dateutil import parser @@ -40,7 +41,7 @@ class KubernetesExecutorConfig: def __init__(self, image=None, image_pull_policy=None, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None, gcp_service_account_key=None, node_selectors=None, affinity=None, - annotations=None, volumes=None, volume_mounts=None): + annotations=None, volumes=None, volume_mounts=None, tolerations=None): self.image = image self.image_pull_policy = image_pull_policy self.request_memory = request_memory @@ -53,16 +54,18 @@ def __init__(self, image=None, image_pull_policy=None, request_memory=None, self.annotations = annotations self.volumes = volumes self.volume_mounts = volume_mounts + self.tolerations = tolerations def __repr__(self): return "{}(image={}, image_pull_policy={}, request_memory={}, request_cpu={}, " \ "limit_memory={}, limit_cpu={}, gcp_service_account_key={}, " \ "node_selectors={}, affinity={}, annotations={}, volumes={}, " \ - "volume_mounts={})" \ + "volume_mounts={}, tolerations={})" \ .format(KubernetesExecutorConfig.__name__, self.image, self.image_pull_policy, self.request_memory, self.request_cpu, self.limit_memory, self.limit_cpu, self.gcp_service_account_key, self.node_selectors, - self.affinity, self.annotations, self.volumes, self.volume_mounts) + self.affinity, self.annotations, self.volumes, self.volume_mounts, + self.tolerations) @staticmethod def from_dict(obj): @@ -88,6 +91,7 @@ def from_dict(obj): annotations=namespaced.get('annotations', {}), volumes=namespaced.get('volumes', []), volume_mounts=namespaced.get('volume_mounts', []), + tolerations=namespaced.get('tolerations', None), ) def as_dict(self): @@ -104,6 +108,7 @@ def as_dict(self): 'annotations': self.annotations, 'volumes': self.volumes, 'volume_mounts': self.volume_mounts, + 'tolerations': self.tolerations, } @@ -205,6 +210,18 @@ def __init__(self): # configmap self.airflow_configmap = conf.get(self.kubernetes_section, 'airflow_configmap') + affinity_json = conf.get(self.kubernetes_section, 'affinity') + if affinity_json: + self.kube_affinity = json.loads(affinity_json) + else: + self.kube_affinity = None + + tolerations_json = conf.get(self.kubernetes_section, 'tolerations') + if tolerations_json: + self.kube_tolerations = json.loads(tolerations_json) + else: + self.kube_tolerations = None + self._validate() def _validate(self): diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index bad5caa738e1b..6d2977592598a 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -60,6 +60,10 @@ class Pod: :type image_pull_secrets: str :param affinity: A dict containing a group of affinity scheduling rules :type affinity: dict + :param hostnetwork: If True enable host networking on the pod + :type hostnetwork: bool + :param tolerations: A list of kubernetes tolerations + :type tolerations: list """ def __init__( self, diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index d83be81c1f704..c2e7768baae8f 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -205,10 +205,13 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da limit_cpu=kube_executor_config.limit_cpu ) gcp_sa_key = kube_executor_config.gcp_service_account_key - annotations = kube_executor_config.annotations.copy() + annotations = dict(kube_executor_config.annotations) if gcp_sa_key: annotations['iam.cloud.google.com/service-account'] = gcp_sa_key + affinity = kube_executor_config.affinity or self.kube_config.kube_affinity + tolerations = kube_executor_config.tolerations or self.kube_config.kube_tolerations + return Pod( namespace=namespace, name=pod_id, @@ -234,5 +237,6 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da annotations=annotations, node_selectors=(kube_executor_config.node_selectors or self.kube_config.kube_node_selectors), - affinity=kube_executor_config.affinity + affinity=affinity, + tolerations=tolerations ) diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py b/airflow/contrib/operators/kubernetes_pod_operator.py index a29b61998d181..4494754f970f8 100644 --- a/airflow/contrib/operators/kubernetes_pod_operator.py +++ b/airflow/contrib/operators/kubernetes_pod_operator.py @@ -84,8 +84,10 @@ class KubernetesPodOperator(BaseOperator): /airflow/xcom/return.json in the container will also be pushed to an XCom when the container completes. :type xcom_push: bool - :param tolerations: Kubernetes tolerations - :type list of tolerations + :param hostnetwork: If True enable host networking on the pod + :type hostnetwork: bool + :param tolerations: A list of kubernetes tolerations + :type tolerations: list tolerations """ template_fields = ('cmds', 'arguments', 'env_vars', 'config_file') diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml b/scripts/ci/kubernetes/kube/configmaps.yaml index c8e6b19076d1d..b5fa3e5f639ee 100644 --- a/scripts/ci/kubernetes/kube/configmaps.yaml +++ b/scripts/ci/kubernetes/kube/configmaps.yaml @@ -192,6 +192,10 @@ data: namespace = default gcp_service_account_keys = + # Example affinity and toleration definitions. + affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}} + tolerations = [{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }] + # For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync git_sync_container_repository = gcr.io/google-containers/git-sync-amd64 git_sync_container_tag = v2.0.5 diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py index f93a9d81e142d..1b5c4c015d963 100644 --- a/tests/contrib/executors/test_kubernetes_executor.py +++ b/tests/contrib/executors/test_kubernetes_executor.py @@ -14,6 +14,7 @@ # import unittest +import uuid import mock import re import string @@ -22,6 +23,7 @@ try: from airflow.contrib.executors.kubernetes_executor import AirflowKubernetesScheduler + from airflow.contrib.executors.kubernetes_executor import KubernetesExecutorConfig from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration except ImportError: AirflowKubernetesScheduler = None @@ -81,13 +83,42 @@ class TestKubernetesWorkerConfiguration(unittest.TestCase): Tests that if dags_volume_subpath/logs_volume_subpath configuration options are passed to worker pod config """ + + affinity_config = { + 'podAntiAffinity': { + 'requiredDuringSchedulingIgnoredDuringExecution': [ + { + 'topologyKey': 'kubernetes.io/hostname', + 'labelSelector': { + 'matchExpressions': [ + { + 'key': 'app', + 'operator': 'In', + 'values': ['airflow'] + } + ] + } + } + ] + } + } + + tolerations_config = [ + { + 'key': 'dedicated', + 'operator': 'Equal', + 'value': 'airflow' + }, + { + 'key': 'prod', + 'operator': 'Exists' + } + ] + def setUp(self): if AirflowKubernetesScheduler is None: self.skipTest("kubernetes python package is not installed") - self.pod = mock.patch( - 'airflow.contrib.kubernetes.worker_configuration.Pod' - ) self.resources = mock.patch( 'airflow.contrib.kubernetes.worker_configuration.Resources' ) @@ -95,7 +126,7 @@ def setUp(self): 'airflow.contrib.kubernetes.worker_configuration.Secret' ) - for patcher in [self.pod, self.resources, self.secret]: + for patcher in [self.resources, self.secret]: self.mock_foo = patcher.start() self.addCleanup(patcher.stop) @@ -152,6 +183,55 @@ def test_worker_environment_when_dags_folder_specified(self): self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER']) + def test_make_pod_with_empty_executor_config(self): + self.kube_config.kube_affinity = self.affinity_config + self.kube_config.kube_tolerations = self.tolerations_config + + worker_config = WorkerConfiguration(self.kube_config) + kube_executor_config = KubernetesExecutorConfig(annotations=[], + volumes=[], + volume_mounts=[] + ) + + pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id", + "test_task_id", str(datetime.utcnow()), "bash -c 'ls /'", + kube_executor_config) + + self.assertTrue(pod.affinity['podAntiAffinity'] is not None) + self.assertEqual('app', + pod.affinity['podAntiAffinity'] + ['requiredDuringSchedulingIgnoredDuringExecution'][0] + ['labelSelector'] + ['matchExpressions'][0] + ['key']) + + self.assertEqual(2, len(pod.tolerations)) + self.assertEqual('prod', pod.tolerations[1]['key']) + + def test_make_pod_with_executor_config(self): + worker_config = WorkerConfiguration(self.kube_config) + kube_executor_config = KubernetesExecutorConfig(affinity=self.affinity_config, + tolerations=self.tolerations_config, + annotations=[], + volumes=[], + volume_mounts=[] + ) + + pod = worker_config.make_pod("default", str(uuid.uuid4()), "test_pod_id", "test_dag_id", + "test_task_id", str(datetime.utcnow()), "bash -c 'ls /'", + kube_executor_config) + + self.assertTrue(pod.affinity['podAntiAffinity'] is not None) + self.assertEqual('app', + pod.affinity['podAntiAffinity'] + ['requiredDuringSchedulingIgnoredDuringExecution'][0] + ['labelSelector'] + ['matchExpressions'][0] + ['key']) + + self.assertEqual(2, len(pod.tolerations)) + self.assertEqual('prod', pod.tolerations[1]['key']) + def test_worker_pvc_dags(self): # Tests persistence volume config created when `dags_volume_claim` is set self.kube_config.dags_volume_claim = 'airflow-dags'