Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion utils/update_checkout/tests/test_clone.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ def test_clone_with_additional_scheme(self):
'--config', self.additional_config_path,
'--source-root', self.source_root,
'--clone',
'--scheme', 'extra'])
'--scheme', 'extra',
'--verbose'])

# Test that we're actually checking out the 'extra' scheme based on the output
self.assertIn(b"git checkout refs/heads/main", output)
Expand Down
38 changes: 29 additions & 9 deletions utils/update_checkout/tests/test_locked_repository.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,35 @@
import unittest
from unittest.mock import patch

from update_checkout.update_checkout import _is_any_repository_locked
from update_checkout.update_checkout import UpdateArguments, _is_any_repository_locked


def _update_arguments_with_fake_path(repo_name: str, path: str) -> UpdateArguments:
return UpdateArguments(
repo_name=repo_name,
source_root=path,
config={},
scheme_name="",
scheme_map=None,
tag="",
timestamp=None,
reset_to_remote=False,
clean=False,
stash=False,
cross_repos_pr=False,
output_prefix="",
verbose=False,
)


class TestIsAnyRepositoryLocked(unittest.TestCase):
@patch("os.path.exists")
@patch("os.path.isdir")
@patch("os.listdir")
def test_repository_with_lock_file(self, mock_listdir, mock_isdir, mock_exists):
pool_args = [
("/fake_path", None, "repo1"),
("/fake_path", None, "repo2"),
_update_arguments_with_fake_path("repo1", "/fake_path"),
_update_arguments_with_fake_path("repo2", "/fake_path"),
]

def listdir_side_effect(path):
Expand All @@ -32,7 +51,7 @@ def listdir_side_effect(path):
@patch("os.listdir")
def test_repository_without_git_dir(self, mock_listdir, mock_isdir, mock_exists):
pool_args = [
("/fake_path", None, "repo1"),
_update_arguments_with_fake_path("repo1", "/fake_path"),
]

mock_exists.return_value = False
Expand All @@ -47,7 +66,7 @@ def test_repository_without_git_dir(self, mock_listdir, mock_isdir, mock_exists)
@patch("os.listdir")
def test_repository_with_git_file(self, mock_listdir, mock_isdir, mock_exists):
pool_args = [
("/fake_path", None, "repo1"),
_update_arguments_with_fake_path("repo1", "/fake_path"),
]

mock_exists.return_value = True
Expand All @@ -60,9 +79,11 @@ def test_repository_with_git_file(self, mock_listdir, mock_isdir, mock_exists):
@patch("os.path.exists")
@patch("os.path.isdir")
@patch("os.listdir")
def test_repository_with_multiple_lock_files(self, mock_listdir, mock_isdir, mock_exists):
def test_repository_with_multiple_lock_files(
self, mock_listdir, mock_isdir, mock_exists
):
pool_args = [
("/fake_path", None, "repo1"),
_update_arguments_with_fake_path("repo1", "/fake_path"),
]

mock_exists.return_value = True
Expand All @@ -77,7 +98,7 @@ def test_repository_with_multiple_lock_files(self, mock_listdir, mock_isdir, moc
@patch("os.listdir")
def test_repository_with_no_lock_files(self, mock_listdir, mock_isdir, mock_exists):
pool_args = [
("/fake_path", None, "repo1"),
_update_arguments_with_fake_path("repo1", "/fake_path"),
]

mock_exists.return_value = True
Expand All @@ -86,4 +107,3 @@ def test_repository_with_no_lock_files(self, mock_listdir, mock_isdir, mock_exis

result = _is_any_repository_locked(pool_args)
self.assertEqual(result, set())

1 change: 0 additions & 1 deletion utils/update_checkout/update_checkout/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

from .update_checkout import main

__all__ = ["main"]
142 changes: 142 additions & 0 deletions utils/update_checkout/update_checkout/parallel_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
from multiprocessing.managers import ListProxy, ValueProxy
import sys
from multiprocessing import Pool, cpu_count, Manager
import time
from typing import Callable, List, Any, Union
from threading import Lock, Thread, Event
import shutil

from .runner_arguments import RunnerArguments, AdditionalSwiftSourcesArguments


class MonitoredFunction:
def __init__(
self,
fn: Callable,
running_tasks: ListProxy,
updated_repos: ValueProxy,
lock: Lock
):
self.fn = fn
self.running_tasks = running_tasks
self.updated_repos = updated_repos
self._lock = lock

def __call__(self, *args: Union[RunnerArguments, AdditionalSwiftSourcesArguments]):
task_name = args[0].repo_name
self.running_tasks.append(task_name)
result = None
try:
result = self.fn(*args)
except Exception as e:
print(e)
finally:
self._lock.acquire()
if task_name in self.running_tasks:
self.running_tasks.remove(task_name)
self.updated_repos.set(self.updated_repos.get() + 1)
self._lock.release()
return result


class ParallelRunner:
def __init__(
self,
fn: Callable,
pool_args: List[Union[RunnerArguments, AdditionalSwiftSourcesArguments]],
n_processes: int = 0,
):
self._monitor_polling_period = 0.1
if n_processes == 0:
n_processes = cpu_count() * 2
self._terminal_width = shutil.get_terminal_size().columns
self._n_processes = n_processes
self._pool_args = pool_args
manager = Manager()
self._lock = manager.Lock()
self._running_tasks = manager.list()
self._updated_repos = manager.Value("i", 0)
self._fn = fn
self._pool = Pool(processes=self._n_processes)
self._verbose = pool_args[0].verbose
self._output_prefix = pool_args[0].output_prefix
self._nb_repos = len(pool_args)
self._stop_event = Event()
self._monitored_fn = MonitoredFunction(
self._fn, self._running_tasks, self._updated_repos, self._lock
)

def run(self) -> List[Any]:
print(
"Running ``%s`` with up to %d processes."
% (self._fn.__name__, self._n_processes)
)
if self._verbose:
results = self._pool.map_async(
func=self._fn, iterable=self._pool_args
).get(timeout=1800)
self._pool.close()
self._pool.join()
else:
monitor_thread = Thread(target=self._monitor, daemon=True)
monitor_thread.start()
results = self._pool.map_async(
func=self._monitored_fn, iterable=self._pool_args
).get(timeout=1800)
self._pool.close()
self._pool.join()
self._stop_event.set()
monitor_thread.join()
return results

def _monitor(self):
last_output = ""
while not self._stop_event.is_set():
self._lock.acquire()
current = list(self._running_tasks)
current_line = ", ".join(current)
updated_repos = self._updated_repos.get()
self._lock.release()

if current_line != last_output:
truncated = (f"{self._output_prefix} [{updated_repos}/{self._nb_repos}] ({current_line})")
if len(truncated) > self._terminal_width:
ellipsis_marker = " ..."
truncated = (
truncated[: self._terminal_width - len(ellipsis_marker)]
+ ellipsis_marker
)
sys.stdout.write("\r" + truncated.ljust(self._terminal_width))
sys.stdout.flush()
last_output = current_line

time.sleep(self._monitor_polling_period)

sys.stdout.write("\r" + " " * len(last_output) + "\r\n")
sys.stdout.flush()

@staticmethod
def check_results(results, op) -> int:
"""Function used to check the results of ParallelRunner.

NOTE: This function was originally located in the shell module of
swift_build_support and should eventually be replaced with a better
parallel implementation.
"""

fail_count = 0
if results is None:
return 0
for r in results:
if r is not None:
if fail_count == 0:
print("======%s FAILURES======" % op)
fail_count += 1
if isinstance(r, str):
print(r)
continue
print("%s failed (ret=%d): %s" % (r.repo_path, r.ret, r))
if r.stderr:
print(r.stderr.decode())
return fail_count

33 changes: 33 additions & 0 deletions utils/update_checkout/update_checkout/runner_arguments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from dataclasses import dataclass
from typing import Any, Dict

@dataclass
class RunnerArguments:
repo_name: str
scheme_name: str
output_prefix: str
verbose: bool

@dataclass
class UpdateArguments(RunnerArguments):
source_root: str
config: Dict[str, Any]
scheme_map: Any
tag: str
timestamp: Any
reset_to_remote: bool
clean: bool
stash: bool
cross_repos_pr: bool

@dataclass
class AdditionalSwiftSourcesArguments(RunnerArguments):
args: RunnerArguments
repo_info: str
repo_branch: str
remote: str
with_ssh: bool
skip_history: bool
skip_tags: bool
skip_repository_list: bool
use_submodules: bool
Loading