diff --git a/.github/actions/setup-repo-env/action.yml b/.github/actions/setup-repo-env/action.yml new file mode 100644 index 0000000..73eaf96 --- /dev/null +++ b/.github/actions/setup-repo-env/action.yml @@ -0,0 +1,15 @@ +name: 'Set up Repo Environment' +description: 'Sets up Python and its deps' +runs: + using: "composite" + steps: + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.11' + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + shell: bash + working-directory: ./email-rotation diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml new file mode 100644 index 0000000..d7fa5e3 --- /dev/null +++ b/.github/workflows/run-tests.yml @@ -0,0 +1,46 @@ +name: Run Tests + +on: + pull_request: + paths: + - '**/*.py' + - '**/*.yaml' + - '.github/**' + +jobs: + run_tests: + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up repo environment + uses: ./.github/actions/setup-repo-env + + - name: Run email script tests + run: python3 email_about_issues_test.py + working-directory: ./email-rotation + + - name: Run rotation extension tests + run: python3 extend_rotation_test.py + working-directory: ./email-rotation + + - name: Run yaml verification test + run: python3 verify_yaml_files_test.py + working-directory: ./email-rotation + + - name: Run pyright + run: pyright *.py + working-directory: ./email-rotation + + - name: Run mypy + run: mypy . --explicit-package-bases --strict + working-directory: ./email-rotation + + - name: Run isort + run: isort . --check-only + working-directory: ./email-rotation + + - name: Run black + run: black . --check + working-directory: ./email-rotation diff --git a/email-rotation/.gitignore b/email-rotation/.gitignore new file mode 100644 index 0000000..b2ccee3 --- /dev/null +++ b/email-rotation/.gitignore @@ -0,0 +1,4 @@ +**/__pycache__ +github-token +secrets +state.json diff --git a/email-rotation/Dockerfile b/email-rotation/Dockerfile new file mode 100644 index 0000000..0715fb0 --- /dev/null +++ b/email-rotation/Dockerfile @@ -0,0 +1,28 @@ +FROM debian:stable-slim +LABEL maintainer="George Burgess " + +# Grab the packages +RUN apt-get update +RUN apt-get install -y python3 python3-requests python3-yaml + +ENV LANG=C.UTF-8 + +# Rootn't +RUN \ + useradd email-bot && \ + mkdir /home/email-bot && \ + chown email-bot:email-bot /home/email-bot + +USER email-bot +WORKDIR /home/email-bot + +# Example build'n'run invocation: +# docker build -t llvm-security-group-emails . && docker run --rm -it -v $PWD:/home/email-bot/llvm-security-repo/email-rotation llvm-security-group-emails +# +# Example `secrets` file: +# export EMAIL_RECIPIENT=receiver@redacted.com +# export GITHUB_REPOSITORY=llvm/llvm-security-repo +# export GITHUB_TOKEN=[redacted] +# export GMAIL_PASSWORD=[redacted] +# export GMAIL_USERNAME=sender@redacted.com +CMD ["bash", "-c", "cd llvm-security-repo && . secrets && exec ./email_about_issues.py --state-file=state.json --debug"] diff --git a/email-rotation/README.md b/email-rotation/README.md new file mode 100644 index 0000000..b097fc5 --- /dev/null +++ b/email-rotation/README.md @@ -0,0 +1,20 @@ +This directory implements an oncall rotation for security issues, essentially. + +Relevant files (ignoring tests) are: + +- `rotation-members.yaml`, which is the set of all members currently on the + security group who are eligible for this rotation. + +- `rotation.yaml`, which specifies the rotation. This is generally extended by + `rotation-members.yaml`, though can be edited by humans (e.g., to remove + people from rotations, swap with others, etc.) + +- `email_about_issues.py` actually emails about the issues; it's run on + a machine through a Docker image produced by the `Dockerfile`. + The `docker run` invocation looks like: + ``` + docker run --rm -it -v $PWD:/home/email-bot/llvm-security-repo llvm-security-group-emails + ``` +- `extend_rotation.py` extends the `rotation.yaml` file automatically. This + script only appends to the rotation, and takes into account who's already been + in the rotation recently when creating new rotation instances. diff --git a/email-rotation/__init__.py b/email-rotation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/email-rotation/email_about_issues.py b/email-rotation/email_about_issues.py new file mode 100755 index 0000000..d560ebe --- /dev/null +++ b/email-rotation/email_about_issues.py @@ -0,0 +1,550 @@ +#!/usr/bin/env python3 + +""" +Emails the LLVM Security Team about any new draft security advisories, +mentioning folks who are oncall. +""" + +import argparse +import dataclasses +import datetime +import json +import logging +import os +import smtplib +import textwrap +import time +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from pathlib import Path +from typing import Any + +import requests + +import rotations + +GhsaId = str + +# How many seconds before the end of _all_ scheduled rotations to wait before +# emailing folks to extend the rotation list. +SECONDS_BEFORE_ROTATION_LIST_END_TO_NAG = 14 * 24 * 60 * 60 +SECONDS_BETWEEN_ROTATION_REFRESH_EMAILS = 24 * 60 * 60 + + +@dataclasses.dataclass(frozen=True) +class EmailCreds: + username: str + password: str + + +@dataclasses.dataclass(frozen=True, eq=True) +class ScriptState: + # Advisories seen on the last run of this script, so we don't send duplicate emails. + seen_advisories: list[GhsaId] + # If the rotation end is coming near, this tracks the last time we alerted about it. + # Don't want to alert more than once per day. + last_alert_about_rotation: float | None = None + + @classmethod + def from_json(cls, json_data: dict[str, Any]) -> "ScriptState": + return cls( + seen_advisories=json_data.get("seen_advisories", []), + last_alert_about_rotation=json_data.get("last_alert_about_rotation"), + ) + + def to_json(self) -> dict[str, Any]: + return dataclasses.asdict(self) + + @classmethod + def load_from_file(cls, state_file: Path) -> "ScriptState": + try: + with state_file.open(encoding="utf-8") as f: + return cls.from_json(json.load(f)) + except FileNotFoundError: + return cls(seen_advisories=[]) + + def save_to_file(self, state_file: Path) -> None: + tmp_file = state_file.with_suffix(".tmp") + with tmp_file.open("w", encoding="utf-8") as f: + json.dump(self.to_json(), f, indent=2, ensure_ascii=False) + tmp_file.rename(state_file) + + +@dataclasses.dataclass(frozen=True) +class ScriptEmailInfo: + creds: EmailCreds + recipient: str + + +# Flags passed to the script, +@dataclasses.dataclass(frozen=True) +class ScriptInvocation: + repo_name: str + github_token: str + now_timestamp: float + email_info: ScriptEmailInfo | None + + +@dataclasses.dataclass(frozen=True) +class SecurityAdvisory: + id: GhsaId + title: str + collaborators: list[str] + + +def extract_next_page_from_header(resp: requests.Response) -> str | None: + """Extracts the next page URL from the Link header of the response.""" + link_header = resp.headers.get("Link") + if not link_header: + return None + + for link in link_header.split(","): + split_link = link.split(";", 1) + if len(split_link) < 2: + logging.warning("Malformed Link: %s", link) + continue + url, meta = split_link + if 'rel="next"' in meta: + return url.strip("<> ") + return None + + +def requests_get_with_retry(url: str, headers: dict[str, Any]) -> requests.Response: + i = 0 + max_retries = 3 + while True: + resp = requests.get(url, headers=headers) + if resp.ok: + return resp + logging.warning("GETing %s failed: %d %s", url, resp.status_code, resp.text) + if i >= max_retries: + resp.raise_for_status() + i += 1 + time.sleep(i * 60) + + +def fetch_all_security_advisories_of_type( + repo_name: str, + github_token: str, + state: str, +) -> list[dict[str, Any]]: + """Iterates all security advisories for the given repo.""" + # Uses the API here: + # https://docs.github.com/en/rest/security-advisories/repository-advisories?apiVersion=2022-11-28#list-repository-security-advisories + url: str | None = ( + f"https://api.github.com/repos/{repo_name}/security-advisories?state={state}" + ) + request_headers = { + "Accept": "application/vnd.github+json", + "Authorization": f"Bearer {github_token}", + "GitHub-Api-Version": "2022-11-28", + } + results = [] + while url: + resp = requests_get_with_retry( + url, + headers=request_headers, + ) + + results += resp.json() + url = extract_next_page_from_header(resp) + + return results + + +def list_unpublished_security_advisories( + repo_name: str, github_token: str +) -> list[SecurityAdvisory]: + results = [] + total_security_advisories = 0 + advisories = fetch_all_security_advisories_of_type(repo_name, github_token, "draft") + advisories += fetch_all_security_advisories_of_type( + repo_name, github_token, "triage" + ) + for advisory in advisories: + logging.debug("Examining advisory %s", advisory) + + total_security_advisories += 1 + state = advisory["state"] + # This should be guaranteed by the + # 'fetch_all_security_advisories_of_type' function. + assert state in ("draft", "triage"), state + + collaborators = [x["login"] for x in advisory.get("collaborating_users", ())] + results.append( + SecurityAdvisory( + id=advisory["ghsa_id"], + title=advisory["summary"], + collaborators=collaborators, + ) + ) + + results.sort(key=lambda x: x.id) + logging.info("Total security advisories fetched: %d", total_security_advisories) + logging.info("%d draft security advisories found.", len(results)) + return results + + +@dataclasses.dataclass(frozen=True) +class RotationState: + all_members: set[str] + current_members: set[str] + final_rotation_start: float + + +def load_rotation_state(now_timestamp: float) -> RotationState | None: + rotation_members_file = rotations.RotationMembersFile.parse_file( + rotations.ROTATION_MEMBERS_FILE, + ) + rotation_file = rotations.RotationFile.parse_file( + rotations.ROTATION_FILE, + ) + + current_rotation = None + # Pick the most recent rotation with a timstamp <= now + for rotation in rotation_file.rotations: + if rotation.start_time.timestamp() > now_timestamp: + break + current_rotation = rotation + + if not current_rotation: + return None + + return RotationState( + all_members=set(rotation_members_file.members), + current_members=set(current_rotation.members), + final_rotation_start=rotation_file.rotations[-1].start_time.timestamp(), + ) + + +def try_email_llvm_security_team( + email_creds: EmailCreds, + email_recipient: str, + subject: str, + body: str, +) -> bool: + """Returns True if the email was sent successfully.""" + try: + # Create a multipart message + message = MIMEMultipart() + message["From"] = email_creds.username + message["To"] = email_recipient + message["Subject"] = subject + # Add body to email + message.attach(MIMEText(body, "plain")) + + with smtplib.SMTP("smtp.gmail.com", 587) as server: + server.ehlo() + server.starttls() + server.ehlo() + + server.login(email_creds.username, email_creds.password) + + server.sendmail(email_creds.username, email_recipient, message.as_string()) + + logging.info("Email sent successfully to %s", email_recipient) + return True + except Exception as e: + logging.exception("Failed to send email with subject '%s'", subject) + return False + + +def email_about_advisory( + email_creds: EmailCreds, + email_recipient: str, + repo_name: str, + advisory: SecurityAdvisory, + oncall_members: list[str], +) -> bool: + """Sends an email; returns True if successful.""" + return try_email_llvm_security_team( + email_creds=email_creds, + email_recipient=email_recipient, + subject=f"New security advisory for {repo_name}: {advisory.title}", + body=textwrap.dedent( + f"""\ + A new security advisory has been created for {repo_name}. + + Please take action within two days. The security group members + currently on the rotation are: {', '.join(oncall_members)}. + + Advisory URL: https://github.com/{repo_name}/security/advisories/{advisory.id} + """ + ), + ) + + +def maybe_email_about_rotation_end( + invocation: ScriptInvocation, + state: ScriptState, + rotation_state: RotationState | None, +) -> ScriptState: + if rotation_state: + time_to_last_start = ( + rotation_state.final_rotation_start - invocation.now_timestamp + ) + if time_to_last_start > SECONDS_BEFORE_ROTATION_LIST_END_TO_NAG: + logging.info( + "Not emailing about rotation end: %d seconds left.", + time_to_last_start, + ) + return state + + if state.last_alert_about_rotation: + time_since_last_alert = ( + invocation.now_timestamp - state.last_alert_about_rotation + ) + if time_since_last_alert < SECONDS_BETWEEN_ROTATION_REFRESH_EMAILS: + logging.info( + "Not emailing about rotation end: already alerted within the last %d seconds.", + SECONDS_BETWEEN_ROTATION_REFRESH_EMAILS, + ) + return state + + new_state = dataclasses.replace( + state, + last_alert_about_rotation=invocation.now_timestamp, + ) + + email_info = invocation.email_info + if not email_info: + logging.info( + "dry-run: would send email about rotation end for %s", + invocation.repo_name, + ) + return new_state + + if rotation_state: + pretty_last_rotation_start = time.strftime( + "%Y-%m-%d %H:%M:%S %Z", + time.localtime(rotation_state.final_rotation_start), + ) + issue = f"the last rotation starts at {pretty_last_rotation_start}" + else: + issue = "no rotation is currently scheduled" + + email_ok = try_email_llvm_security_team( + email_creds=email_info.creds, + email_recipient=email_info.recipient, + subject=f"Rotation schedule running short for {invocation.repo_name}", + body=textwrap.dedent( + f"""\ + The rotation schedule is running short; {issue}. + + Please extend it by running `./extend_rotation.py` in the + {invocation.repo_name} repo and committing the results. + + This nag email will be sent daily until the rotation is extended. + + Thank you! + """ + ), + ) + if not email_ok: + return state + + return new_state + + +def run_script( + invocation: ScriptInvocation, + script_state: ScriptState, + rotation_state: RotationState, +) -> ScriptState: + draft_security_advisories = list_unpublished_security_advisories( + invocation.repo_name, + invocation.github_token, + ) + + failed_alerts_for_advisories = set() + current_oncall = sorted(rotation_state.current_members) + for advisory in draft_security_advisories: + has_rotation_member = any( + member in rotation_state.current_members + for member in advisory.collaborators + ) + + if advisory.id in script_state.seen_advisories: + logging.info( + "Skipping advisory %s: already seen/alerted.", + advisory.id, + ) + continue + + if has_rotation_member: + logging.info( + "Skipping advisory %s: already has rotation member(s) as collaborator.", + advisory.id, + ) + continue + + email_info = invocation.email_info + if not email_info: + logging.info( + "dry-run: would send email about advisory %s, mentioning %s", + advisory.id, + current_oncall, + ) + continue + + email_success = email_about_advisory( + email_creds=email_info.creds, + email_recipient=email_info.recipient, + repo_name=invocation.repo_name, + advisory=advisory, + oncall_members=current_oncall, + ) + + if not email_success: + failed_alerts_for_advisories.add(advisory.id) + + return dataclasses.replace( + script_state, + seen_advisories=sorted( + x.id + # You can't unpublish advisories, so no need to keep ones + # exclusively in the old state around. + for x in draft_security_advisories + # Pretend we didn't see advisories we failed to alert about, so we + # try again next time. + if x.id not in failed_alerts_for_advisories + ), + ) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Respond to new issues in a GitHub repository." + ) + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug logging.", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Don't actually modify issues.", + ) + parser.add_argument( + "--state-file", + type=Path, + required=True, + help="State file used for tracking issues we've seen.", + ) + parser.add_argument( + "--github-repo", + default=os.getenv("GITHUB_REPOSITORY"), + help="GitHub repository in the format 'owner/repo'. Defaults to GITHUB_REPOSITORY env var.", + ) + parser.add_argument( + "--github-token", + default=os.getenv("GITHUB_TOKEN"), + help="GitHub API token. Defaults to GITHUB_TOKEN env var.", + ) + parser.add_argument( + "--email-username", + default=os.getenv("GMAIL_USERNAME"), + help="Email (Gmail) username. Defaults to GMAIL_USER env var.", + ) + parser.add_argument( + "--email-password", + default=os.getenv("GMAIL_PASSWORD"), + help="Email (Gmail) password. Defaults to GMAIL_PASSWORD env var.", + ) + parser.add_argument( + "--email-recipient", + default=os.getenv("EMAIL_RECIPIENT"), + help="Recipient email address. Defaults to EMAIL_RECIPIENT env var.", + ) + + args = parser.parse_args() + + if not args.github_repo: + parser.error( + "GitHub repository must be specified either via --github-repo or GITHUB_REPOSITORY env var." + ) + if not args.github_token: + parser.error( + "GitHub token must be specified either via --github-token or GITHUB_TOKEN env var." + ) + + if not args.dry_run: + if not args.email_username: + parser.error( + "Email username must be specified either via --email-username or GMAIL_USER env var." + ) + if not args.email_password: + parser.error( + "Email password must be specified either via --email-password or GMAIL_PASSWORD env var." + ) + if not args.email_recipient: + parser.error( + "Recipient email must be specified either via --email-recipient or EMAIL_RECIPIENT env var." + ) + + return args + + +def main() -> None: + opts = parse_args() + + logging.basicConfig( + format=">> %(asctime)s: %(levelname)s: %(filename)s:%(lineno)d: %(message)s", + level=logging.DEBUG if opts.debug else logging.INFO, + ) + + now = time.time() + state_file: Path = opts.state_file + dry_run: bool = opts.dry_run + email_info = None + if not dry_run: + email_info = ScriptEmailInfo( + creds=EmailCreds( + username=opts.email_username, password=opts.email_password + ), + recipient=opts.email_recipient, + ) + + script_state = ScriptState.load_from_file(state_file) + rotation_state = load_rotation_state(now) + script_invocation = ScriptInvocation( + repo_name=opts.github_repo, + github_token=opts.github_token, + now_timestamp=now, + email_info=email_info, + ) + + if rotation_state: + new_script_state = run_script( + invocation=script_invocation, + script_state=script_state, + rotation_state=rotation_state, + ) + else: + new_script_state = script_state + logging.warning( + "No rotation state found; not sending any emails about security advisories." + ) + + if rotation_state: + new_script_state = maybe_email_about_rotation_end( + invocation=script_invocation, + state=new_script_state, + rotation_state=rotation_state, + ) + + if new_script_state == script_state: + return + + if dry_run: + write_to_file = state_file.with_suffix(".dry-run") + logging.info("dry-run: writing new state to file %s", write_to_file) + else: + logging.debug("Writing new state file...") + write_to_file = state_file + new_script_state.save_to_file(write_to_file) + + +if __name__ == "__main__": + main() diff --git a/email-rotation/email_about_issues_test.py b/email-rotation/email_about_issues_test.py new file mode 100755 index 0000000..fe9e6ca --- /dev/null +++ b/email-rotation/email_about_issues_test.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python3 + +import datetime +import unittest +from unittest import mock + +import email_about_issues as email +import rotations + +# Test constant for email configuration +TEST_EMAIL_INFO = email.ScriptEmailInfo( + creds=email.EmailCreds( + username="test_user@example.com", + password="fake_password123", + ), + recipient="foo@bar.com", +) + + +class TestEmailAboutIssues(unittest.TestCase): + @mock.patch.object(rotations.RotationMembersFile, "parse_file") + @mock.patch.object(rotations.RotationFile, "parse_file") + def test_load_rotation_state_returns_expected( + self, mock_rotationfile_parse: mock.Mock, mock_membersfile_parse: mock.Mock + ) -> None: + mock_membersfile_parse.return_value = rotations.RotationMembersFile( + members=["alice", "bob", "carol"] + ) + + mock_rotationfile_parse.return_value = rotations.RotationFile( + rotations=[ + rotations.Rotation( + start_time=datetime.datetime.fromtimestamp(1000.0), + members=["alice"], + ), + rotations.Rotation( + start_time=datetime.datetime.fromtimestamp(1500.0), + members=["bob"], + ), + rotations.Rotation( + start_time=datetime.datetime.fromtimestamp(1800.0), + members=["carol"], + ), + ] + ) + + result = email.load_rotation_state(1600) + assert result # use `assert` so pyright doesn't complain below + self.assertEqual(result.current_members, {"bob"}) + self.assertEqual(result.all_members, {"alice", "bob", "carol"}) + self.assertEqual(result.final_rotation_start, 1800.0) + + @mock.patch.object(email, "try_email_llvm_security_team") + def test_maybe_email_about_rotation_end_sends_email( + self, mock_send_email: mock.Mock + ) -> None: + invocation = email.ScriptInvocation( + repo_name="repo", + github_token="token", + now_timestamp=2000.0, + email_info=TEST_EMAIL_INFO, + ) + state = email.ScriptState(seen_advisories=[], last_alert_about_rotation=None) + rotation_state = email.RotationState( + all_members={"a", "b"}, + current_members={"a"}, + final_rotation_start=2000.0 + - email.SECONDS_BEFORE_ROTATION_LIST_END_TO_NAG + + 1, + ) + mock_send_email.return_value = True + new_state = email.maybe_email_about_rotation_end( + invocation, state, rotation_state + ) + mock_send_email.assert_called_once() + self.assertNotEqual(new_state, state) + self.assertEqual(new_state.last_alert_about_rotation, invocation.now_timestamp) + + @mock.patch.object(email, "try_email_llvm_security_team") + def test_maybe_email_about_rotation_end_no_email_if_too_early( + self, mock_send_email: mock.Mock + ) -> None: + invocation = email.ScriptInvocation( + repo_name="repo", + github_token="token", + now_timestamp=2000.0, + email_info=TEST_EMAIL_INFO, + ) + state = email.ScriptState(seen_advisories=[], last_alert_about_rotation=None) + rotation_state = email.RotationState( + all_members={"a", "b"}, + current_members={"a"}, + final_rotation_start=invocation.now_timestamp + + email.SECONDS_BEFORE_ROTATION_LIST_END_TO_NAG + + 1, + ) + mock_send_email.return_value = True + new_state = email.maybe_email_about_rotation_end( + invocation, state, rotation_state + ) + mock_send_email.assert_not_called() + self.assertEqual(new_state, state) + + @mock.patch.object(email, "try_email_llvm_security_team") + def test_maybe_email_about_rotation_end_respects_alert_interval( + self, mock_send_email: mock.Mock + ) -> None: + invocation = email.ScriptInvocation( + repo_name="repo", + github_token="token", + now_timestamp=2000.0, + email_info=TEST_EMAIL_INFO, + ) + state = email.ScriptState( + seen_advisories=[], last_alert_about_rotation=invocation.now_timestamp + ) + rotation_state = email.RotationState( + all_members={"a", "b"}, + current_members={"a"}, + final_rotation_start=2000.0 - email.SECONDS_BEFORE_ROTATION_LIST_END_TO_NAG, + ) + mock_send_email.return_value = True + new_state = email.maybe_email_about_rotation_end( + invocation, state, rotation_state + ) + mock_send_email.assert_not_called() + self.assertEqual(new_state, state) + + def test_scriptstate_json_roundtrip(self) -> None: + original = email.ScriptState( + seen_advisories=["a", "b"], last_alert_about_rotation=123.45 + ) + as_json = original.to_json() + reconstructed = email.ScriptState.from_json(as_json) + self.assertEqual(original, reconstructed) + + +if __name__ == "__main__": + unittest.main() diff --git a/email-rotation/extend_rotation.py b/email-rotation/extend_rotation.py new file mode 100755 index 0000000..70ec491 --- /dev/null +++ b/email-rotation/extend_rotation.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 + +import argparse +import collections +import dataclasses +import datetime +import logging +from pathlib import Path +from typing import Iterator + +import rotations + +MIN_TIME_UTC = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) + + +def find_most_recent_service_times( + rotations: list[rotations.Rotation], + members: list[str], +) -> dict[str, datetime.datetime]: + """Returns a dictionary mapping member names to their last service date.""" + # Give new people the minimum possible service time, so they're + # scheduled promptly. + last_service_times = {x: MIN_TIME_UTC for x in members} + for rotation in rotations: + service_time = rotation.start_time + for member in rotation.members: + # `rotations` is always in the order of oldest to newest, so we can + # just overwrite old values here. + last_service_times[member] = service_time + + return last_service_times + + +def generate_additional_rotations( + prior_rotations: list[rotations.Rotation], + members: list[str], + rotation_length_weeks: int, + people_per_rotation: int, + now: datetime.datetime, +) -> Iterator[rotations.Rotation]: + """Generates new rotations based on the given parameters.""" + # Super simple algorithm: the least recent person to serve on the rotation + # gets added first. If there's a tie, randomly choose between them. + # + # In the face of swaps, this will not necessarily be completely fair, but + # rotations are very infrequent anyway. If it's a problem, we can figure out + # something better. + last_service_times = find_most_recent_service_times(prior_rotations, members) + least_recent_assignees = collections.deque( + name + for name, _ in sorted( + last_service_times.items(), + key=lambda item: item[1], + ) + ) + + rotation_length = datetime.timedelta(weeks=rotation_length_weeks) + if prior_rotations: + next_rotation_start_time = prior_rotations[-1].start_time + rotation_length + else: + # Choose the most recent Sunday, at midnight UTC, as the start time for the first rotation. + midnight = now.replace(hour=0, minute=0, second=0, microsecond=0) + next_rotation_start_time = midnight - datetime.timedelta( + days=midnight.weekday() + 1 + ) + + while True: + people_on_this_rotation = [] + for _ in range(people_per_rotation): + people_on_this_rotation.append(least_recent_assignees.popleft()) + + yield rotations.Rotation( + start_time=next_rotation_start_time, + members=people_on_this_rotation, + ) + next_rotation_start_time += rotation_length + least_recent_assignees.extend(people_on_this_rotation) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Extend a rotation with additional members." + ) + parser.add_argument( + "--rotation-file", + type=Path, + default=rotations.ROTATION_FILE, + help="Path to the rotation YAML file.", + ) + parser.add_argument( + "--rotation-members-file", + type=Path, + default=rotations.ROTATION_MEMBERS_FILE, + help="Path to the rotation members YAML file.", + ) + parser.add_argument( + "--rotation-length-weeks", + type=int, + default=2, + help="Length of each rotation in weeks. Default is %(default)d.", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help=""" + If set, this script will print the new rotation file to stdout, rather + than overwriting the existing file. + """, + ) + parser.add_argument( + "--num-rotations", + type=int, + default=5, + help="Number of rotations to add. Default is %(default)d.", + ) + parser.add_argument( + "--people-per-rotation", + type=int, + default=2, + help="Number of people per rotation. Default is %(default)d.", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug logging.", + ) + return parser.parse_args() + + +def main() -> None: + opts = parse_args() + + logging.basicConfig( + format=">> %(asctime)s: %(levelname)s: %(filename)s:%(lineno)d: %(message)s", + level=logging.DEBUG if opts.debug else logging.INFO, + ) + + dry_run: bool = opts.dry_run + num_rotations: int = opts.num_rotations + people_per_rotation: int = opts.people_per_rotation + rotation_file_path: Path = opts.rotation_file + rotation_length_weeks: int = opts.rotation_length_weeks + rotation_members_file_path: Path = opts.rotation_members_file + + members_file = rotations.RotationMembersFile.parse_file(rotation_members_file_path) + current_rotation = rotations.RotationFile.parse_file(rotation_file_path) + + rotation_generator = generate_additional_rotations( + current_rotation.rotations, + members_file.members, + people_per_rotation, + rotation_length_weeks, + now=datetime.datetime.now(tz=datetime.timezone.utc), + ) + + extra_rotations = [x for x, _ in zip(rotation_generator, range(num_rotations))] + new_rotations = current_rotation.rotations + extra_rotations + new_rotations_file = dataclasses.replace(current_rotation, rotations=new_rotations) + + if dry_run: + logging.info("Dry-run mode enabled. Not writing to file; would've written:") + print(new_rotations_file.to_yaml_str()) + return + + logging.info("Writing new rotations to file: %s", rotation_file_path) + # On UNIX, ensure atomic writes. In any case, if to_yaml_str() raises, this + # won't wipe out the old file. + tmp_rotation_file_path = rotation_file_path.with_suffix(".tmp") + with tmp_rotation_file_path.open("w") as f: + f.write(new_rotations_file.to_yaml_str()) + tmp_rotation_file_path.rename(rotation_file_path) + logging.info("New rotations written successfully.") + + +if __name__ == "__main__": + main() diff --git a/email-rotation/extend_rotation_test.py b/email-rotation/extend_rotation_test.py new file mode 100755 index 0000000..1bd9858 --- /dev/null +++ b/email-rotation/extend_rotation_test.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 + +import datetime +import unittest +from unittest.mock import patch + +from extend_rotation import ( + MIN_TIME_UTC, + find_most_recent_service_times, + generate_additional_rotations, +) +from rotations import Rotation + +MOCKED_NOW_UTC = datetime.datetime(2025, 5, 30, 10, 0, 0, tzinfo=datetime.timezone.utc) +EXPECTED_FIRST_ROTATION_START_NO_PRIORS = datetime.datetime( + 2025, 5, 25, 0, 0, 0, tzinfo=datetime.timezone.utc +) + + +class TestFindMostRecentServiceTimes(unittest.TestCase): + def test_no_rotations(self) -> None: + members = ["alice", "bob"] + result = find_most_recent_service_times([], members) + expected = { + "alice": MIN_TIME_UTC, + "bob": MIN_TIME_UTC, + } + self.assertEqual(result, expected) + + def test_basic_rotations(self) -> None: + members = ["alice", "bob", "charlie"] + time1 = datetime.datetime(2023, 1, 1, tzinfo=datetime.timezone.utc) + time2 = datetime.datetime(2023, 1, 15, tzinfo=datetime.timezone.utc) + + prior_rotations = [ + Rotation(start_time=time1, members=["alice", "bob"]), + Rotation(start_time=time2, members=["charlie", "alice"]), + ] + result = find_most_recent_service_times(prior_rotations, members) + expected = { + "alice": time2, + "bob": time1, + "charlie": time2, + } + self.assertEqual(result, expected) + + def test_member_in_multiple_rotations_gets_latest(self) -> None: + members = ["alice", "bob", "charlie"] + time1 = datetime.datetime(2023, 1, 1, tzinfo=datetime.timezone.utc) + time2 = datetime.datetime(2023, 1, 15, tzinfo=datetime.timezone.utc) + time3 = datetime.datetime(2023, 2, 1, tzinfo=datetime.timezone.utc) + prior_rotations = [ + Rotation(start_time=time1, members=["alice", "bob"]), + Rotation(start_time=time2, members=["charlie", "alice"]), + Rotation(start_time=time3, members=["bob"]), + ] + result = find_most_recent_service_times(prior_rotations, members) + expected = { + "alice": time2, + "bob": time3, + "charlie": time2, + } + self.assertEqual(result, expected) + + +class TestGenerateAdditionalRotations(unittest.TestCase): + def test_generate_no_prior_rotations(self) -> None: + members = ["alice", "bob", "charlie"] + rotation_length_weeks = 1 + people_per_rotation = 2 + + # The function returns an Iterator[Rotation] + generator = generate_additional_rotations( + [], members, rotation_length_weeks, people_per_rotation, now=MOCKED_NOW_UTC + ) + generated_list = [next(generator) for _ in range(2)] + + expected_start_time1 = EXPECTED_FIRST_ROTATION_START_NO_PRIORS + self.assertEqual(generated_list[0].start_time, expected_start_time1) + self.assertEqual(generated_list[0].members, ["alice", "bob"]) + + expected_start_time2 = expected_start_time1 + datetime.timedelta(weeks=1) + self.assertEqual(generated_list[1].start_time, expected_start_time2) + self.assertEqual(generated_list[1].members, ["charlie", "alice"]) + + def test_generate_with_prior_rotations(self) -> None: + prior_time = datetime.datetime( + 2025, 5, 5, 0, 0, 0, tzinfo=datetime.timezone.utc + ) + prior_rotations = [ + Rotation(start_time=prior_time, members=["memberA", "memberB"]) + ] + members = ["memberC", "memberA", "memberB"] + + rotation_length_weeks = 2 + people_per_rotation = 1 + + generator = generate_additional_rotations( + prior_rotations, + members, + rotation_length_weeks, + people_per_rotation, + now=MOCKED_NOW_UTC, + ) + generated_list = [next(generator) for _ in range(3)] + + expected_start1 = prior_time + datetime.timedelta(weeks=2) + self.assertEqual(generated_list[0].start_time, expected_start1) + self.assertEqual(generated_list[0].members, ["memberC"]) + + expected_start2 = expected_start1 + datetime.timedelta(weeks=2) + self.assertEqual(generated_list[1].start_time, expected_start2) + self.assertEqual(generated_list[1].members, ["memberA"]) + + expected_start3 = expected_start2 + datetime.timedelta(weeks=2) + self.assertEqual(generated_list[2].start_time, expected_start3) + self.assertEqual(generated_list[2].members, ["memberB"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/email-rotation/pyproject.toml b/email-rotation/pyproject.toml new file mode 100644 index 0000000..5d7bf33 --- /dev/null +++ b/email-rotation/pyproject.toml @@ -0,0 +1,2 @@ +[tool.isort] +profile = "black" diff --git a/email-rotation/requirements.txt b/email-rotation/requirements.txt new file mode 100644 index 0000000..d40814b --- /dev/null +++ b/email-rotation/requirements.txt @@ -0,0 +1,8 @@ +black==24.4.2 +isort==5.13.2 +mypy==1.10.0 +types-requests==2.32.0.20240622 +types-PyYAML==6.0.12.20250516 +pyright==1.1.369 +PyYAML==6.0 +requests==2.32.3 diff --git a/email-rotation/rotation-members.yaml b/email-rotation/rotation-members.yaml new file mode 100644 index 0000000..81749d7 --- /dev/null +++ b/email-rotation/rotation-members.yaml @@ -0,0 +1,26 @@ +# A listing of members for the rotation. +# +# `./extend_rotation.py` can be used to extend the rotation.yaml +# +# TODO: There's no entry here for `Artur Pilipenko` or `Nikhil Gupta`, as they +# have no associated github user in the docs. +members: + - ahmedbougacha + - mrragava + - DimitryAndric + - emaste + - gburgessiv + - cuviper + - kbeyls + - mmdriley + - ormris + - ojhunt + - smithp35 + - pietroalbini + - serge-sans-paille + - offsake + - GreatKeeper + - tpenge + - tuliom + - wphuhn-intel + - yroux diff --git a/email-rotation/rotation.yaml b/email-rotation/rotation.yaml new file mode 100644 index 0000000..42c8a9d --- /dev/null +++ b/email-rotation/rotation.yaml @@ -0,0 +1,36 @@ +# Rotation members yaml file. +# +# **Autogenerated, please do not edit manually.** +# +# If you want to extend the rotation, please use `./extend_rotation.py`. +# If you want to swap with someone else on the rotation, those edits are OK. :) + +rotations: +- members: + - ahmedbougacha + - mrragava + start_time: '2025-08-31T00:00:00+00:00' +- members: + - DimitryAndric + - emaste + start_time: '2025-09-14T00:00:00+00:00' +- members: + - gburgessiv + - cuviper + start_time: '2025-09-28T00:00:00+00:00' +- members: + - kbeyls + - mmdriley + start_time: '2025-10-12T00:00:00+00:00' +- members: + - ormris + - ojhunt + start_time: '2025-10-26T00:00:00+00:00' +- members: + - smithp35 + - pietroalbini + start_time: '2025-11-09T00:00:00+00:00' +- members: + - serge-sans-paille + - offsake + start_time: '2025-11-23T00:00:00+00:00' diff --git a/email-rotation/rotations.py b/email-rotation/rotations.py new file mode 100644 index 0000000..444e62e --- /dev/null +++ b/email-rotation/rotations.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python3 + +import dataclasses +import datetime +import logging +from pathlib import Path +from typing import Any, Dict, List + +import yaml + +ROTATION_FILE = Path(__file__).resolve().parent / "rotation.yaml" +ROTATION_MEMBERS_FILE = Path(__file__).resolve().parent / "rotation-members.yaml" + + +@dataclasses.dataclass(frozen=True) +class RotationMembersFile: + """Represents the contents of the rotation-members.yaml file.""" + + members: List[str] + + @classmethod + def from_yaml(cls, data: Dict[str, Any]) -> "RotationMembersFile": + """Create an instance from a YAML dictionary.""" + return cls(members=data.get("members", [])) + + def to_yaml(self) -> str: + """Convert the instance to a YAML-compatible dictionary.""" + return yaml.safe_dump({"members": self.members}) + + @classmethod + def parse_file(cls, filepath: Path) -> "RotationMembersFile": + """Load the rotation members from a YAML file.""" + try: + with filepath.open(encoding="utf-8") as f: + data = yaml.safe_load(f) + return cls.from_yaml(data) + except FileNotFoundError: + logging.warning( + f"file '%s' not found. Using empty rotation members file.", filepath + ) + return cls(members=[]) + + +@dataclasses.dataclass(frozen=True, order=True) +class Rotation: + """Represents a single rotation entry.""" + + start_time: datetime.datetime + members: List[str] + + @classmethod + def from_yaml(cls, data: Dict[str, Any]) -> "Rotation": + """Create an instance from a YAML dictionary.""" + start_time = datetime.datetime.fromisoformat(data["start_time"]) + return cls(start_time=start_time, members=data.get("members", [])) + + def to_yaml(self) -> Dict[str, Any]: + """Convert the instance to a YAML-compatible dictionary.""" + as_dict = dataclasses.asdict(self) + as_dict["start_time"] = self.start_time.isoformat() + return as_dict + + +ROTATION_FILE_TOP_COMMENT = """\ +# Rotation members yaml file. +# +# **Autogenerated, please do not edit manually.** +# +# If you want to extend the rotation, please use `./extend_rotation.py`. +# If you want to swap with someone else on the rotation, those edits are OK. :) +""" + + +@dataclasses.dataclass(frozen=True) +class RotationFile: + """Represents the contents of the rotation.yaml file.""" + + rotations: List[Rotation] + + @classmethod + def from_yaml(cls, data: Dict[str, Any]) -> "RotationFile": + """Create an instance from a YAML dictionary.""" + # Subtle: sort this, so `rotations` is always in order from + # oldest -> newest. + rotations = sorted( + Rotation.from_yaml(rot) for rot in data.get("rotations") or [] + ) + return cls(rotations=rotations) + + def to_yaml_str(self) -> str: + """Convert the instance to a YAML-compatible dictionary.""" + yaml_str = yaml.safe_dump( + {"rotations": [rot.to_yaml() for rot in self.rotations]} + ) + return f"{ROTATION_FILE_TOP_COMMENT}\n{yaml_str}" + + @classmethod + def parse_file(cls, filepath: Path) -> "RotationFile": + """Load the rotation from a YAML file.""" + try: + with filepath.open(encoding="utf-8") as f: + data = yaml.safe_load(f) + return cls.from_yaml(data) + except FileNotFoundError: + logging.warning( + f"file '%s' not found. Using empty rotation file.", filepath + ) + return cls(rotations=[]) diff --git a/email-rotation/verify_yaml_files_test.py b/email-rotation/verify_yaml_files_test.py new file mode 100755 index 0000000..da06dd8 --- /dev/null +++ b/email-rotation/verify_yaml_files_test.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python3 + +import unittest + +import rotations + + +class TestRotationYamlFilesParse(unittest.TestCase): + """Ensures our yaml files parse.""" + + def test_rotation_members_yaml_parses(self) -> None: + parsed = rotations.RotationMembersFile.parse_file( + rotations.ROTATION_MEMBERS_FILE + ) + self.assertTrue(parsed.members, "No rotation members could be parsed") + + def test_rotation_yaml_parses(self) -> None: + parsed = rotations.RotationFile.parse_file(rotations.ROTATION_FILE) + self.assertTrue(parsed.rotations, "No rotations could be parsed") + + +if __name__ == "__main__": + unittest.main()