diff --git a/snakemake/persistence.py b/snakemake/persistence.py index ceb9e1cb3..e8b83c67e 100755 --- a/snakemake/persistence.py +++ b/snakemake/persistence.py @@ -9,6 +9,7 @@ import marshal import pickle import json +import tempfile import time from base64 import urlsafe_b64encode, b64encode from functools import lru_cache, partial @@ -434,9 +435,20 @@ def _output(self, job): def _record(self, subject, json_value, id): recpath = self._record_path(subject, id) - os.makedirs(os.path.dirname(recpath), exist_ok=True) - with open(recpath, "w") as f: - json.dump(json_value, f) + recdir = os.path.dirname(recpath) + os.makedirs(recdir, exist_ok=True) + # Write content to temporary file and rename it to the final file. + # This avoids race-conditions while writing (e.g. on NFS when the main job + # and the cluster node job propagate their content and the system has some + # latency including non-atomic propagation processes). + with tempfile.NamedTemporaryFile( + mode="w", + dir=recdir, + delete=False, + suffix=os.path.basename(recpath), + ) as tmpfile: + json.dump(json_value, tmpfile) + os.rename(tmpfile.name, recpath) def _delete_record(self, subject, id): try: @@ -462,7 +474,14 @@ def _read_record_uncached(self, subject, id): if not self._exists_record(subject, id): return dict() with open(self._record_path(subject, id), "r") as f: - return json.load(f) + try: + return json.load(f) + except json.JSONDecodeError as e: + pass + # case: file is corrupted, delete it + logger.warning(f"Deleting corrupted metadata record.") + self._delete_record(subject, id) + return dict() def _exists_record(self, subject, id): return os.path.exists(self._record_path(subject, id))