Skip to content

Commit

Permalink
Merge pull request #131 from mkoura/parallel_chores
Browse files Browse the repository at this point in the history
Parallel execution cleanups and refactoring
  • Loading branch information
mkoura committed Sep 21, 2020
2 parents e4999de + d90d3f4 commit 3c3436d
Showing 1 changed file with 27 additions and 10 deletions.
37 changes: 27 additions & 10 deletions cardano_node_tests/utils/parallel_run.py
Expand Up @@ -21,8 +21,8 @@
LOCK_LOG_FILE = ".lock.log"
SESSION_RUNNING_FILE = ".session_running"
TEST_SINGLETON_FILE = ".test_singleton"
RESOURCE_LOCKED = ".resource_locked"
RESOURCE_IN_USE = ".resource_in_use"
RESOURCE_LOCKED_GLOB = ".resource_locked"
RESOURCE_IN_USE_GLOB = ".resource_in_use"
RESTART_NEEDED_GLOB = ".needs_restart"
RESTART_IN_PROGRESS_GLOB = ".restart_in_progress"
RESTART_AFTER_MARK_GLOB = ".restart_after_mark"
Expand Down Expand Up @@ -74,12 +74,14 @@ def cache(self) -> ClusterManagerCache:
return self.get_cache()

def _log(self, msg: str) -> None:
"""Log message - needs to be called while having lock."""
if not self.lock_log:
return
with open(self.lock_log, "a") as logfile:
logfile.write(f"{datetime.datetime.now()} on {self.worker_id}: {msg}\n")

def _locked_log(self, msg: str) -> None:
"""Log message - will obtain lock first."""
if not self.lock_log:
return
with helpers.FileLockIfXdist(self.cluster_lock):
Expand Down Expand Up @@ -187,14 +189,14 @@ def on_test_stop(self) -> None:

# remove resource locking files created by the worker
resource_locking_files = list(
self.lock_dir.glob(f"{RESOURCE_LOCKED}_*_{self.worker_id}")
self.lock_dir.glob(f"{RESOURCE_LOCKED_GLOB}_*_{self.worker_id}")
)
for f in resource_locking_files:
os.remove(f)

# remove "resource in use" files created by the worker
resource_in_use_files = list(
self.lock_dir.glob(f"{RESOURCE_IN_USE}_*_{self.worker_id}")
self.lock_dir.glob(f"{RESOURCE_IN_USE_GLOB}_*_{self.worker_id}")
)
for f in resource_in_use_files:
os.remove(f)
Expand All @@ -216,6 +218,11 @@ def get( # noqa: C901
use_resources: UnpackableSequence = (),
cleanup: bool = False,
) -> clusterlib.ClusterLib:
"""Return the `clusterlib.ClusterLib` instance once we can start the test.
It checks current conditions and waits if the conditions don't allow to start the test
right away.
"""
# pylint: disable=too-many-statements,too-many-branches
restart_here = False
mark_start_here = False
Expand All @@ -225,6 +232,7 @@ def get( # noqa: C901
test_running_file = self.lock_dir / f"{TEST_RUNNING_GLOB}_{self.worker_id}"
cluster_obj = self.cache.cluster_obj

# iterate until it is possible to start the test
while True:
if not first_iteration:
helpers.xdist_sleep(random.random() * sleep_delay)
Expand Down Expand Up @@ -352,11 +360,11 @@ def get( # noqa: C901
if lock_resources:
res_usable = False
for res in lock_resources:
res_locked = list(self.lock_dir.glob(f"{RESOURCE_LOCKED}_{res}_*"))
res_locked = list(self.lock_dir.glob(f"{RESOURCE_LOCKED_GLOB}_{res}_*"))
if res_locked:
self._log(f"resource '{res}' locked, cannot start")
break
res_used = list(self.lock_dir.glob(f"{RESOURCE_IN_USE}_{res}_*"))
res_used = list(self.lock_dir.glob(f"{RESOURCE_IN_USE_GLOB}_{res}_*"))
if res_used:
self._log(f"resource '{res}' in use, cannot lock and start")
break
Expand All @@ -372,19 +380,23 @@ def get( # noqa: C901
"starting and locking"
)

# create status file for each locked resource
_ = [
open(self.lock_dir / f"{RESOURCE_LOCKED}_{r}_{self.worker_id}", "a").close()
open(
self.lock_dir / f"{RESOURCE_LOCKED_GLOB}_{r}_{self.worker_id}", "a"
).close()
for r in lock_resources
]

# filter out `lock_resources` from the list of `use_resources`
use_resources = list(set(use_resources) - set(lock_resources))
if use_resources and lock_resources:
use_resources = list(set(use_resources) - set(lock_resources))

# this test wants to use some resources, check if these are not locked
if use_resources:
res_locked = []
for res in use_resources:
res_locked = list(self.lock_dir.glob(f"{RESOURCE_LOCKED}_{res}_*"))
res_locked = list(self.lock_dir.glob(f"{RESOURCE_LOCKED_GLOB}_{res}_*"))
if res_locked:
self._log(f"resource '{res}' locked, cannot start")
break
Expand All @@ -393,8 +405,12 @@ def get( # noqa: C901
no_tests_iteration = 0
continue
self._log("none of the resources in '{use_resources}' locked, starting")

# create status file for each in-use resource
_ = [
open(self.lock_dir / f"{RESOURCE_IN_USE}_{r}_{self.worker_id}", "a").close()
open(
self.lock_dir / f"{RESOURCE_IN_USE_GLOB}_{r}_{self.worker_id}", "a"
).close()
for r in use_resources
]

Expand Down Expand Up @@ -430,6 +446,7 @@ def get( # noqa: C901
if not cluster_obj:
cluster_obj = clusterlib.ClusterLib(state_dir)

# `cluster_obj` is ready, we can start the test
break

return cluster_obj

0 comments on commit 3c3436d

Please sign in to comment.