Skip to content

Commit

Permalink
Airflow Upgrade with Cherry-Picks [only] from 1.10.4+twtr (apache#64)
Browse files Browse the repository at this point in the history
* EWT-569 : Initial Commit for migrations

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  76fe7ac from 1.10.4

* CP Contains fb64f2e: [TWTR][AIRFLOW-XXX] Twitter Airflow Customizations + Fixup job scheduling without explicit_defaults_for_timestamp

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (apache#63)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
CP contains [EWT-16]: Airflow fix for manual trigger during version upgrade (apache#13)

* [EWT-16]: Airflow fix for manual trigger during version upgrade

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 91d2b00
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (apache#63)

CP of f757a54

* CP(55bb579) [AIRFLOW-5597] Linkify urls in task instance log (apache#16)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 94cdcf6
[CP] Contains [AIRFLOW-5597] Linkify urls in task instance log

CP of f757a54

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  4ce8d4c from 1.10.4
CP contains [TWTTR] Fix for rendering code on UI (apache#34)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  299b4d8 from 1.10.4
CP contains [TWTR] CP from 1.10+twtr (apache#35)

* 99ee040: CP from 1.10+twtr

* 2e01c24: CP from 1.10.4 ([TWTR][AIRFLOW-4939] Fixup use of fallback kwarg in conf.getint)

* 00cb4ae: [TWTR][AIRFLOW-XXXX] Cherry-pick d4a83bc and bump version (apache#21)

* CP 51b1aee: Relax version requiremets (apache#24)

* CP 67a4d1c: [CX-16266] Change with reference to 1a4c164 commit in open source (apache#25)

* CP 54bd095: [TWTR][CX-17516] Queue tasks already being handled by the executor (apache#26)

* CP 87fcc1c: [TWTR][CX-17516] Requeue tasks in the queued state (apache#27)

* CP 98a1ca9: [AIRFLOW-6625] Explicitly log using utf-8 encoding (apache#7247) (apache#31)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : f7050fb
CP Contains Experiment API path fix (apache#37)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  8a689af from 1.10.4
CP Contains Export scheduler env variable into worker pods. (apache#38)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  5875a15 from 1.10.4
Cp Contains [EWT-115][EWT-118] Initialise dag var to None and fix for DagModel.fileloc (missed in EWT-16) (apache#39)

* [EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick  a68e2b3 from 1.10.4
[CX-16591] Fix regex to work with impersonated clusters like airflow_scheduler_ddavydov (apache#42)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : e9642c2
[CP][EWT-128] Fetch task logs from worker pods (19ac45a) (apache#43)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : d5d0a07
[CP][AIRFLOW-6561][EWT-290]: Adding priority class and default resource for worker pod. (apache#47)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 9b58c88
[CP][EWT-302]Patch Pool.DEFAULT_POOL_NAME in BaseOperator (apache#8587) (apache#49)

Open source commit id: b37ce29

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 7b52a71
[CP][AIRFLOW-3121] Define closed property on StreamLogWriter (apache#3955) (apache#52)

CP of 2d5b8a5

* [EWT-361] Fix broken regex pattern for extracting dataflow job id (apache#51)

Update the dataflow URL regex as per AIRFLOW-9323

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 4b5b977
EWT-370: Use python3 to launch the dataflow job. (apache#53)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 596e24f
* [EWT-450] fixing sla miss triggering duplicate alerts every minute (apache#56)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : b3d7fb4
[CP] Handle IntegrityErrors for trigger dagruns & add Stacktrace when DagFileProcessorManager gets killed (apache#57)

CP of faaf179 - from master
CP of 2102122 - from 1.10.12

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : bac4acd
[TWTR][EWT-472] Add lifecycle support while launching worker pods (apache#59)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 6162402
[TWTTR] Don't enqueue tasks again if already queued for K8sExecutor(apache#60)

Basically reverting commit 87fcc1c  and making changes specifically into the Celery Executor class only.

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 1991419
[CP][TWTR][EWT-377] Fix DagBag bug when a Dag has invalid schedule_interval (apache#61)

CP of 5605d10 & apache#11462

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : 48be0f9
[TWTR][EWT-350] Reverting the last commit partially (apache#62)

* [EWT-569] Airflow Upgrade to 1.10.14 [CP] from 1.10.4+twtr : d8c473e
[CP][EWT-548][AIRFLOW-6527] Make send_task_to_executor timeout configurable (apache#63)

CP of f757a54
  • Loading branch information
ayushSethi22 committed Jan 8, 2021
1 parent c743b95 commit d85a9a3
Show file tree
Hide file tree
Showing 32 changed files with 508 additions and 159 deletions.
6 changes: 6 additions & 0 deletions .arcconfig
@@ -0,0 +1,6 @@
{
"arc.feature.start.default": "origin/twtr_rb_1.10.0",
"arc.land.onto.default": "twtr_rb_1.10.0",
"base": "git:merge-base(origin/twtr_rb_1.10.0), arc:amended, arc:prompt",
"history.immutable": false
}
20 changes: 20 additions & 0 deletions README_TWITTER.md
@@ -0,0 +1,20 @@
# Developing locally

Here are some steps to develop this dependency locally and interact with source, interpreted from
https://confluence.twitter.biz/display/ENG/Overview%3A+Python+3rdparty+in+Source

1. Create a git branch for this change.
2. Edit `airflow/version.py` to change the version.
3. Edit `source/3rdparty/python/BUILD` with the corresponding version.
4. Run the command `python3.7 setup.py bdist_wheel` in the `airflow` directory to build the wheel.
It will be written to `airflow/dist`.
5. Clean out the pex cache: `rm -rf ~/.pex ~/.cache/pants`.
6. Run `ps aux | grep pantsd` to find the pid of the pantsd process.
7. Run `kill $pid` where `$pid` is the the pid just observed.
8. From the `source` directory, run `./pants clean-all`.
9. Now here are the hacky parts. The `run-local.sh` and `run-aurora.sh` all run pants commands
without the option `--python-repos-repos`. You can either edit these to include this option,
or run a pants command that includes it, which will cache the local artifact you need, e.g.
`./pants test airflow:: --python-repos-repos="['file:///path/to/airflow/dist/', 'https://science-binaries.local.twitter.com/home/third_party/source/python/wheels/', 'https://science-binaries.local.twitter.com/home/third_party/source/python/bootstrap/','https://science-binaries.local.twitter.com/home/third_party/source/python/sources/']"`
10. Now you can start up airflow instances as usual with the newly built wheel!
11. See the above link for `Adding Dependencies to science-libraries`.
14 changes: 14 additions & 0 deletions UPDATING.md
Expand Up @@ -21,6 +21,7 @@
This file documents any backwards-incompatible changes in Airflow and
assists users migrating to a new version.


<!-- START doctoc generated TOC please keep comment here to allow auto update -->
<!-- DON'T EDIT THIS SECTION, INSTEAD RE-RUN doctoc TO UPDATE -->
**Table of contents**
Expand Down Expand Up @@ -196,6 +197,19 @@ Users can now offer a path to a yaml for the KubernetesPodOperator using the `po

Now use NULL as default value for dag.description in dag table

## CP

### Ability to patch Pool.DEFAULT_POOL_NAME in BaseOperator
It was not possible to patch pool in BaseOperator as the signature sets the default value of pool
as Pool.DEFAULT_POOL_NAME.
While using subdagoperator in unittest(without initializing the sqlite db), it was throwing the
following error:
```
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: slot_pool.
```
Fix for this, https://github.com/apache/airflow/pull/8587


### Restrict editing DagRun State in the old UI (Flask-admin based UI)

Before 1.10.11 it was possible to edit DagRun State in the `/admin/dagrun/` page
Expand Down
3 changes: 2 additions & 1 deletion airflow/api/common/experimental/trigger_dag.py
Expand Up @@ -123,9 +123,10 @@ def read_store_serialized_dags():
from airflow.configuration import conf
return conf.getboolean('core', 'store_serialized_dags')
dagbag = DagBag(
dag_folder=dag_model.fileloc,
dag_folder=dag_model.get_local_fileloc(),
store_serialized_dags=read_store_serialized_dags()
)

dag_run = DagRun()
triggers = _trigger_dag(
dag_id=dag_id,
Expand Down
2 changes: 1 addition & 1 deletion airflow/config_templates/default_airflow.cfg
Expand Up @@ -1099,4 +1099,4 @@ fs_group =
# The Key-value pairs to be given to worker pods.
# The worker pods will be given these static labels, as well as some additional dynamic labels
# to identify the task.
# Should be supplied in the format: ``key = value``
# Should be supplied in the format: key = value
24 changes: 15 additions & 9 deletions airflow/contrib/hooks/gcp_dataflow_hook.py
Expand Up @@ -33,6 +33,11 @@
DEFAULT_DATAFLOW_LOCATION = 'us-central1'


JOB_ID_PATTERN = re.compile(
r'Submitted job: (?P<job_id_java>.*)|Created job with id: \[(?P<job_id_python>.*)\]'
)


class _DataflowJob(LoggingMixin):
def __init__(self, dataflow, project_number, name, location, poll_sleep=10,
job_id=None, num_retries=None):
Expand Down Expand Up @@ -128,24 +133,25 @@ def __init__(self, cmd):

def _line(self, fd):
if fd == self._proc.stderr.fileno():
line = b''.join(self._proc.stderr.readlines())
line = self._proc.stderr.readline().decode()
if line:
self.log.warning(line[:-1])
self.log.warning(line.rstrip("\n"))
return line
if fd == self._proc.stdout.fileno():
line = b''.join(self._proc.stdout.readlines())
line = self._proc.stdout.readline().decode()
if line:
self.log.info(line[:-1])
self.log.info(line.rstrip("\n"))
return line

raise Exception("No data in stderr or in stdout.")

@staticmethod
def _extract_job(line):
# Job id info: https://goo.gl/SE29y9.
job_id_pattern = re.compile(
br'.*console.cloud.google.com/dataflow.*/jobs/([a-z|0-9|A-Z|\-|\_]+).*')
matched_job = job_id_pattern.search(line or '')
# [EWT-361] : Fixes out of date regex to extract job id
matched_job = JOB_ID_PATTERN.search(line or '')
if matched_job:
return matched_job.group(1).decode()
return matched_job.group('job_id_java') or matched_job.group('job_id_python')

def wait_for_done(self):
reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
Expand Down Expand Up @@ -236,7 +242,7 @@ def start_python_dataflow(self, job_name, variables, dataflow, py_options,
def label_formatter(labels_dict):
return ['--labels={}={}'.format(key, value)
for key, value in labels_dict.items()]
self._start_dataflow(variables, name, ["python2"] + py_options + [dataflow],
self._start_dataflow(variables, name, ["python3"] + py_options + [dataflow],
label_formatter)

@staticmethod
Expand Down
7 changes: 6 additions & 1 deletion airflow/contrib/kubernetes/pod.py
Expand Up @@ -98,7 +98,9 @@ def __init__(
security_context=None,
configmaps=None,
pod_runtime_info_envs=None,
dnspolicy=None
dnspolicy=None,
priority_class=None,
lifecycle=None
):
warnings.warn(
"Using `airflow.contrib.kubernetes.pod.Pod` is deprecated. Please use `k8s.V1Pod`.",
Expand Down Expand Up @@ -130,6 +132,9 @@ def __init__(
self.configmaps = configmaps or []
self.pod_runtime_info_envs = pod_runtime_info_envs or []
self.dnspolicy = dnspolicy
self.priority_class = priority_class
self.lifecycle = lifecycle or {}


def to_v1_kubernetes_pod(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/base_executor.py
Expand Up @@ -58,7 +58,7 @@ def queue_command(self, simple_task_instance, command, priority=1, queue=None):
self.log.info("Adding to queue: %s", command)
self.queued_tasks[key] = (command, priority, queue, simple_task_instance)
else:
self.log.info("could not queue task %s", key)
self.log.error("could not queue task %s", key)

def queue_task_instance(
self,
Expand Down
8 changes: 8 additions & 0 deletions airflow/executors/celery_executor.py
Expand Up @@ -166,6 +166,14 @@ def start(self):
self._sync_parallelism
)

def queue_command(self, simple_task_instance, command, priority=1, queue=None):
key = simple_task_instance.key
if key not in self.queued_tasks and key not in self.running:
self.log.info("Adding to queue: %s", command)
else:
self.log.info("Adding to queue even though already queued or running {}".format(command, key))
self.queued_tasks[key] = (command, priority, queue, simple_task_instance)

def _num_tasks_per_send_process(self, to_send_count):
"""
How many Celery tasks should each worker process send.
Expand Down
59 changes: 57 additions & 2 deletions airflow/jobs/scheduler_job.py
Expand Up @@ -134,7 +134,10 @@ def _run_file_processor(result_channel,
stdout = StreamLogWriter(log, logging.INFO)
stderr = StreamLogWriter(log, logging.WARN)

log.info("Setting log context for file {}".format(file_path))
# log file created here
set_context(log, file_path)
log.info("Successfully set log context for file {}".format(file_path))
setproctitle("airflow scheduler - DagFileProcessor {}".format(file_path))

try:
Expand All @@ -154,6 +157,7 @@ def _run_file_processor(result_channel,
log.info("Started process (PID=%s) to work on %s",
os.getpid(), file_path)
scheduler_job = SchedulerJob(dag_ids=dag_ids, log=log)
log.info("Processing file {}".format(file_path))
result = scheduler_job.process_file(file_path,
zombies,
pickle_dags)
Expand Down Expand Up @@ -1030,7 +1034,7 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):

if self.executor.has_task(task_instance):
self.log.debug(
"Not handling task %s as the executor reports it is running",
"Still handling task %s even though as the executor reports it is running",
task_instance.key
)
num_tasks_in_executor += 1
Expand Down Expand Up @@ -1157,6 +1161,11 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag,
# actually enqueue them
for simple_task_instance in simple_task_instances:
simple_dag = simple_dag_bag.get_dag(simple_task_instance.dag_id)

path = simple_dag.full_filepath
if path.startswith(settings.DAGS_FOLDER):
path = path.replace(settings.DAGS_FOLDER, "DAGS_FOLDER", 1)

command = TI.generate_command(
simple_task_instance.dag_id,
simple_task_instance.task_id,
Expand All @@ -1168,7 +1177,7 @@ def _enqueue_task_instances_with_queued_state(self, simple_dag_bag,
ignore_task_deps=False,
ignore_ti_state=False,
pool=simple_task_instance.pool,
file_path=simple_dag.full_filepath,
file_path=path,
pickle_id=simple_dag.pickle_id)

priority = simple_task_instance.priority_weight
Expand Down Expand Up @@ -1449,6 +1458,50 @@ def _execute_helper(self):

# Send tasks for execution if available
simple_dag_bag = SimpleDagBag(simple_dags)
if len(simple_dags) > 0:
try:
simple_dag_bag = SimpleDagBag(simple_dags)

# Handle cases where a DAG run state is set (perhaps manually) to
# a non-running state. Handle task instances that belong to
# DAG runs in those states

# If a task instance is up for retry but the corresponding DAG run
# isn't running, mark the task instance as FAILED so we don't try
# to re-run it.
self._change_state_for_tis_without_dagrun(simple_dag_bag,
[State.UP_FOR_RETRY],
State.FAILED)
# If a task instance is scheduled or queued or up for reschedule,
# but the corresponding DAG run isn't running, set the state to
# NONE so we don't try to re-run it.
self._change_state_for_tis_without_dagrun(simple_dag_bag,
[State.QUEUED,
State.SCHEDULED,
State.UP_FOR_RESCHEDULE],
State.NONE)

scheduled_dag_ids = ", ".join(simple_dag_bag.dag_ids)
self.log.info('DAGs to be executed: {}'.format(scheduled_dag_ids))

# TODO(CX-17516): State.QUEUED has been added here which is a hack as the Celery
# Executor does not reliably enqueue tasks with the my MySQL broker, and we have
# seen tasks hang after they get queued. The effect of this hack is queued tasks
# will constantly be requeued and resent to the executor (Celery).
# This should be removed when we switch away from the MySQL Celery backend.
self._execute_task_instances(simple_dag_bag,
(State.SCHEDULED, State.QUEUED))

except Exception as e:
self.log.error("Error queuing tasks")
self.log.exception(e)
continue

# Call heartbeats
self.log.debug("Heartbeating the executor")
self.executor.heartbeat()

self._change_state_for_tasks_failed_to_execute()

if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
continue
Expand Down Expand Up @@ -1485,7 +1538,9 @@ def _execute_helper(self):
sleep(sleep_length)

# Stop any processors
self.log.info("Terminating DAG processors")
self.processor_agent.terminate()
self.log.info("All DAG processors terminated")

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
Expand Down
5 changes: 5 additions & 0 deletions airflow/kubernetes/worker_configuration.py
Expand Up @@ -200,6 +200,11 @@ def _get_environment(self):
self.kube_config.git_subpath # dags
)
env['AIRFLOW__CORE__DAGS_FOLDER'] = dag_volume_mount_path
# TODO This change can be submitted into the apache as well.
# Set the scheduler env into the worker pod.
os_env = os.environ
os_env.update(env)
env = os_env
return env

def _get_configmaps(self):
Expand Down

0 comments on commit d85a9a3

Please sign in to comment.