From d5d0a0735bcbea8967fab0dc24e915e0c70a4fb8 Mon Sep 17 00:00:00 2001 From: abhishek bafna Date: Wed, 6 May 2020 15:20:52 +0530 Subject: [PATCH] [CP][AIRFLOW-6561][EWT-290]: Adding priority class and default resource for worker pod. (#47) --- airflow/config_templates/default_airflow.cfg | 4 ++++ airflow/contrib/executors/kubernetes_executor.py | 1 + .../kubernetes_request_factory.py | 5 +++++ .../pod_request_factory.py | 2 ++ airflow/contrib/kubernetes/pod.py | 4 +++- .../contrib/kubernetes/worker_configuration.py | 16 ++++++++++++++-- airflow/version.py | 2 +- 7 files changed, 30 insertions(+), 4 deletions(-) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index ef286e6790e6a..78a596a623128 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -840,3 +840,7 @@ fs_group = # The worker pods will be given these static labels, as well as some additional dynamic labels # to identify the task. # Should be supplied in the format: key = value + +[kubernetes_worker_resources] +# EWT-290: This is added adhoc basis to configure the Airflow worker resources. +# This should be removed when the similar functionality is available with Airflow upgrade. diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 0968343c87b25..b95284ba9ac90 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -149,6 +149,7 @@ def __init__(self): ) self.kube_node_selectors = configuration_dict.get('kubernetes_node_selectors', {}) self.kube_annotations = configuration_dict.get('kubernetes_annotations', {}) + self.kube_worker_resources = configuration_dict.get('kubernetes_worker_resources', {}) self.kube_labels = configuration_dict.get('kubernetes_labels', {}) self.delete_worker_pods = conf.getboolean( self.kubernetes_section, 'delete_worker_pods') diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py index 7736dc5dc4a6c..14d63e0df1bfe 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/kubernetes_request_factory.py @@ -257,3 +257,8 @@ def _apply_env_from(pod, req): } } ) + + @staticmethod + def extract_priority_class(pod, req): + if pod.priority_class: + req['spec']['priorityClassName'] = pod.priority_class diff --git a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py index d286da930ce53..d8fed58f594ba 100644 --- a/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py +++ b/airflow/contrib/kubernetes/kubernetes_request_factory/pod_request_factory.py @@ -65,6 +65,7 @@ def create(self, pod): self.extract_tolerations(pod, req) self.extract_security_context(pod, req) self.extract_dnspolicy(pod, req) + self.extract_priority_class(pod, req) return req @@ -135,4 +136,5 @@ def create(self, pod): self.extract_tolerations(pod, req) self.extract_security_context(pod, req) self.extract_dnspolicy(pod, req) + self.extract_priority_class(pod, req) return req diff --git a/airflow/contrib/kubernetes/pod.py b/airflow/contrib/kubernetes/pod.py index 52e27b68bb633..b889562580651 100644 --- a/airflow/contrib/kubernetes/pod.py +++ b/airflow/contrib/kubernetes/pod.py @@ -113,7 +113,8 @@ def __init__( security_context=None, configmaps=None, pod_runtime_info_envs=None, - dnspolicy=None + dnspolicy=None, + priority_class=None ): self.image = image self.envs = envs or {} @@ -141,3 +142,4 @@ def __init__( self.configmaps = configmaps or [] self.pod_runtime_info_envs = pod_runtime_info_envs or [] self.dnspolicy = dnspolicy + self.priority_class = priority_class diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index 684cf1be03d52..756a7aadc2e00 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -328,6 +328,16 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da limit_cpu=kube_executor_config.limit_cpu, limit_gpu=kube_executor_config.limit_gpu ) + # EWT-290 add priorityClassName and cpu and memory resource into pod spec definition. + # The priorityClassName is set as the env (AIRFLOW_POD_PRIORITY_CLASS). + # The cpu and memory resource are set into the airflow.cfg file under 'kube_worker_resources' + resources = Resources( + request_memory=self.kube_config.kube_worker_resources.get('request_memory'), + request_cpu=self.kube_config.kube_worker_resources.get('request_cpu'), + limit_memory=self.kube_config.kube_worker_resources.get('limit_memory'), + limit_cpu=self.kube_config.kube_worker_resources.get('limit_cpu'), + ) if resources.is_empty_resource_request() else resources + gcp_sa_key = kube_executor_config.gcp_service_account_key annotations = dict(kube_executor_config.annotations) or self.kube_config.kube_annotations if gcp_sa_key: @@ -339,6 +349,7 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da affinity = kube_executor_config.affinity or self.kube_config.kube_affinity tolerations = kube_executor_config.tolerations or self.kube_config.kube_tolerations + environment = self._get_environment() return Pod( namespace=namespace, name=pod_id, @@ -353,7 +364,7 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da 'execution_date': execution_date, 'try_number': str(try_number), }), - envs=self._get_environment(), + envs=environment, secrets=self._get_secrets(), service_account_name=self.kube_config.worker_service_account_name, image_pull_secrets=self.kube_config.image_pull_secrets, @@ -367,5 +378,6 @@ def make_pod(self, namespace, worker_uuid, pod_id, dag_id, task_id, execution_da affinity=affinity, tolerations=tolerations, security_context=self._get_security_context(), - configmaps=self._get_configmaps() + configmaps=self._get_configmaps(), + priority_class=environment.get('AIRFLOW_POD_PRIORITY_CLASS', None) ) diff --git a/airflow/version.py b/airflow/version.py index a92cfb911088f..4bef2d0d6390e 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -18,5 +18,5 @@ # under the License. # -version = '1.10.4+twtr9' +version = '1.10.4+twtr10'