diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index 4787b9f02168..a1442dab2465 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -57,9 +57,9 @@ def queue_command(self, simple_task_instance, command, priority=1, queue=None): key = simple_task_instance.key if key not in self.queued_tasks and key not in self.running: self.log.info("Adding to queue: %s", command) + self.queued_tasks[key] = (command, priority, queue, simple_task_instance) else: - self.log.info("Adding to queue even though already queued or running {}".format(command, key)) - self.queued_tasks[key] = (command, priority, queue, simple_task_instance) + self.log.error("could not queue task %s", key) def queue_task_instance( self, diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 2c108bc7c76c..4302253e6bd3 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -155,6 +155,14 @@ def start(self): self._sync_parallelism ) + def queue_command(self, simple_task_instance, command, priority=1, queue=None): + key = simple_task_instance.key + if key not in self.queued_tasks and key not in self.running: + self.log.info("Adding to queue: %s", command) + else: + self.log.info("Adding to queue even though already queued or running {}".format(command, key)) + self.queued_tasks[key] = (command, priority, queue, simple_task_instance) + def _num_tasks_per_send_process(self, to_send_count): """ How many Celery tasks should each worker process send. diff --git a/airflow/version.py b/airflow/version.py index b8a4b08d4928..39582d1ce56d 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -18,5 +18,5 @@ # under the License. # -version = '1.10.4+twtr19' +version = '1.10.4+twtr21'