Skip to content

Commit

Permalink
Do not evict a run from the cache which has active tasks.
Browse files Browse the repository at this point in the history
A dead task (a task which has not been updated for 360s) will only
be scavenged if it belongs to a run in the cache. However clean
runs are evicted from the cache after 300s. So if the run does
not get new tasks, for example because it has low priority, the
dead tasks remain unscavenged until somebody explicitly looks at
the run (this pulls the run back in the cache).

Obviously it is not nice that dead tasks are not scavenged as soon
as possible. Moreover the fact that the behaviour of the server
is changed after executing a GET request is also a violation of the HTTP
specs.
  • Loading branch information
vdbergh committed May 22, 2024
1 parent 7be6f51 commit febaaec
Showing 1 changed file with 24 additions and 27 deletions.
51 changes: 24 additions & 27 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,37 +444,34 @@ def flush_buffers(self):
self.run_cache_lock.acquire()
now = time.time()
old = now + 1
oldest = None
for r_id in list(self.run_cache):
if not self.run_cache[r_id]["dirty"]:
if not self.run_cache[r_id]["run"].get("finished", False) and (
"scavenge" not in self.run_cache[r_id]
or self.run_cache[r_id]["scavenge"] < now - 60
oldest_entry = None
# We make this a list to be able to change run_cache during iteration
for r_id, cache_entry in list(self.run_cache.items()):
run = cache_entry["run"]
if not cache_entry["dirty"]:
if not run["finished"] and (
"scavenge" not in cache_entry
or cache_entry["scavenge"] < now - 60
):
self.run_cache[r_id]["scavenge"] = now
if self.scavenge(self.run_cache[r_id]["run"]):
cache_entry["scavenge"] = now
if self.scavenge(run):
with self.run_cache_write_lock:
self.runs.replace_one(
{"_id": ObjectId(r_id)}, self.run_cache[r_id]["run"]
)
if self.run_cache[r_id]["rtime"] < now - 300:
self.runs.replace_one({"_id": run["_id"]}, run)
if run["cores"] <= 0 and cache_entry["rtime"] < now - 300:
del self.run_cache[r_id]
elif self.run_cache[r_id]["ftime"] < old:
old = self.run_cache[r_id]["ftime"]
oldest = r_id
# print(oldest)
if oldest is not None:
self.scavenge(self.run_cache[oldest]["run"])
self.run_cache[oldest]["scavenge"] = now
self.run_cache[oldest]["dirty"] = False
self.run_cache[oldest]["ftime"] = time.time()
# print("SYNC")
elif cache_entry["ftime"] < old:
old = cache_entry["ftime"]
oldest_entry = cache_entry
if oldest_entry is not None:
oldest_run = oldest_entry["run"]
self.scavenge(oldest_run)
oldest_entry["scavenge"] = now
oldest_entry["dirty"] = False
oldest_entry["ftime"] = time.time()
with self.run_cache_write_lock:
self.runs.replace_one(
{"_id": ObjectId(oldest)}, self.run_cache[oldest]["run"]
)
except:
print("Flush exception", flush=True)
self.runs.replace_one({"_id": oldest_run["_id"]}, oldest_run)
except Exception as e:
print(f"Flush exception: {str(e)}", flush=True)
finally:
# Restart timer:
self.run_cache_lock.release()
Expand Down

0 comments on commit febaaec

Please sign in to comment.