Skip to content

Commit

Permalink
[EWT-115][EWT-118] Initialise dag var to None and fix for DagModel.fi…
Browse files Browse the repository at this point in the history
…leloc (missed in EWT-16) (apache#39)

Co-authored-by: Vishesh Jain <visheshj@twitter.com>
Co-authored-by: aoen <aoen@users.noreply.github.com>
  • Loading branch information
3 people committed Apr 13, 2020
1 parent 711b4f7 commit 5875a15
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 8 deletions.
4 changes: 2 additions & 2 deletions airflow/api/common/experimental/delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ def delete_dag(dag_id, keep_records_in_log=True, session=None):
if dag is None:
raise DagNotFound("Dag id {} not found".format(dag_id))

if dag.fileloc and os.path.exists(dag.fileloc):
if dag.get_local_fileloc() and os.path.exists(dag.get_local_fileloc()):
raise DagFileExists("Dag id {} is still in DagBag. "
"Remove the DAG file first: {}".format(dag_id, dag.fileloc))
"Remove the DAG file first: {}".format(dag_id, dag.get_local_fileloc()))

count = 0

Expand Down
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/get_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def get_code(dag_id): # type (str) -> str
dag = check_and_get_dag(dag_id=dag_id)

try:
with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as file:
with wwwutils.open_maybe_zipped(dag.get_local_fileloc(), 'r') as file:
code = file.read()
return code
except IOError as exception:
Expand Down
5 changes: 3 additions & 2 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def get_dag(self, dag_id):
"""
from airflow.models.dag import DagModel # Avoid circular import

dag = None
# If asking for a known subdag, we want to refresh the parent
root_dag_id = dag_id
if dag_id in self.dags:
Expand All @@ -135,7 +136,7 @@ def get_dag(self, dag_id):
):
# Reprocess source file
# TODO: remove the below hack to find relative dag location in webserver
filepath = dag.fileloc if dag else orm_dag.fileloc
filepath = dag.get_local_fileloc() if dag else orm_dag.get_local_fileloc()
found_dags = self.process_file(
filepath=correct_maybe_zipped(filepath), only_if_updated=False)

Expand Down Expand Up @@ -248,7 +249,7 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
if isinstance(dag, DAG):
if not dag.full_filepath:
dag.full_filepath = filepath
if dag.fileloc != filepath and not is_zipfile:
if dag.get_local_fileloc() != filepath and not is_zipfile:
dag.fileloc = filepath
try:
dag.is_subdag = False
Expand Down
4 changes: 2 additions & 2 deletions airflow/sensors/external_task_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ def poke(self, context, session=None):
raise AirflowException('The external DAG '
'{} does not exist.'.format(self.external_dag_id))
else:
if not os.path.exists(dag_to_wait.fileloc):
if not os.path.exists(dag_to_wait.get_local_fileloc()):
raise AirflowException('The external DAG '
'{} was deleted.'.format(self.external_dag_id))

if self.external_task_id:
refreshed_dag_info = DagBag(dag_to_wait.fileloc).get_dag(self.external_dag_id)
refreshed_dag_info = DagBag(dag_to_wait.get_local_fileloc()).get_dag(self.external_dag_id)
if not refreshed_dag_info.has_task(self.external_task_id):
raise AirflowException('The external task'
'{} in DAG {} does not exist.'.format(self.external_task_id,
Expand Down
2 changes: 1 addition & 1 deletion airflow/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@
# under the License.
#

version = '1.10.4+twtr5'
version = '1.10.4+twtr6'

0 comments on commit 5875a15

Please sign in to comment.