diff --git a/git_hg_sync/__main__.py b/git_hg_sync/__main__.py index 6cba536..272e46e 100644 --- a/git_hg_sync/__main__.py +++ b/git_hg_sync/__main__.py @@ -1,5 +1,4 @@ import argparse -import sys import logging from pathlib import Path @@ -73,7 +72,7 @@ def main() -> None: parser = get_parser() commandline.add_logging_group(parser) args = parser.parse_args() - logger = commandline.setup_logging("service", args, {"raw": sys.stdout}) + logger = commandline.setup_logging("service", args) config = Config.from_file(args.config) sentry_config = config.sentry diff --git a/git_hg_sync/application.py b/git_hg_sync/application.py index a4e8a6b..1e7b42d 100644 --- a/git_hg_sync/application.py +++ b/git_hg_sync/application.py @@ -1,3 +1,5 @@ +from dataclasses import asdict +import json import signal import sys from types import FrameType @@ -9,6 +11,7 @@ from git_hg_sync.mapping import Mapping, SyncOperation from git_hg_sync.pulse_worker import PulseWorker from git_hg_sync.repo_synchronizer import RepoSynchronizer +from git_hg_sync.retry import retry logger = get_proxy_logger(__name__) @@ -29,16 +32,18 @@ def __init__( def run(self) -> None: def signal_handler(sig: int, frame: Optional[FrameType]) -> None: if self._worker.should_stop: - logger.info("Process killed by user") + logger.error("Process killed by user") sys.exit(1) - self._worker.shoud_stop = True + self._worker.should_stop = True logger.info("Process exiting gracefully") signal.signal(signal.SIGINT, signal_handler) self._worker.run() def _handle_push_event(self, push_event: Push) -> None: - logger.info(f"Handling push event: {push_event.pushid}") + json_event = json.dumps(asdict(push_event)) + logger.info(f"Handling push event. {json_event}") + synchronizer = self._repo_synchronizers[push_event.repo_url] operations_by_destination: dict[str, list[SyncOperation]] = {} @@ -50,11 +55,29 @@ def _handle_push_event(self, push_event: Push) -> None: ).append(match.operation) for destination, operations in operations_by_destination.items(): - synchronizer.sync(destination, operations) + try: + retry( + lambda: synchronizer.sync(destination, operations), + tries=3, + action="executing sync operations", + delay=5, + ) + except Exception: + error_data = json.dumps( + { + "destination_url": destination, + "operations": [asdict(operation) for operation in operations], + } + ) + logger.error( + f"An error prevented completion of the following sync operations. {error_data}", + exc_info=True, + ) def _handle_event(self, event: Push | Tag) -> None: if event.repo_url not in self._repo_synchronizers: - logger.info("Ignoring event for untracked repository: %()s", event.repo_url) + ignored_event = json.dumps(asdict(event)) + logger.info(f"Ignoring event for untracked repository. {ignored_event}") return match event: case Push(): diff --git a/git_hg_sync/mapping.py b/git_hg_sync/mapping.py index 99e1dd6..d2b67b9 100644 --- a/git_hg_sync/mapping.py +++ b/git_hg_sync/mapping.py @@ -1,7 +1,7 @@ import re from dataclasses import dataclass from functools import cached_property -from typing import Sequence, TypeAlias +from typing import Sequence, TypeAlias, Literal import pydantic @@ -16,6 +16,8 @@ class SyncBranchOperation: # Destination (hg) destination_branch: str + type: Literal["SyncBranchOperation"] = "SyncBranchOperation" + @dataclass class SyncTagOperation: @@ -26,6 +28,8 @@ class SyncTagOperation: tag: str tags_destination_branch: str + type: Literal["SyncTagOperation"] = "SyncTagOperation" + SyncOperation: TypeAlias = SyncBranchOperation | SyncTagOperation diff --git a/git_hg_sync/pulse_worker.py b/git_hg_sync/pulse_worker.py index 0ec8ebb..badc40d 100644 --- a/git_hg_sync/pulse_worker.py +++ b/git_hg_sync/pulse_worker.py @@ -5,7 +5,7 @@ from git_hg_sync.events import Push, Tag -logger = get_proxy_logger("pluse_consumer") +logger = get_proxy_logger(__name__) class EventHandler(Protocol): diff --git a/git_hg_sync/repo_synchronizer.py b/git_hg_sync/repo_synchronizer.py index 2317b7f..f504a52 100644 --- a/git_hg_sync/repo_synchronizer.py +++ b/git_hg_sync/repo_synchronizer.py @@ -1,11 +1,15 @@ +import json +from dataclasses import asdict from pathlib import Path from git import Repo, exc from git_hg_sync.mapping import SyncOperation, SyncBranchOperation, SyncTagOperation +from git_hg_sync.retry import retry from mozlog import get_proxy_logger -logger = get_proxy_logger("sync_repo") + +logger = get_proxy_logger(__name__) class RepoSyncError(Exception): @@ -17,7 +21,6 @@ class MercurialMetadataNotFoundError(RepoSyncError): class RepoSynchronizer: - def __init__( self, clone_directory: Path, @@ -26,12 +29,21 @@ def __init__( self._clone_directory = clone_directory self._src_remote = url + def _repo_config_as_dict(self, repo: Repo): + with repo.config_reader() as reader: + return { + section: dict(reader.items(section)) for section in reader.sections() + } + def _get_clone_repo(self) -> Repo: """Get a GitPython Repo object pointing to a git clone of the source remote.""" + log_data = json.dumps({"clone_directory": str(self._clone_directory)}) if self._clone_directory.exists(): + logger.debug(f"Clone directory exists. Using it .{log_data}") repo = Repo(self._clone_directory) else: + logger.debug(f"Clone directory does not exist. Creating it. {log_data}") repo = Repo.init(self._clone_directory) # Ensure that the clone repository is well configured @@ -49,19 +61,52 @@ def _fetch_all_from_remote(self, repo: Repo, remote: str) -> None: repo.git.fetch([remote]) except exc.GitCommandError as e: # can't fetch if repo is empty - if "fatal: couldn't find remote ref HEAD" not in e.stderr: - raise e + if "fatal: couldn't find remote ref HEAD" in e.stderr: + return + raise + + def _log_data(self, **kwargs): + return json.dumps( + { + "clone_directory": str(self._clone_directory), + "source_remote": self._src_remote, + **kwargs, + } + ) def sync(self, destination_url: str, operations: list[SyncOperation]) -> None: - repo = self._get_clone_repo() destination_remote = f"hg::{destination_url}" + json_operations = self._log_data( + destination_remote=destination_remote, + operations=[asdict(op) for op in operations], + ) + logger.info(f"Synchronizing. {json_operations}") + + repo = self._get_clone_repo() + + logger.debug( + f"Git clone configuration. {self._log_data(configuration=self._repo_config_as_dict(repo))}" + ) + # Ensure we have all commits from destination repository - self._fetch_all_from_remote(repo, destination_remote) + logger.debug(f"Fetching all commits from destination. {self._log_data()}") + retry( + lambda: self._fetch_all_from_remote(repo, destination_remote), + tries=2, + action="fetching commits from destination", + ) # Get commits we want to send to destination repository commits_to_fetch = [operation.source_commit for operation in operations] - repo.git.fetch([self._src_remote, *commits_to_fetch]) + logger.debug( + f"Fetching source commits. {self._log_data(commits=commits_to_fetch)}" + ) + retry( + lambda: repo.git.fetch([self._src_remote, *commits_to_fetch]), + tries=2, + action="fetching source commits", + ) push_args = [destination_remote] @@ -79,8 +124,22 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None: # tagging can only be done on a commit that already have mercurial # metadata if branch_ops: - repo.git.execute( - ["git", "-c", "cinnabar.data=force", "push", "--dry-run", *push_args] + logger.debug( + f"Adding mercurial metadata to git commits. {self._log_data(args=push_args)}" + ) + retry( + lambda: repo.git.execute( + [ + "git", + "-c", + "cinnabar.data=force", + "push", + "--dry-run", + *push_args, + ] + ), + tries=2, + action="adding mercurial metadata to git commits", ) # Handle tag operations @@ -91,21 +150,32 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None: # Create tag branches locally tag_branches = set([op.tags_destination_branch for op in tag_ops]) for tag_branch in tag_branches: - repo.git.fetch( - [ - "-f", - destination_remote, - f"refs/heads/branches/{tag_branch}/tip:{tag_branch}", - ] + logger.debug( + f"Get tag branch from destination. {self._log_data(tag_branch=tag_branch)}." + ) + retry( + lambda: repo.git.fetch( + [ + "-f", + destination_remote, + f"refs/heads/branches/{tag_branch}/tip:{tag_branch}", + ] + ), + tries=2, + action="getting tag branch from destination", ) push_args.append(f"{tag_branch}:refs/heads/branches/{tag_branch}/tip") # Create tags for tag_operation in tag_ops: - if not self._commit_has_mercurial_metadata( + commit_has_metadata = self._commit_has_mercurial_metadata( repo, tag_operation.source_commit - ): + ) + if not commit_has_metadata: raise MercurialMetadataNotFoundError() + logger.debug( + f"Creating tag. {self._log_data(operation=asdict(tag_operation))}" + ) repo.git.cinnabar( [ "tag", @@ -117,4 +187,13 @@ def sync(self, destination_url: str, operations: list[SyncOperation]) -> None: ) # Push commits, branches and tags to destination - repo.git.push(*push_args) + logged_command = ["git", "push", *push_args] + logger.debug( + f"Pushing branches and tags to destination. {self._log_data(command=logged_command)}" + ) + retry( + lambda: repo.git.push(*push_args), + tries=2, + delay=5, + action="pushing branch and tags to destination", + ) diff --git a/git_hg_sync/retry.py b/git_hg_sync/retry.py new file mode 100644 index 0000000..766a812 --- /dev/null +++ b/git_hg_sync/retry.py @@ -0,0 +1,47 @@ +from collections.abc import Callable +import time +from typing import TypeVar + +from mozlog import get_proxy_logger + +TResult = TypeVar("TResult") + +logger = get_proxy_logger(__name__) + + +def retry( + func: Callable[[], TResult], tries: int, action: str = "", delay: float = 0.0 +) -> TResult: + """ + Retry a function on failure. + Args: + func (Callable): The function to retry. + tries (int): The number of attempts to make before failing. + action (str): A description of the action being performed for better logging context (eg. "fetching commits") + delay (float): The delay in seconds between attempts. + + Returns: + _Ret: The return value of the function if successful. + + Raises: + Exception: The last exception raised if all attempts fail. + """ + for attempt in range(1, tries + 1): + try: + return func() + except Exception as exc: + action_text = f" while {action}" if action else "" + if attempt < tries: + logger.error( + f"Attempt {attempt}/{tries} failed{action_text} with error: {type(exc).__name__}: {exc}. Retrying...", + exc_info=True, + ) + if delay > 0: + time.sleep(delay) + else: + logger.error( + f"Attempt {attempt}/{tries} failed{action_text}. Aborting." + ) + raise + + assert False, "unreachable" diff --git a/tests/conftest.py b/tests/conftest.py index cb84835..e8cf94a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,12 +1,17 @@ +import sys + import mozlog import pytest from git_hg_sync.config import PulseConfig -@pytest.fixture(autouse=True) +@pytest.fixture(autouse=True, scope="session") def mozlog_logging() -> None: logger = mozlog.structuredlog.StructuredLogger("tests") + logger.add_handler( + mozlog.handlers.StreamHandler(sys.stdout, mozlog.formatters.JSONFormatter()) + ) mozlog.structuredlog.set_default_logger(logger) diff --git a/tests/test_retry.py b/tests/test_retry.py new file mode 100644 index 0000000..4cc5880 --- /dev/null +++ b/tests/test_retry.py @@ -0,0 +1,70 @@ +import unittest.mock as mock +from typing import Literal, Never + +import pytest +from git_hg_sync.retry import retry, logger + + +def test_retry_does_not_log_error_on_immediate_success(): + def true_on_first_call(): + return True + + with mock.patch.object(logger, "error", wraps=logger.error) as error_spy: + assert retry(true_on_first_call, tries=2) is True + + error_spy.assert_not_called() + + +def test_retry_logs_error_on_first_failing_try(): + count = 0 + + def true_on_second_call() -> Literal[True]: + nonlocal count + if count == 0: + count += 1 + raise Exception("Error on first call") + return True + + with mock.patch.object(logger, "error", wraps=logger.error) as error_spy: + assert retry(true_on_second_call, tries=2) is True + assert "Retrying" in error_spy.call_args.args[0] + + +def test_retry_abort_on_failing_last_try(): + def always_raising() -> Never: + raise Exception("Error on first call") + + with mock.patch.object(logger, "error", wraps=logger.error) as error_spy: + with pytest.raises(Exception): + retry(always_raising, tries=2) + + assert "Aborting" in error_spy.call_args.args[0] + + +def test_retry_raises_inner_exception_on_last_failure(): + count = 0 + + class MyCustomError(Exception): + pass + + def always_raising(): + nonlocal count + count += 1 + raise MyCustomError(f"Called {count} times") + + with pytest.raises(MyCustomError) as exc_info: + retry(always_raising, tries=3) + + assert exc_info.value.args[0] == "Called 3 times" + + +def test_retry_shows_action_on_failed_attempt(caplog): + + def always_raising(): + raise ValueError("Commits too old") + + with mock.patch.object(logger, "error", wraps=logger.error) as error_spy: + with pytest.raises(ValueError): + retry(always_raising, tries=2, action="fetching old commits") + + assert "while fetching old commits" in error_spy.call_args.args[0]