Skip to content

Commit

Permalink
Every three minutes validate a random run from the cache.
Browse files Browse the repository at this point in the history
The reason for this PR is that before introducing more dynamic
updates we would like to verify that the dynamic updates of
run["workers"] and run["cores"] introduced in
#2010
are bug free.

In order to facilitate the use of periodic timers in Fishtest
we have introduced a new scheduler in utils.py which may be
interesting in its own right.

Currently the timers defined are the cache flush timer and the
timer introduced in this PR.
  • Loading branch information
vdbergh committed May 22, 2024
1 parent 7be6f51 commit d97d913
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 23 deletions.
2 changes: 1 addition & 1 deletion server/fishtest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def init_rundb(event):
if rundb.is_primary_instance():
signal.signal(signal.SIGINT, rundb.exit_run)
signal.signal(signal.SIGTERM, rundb.exit_run)
rundb.start_timer()
rundb.schedule_tasks()
rundb.update_workers_cores()

config.add_subscriber(add_rundb, NewRequest)
Expand Down
66 changes: 47 additions & 19 deletions server/fishtest/rundb.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@
from bson.errors import InvalidId
from bson.objectid import ObjectId
from fishtest.actiondb import ActionDb
from fishtest.schemas import RUN_VERSION, nn_schema, pgns_schema, runs_schema
from fishtest.schemas import (
RUN_VERSION,
nn_schema,
pgns_schema,
runs_schema,
valid_aggregated_data,
)
from fishtest.stats.stat_util import SPRT_elo
from fishtest.userdb import UserDb
from fishtest.util import (
GeneratorAsFileReader,
Scheduler,
crash_or_time,
estimate_game_duration,
format_bounds,
Expand Down Expand Up @@ -73,8 +80,43 @@ def __init__(self, db_name="fishtest_new", port=-1, is_primary_instance=True):
self.__is_primary_instance = is_primary_instance

self.request_task_lock = threading.Lock()
self.timer = None
self.timer_active = True
self.scheduler = None

def schedule_tasks(self):
if self.scheduler is None:
self.scheduler = Scheduler(jitter=0.05)
self.scheduler.add_task(1.0, self.flush_buffers)
self.scheduler.add_task(180.0, self.validate_random_run)

def validate_random_run(self):
run_list = list(self.run_cache.values())
if len(run_list) == 0:
print(
"Validate_random_run: cache empty. No runs to validate...", flush=True
)
return
run = random.choice(list(run_list))["run"]
run_id = str(run["_id"])
try:
# Make sure that the run object does not change while we are
# validating it
with self.active_run_lock(run_id):
# We verify only the aggregated data since the other
# data is not synchronized and may be in a transient
# inconsistent state
validate(valid_aggregated_data, run, "run")
print(
f"Validate_random_run: validated aggregated data in cache run {run_id}...",
flush=True,
)
except ValidationError as e:
message = f"The run object {run_id} does not validate: {str(e)}"
print(message, flush=True)
if "version" in run and run["version"] >= RUN_VERSION:
self.actiondb.log_message(
username="fishtest.system",
message=message,
)

def set_inactive_run(self, run):
run_id = run["_id"]
Expand Down Expand Up @@ -348,7 +390,8 @@ def get_nns(self, user="", network_name="", master_only=False, limit=0, skip=0):

# handle termination
def exit_run(self, signum, frame):
self.stop_timer()
if self.scheduler is not None:
self.scheduler.stop()
self.flush_all()
if self.port >= 0:
self.actiondb.system_event(message=f"stop fishtest@{self.port}")
Expand Down Expand Up @@ -378,17 +421,6 @@ def get_run(self, r_id):
else:
return self.runs.find_one({"_id": r_id_obj})

def stop_timer(self):
if self.timer is not None:
self.timer.cancel()
self.timer = None
self.timer_active = False

def start_timer(self):
if self.timer_active:
self.timer = threading.Timer(1.0, self.flush_buffers)
self.timer.start()

def buffer(self, run, flush):
if not self.is_primary_instance():
print(
Expand Down Expand Up @@ -438,8 +470,6 @@ def flush_all(self):
print("done", flush=True)

def flush_buffers(self):
if self.timer is None:
return
try:
self.run_cache_lock.acquire()
now = time.time()
Expand Down Expand Up @@ -476,9 +506,7 @@ def flush_buffers(self):
except:
print("Flush exception", flush=True)
finally:
# Restart timer:
self.run_cache_lock.release()
self.start_timer()

def scavenge(self, run):
if datetime.now(timezone.utc) < boot_time + timedelta(seconds=300):
Expand Down
40 changes: 37 additions & 3 deletions server/fishtest/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,43 @@ def final_results_must_match(run):
raise Exception(
f"The final results {run['results']} do not match the computed results {rr}"
)
else:
return True

return True


def cores_must_match(run):
cores = 0
for t in run["tasks"]:
if t["active"]:
cores += t["worker_info"]["concurrency"]
if cores != run["cores"]:
raise Exception(
f"Cores mismatch. Cores from tasks: {cores}. Cores from "
f"run: {run['cores']}"
)

return True


def workers_must_match(run):
workers = 0
for t in run["tasks"]:
if t["active"]:
workers += 1
if workers != run["workers"]:
raise Exception(
f"Workers mismatch. Workers from tasks: {workers}. Workers from "
f"run: {run['workers']}"
)

return True


valid_aggregated_data = intersect(
final_results_must_match,
cores_must_match,
workers_must_match,
)

# The following schema only matches new runs. The old runs
# are not compatible with it. For documentation purposes
Expand Down Expand Up @@ -655,5 +689,5 @@ def final_results_must_match(run):
lax(ifthen({"deleted": True}, {"finished": True})),
lax(ifthen({"finished": True}, {"workers": 0, "cores": 0})),
lax(ifthen({"finished": True}, {"tasks": [{"active": False}, ...]})),
final_results_must_match,
valid_aggregated_data,
)
144 changes: 144 additions & 0 deletions server/fishtest/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import math
import re
import smtplib
import threading
from datetime import datetime, timedelta, timezone
from email.mime.text import MIMEText
from functools import cache
from random import uniform

import fishtest.stats.stat_util
import numpy
Expand Down Expand Up @@ -545,3 +547,145 @@ def strip_run(run):
stripped[key] = str(run[key])

return stripped


"""
The following scheduling code should be thread safe.
- First and foremost, all tasks are executed in a single main thread.
So they are atomic. In particular, between its scheduling and descheduling,
a task will be a executed exactly once at each scheduling point.
- The main thread maintains a list of scheduled tasks. To safely manipulate
this list outside the main thread we rely on the atomicity of in-place
list operations in Python.
- To signal the main thread that the task list has changed, which should
be acted upon as soon as possible as it might affect the next task to
be executed, we use a threading.Event.
Example
s=Scheduler()
s.add_task(3, task1)
s.add_task(2, task2)
When the second task is scheduled, the scheduler will interrupt the
3s wait for the first task and replace it by a 2s wait for the second task.
"""


class Task:
def __init__(
self,
period,
worker,
initial_delay=None,
one_shot=False,
jitter=0.0,
args=(),
kwargs={},
):
self.period = timedelta(seconds=period)
self.worker = worker
if initial_delay is None:
initial_delay = self.period
else:
initial_delay = timedelta(seconds=initial_delay)
self.__rel_jitter = jitter * self.period
self.__next_schedule = (
datetime.now(timezone.utc)
+ initial_delay
+ uniform(-self.__rel_jitter, self.__rel_jitter)
)
self.one_shot = one_shot
self.__expired = False
self.args = args
self.kwargs = kwargs

def do_work(self):
if self.__expired:
return
try:
self.worker(*self.args, *self.kwargs)
except Exception as e:
print(f"Exception while executing task: {str(e)}", flush=True)
if not self.one_shot:
self.__next_schedule += self.period + uniform(
-self.__rel_jitter, self.__rel_jitter
)
else:
self.__expired = True

def next_schedule(self):
return self.__next_schedule

def expired(self):
return self.__expired


class Scheduler:
def __init__(self, jitter=0.0):
self.__tasks = []
self.__timer_stopped = False
self.__event = threading.Event()
self.jitter = jitter
# dummy task to avoid special casing an empty task list
self.add_task(3600.0, lambda: None, jitter=0.1)
self.__worker_thread = threading.Thread(target=self.__next_schedule)
self.__worker_thread.start()

def add_task(
self,
period,
worker,
initial_delay=None,
one_shot=False,
jitter=None,
args=(),
kwargs={},
):
if jitter is None:
jitter = self.jitter
task = Task(
period,
worker,
initial_delay=initial_delay,
one_shot=one_shot,
jitter=jitter,
args=args,
kwargs=kwargs,
)
self.__tasks.append(task)
self.__event.set()
return task

def del_task(self, task):
self.__del_task(task)
self.__event.set()

def stop(self):
self.__timer_stopped = True
self.__event.set()

def __del_task(self, task):
try:
self.__tasks.remove(task)
except Exception:
pass

def __next_schedule(self):
while not self.__timer_stopped:
next_schedule = None
for task in copy.copy(self.__tasks):
if task.expired():
self.__del_task(task)
else:
if next_schedule is None or task.next_schedule() < next_schedule:
next_task = task
next_schedule = task.next_schedule()
delay = (next_schedule - datetime.now(timezone.utc)).total_seconds()
self.__event.wait(delay)
if not self.__event.is_set():
next_task.do_work()
self.__event.clear()

0 comments on commit d97d913

Please sign in to comment.