diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 8ea514a..d811601 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -32,6 +32,38 @@ jobs: echo "Pulled files:" { git lfs ls-files | grep -E '[a-f0-9]{10}\s\*'; } || true + - name: Install python dependencies + run: | + pip install -e . + + - name: ctf init + run: | + ctf init test-ctf + + - name: Copy CTF files + run: | + mkdir -p test-ctf/challenges + cp -r ./challenges test-ctf/ + ls -al test-ctf/ + ls -al test-ctf/challenges + + - name: ctf version + working-directory: test-ctf + run: | + ctf version + + - name: CTF stats + # Run this in the test-ctf directory + working-directory: test-ctf + run: | + ctf stats + + - name: CTF list + # Run this in the test-ctf directory + working-directory: test-ctf + run: | + ctf list + - name: Remove docker run: | sudo apt-get autopurge -y moby-containerd docker uidmap @@ -104,38 +136,11 @@ jobs: ./install-opentofu.sh --install-method deb rm -f install-opentofu.sh - - name: Install python dependencies - run: | - pip install -e . - - - name: ctf init - run: | - ctf init test-ctf - - - name: Copy CTF files - run: | - mkdir -p test-ctf/challenges - cp -r ./challenges test-ctf/ - ls -al test-ctf/ - ls -al test-ctf/challenges - - name: Validate CTF structure # Run this in the test-ctf directory working-directory: test-ctf run: | ctf validate - - - name: CTF stats - # Run this in the test-ctf directory - working-directory: test-ctf - run: | - ctf stats - - - name: CTF list - # Run this in the test-ctf directory - working-directory: test-ctf - run: | - ctf list - name: Deployment check working-directory: test-ctf diff --git a/ctf/__init__.py b/ctf/__init__.py index c9b61a7..afc637e 100644 --- a/ctf/__init__.py +++ b/ctf/__init__.py @@ -1,12 +1,16 @@ #!/usr/bin/env python3 import importlib.metadata import json -import logging import os import sys import urllib.request -import coloredlogs +from ctf.logger import LOG +from ctf.utils import ( + find_ctf_root_directory, + get_ctf_script_schemas_directory, + get_ctf_script_templates_directory, +) VERSION = importlib.metadata.version("ctf-script") @@ -14,13 +18,21 @@ print(VERSION) exit(code=0) + ENV = {} for k, v in os.environ.items(): ENV[k] = v -LOG = logging.getLogger() -LOG.setLevel(level=logging.DEBUG) -coloredlogs.install(level="DEBUG", logger=LOG) +match sys.argv[1] if len(sys.argv) > 1 else "": + case "init": + CTF_ROOT_DIRECTORY = os.path.join(os.getcwd(), ".") + case "version": + CTF_ROOT_DIRECTORY = "" + case _: + CTF_ROOT_DIRECTORY = find_ctf_root_directory() + +TEMPLATES_ROOT_DIRECTORY = get_ctf_script_templates_directory() +SCHEMAS_ROOT_DIRECTORY = get_ctf_script_schemas_directory() def check_tool_version() -> None: @@ -56,40 +68,10 @@ def check_tool_version() -> None: LOG.debug("Script is up to date.") case -1: LOG.warning( - "Script is outdated. Please update to the latest release before continuing." + f"Script is outdated (current: {VERSION}, upstream: {latest_version}). Please update to the latest release before continuing." ) if (input("Do you want to continue? [y/N] ").lower() or "n") == "n": exit(code=0) check_tool_version() - - -def find_ctf_root_directory() -> str: - path = os.path.join(os.getcwd(), ".") - - while path != (path := os.path.dirname(p=path)): - dir = os.listdir(path=path) - - if ".deploy" not in dir: - continue - if "challenges" not in dir: - continue - break - - if path == "/": - if "CTF_ROOT_DIR" not in os.environ: - LOG.critical( - msg='Could not automatically find the root directory nor the "CTF_ROOT_DIR" environment variable. To initialize a new root directory, use `ctf init [path]`' - ) - exit(1) - return os.environ.get("CTF_ROOT_DIR", default=".") - - LOG.debug(msg=f"Found root directory: {path}") - return path - - -if len(sys.argv) > 1 and sys.argv[1] == "init": - CTF_ROOT_DIRECTORY = os.path.join(os.getcwd(), ".") -else: - CTF_ROOT_DIRECTORY = find_ctf_root_directory() diff --git a/ctf/__main__.py b/ctf/__main__.py index d0a7d58..bbc4db4 100644 --- a/ctf/__main__.py +++ b/ctf/__main__.py @@ -1,1592 +1,42 @@ #!/usr/bin/env python3 -import csv -import io -import json -import logging import os -import re -import secrets -import shutil -import statistics -import subprocess -import textwrap -from datetime import datetime -from enum import StrEnum, unique -import jinja2 -import typer -import yaml -from tabulate import tabulate from typer import Typer -from typing_extensions import Annotated -from ctf import CTF_ROOT_DIRECTORY, ENV, LOG -from ctf.utils import ( - add_tracks_to_terraform_modules, - available_incus_remotes, - check_git_lfs, - create_terraform_modules_file, - get_all_available_tracks, - get_ctf_script_schemas_directory, - get_ctf_script_templates_directory, - get_terraform_tracks_from_modules, - parse_post_yamls, - parse_track_yaml, - remove_tracks_from_terraform_modules, - validate_track_can_be_deployed, -) -from ctf.validate_json_schemas import validate_with_json_schemas -from ctf.validators import ( - ValidationError, - validators_list, -) - -try: - import pybadges - - _has_pybadges = True -except ImportError: - _has_pybadges = False - -try: - import matplotlib.pyplot as plt - - _has_matplotlib = True -except ImportError: - _has_matplotlib = False - -TEMPLATES_ROOT_DIRECTORY = get_ctf_script_templates_directory() -SCHEMAS_ROOT_DIRECTORY = get_ctf_script_schemas_directory() -AVAILABLE_INCUS_REMOTES = available_incus_remotes() +from ctf import ( + CTF_ROOT_DIRECTORY, + LOG, +) +from ctf.check import app as check_app +from ctf.deploy import app as deploy_app +from ctf.destroy import app as destroy_app +from ctf.flags import app as flags_app +from ctf.generate import app as generate_app +from ctf.init import app as init_app +from ctf.list import app as list_app +from ctf.new import app as new_app +from ctf.redeploy import app as redeploy_app +from ctf.services import app as services_app +from ctf.stats import app as stats_app +from ctf.validate import app as validate_app +from ctf.version import app as version_app app = Typer( help="CLI tool to manage CTF challenges as code. Run from the root CTF repo directory or set the CTF_ROOT_DIR environment variable to run the tool." ) - - -@unique -class Template(StrEnum): - APACHE_PHP = "apache-php" - PYTHON_SERVICE = "python-service" - FILES_ONLY = "files-only" - TRACK_YAML_ONLY = "track-yaml-only" - RUST_WEBSERVICE = "rust-webservice" - - -@unique -class OutputFormat(StrEnum): - JSON = "json" - CSV = "csv" - YAML = "yaml" - - -class ListOutputFormat(StrEnum): - PRETTY = "pretty" - - -def terraform_binary() -> str: - path = shutil.which(cmd="tofu") - if not path: - path = shutil.which(cmd="terraform") - - if not path: - raise Exception("Couldn't find Terraform or OpenTofu") - - return path - - -@app.command( - help="Initialize a directory with the default CTF structure. If the directory does not exist, it will be created." -) -def init( - path: Annotated[ - str, typer.Argument(help="Directory in which to initialize a CTF") - ] = CTF_ROOT_DIRECTORY, - force: Annotated[ - bool, - typer.Option( - "--force", help="Overwrite the directory if it's already initialized" - ), - ] = False, -) -> None: - created_directory = False - created_assets: list[str] = [] - try: - if not os.path.isdir(path): - os.mkdir(path) - LOG.info(f'Creating directory "{path}"') - created_directory = True - elif ( - os.path.isdir(os.path.join(path, "challenges")) - or os.path.isdir(os.path.join(path, ".deploy")) - ) and not force: - LOG.error( - f'Directory "{path}" is already initialized. Use --force to overwrite.' - ) - LOG.error(force) - exit(code=1) - - for asset in os.listdir(p := os.path.join(TEMPLATES_ROOT_DIRECTORY, "init")): - dst_asset = os.path.join(path, asset) - if os.path.isdir(src_asset := os.path.join(p, asset)): - shutil.copytree(src_asset, dst_asset, dirs_exist_ok=True) - LOG.info(f'Created "{dst_asset}" folder') - else: - shutil.copy(src_asset, dst_asset) - LOG.info(f'Created "{dst_asset}" file') - - created_assets.append(dst_asset) - - except Exception: - import traceback - - if created_directory: - shutil.rmtree(path) - LOG.info(f'Removed created "{path}" folder') - else: - for asset in created_assets: - if os.path.isdir(asset): - shutil.rmtree(asset) - LOG.info(f'Removed created "{asset}" folder') - else: - os.unlink(asset) - LOG.info(f'Removed created "{asset}" file') - - LOG.critical(traceback.format_exc()) - - -@app.command(help="Create a new CTF track with a given name") -def new( - name: Annotated[ - str, - typer.Option( - help="Track name. No space, use underscores if needed.", - prompt="Track name. No space, use underscores if needed.", - ), - ], - template: Annotated[ - Template, - typer.Option("--template", "-t", help="Template to use for the track."), - ] = Template.APACHE_PHP, - force: Annotated[ - bool, - typer.Option( - "--force", - help="If directory already exists, delete it and create it again.", - ), - ] = False, -) -> None: - LOG.info(msg=f"Creating a new track: {name}") - if not re.match(pattern=r"^[a-z][a-z0-9\-]{0,61}[a-z0-9]$", string=name): - LOG.critical( - msg="""The track name Valid instance names must fulfill the following requirements: -* The name must be between 1 and 63 characters long; -* The name must contain only letters, numbers and dashes from the ASCII table; -* The name must not start with a digit or a dash; -* The name must not end with a dash.""" - ) - exit(code=1) - - if os.path.exists( - path=( - new_challenge_directory := os.path.join( - CTF_ROOT_DIRECTORY, "challenges", name - ) - ) - ): - if force: - LOG.debug(msg=f"Deleting {new_challenge_directory}") - shutil.rmtree(new_challenge_directory) - else: - LOG.critical( - "Track already exists with that name. Use `--force` to overwrite the track." - ) - exit(code=1) - - os.mkdir(new_challenge_directory) - - LOG.debug(msg=f"Directory {new_challenge_directory} created.") - - env = jinja2.Environment( - loader=jinja2.FileSystemLoader( - searchpath=TEMPLATES_ROOT_DIRECTORY, encoding="utf-8" - ) - ) - - ipv6_subnet = f"9000:d37e:c40b:{secrets.choice('0123456789abcdef')}{secrets.choice('0123456789abcdef')}{secrets.choice('0123456789abcdef')}{secrets.choice('0123456789abcdef')}" - - rb = [ - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - secrets.choice("0123456789abcdef"), - ] - hardware_address = f"00:16:3e:{rb[0]}{rb[1]}:{rb[2]}{rb[3]}:{rb[4]}{rb[5]}" - ipv6_address = f"216:3eff:fe{rb[0]}{rb[1]}:{rb[2]}{rb[3]}{rb[4]}{rb[5]}" - full_ipv6_address = f"{ipv6_subnet}:{ipv6_address}" - - track_template = env.get_template(name="track.yaml.j2") - render = track_template.render( - data={ - "name": name, - "full_ipv6_address": full_ipv6_address, - "ipv6_subnet": ipv6_subnet, - "template": template.value, - } - ) - with open( - file=(p := os.path.join(new_challenge_directory, "track.yaml")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - posts_directory = os.path.join(new_challenge_directory, "posts") - - os.mkdir(path=posts_directory) - - LOG.debug(msg=f"Directory {posts_directory} created.") - - track_template = env.get_template(name="topic.yaml.j2") - render = track_template.render(data={"name": name}) - with open( - file=(p := os.path.join(posts_directory, f"{name}.yaml")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - track_template = env.get_template(name="post.yaml.j2") - render = track_template.render(data={"name": name}) - with open( - file=(p := os.path.join(posts_directory, f"{name}_flag1.yaml")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - if template == Template.TRACK_YAML_ONLY: - return - - files_directory = os.path.join(new_challenge_directory, "files") - - os.mkdir(path=files_directory) - - LOG.debug(msg=f"Directory {files_directory} created.") - - if template == Template.FILES_ONLY: - return - - terraform_directory = os.path.join(new_challenge_directory, "terraform") - - os.mkdir(path=terraform_directory) - - LOG.debug(msg=f"Directory {terraform_directory} created.") - - track_template = env.get_template(name="main.tf.j2") - - render = track_template.render( - data={ - "name": name, - "hardware_address": hardware_address, - "ipv6": ipv6_address, - "ipv6_subnet": ipv6_subnet, - "full_ipv6_address": full_ipv6_address, - } - ) - with open( - file=(p := os.path.join(terraform_directory, "main.tf")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - relpath = os.path.relpath( - os.path.join(CTF_ROOT_DIRECTORY, ".deploy", "common"), terraform_directory - ) - - os.symlink( - src=os.path.join(relpath, "variables.tf"), - dst=(p := os.path.join(terraform_directory, "variables.tf")), - ) - - LOG.debug(msg=f"Wrote {p}.") - - os.symlink( - src=os.path.join(relpath, "versions.tf"), - dst=(p := os.path.join(terraform_directory, "versions.tf")), - ) - - LOG.debug(msg=f"Wrote {p}.") - - ansible_directory = os.path.join(new_challenge_directory, "ansible") - - os.mkdir(path=ansible_directory) - - LOG.debug(msg=f"Directory {ansible_directory} created.") - - track_template = env.get_template(name=f"deploy-{template}.yaml.j2") - render = track_template.render(data={"name": name}) - with open( - file=(p := os.path.join(ansible_directory, "deploy.yaml")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - track_template = env.get_template(name="inventory.j2") - render = track_template.render(data={"name": name}) - with open( - file=(p := os.path.join(ansible_directory, "inventory")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - ansible_challenge_directory = os.path.join(ansible_directory, "challenge") - - os.mkdir(path=ansible_challenge_directory) - - LOG.debug(msg=f"Directory {ansible_challenge_directory} created.") - - if template == Template.APACHE_PHP: - track_template = env.get_template(name="index.php.j2") - render = track_template.render(data={"name": name}) - with open( - file=(p := os.path.join(ansible_challenge_directory, "index.php")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - if template == Template.PYTHON_SERVICE: - track_template = env.get_template(name="app.py.j2") - render = track_template.render(data={"name": name}) - with open( - file=(p := os.path.join(ansible_challenge_directory, "app.py")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - with open( - file=(p := os.path.join(ansible_challenge_directory, "flag-1.txt")), - mode="w", - encoding="utf-8", - ) as f: - f.write(f"{{{{ track_flags.{name}_flag_1 }}}} (1/2)\n") - - LOG.debug(msg=f"Wrote {p}.") - - if template == Template.RUST_WEBSERVICE: - # Copy the entire challenge template - shutil.copytree( - os.path.join(TEMPLATES_ROOT_DIRECTORY, "rust-webservice"), - ansible_challenge_directory, - dirs_exist_ok=True, - ) - LOG.debug(msg=f"Wrote files to {ansible_challenge_directory}") - - manifest_template = env.get_template(name="Cargo.toml.j2") - render = manifest_template.render(data={"name": name}) - with open( - file=(p := os.path.join(ansible_challenge_directory, "Cargo.toml")), - mode="w", - encoding="utf-8", - ) as f: - f.write(render) - - LOG.debug(msg=f"Wrote {p}.") - - -@app.command( - help="Destroy everything deployed by Terraform. This is a destructive operation." -) -def destroy( - tracks: Annotated[ - list[str], - typer.Option( - "--tracks", - "-t", - help="Only destroy the given tracks (use the directory name)", - ), - ] = [], - production: Annotated[ - bool, - typer.Option( - "--production", - help="Do a production deployment. Only use this if you know what you're doing.", - ), - ] = False, - remote: Annotated[ - str, typer.Option("--remote", help="Incus remote to deploy to") - ] = "local", - force: Annotated[ - bool, - typer.Option( - "--force", - help="If there are artefacts remaining, delete them without asking.", - ), - ] = False, -) -> None: - ENV["INCUS_REMOTE"] = remote - LOG.info(msg="tofu destroy...") - - if not os.path.exists( - path=os.path.join(CTF_ROOT_DIRECTORY, ".deploy", "modules.tf") - ): - LOG.critical(msg="Nothing to destroy.") - exit(code=1) - - terraform_tracks = get_terraform_tracks_from_modules() - - r = ( - subprocess.run( - args=["incus", "project", "get-current"], - check=True, - capture_output=True, - env=ENV, - ) - .stdout.decode() - .strip() - ) - - tmp_tracks = set(tracks) - if tmp_tracks and tmp_tracks != terraform_tracks: - terraform_tracks &= tmp_tracks - if not terraform_tracks: - LOG.warning("No track to destroy.") - return - - if r in terraform_tracks: - projects = { - project["name"] - for project in json.loads( - s=subprocess.run( - args=["incus", "project", "list", "--format=json"], - check=False, - capture_output=True, - env=ENV, - ).stdout.decode() - ) - } - - projects = list((projects - terraform_tracks)) - if len(projects) == 0: - LOG.critical( - msg="No project to switch to. This should never happen as the default should always exists." - ) - exit(code=1) - - cmd = [ - "incus", - "project", - "switch", - "default" if "default" in projects else projects[0], - ] - - LOG.info(msg=f"Running `{' '.join(cmd)}`") - subprocess.run(args=cmd, check=True, env=ENV) - - subprocess.run( - args=[ - terraform_binary(), - "destroy", - "-auto-approve", - *[f"-target=module.track-{track}" for track in terraform_tracks], - ], - cwd=os.path.join(CTF_ROOT_DIRECTORY, ".deploy"), - check=False, - ) - - projects = [ - project["name"] - for project in json.loads( - s=subprocess.run( - args=["incus", "project", "list", "--format=json"], - check=False, - capture_output=True, - env=ENV, - ).stdout.decode() - ) - ] - - networks = [ - network["name"] - for network in json.loads( - s=subprocess.run( - args=["incus", "network", "list", "--format=json"], - check=False, - capture_output=True, - env=ENV, - ).stdout.decode() - ) - ] - - network_acls = [ - network_acl["name"] - for network_acl in json.loads( - s=subprocess.run( - args=["incus", "network", "acl", "list", "--format=json"], - check=False, - capture_output=True, - env=ENV, - ).stdout.decode() - ) - ] - - for module in terraform_tracks: - if module in projects: - LOG.warning(msg=f"The project {module} was not destroyed properly.") - if ( - force - or (input("Do you want to destroy it? [Y/n] ").lower() or "y") == "y" - ): - subprocess.run( - args=["incus", "project", "delete", module, "--force"], - check=False, - capture_output=True, - input=b"yes\n", - env=ENV, - ) - - if (tmp_module := module[0:15]) in networks: - LOG.warning(msg=f"The network {tmp_module} was not destroyed properly.") - if ( - force - or (input("Do you want to destroy it? [Y/n] ").lower() or "y") == "y" - ): - subprocess.run( - args=["incus", "network", "delete", tmp_module], - check=False, - capture_output=True, - env=ENV, - ) - - if (tmp_module := module) in network_acls or ( - tmp_module := f"{module}-default" - ) in network_acls: - LOG.warning(msg=f"The network ACL {tmp_module} was not destroyed properly.") - if ( - force - or (input("Do you want to destroy it? [Y/n] ").lower() or "y") == "y" - ): - subprocess.run( - args=["incus", "network", "acl", "delete", tmp_module], - check=False, - capture_output=True, - env=ENV, - ) - remove_tracks_from_terraform_modules( - tracks=terraform_tracks, - remote=remote, - production=production, - ) - LOG.info(msg="Successfully destroyed every track") - - -@app.command(help="Get flags from tracks") -def flags( - tracks: Annotated[ - list[str], - typer.Option( - "--tracks", - "-t", - help="Only flags from the given tracks (use the directory name)", - ), - ] = [], - format: Annotated[ - OutputFormat, - typer.Option("--format", help="Output format", prompt="Output format"), - ] = OutputFormat.JSON, -) -> None: - distinct_tracks: set[str] = set() - - for entry in os.listdir( - path=(challenges_directory := os.path.join(CTF_ROOT_DIRECTORY, "challenges")) - ): - if os.path.isdir( - s=(track_directory := os.path.join(challenges_directory, entry)) - ) and os.path.exists(path=os.path.join(track_directory, "track.yaml")): - if not tracks: - distinct_tracks.add(entry) - elif entry in tracks: - distinct_tracks.add(entry) - - flags = [] - for track in distinct_tracks: - LOG.debug(msg=f"Parsing track.yaml for track {track}") - track_yaml = parse_track_yaml(track_name=track) - - if len(track_yaml["flags"]) == 0: - LOG.debug(msg=f"No flag in track {track}. Skipping...") - continue - - flags.extend(track_yaml["flags"]) - - if not flags: - LOG.warning(msg="No flag found...") - return - - if format == OutputFormat.JSON: - print(json.dumps(obj=flags, indent=2)) - elif format == OutputFormat.CSV: - output = io.StringIO() - writer = csv.DictWriter(f=output, fieldnames=flags[0].keys()) - writer.writeheader() - writer.writerows(rowdicts=flags) - print(output.getvalue()) - elif format == OutputFormat.YAML: - print(yaml.safe_dump(data=flags)) - - -@app.command(help="Get services from tracks") -def services( - tracks: Annotated[ - list[str], - typer.Option( - "--tracks", - "-t", - help="Only services from the given tracks (use the directory name)", - ), - ] = [], -) -> None: - distinct_tracks: set[str] = set() - for entry in os.listdir( - path=(challenges_directory := os.path.join(CTF_ROOT_DIRECTORY, "challenges")) - ): - if os.path.isdir( - s=(track_directory := os.path.join(challenges_directory, entry)) - ) and os.path.exists(path=os.path.join(track_directory, "track.yaml")): - if not tracks: - distinct_tracks.add(entry) - elif entry in tracks: - distinct_tracks.add(entry) - - for track in distinct_tracks: - LOG.debug(msg=f"Parsing track.yaml for track {track}") - track_yaml = parse_track_yaml(track_name=track) - - if len(track_yaml["services"]) == 0: - LOG.debug(msg=f"No service in track {track}. Skipping...") - continue - - for service in track_yaml["services"]: - contact = ",".join(track_yaml["contacts"]["support"]) - name = service["name"] - instance = service["instance"] - address = service["address"] - check = service["check"] - port = service["port"] - - print(f"{track}/{instance}/{name} {contact} {address} {check} {port}") - - -@app.command( - help="Generate the deployment files using `terraform init` and `terraform validate`" -) -def generate( - tracks: Annotated[ - list[str], - typer.Option( - "--tracks", - "-t", - help="Only generate the given tracks (use the directory name)", - ), - ] = [], - production: Annotated[ - bool, - typer.Option( - "--production", - help="Do a production deployment. Only use this if you know what you're doing.", - ), - ] = False, - remote: Annotated[ - str, typer.Option("--remote", help="Incus remote to deploy to") - ] = "local", -) -> set[str]: - ENV["INCUS_REMOTE"] = remote - # Get the list of tracks. - distinct_tracks = set( - track - for track in get_all_available_tracks() - if validate_track_can_be_deployed(track=track) - and (not tracks or track in tracks) - ) - - if distinct_tracks: - LOG.debug(msg=f"Found {len(distinct_tracks)} tracks") - # Generate the Terraform modules file. - create_terraform_modules_file(remote=remote, production=production) - add_tracks_to_terraform_modules( - tracks=distinct_tracks, - remote=remote, - production=production, - ) - - for track in distinct_tracks: - relpath = os.path.relpath( - os.path.join(CTF_ROOT_DIRECTORY, ".deploy", "common"), - ( - terraform_directory := os.path.join( - CTF_ROOT_DIRECTORY, "challenges", track, "terraform" - ) - ), - ) - - # If the file exists and is a symlink, refresh it by deleting it first. - if os.path.exists( - path=(p := os.path.join(terraform_directory, "variables.tf")) - ) and os.path.islink(path=p): - os.unlink(path=p) - - LOG.debug(msg=f"Refreshing symlink {p}.") - - if not os.path.exists(path=p): - os.symlink( - src=os.path.join(relpath, "variables.tf"), - dst=p, - ) - - LOG.debug(msg=f"Created symlink {p}.") - - # If the file exists and is a symlink, refresh it by deleting it first. - if os.path.exists( - path=(p := os.path.join(terraform_directory, "versions.tf")) - ) and os.path.islink(path=p): - os.unlink(path=p) - - LOG.debug(msg=f"Refreshing symlink {p}.") - - if not os.path.exists(path=p): - os.symlink( - src=os.path.join(relpath, "versions.tf"), - dst=p, - ) - - LOG.debug(msg=f"Created symlink {p}.") - - subprocess.run( - args=[terraform_binary(), "init", "-upgrade"], - cwd=os.path.join(CTF_ROOT_DIRECTORY, ".deploy"), - stdout=subprocess.DEVNULL, - check=True, - ) - subprocess.run( - args=[terraform_binary(), "validate"], - cwd=os.path.join(CTF_ROOT_DIRECTORY, ".deploy"), - check=True, - ) - else: - LOG.critical("No track was found") - exit(code=1) - - return distinct_tracks - - -@app.command(help="Deploy and provision the tracks") -def deploy( - tracks: Annotated[ - list[str], - typer.Option( - "--tracks", - "-t", - help="Only deploy the given tracks (use the directory name)", - ), - ] = [], - production: Annotated[ - bool, - typer.Option( - "--production", - help="Do a production deployment. Only use this if you know what you're doing.", - ), - ] = False, - remote: Annotated[ - str, typer.Option("--remote", help="Incus remote to deploy to") - ] = "local", - redeploy: Annotated[ - bool, typer.Option("--redeploy", help="Do not use. Use `ctf redeploy` instead.") - ] = False, - force: Annotated[ - bool, - typer.Option("--force", help="Force the deployment even if there are errors."), - ] = False, -): - ENV["INCUS_REMOTE"] = remote - if redeploy: - distinct_tracks = set( - track - for track in get_all_available_tracks() - if validate_track_can_be_deployed(track=track) and track in tracks - ) - - add_tracks_to_terraform_modules( - tracks=distinct_tracks - get_terraform_tracks_from_modules(), - remote=remote, - production=production, - ) - else: - # Run generate first. - distinct_tracks = generate(tracks=tracks, production=production, remote=remote) - - # Check if Git LFS is installed on the system as it is required for deployment. - if not check_git_lfs(): - LOG.critical( - msg="Git LFS is missing from your system. Install it before deploying." - ) - exit(code=1) - - # Pull LFS files - LOG.debug("Pulling Git LFS files for specific tracks.") - subprocess.run( - args=[ - "git", - "lfs", - "pull", - f"--include={','.join([os.path.join('challenges', track, 'ansible', '*') for track in distinct_tracks])}", - ], - check=True, - ) - - try: - subprocess.run( - args=[terraform_binary(), "apply", "-auto-approve"], - cwd=os.path.join(CTF_ROOT_DIRECTORY, ".deploy"), - check=True, - ) - except subprocess.CalledProcessError: - LOG.warning( - f"The project could not deploy due to instable state. It is often due to CTRL+C while deploying as {os.path.basename(terraform_binary())} was not able to save the state of each object created." - ) - - if (input("Do you want to clean and start over? [Y/n] ").lower() or "y") != "y": - exit(code=1) - - force = True - destroy(tracks=tracks, production=production, remote=remote, force=force) - - subprocess.run( - args=[terraform_binary(), "apply", "-auto-approve"], - cwd=os.path.join(CTF_ROOT_DIRECTORY, ".deploy"), - check=True, - ) - except KeyboardInterrupt: - LOG.warning( - "CTRL+C was detected during Terraform deployment. Destroying everything..." - ) - force = True - destroy(tracks=tracks, production=production, remote=remote, force=force) - exit(code=0) - - for track in distinct_tracks: - if not os.path.exists( - path=( - path := os.path.join(CTF_ROOT_DIRECTORY, "challenges", track, "ansible") - ) - ): - continue - - run_ansible_playbook( - remote=remote, production=production, track=track, path=path - ) - - if not production: - incus_list = json.loads( - s=subprocess.run( - args=["incus", "list", f"--project={track}", "--format", "json"], - check=True, - capture_output=True, - env=ENV, - ).stdout.decode() - ) - ipv6_to_container_name = {} - for machine in incus_list: - addresses = machine["state"]["network"]["eth0"]["addresses"] - ipv6_address = list( - filter(lambda address: address["family"] == "inet6", addresses) - )[0]["address"] - ipv6_to_container_name[ipv6_address] = machine["name"] - - LOG.debug(msg=f"Mapping: {ipv6_to_container_name}") - - if remote == "local": - LOG.debug(msg=f"Parsing track.yaml for track {track}") - track_yaml = parse_track_yaml(track_name=track) - - for service in track_yaml["services"]: - if service.get("dev_port_mapping"): - LOG.debug( - f"Adding incus proxy for service {track}-{service['name']}-port-{service['port']}" - ) - machine_name = ipv6_to_container_name[ - service["address"] - .replace(":0", ":") - .replace(":0", ":") - .replace(":0", ":") - .replace(":0", ":") - ] - subprocess.run( - args=[ - "incus", - "config", - "device", - "add", - machine_name, - f"proxy-{track}-{service['dev_port_mapping']}-to-{service['port']}", - "proxy", - f"listen=tcp:0.0.0.0:{service['dev_port_mapping']}", - f"connect=tcp:127.0.0.1:{service['port']}", - "--project", - track, - ], - cwd=path, - check=True, - ) - - LOG.info(msg=f"Running `incus --project={track} list`") - subprocess.run( - args=["incus", f"--project={track}", "list"], check=True, env=ENV - ) - - if not production and distinct_tracks: - tracks_list = list(distinct_tracks) - track_index = input( - textwrap.dedent( - f"""\ - Do you want to `incus project switch` to any of the tracks mentioned in argument? - {chr(10).join([f"{list(tracks_list).index(t) + 1}) {t}" for t in tracks_list])} - - Which? """ - ) - ) - - if ( - track_index.isnumeric() - and (track_index := int(track_index)) - and 0 < track_index <= len(tracks_list) - ): - LOG.info( - msg=f"Running `incus project switch {tracks_list[track_index - 1]}`" - ) - subprocess.run( - args=["incus", "project", "switch", tracks_list[track_index - 1]], - check=True, - env=ENV, - ) - elif track_index: - LOG.warning( - msg=f"Could not switch project, unrecognized input: {track_index}." - ) - - -def run_ansible_playbook(remote: str, production: bool, track: str, path: str) -> None: - extra_args = [] - if remote: - extra_args += ["-e", f"ansible_incus_remote={remote}"] - - if production: - extra_args += ["-e", "nsec_production=true"] - - LOG.info(msg=f"Running common yaml with ansible for track {track}...") - ansible_args = [ - "ansible-playbook", - "../../../.deploy/common.yaml", - "-i", - "inventory", - ] + extra_args - subprocess.run( - args=ansible_args, - cwd=path, - check=True, - ) - - LOG.info(msg=f"Running deploy.yaml with ansible for track {track}...") - ansible_args = [ - "ansible-playbook", - "deploy.yaml", - "-i", - "inventory", - ] + extra_args - subprocess.run( - args=ansible_args, - cwd=path, - check=True, - ) - - artifacts_path = os.path.join(path, "artifacts") - if os.path.exists(path=artifacts_path): - shutil.rmtree(artifacts_path) - - -@app.command(help="Destroy and then deploy the given tracks") -def redeploy( - tracks: Annotated[ - list[str], - typer.Option( - "--tracks", - "-t", - help="Only redeploy the given tracks (use the directory name)", - ), - ] = [], - production: Annotated[ - bool, - typer.Option( - "--production", - help="Do a production deployment. Only use this if you know what you're doing.", - ), - ] = False, - remote: Annotated[ - str, typer.Option("--remote", help="Incus remote to deploy to") - ] = "local", - force: Annotated[ - bool, - typer.Option( - "--force", - help="If there are artefacts remaining, delete them without asking.", - ), - ] = False, -) -> None: - ENV["INCUS_REMOTE"] = remote - destroy(tracks=tracks, production=production, remote=remote, force=force) - deploy( - tracks=tracks, production=production, remote=remote, force=force, redeploy=True - ) - - -@app.command(help="Preview the changes") -def check( - tracks: Annotated[ - list[str], - typer.Option( - "--tracks", - "-t", - help="Only check the given tracks (use the directory name)", - ), - ] = [], - production: Annotated[ - bool, - typer.Option( - "--production", - help="Do a production deployment. Only use this if you know what you're doing.", - ), - ] = False, - remote: Annotated[ - str, typer.Option("--remote", help="Incus remote to deploy to") - ] = "local", -) -> None: - ENV["INCUS_REMOTE"] = remote - # Run generate first. - generate(tracks=tracks, production=production, remote=remote) - - # Then run terraform plan. - subprocess.run( - args=[terraform_binary(), "plan"], - cwd=os.path.join(CTF_ROOT_DIRECTORY, ".deploy"), - check=True, - ) - - # Check if Git LFS is installed on the system as it will be required for deployment. - if not check_git_lfs(): - LOG.warning( - msg="Git LFS is missing from your system. Install it before deploying." - ) - - -@app.command( - help="Generate statistics (such as number of tracks, number of flags, total flag value, etc.) from all the `track.yaml files. Outputs as JSON." -) -def stats( - tracks: Annotated[ - list[str], - typer.Option( - "--tracks", - "-t", - help="Name of the tracks to count in statistics (if not specified, all tracks are counted).", - ), - ] = [], - generate_badges: Annotated[ - bool, - typer.Option( - "--generate-badges", - help="Generate SVG files of some statistics in the .badges directory.", - ), - ] = False, - charts: Annotated[ - bool, - typer.Option( - "--charts", - help="Generate PNG charts of some statistics in the .charts directory.", - ), - ] = False, - historical: Annotated[ - bool, - typer.Option( - "--historical", - help="Use in conjunction with --charts to generate historical data. ONLY USE THIS IF YOU KNOW WHAT YOU ARE DOING. THIS IS BAD CODE THAT WILL FUCK YOUR REPO IN UNEXPECTED WAYS.", - ), - ] = False, -) -> None: - LOG.debug(msg="Generating statistics...") - stats = {} - distinct_tracks: set[str] = set() - for entry in os.listdir( - (challenges_directory := os.path.join(CTF_ROOT_DIRECTORY, "challenges")) - ): - if os.path.isdir( - (track_directory := os.path.join(challenges_directory, entry)) - ) and os.path.isfile(os.path.join(track_directory, "track.yaml")): - if not tracks: - distinct_tracks.add(entry) - elif entry in tracks: - distinct_tracks.add(entry) - - stats["number_of_tracks"] = len(distinct_tracks) - stats["number_of_tracks_integrated_with_scenario"] = 0 - stats["number_of_flags"] = 0 - stats["highest_value_flag"] = 0 - stats["most_flags_in_a_track"] = 0 - stats["total_flags_value"] = 0 - stats["number_of_services"] = 0 - stats["number_of_files"] = 0 - stats["median_flag_value"] = 0 - stats["mean_flag_value"] = 0 - stats["number_of_services_per_port"] = {} - stats["flag_count_per_value"] = {} - stats["number_of_challenge_designers"] = 0 - stats["number_of_flags_per_track"] = {} - stats["number_of_points_per_track"] = {} - stats["not_integrated_with_scenario"] = [] - challenge_designers = set() - flags = [] - for track in distinct_tracks: - track_yaml = parse_track_yaml(track_name=track) - number_of_flags = len(track_yaml["flags"]) - stats["number_of_flags_per_track"][track] = number_of_flags - if track_yaml["integrated_with_scenario"]: - stats["number_of_tracks_integrated_with_scenario"] += 1 - else: - stats["not_integrated_with_scenario"].append(track) - if number_of_flags > stats["most_flags_in_a_track"]: - stats["most_flags_in_a_track"] = number_of_flags - stats["number_of_flags"] += number_of_flags - stats["number_of_services"] += len(track_yaml["services"]) - stats["number_of_points_per_track"][track] = 0 - for flag in track_yaml["flags"]: - flags.append(flag["value"]) - stats["number_of_points_per_track"][track] += flag["value"] - stats["total_flags_value"] += flag["value"] - if flag["value"] > stats["highest_value_flag"]: - stats["highest_value_flag"] = flag["value"] - if flag["value"] not in stats["flag_count_per_value"]: - stats["flag_count_per_value"][flag["value"]] = 0 - stats["flag_count_per_value"][flag["value"]] += 1 - for service in track_yaml["services"]: - if service["port"] not in stats["number_of_services_per_port"]: - stats["number_of_services_per_port"][service["port"]] = 0 - stats["number_of_services_per_port"][service["port"]] += 1 - for challenge_designer in track_yaml["contacts"]["dev"]: - challenge_designers.add(challenge_designer.lower()) - - if os.path.exists( - path=(files_directory := os.path.join(challenges_directory, track, "files")) - ): - for file in os.listdir(path=files_directory): - stats["number_of_files"] += 1 - stats["median_flag_value"] = statistics.median(flags) - stats["mean_flag_value"] = round(statistics.mean(flags), 2) - stats["number_of_challenge_designers"] = len(challenge_designers) - - # Sort dict keys - stats["flag_count_per_value"] = { - key: stats["flag_count_per_value"][key] - for key in sorted(stats["flag_count_per_value"].keys()) - } - stats["number_of_services_per_port"] = { - key: stats["number_of_services_per_port"][key] - for key in sorted(stats["number_of_services_per_port"].keys()) - } - - stats["challenge_designers"] = sorted(list(challenge_designers)) - stats["number_of_flags_per_track"] = dict( - sorted(stats["number_of_flags_per_track"].items(), key=lambda item: item[1]) - ) - stats["number_of_points_per_track"] = dict( - sorted(stats["number_of_points_per_track"].items(), key=lambda item: item[1]) - ) - - print(json.dumps(stats, indent=2, ensure_ascii=False)) - if generate_badges: - if not _has_pybadges: - LOG.critical(msg="Module pybadges was not found.") - exit(code=1) - LOG.info(msg="Generating badges...") - os.makedirs(name=".badges", exist_ok=True) - write_badge( - "flag", - pybadges.badge(left_text="Flags", right_text=str(stats["number_of_flags"])), # type: ignore - ) - write_badge( - "points", - pybadges.badge( # type: ignore - left_text="Points", right_text=str(stats["total_flags_value"]) - ), - ) - write_badge( - "tracks", - pybadges.badge( # type: ignore - left_text="Tracks", right_text=str(stats["number_of_tracks"]) - ), - ) - write_badge( - "services", - pybadges.badge( # type: ignore - left_text="Services", right_text=str(stats["number_of_services"]) - ), - ) - write_badge( - "designers", - pybadges.badge( # type: ignore - left_text="Challenge Designers", - right_text=str(stats["number_of_challenge_designers"]), - ), - ) - write_badge( - "files", - pybadges.badge( # type: ignore - left_text="Files", - right_text=str(stats["number_of_files"]), - ), - ) - write_badge( - "scenario", - pybadges.badge( # type: ignore - left_text="Integrated with scenario", - right_text=str(stats["number_of_tracks_integrated_with_scenario"]) - + "/" - + str(stats["number_of_tracks"]), - ), - ) - - if charts: - if not _has_matplotlib: - LOG.critical(msg="Module matplotlib was not found.") - exit(code=1) - LOG.info(msg="Generating charts...") - mpl_logger = logging.getLogger("matplotlib") - mpl_logger.setLevel(logging.INFO) - os.makedirs(name=".charts", exist_ok=True) - # Flag count per value barchart - plt.bar( - stats["flag_count_per_value"].keys(), stats["flag_count_per_value"].values() - ) - plt.xticks( - ticks=range(0, max(stats["flag_count_per_value"].keys()) + 1), rotation=45 - ) - plt.grid(True, linestyle="--", alpha=0.3) - plt.xlabel("Flag Value") - plt.ylabel("Number of Flags") - plt.title("Number of Flags per Value") - plt.savefig(os.path.join(".charts", "flags_per_value.png")) - plt.clf() - - # Number of flag per track barchart - plt.bar( - list(stats["number_of_flags_per_track"].keys()), - stats["number_of_flags_per_track"].values(), - ) - plt.xticks(ticks=list(stats["number_of_flags_per_track"].keys()), rotation=90) - plt.grid(True, linestyle="--", alpha=0.3) - plt.subplots_adjust(bottom=0.5) - plt.xlabel("Track") - plt.ylabel("Number of flags") - plt.title("Number of flags per track") - plt.savefig(os.path.join(".charts", "flags_per_track.png")) - plt.clf() - - # Number of points per track barchart - plt.bar( - list(stats["number_of_points_per_track"].keys()), - stats["number_of_points_per_track"].values(), - ) - plt.xticks(ticks=list(stats["number_of_points_per_track"].keys()), rotation=90) - plt.grid(True, linestyle="--", alpha=0.3) - plt.subplots_adjust(bottom=0.5) - plt.xlabel("Track") - plt.ylabel("Number of points") - plt.title("Number of points per track") - plt.savefig(os.path.join(".charts", "points_per_track.png")) - plt.clf() - - if historical: - # Number of points and flags over time - historical_data = {} - commit_list = ( - subprocess.check_output( - ["git", "log", "--pretty=format:%H %ad", "--date=iso"] - ) - .decode() - .splitlines()[::-1] - ) - commit_list_with_date = [] - for commit in commit_list: - hash, date = commit.split(" ", 1) - parsed_datetime = datetime.strptime(date, "%Y-%m-%d %H:%M:%S %z") - commit_list_with_date.append((parsed_datetime, hash)) - commit_list_with_date = sorted(commit_list_with_date, key=lambda x: x[0]) - subprocess.run(["git", "stash"], check=True) - for i, commit in list(enumerate(commit_list_with_date))[0:]: - parsed_datetime, hash = commit - # Check if the commit message has "Merge pull request" in it - commit_message = subprocess.run( - ["git", "show", "-s", "--pretty=%B", hash], - check=True, - capture_output=True, - ) - if "Merge pull request" in commit_message.stdout.decode(): - LOG.debug( - f"{i + 1}/{len(commit_list_with_date)} Checking out commit: {commit}" - ) - parsed_date = parsed_datetime.date() - subprocess.run( - ["git", "checkout", hash], check=True, capture_output=True - ) - - # Execute your command here (replace with what you need) - result = ( - subprocess.run( - ["python", "scripts/ctf.py", "stats"], - check=False, - capture_output=True, - text=True, - ), - ) - if result[0].returncode == 0: - stats = json.loads(result[0].stdout) - total_points = stats["total_flags_value"] - total_flags = stats["number_of_flags"] - print(total_flags) - historical_data[parsed_date] = { - "total_points": total_points, - "total_flags": total_flags, - } - subprocess.run(["git", "checkout", "main"], check=True, capture_output=True) - subprocess.run(["git", "stash", "pop"], check=True) - - plt.plot( - historical_data.keys(), - [data["total_points"] for data in historical_data.values()], - label="Total Points", - ) - # plt.plot(historical_data.keys(), [data["total_flags"] for data in historical_data.values()], label="Total Flags") - # plt.xticks(ticks=list(stats["number_of_points_per_track"].keys()), rotation=90) - plt.grid(True, linestyle="--", alpha=0.3) - plt.subplots_adjust(bottom=0.1) - plt.xlabel("Time") - plt.ylabel("Total points") - plt.title("Total points over time") - plt.xticks(rotation=90) - plt.subplots_adjust(bottom=0.2) - plt.subplot().set_ylim( - 0, max([data["total_points"] for data in historical_data.values()]) + 10 - ) - plt.savefig(os.path.join(".charts", "points_over_time.png")) - plt.clf() - - LOG.debug(msg="Done...") - - -@app.command("list", help="List tracks and their author(s).") -def list_tracks( - format: Annotated[ - ListOutputFormat, typer.Option("--format", "-f", help="Output format") - ] = ListOutputFormat.PRETTY, -) -> None: - tracks: set[str] = set() - for track in os.listdir(path=os.path.join(CTF_ROOT_DIRECTORY, "challenges")): - if os.path.isdir( - s=os.path.join(CTF_ROOT_DIRECTORY, "challenges", track) - ) and os.path.exists( - path=os.path.join(CTF_ROOT_DIRECTORY, "challenges", track, "track.yaml") - ): - tracks.add(track) - - parsed_tracks = [] - for track in tracks: - parsed_track = parse_track_yaml(track) - - # find the discourse topic name - posts = parse_post_yamls(track) - topic = None - for post in posts: - if post.get("type") == "topic": - topic = post["title"] - parsed_tracks.append( - [ - parsed_track["name"], - topic, - ", ".join(parsed_track["contacts"]["dev"]), - ", ".join(parsed_track["contacts"]["support"]), - ", ".join(parsed_track["contacts"]["qa"]), - ] - ) - - if format.value == "pretty": - LOG.info( - "\n" - + tabulate( - parsed_tracks, - headers=[ - "Internal track name", - "Discourse Topic Name", - "Dev", - "Support", - "QA", - ], - tablefmt="fancy_grid", - ) - ) - else: - raise ValueError(f"Invalid format: {format.value}") - - -@app.command( - help="Run many static validations to ensure coherence and quality in the tracks and repo as a whole." -) -def validate() -> None: - LOG.info(msg="Starting ctf validate...") - - LOG.info(msg=f"Found {len(validators_list)} Validators") - - validators = [validator_class() for validator_class in validators_list] - - tracks = [] - for track in os.listdir(path=os.path.join(CTF_ROOT_DIRECTORY, "challenges")): - if os.path.isdir( - s=os.path.join(CTF_ROOT_DIRECTORY, "challenges", track) - ) and os.path.exists( - path=os.path.join(CTF_ROOT_DIRECTORY, "challenges", track, "track.yaml") - ): - tracks.append(track) - - LOG.info(msg=f"Found {len(tracks)} tracks") - - errors: list[ValidationError] = [] - - LOG.info(msg="Validating track.yaml files against JSON Schema...") - validate_with_json_schemas( - schema=os.path.join(SCHEMAS_ROOT_DIRECTORY, "track.yaml.json"), - files_pattern=os.path.join(CTF_ROOT_DIRECTORY, "challenges", "*", "track.yaml"), - ) - LOG.info(msg="Validating discourse post YAML files against JSON Schema...") - validate_with_json_schemas( - schema=os.path.join(SCHEMAS_ROOT_DIRECTORY, "post.json"), - files_pattern=os.path.join( - CTF_ROOT_DIRECTORY, "challenges", "*", "posts", "*.yaml" - ), - ) - - LOG.info(msg="Validating terraform files format...") - r = subprocess.run( - args=["tofu", "fmt", "-no-color", "-check", "-recursive", CTF_ROOT_DIRECTORY], - capture_output=True, - ) - if r.returncode != 0: - errors.append( - ValidationError( - error_name="Tofu format", - error_description="Bad Terraform formatting. Please run `tofu fmt -recursive ./`", - details={ - "Files": "\n".join( - [ - *([out] if (out := r.stdout.decode().strip()) else []), - *re.findall( - pattern=r"(Failed to read file .+)$", - string=r.stderr.decode().strip(), - flags=re.MULTILINE, - ), - ] - ) - }, - ) - ) - - for validator in validators: - LOG.info(msg=f"Running {type(validator).__name__}") - for track in tracks: - errors += validator.validate(track_name=track) - - # Get the errors from finalize() - for validator in validators: - errors += validator.finalize() - - if not errors: - LOG.info(msg="No error found!") - else: - LOG.error(msg=f"{len(errors)} errors found.") - - errors_list = list( - map( - lambda error: [ - error.track_name, - error.error_name, - "\n".join(textwrap.wrap(error.error_description, 50)), - "\n".join( - [ - str(key) + ": " + str(value) - for key, value in error.details.items() - ] - ), - ], - errors, - ) - ) - - LOG.error( - "\n" - + tabulate( - errors_list, - headers=["Track", "Error", "Description", "Details"], - tablefmt="fancy_grid", - ) - ) - exit(code=1) - - -def write_badge(name: str, svg: str) -> None: - with open( - file=os.path.join(".badges", f"badge-{name}.svg"), mode="w", encoding="utf-8" - ) as f: - f.write(svg) - - -@app.command(help="Print the tool's version.") -def version(): - # Create an empty command as the version is printed before anything else in the __init__.py file. - pass +app.add_typer(validate_app) +app.add_typer(init_app) +app.add_typer(new_app) +app.add_typer(destroy_app) +app.add_typer(flags_app) +app.add_typer(services_app) +app.add_typer(generate_app) +app.add_typer(deploy_app) +app.add_typer(redeploy_app) +app.add_typer(check_app) +app.add_typer(stats_app) +app.add_typer(list_app) +app.add_typer(version_app) def main(): @@ -1594,10 +44,10 @@ def main(): if __name__ == "__main__": - if not os.path.isdir(s=(p := os.path.join(CTF_ROOT_DIRECTORY, "challenges"))): - import sys + import sys - if "init" not in sys.argv: + if "version" not in sys.argv and "init" not in sys.argv: + if not os.path.isdir(s=(p := os.path.join(CTF_ROOT_DIRECTORY, "challenges"))): LOG.error( msg=f"Directory `{p}` not found. Make sure this script is ran from the root directory OR set the CTF_ROOT_DIR environment variable to the root directory." ) diff --git a/ctf/check.py b/ctf/check.py new file mode 100644 index 0000000..3b915bc --- /dev/null +++ b/ctf/check.py @@ -0,0 +1,51 @@ +import os +import subprocess + +import typer +from typing_extensions import Annotated + +from ctf import CTF_ROOT_DIRECTORY, ENV +from ctf.generate import generate +from ctf.logger import LOG +from ctf.utils import check_git_lfs, terraform_binary + +app = typer.Typer() + + +@app.command(help="Preview the changes") +def check( + tracks: Annotated[ + list[str], + typer.Option( + "--tracks", + "-t", + help="Only check the given tracks (use the directory name)", + ), + ] = [], + production: Annotated[ + bool, + typer.Option( + "--production", + help="Do a production deployment. Only use this if you know what you're doing.", + ), + ] = False, + remote: Annotated[ + str, typer.Option("--remote", help="Incus remote to deploy to") + ] = "local", +) -> None: + ENV["INCUS_REMOTE"] = remote + # Run generate first. + generate(tracks=tracks, production=production, remote=remote) + + # Then run terraform plan. + subprocess.run( + args=[terraform_binary(), "plan"], + cwd=os.path.join(CTF_ROOT_DIRECTORY, ".deploy"), + check=True, + ) + + # Check if Git LFS is installed on the system as it will be required for deployment. + if not check_git_lfs(): + LOG.warning( + msg="Git LFS is missing from your system. Install it before deploying." + ) diff --git a/ctf/deploy.py b/ctf/deploy.py new file mode 100644 index 0000000..5d32445 --- /dev/null +++ b/ctf/deploy.py @@ -0,0 +1,258 @@ +import json +import os +import shutil +import subprocess +import textwrap + +import typer +from typing_extensions import Annotated + +from ctf import CTF_ROOT_DIRECTORY, ENV +from ctf.destroy import destroy +from ctf.generate import generate +from ctf.logger import LOG +from ctf.utils import ( + add_tracks_to_terraform_modules, + check_git_lfs, + get_all_available_tracks, + get_terraform_tracks_from_modules, + parse_track_yaml, + terraform_binary, + validate_track_can_be_deployed, +) + +app = typer.Typer() + + +@app.command(help="Deploy and provision the tracks") +def deploy( + tracks: Annotated[ + list[str], + typer.Option( + "--tracks", + "-t", + help="Only deploy the given tracks (use the directory name)", + ), + ] = [], + production: Annotated[ + bool, + typer.Option( + "--production", + help="Do a production deployment. Only use this if you know what you're doing.", + ), + ] = False, + remote: Annotated[ + str, typer.Option("--remote", help="Incus remote to deploy to") + ] = "local", + redeploy: Annotated[ + bool, typer.Option("--redeploy", help="Do not use. Use `ctf redeploy` instead.") + ] = False, + force: Annotated[ + bool, + typer.Option("--force", help="Force the deployment even if there are errors."), + ] = False, +): + ENV["INCUS_REMOTE"] = remote + if redeploy: + distinct_tracks = set( + track + for track in get_all_available_tracks() + if validate_track_can_be_deployed(track=track) and track in tracks + ) + + add_tracks_to_terraform_modules( + tracks=distinct_tracks - get_terraform_tracks_from_modules(), + remote=remote, + production=production, + ) + else: + # Run generate first. + distinct_tracks = generate(tracks=tracks, production=production, remote=remote) + + # Check if Git LFS is installed on the system as it is required for deployment. + if not check_git_lfs(): + LOG.critical( + msg="Git LFS is missing from your system. Install it before deploying." + ) + exit(code=1) + + # Pull LFS files + LOG.debug("Pulling Git LFS files for specific tracks.") + subprocess.run( + args=[ + "git", + "lfs", + "pull", + f"--include={','.join([os.path.join('challenges', track, 'ansible', '*') for track in distinct_tracks])}", + ], + check=True, + ) + + try: + subprocess.run( + args=[terraform_binary(), "apply", "-auto-approve"], + cwd=os.path.join(CTF_ROOT_DIRECTORY, ".deploy"), + check=True, + ) + except subprocess.CalledProcessError: + LOG.warning( + f"The project could not deploy due to instable state. It is often due to CTRL+C while deploying as {os.path.basename(terraform_binary())} was not able to save the state of each object created." + ) + + if (input("Do you want to clean and start over? [Y/n] ").lower() or "y") != "y": + exit(code=1) + + force = True + destroy(tracks=tracks, production=production, remote=remote, force=force) + + subprocess.run( + args=[terraform_binary(), "apply", "-auto-approve"], + cwd=os.path.join(CTF_ROOT_DIRECTORY, ".deploy"), + check=True, + ) + except KeyboardInterrupt: + LOG.warning( + "CTRL+C was detected during Terraform deployment. Destroying everything..." + ) + force = True + destroy(tracks=tracks, production=production, remote=remote, force=force) + exit(code=0) + + for track in distinct_tracks: + if not os.path.exists( + path=( + path := os.path.join(CTF_ROOT_DIRECTORY, "challenges", track, "ansible") + ) + ): + continue + + run_ansible_playbook( + remote=remote, production=production, track=track, path=path + ) + + if not production: + incus_list = json.loads( + s=subprocess.run( + args=["incus", "list", f"--project={track}", "--format", "json"], + check=True, + capture_output=True, + env=ENV, + ).stdout.decode() + ) + ipv6_to_container_name = {} + for machine in incus_list: + addresses = machine["state"]["network"]["eth0"]["addresses"] + ipv6_address = list( + filter(lambda address: address["family"] == "inet6", addresses) + )[0]["address"] + ipv6_to_container_name[ipv6_address] = machine["name"] + + LOG.debug(msg=f"Mapping: {ipv6_to_container_name}") + + if remote == "local": + LOG.debug(msg=f"Parsing track.yaml for track {track}") + track_yaml = parse_track_yaml(track_name=track) + + for service in track_yaml["services"]: + if service.get("dev_port_mapping"): + LOG.debug( + f"Adding incus proxy for service {track}-{service['name']}-port-{service['port']}" + ) + machine_name = ipv6_to_container_name[ + service["address"] + .replace(":0", ":") + .replace(":0", ":") + .replace(":0", ":") + .replace(":0", ":") + ] + subprocess.run( + args=[ + "incus", + "config", + "device", + "add", + machine_name, + f"proxy-{track}-{service['dev_port_mapping']}-to-{service['port']}", + "proxy", + f"listen=tcp:0.0.0.0:{service['dev_port_mapping']}", + f"connect=tcp:127.0.0.1:{service['port']}", + "--project", + track, + ], + cwd=path, + check=True, + ) + + LOG.info(msg=f"Running `incus --project={track} list`") + subprocess.run( + args=["incus", f"--project={track}", "list"], check=True, env=ENV + ) + + if not production and distinct_tracks: + tracks_list = list(distinct_tracks) + track_index = input( + textwrap.dedent( + f"""\ + Do you want to `incus project switch` to any of the tracks mentioned in argument? + {chr(10).join([f"{list(tracks_list).index(t) + 1}) {t}" for t in tracks_list])} + + Which? """ + ) + ) + + if ( + track_index.isnumeric() + and (track_index := int(track_index)) + and 0 < track_index <= len(tracks_list) + ): + LOG.info( + msg=f"Running `incus project switch {tracks_list[track_index - 1]}`" + ) + subprocess.run( + args=["incus", "project", "switch", tracks_list[track_index - 1]], + check=True, + env=ENV, + ) + elif track_index: + LOG.warning( + msg=f"Could not switch project, unrecognized input: {track_index}." + ) + + +def run_ansible_playbook(remote: str, production: bool, track: str, path: str) -> None: + extra_args = [] + if remote: + extra_args += ["-e", f"ansible_incus_remote={remote}"] + + if production: + extra_args += ["-e", "nsec_production=true"] + + LOG.info(msg=f"Running common yaml with ansible for track {track}...") + ansible_args = [ + "ansible-playbook", + "../../../.deploy/common.yaml", + "-i", + "inventory", + ] + extra_args + subprocess.run( + args=ansible_args, + cwd=path, + check=True, + ) + + LOG.info(msg=f"Running deploy.yaml with ansible for track {track}...") + ansible_args = [ + "ansible-playbook", + "deploy.yaml", + "-i", + "inventory", + ] + extra_args + subprocess.run( + args=ansible_args, + cwd=path, + check=True, + ) + + artifacts_path = os.path.join(path, "artifacts") + if os.path.exists(path=artifacts_path): + shutil.rmtree(artifacts_path) diff --git a/ctf/destroy.py b/ctf/destroy.py new file mode 100644 index 0000000..52e004f --- /dev/null +++ b/ctf/destroy.py @@ -0,0 +1,202 @@ +import json +import os +import subprocess + +import typer +from typing_extensions import Annotated + +from ctf import CTF_ROOT_DIRECTORY, ENV +from ctf.logger import LOG +from ctf.utils import ( + get_terraform_tracks_from_modules, + remove_tracks_from_terraform_modules, + terraform_binary, +) + +app = typer.Typer() + + +@app.command( + help="Destroy everything deployed by Terraform. This is a destructive operation." +) +def destroy( + tracks: Annotated[ + list[str], + typer.Option( + "--tracks", + "-t", + help="Only destroy the given tracks (use the directory name)", + ), + ] = [], + production: Annotated[ + bool, + typer.Option( + "--production", + help="Do a production deployment. Only use this if you know what you're doing.", + ), + ] = False, + remote: Annotated[ + str, typer.Option("--remote", help="Incus remote to deploy to") + ] = "local", + force: Annotated[ + bool, + typer.Option( + "--force", + help="If there are artefacts remaining, delete them without asking.", + ), + ] = False, +) -> None: + ENV["INCUS_REMOTE"] = remote + LOG.info(msg="tofu destroy...") + + if not os.path.exists( + path=os.path.join(CTF_ROOT_DIRECTORY, ".deploy", "modules.tf") + ): + LOG.critical(msg="Nothing to destroy.") + exit(code=1) + + terraform_tracks = get_terraform_tracks_from_modules() + + r = ( + subprocess.run( + args=["incus", "project", "get-current"], + check=True, + capture_output=True, + env=ENV, + ) + .stdout.decode() + .strip() + ) + + tmp_tracks = set(tracks) + if tmp_tracks and tmp_tracks != terraform_tracks: + terraform_tracks &= tmp_tracks + if not terraform_tracks: + LOG.warning("No track to destroy.") + return + + if r in terraform_tracks: + projects = { + project["name"] + for project in json.loads( + s=subprocess.run( + args=["incus", "project", "list", "--format=json"], + check=False, + capture_output=True, + env=ENV, + ).stdout.decode() + ) + } + + projects = list((projects - terraform_tracks)) + if len(projects) == 0: + LOG.critical( + msg="No project to switch to. This should never happen as the default should always exists." + ) + exit(code=1) + + cmd = [ + "incus", + "project", + "switch", + "default" if "default" in projects else projects[0], + ] + + LOG.info(msg=f"Running `{' '.join(cmd)}`") + subprocess.run(args=cmd, check=True, env=ENV) + + subprocess.run( + args=[ + terraform_binary(), + "destroy", + "-auto-approve", + *[f"-target=module.track-{track}" for track in terraform_tracks], + ], + cwd=os.path.join(CTF_ROOT_DIRECTORY, ".deploy"), + check=False, + ) + + projects = [ + project["name"] + for project in json.loads( + s=subprocess.run( + args=["incus", "project", "list", "--format=json"], + check=False, + capture_output=True, + env=ENV, + ).stdout.decode() + ) + ] + + networks = [ + network["name"] + for network in json.loads( + s=subprocess.run( + args=["incus", "network", "list", "--format=json"], + check=False, + capture_output=True, + env=ENV, + ).stdout.decode() + ) + ] + + network_acls = [ + network_acl["name"] + for network_acl in json.loads( + s=subprocess.run( + args=["incus", "network", "acl", "list", "--format=json"], + check=False, + capture_output=True, + env=ENV, + ).stdout.decode() + ) + ] + + for module in terraform_tracks: + if module in projects: + LOG.warning(msg=f"The project {module} was not destroyed properly.") + if ( + force + or (input("Do you want to destroy it? [Y/n] ").lower() or "y") == "y" + ): + subprocess.run( + args=["incus", "project", "delete", module, "--force"], + check=False, + capture_output=True, + input=b"yes\n", + env=ENV, + ) + + if (tmp_module := module[0:15]) in networks: + LOG.warning(msg=f"The network {tmp_module} was not destroyed properly.") + if ( + force + or (input("Do you want to destroy it? [Y/n] ").lower() or "y") == "y" + ): + subprocess.run( + args=["incus", "network", "delete", tmp_module], + check=False, + capture_output=True, + env=ENV, + ) + + if (tmp_module := module) in network_acls or ( + tmp_module := f"{module}-default" + ) in network_acls: + LOG.warning(msg=f"The network ACL {tmp_module} was not destroyed properly.") + if ( + force + or (input("Do you want to destroy it? [Y/n] ").lower() or "y") == "y" + ): + subprocess.run( + args=["incus", "network", "acl", "delete", tmp_module], + check=False, + capture_output=True, + env=ENV, + ) + remove_tracks_from_terraform_modules( + tracks=terraform_tracks, + remote=remote, + production=production, + ) + LOG.info(msg="Successfully destroyed every track") diff --git a/ctf/flags.py b/ctf/flags.py new file mode 100644 index 0000000..743c4fe --- /dev/null +++ b/ctf/flags.py @@ -0,0 +1,77 @@ +import csv +import io +import json +import os +from enum import StrEnum, unique + +import typer +import yaml +from typing_extensions import Annotated + +from ctf import CTF_ROOT_DIRECTORY +from ctf.logger import LOG +from ctf.utils import parse_track_yaml + +app = typer.Typer() + + +@unique +class OutputFormat(StrEnum): + JSON = "json" + CSV = "csv" + YAML = "yaml" + + +@app.command(help="Get flags from tracks") +def flags( + tracks: Annotated[ + list[str], + typer.Option( + "--tracks", + "-t", + help="Only flags from the given tracks (use the directory name)", + ), + ] = [], + format: Annotated[ + OutputFormat, + typer.Option("--format", help="Output format", prompt="Output format"), + ] = OutputFormat.JSON, +) -> None: + distinct_tracks: set[str] = set() + + for entry in os.listdir( + path=(challenges_directory := os.path.join(CTF_ROOT_DIRECTORY, "challenges")) + ): + if os.path.isdir( + s=(track_directory := os.path.join(challenges_directory, entry)) + ) and os.path.exists(path=os.path.join(track_directory, "track.yaml")): + if not tracks: + distinct_tracks.add(entry) + elif entry in tracks: + distinct_tracks.add(entry) + + flags = [] + for track in distinct_tracks: + LOG.debug(msg=f"Parsing track.yaml for track {track}") + track_yaml = parse_track_yaml(track_name=track) + + if len(track_yaml["flags"]) == 0: + LOG.debug(msg=f"No flag in track {track}. Skipping...") + continue + + flags.extend(track_yaml["flags"]) + + if not flags: + LOG.warning(msg="No flag found...") + return + + if format == OutputFormat.JSON: + print(json.dumps(obj=flags, indent=2)) + elif format == OutputFormat.CSV: + output = io.StringIO() + writer = csv.DictWriter(f=output, fieldnames=flags[0].keys()) + writer.writeheader() + writer.writerows(rowdicts=flags) + print(output.getvalue()) + elif format == OutputFormat.YAML: + print(yaml.safe_dump(data=flags)) diff --git a/ctf/generate.py b/ctf/generate.py new file mode 100644 index 0000000..05d7f4d --- /dev/null +++ b/ctf/generate.py @@ -0,0 +1,119 @@ +import os +import subprocess + +import typer +from typing_extensions import Annotated + +from ctf import CTF_ROOT_DIRECTORY, ENV +from ctf.logger import LOG +from ctf.utils import ( + add_tracks_to_terraform_modules, + create_terraform_modules_file, + get_all_available_tracks, + terraform_binary, + validate_track_can_be_deployed, +) + +app = typer.Typer() + + +@app.command( + help="Generate the deployment files using `terraform init` and `terraform validate`" +) +def generate( + tracks: Annotated[ + list[str], + typer.Option( + "--tracks", + "-t", + help="Only generate the given tracks (use the directory name)", + ), + ] = [], + production: Annotated[ + bool, + typer.Option( + "--production", + help="Do a production deployment. Only use this if you know what you're doing.", + ), + ] = False, + remote: Annotated[ + str, typer.Option("--remote", help="Incus remote to deploy to") + ] = "local", +) -> set[str]: + ENV["INCUS_REMOTE"] = remote + # Get the list of tracks. + distinct_tracks = set( + track + for track in get_all_available_tracks() + if validate_track_can_be_deployed(track=track) + and (not tracks or track in tracks) + ) + + if distinct_tracks: + LOG.debug(msg=f"Found {len(distinct_tracks)} tracks") + # Generate the Terraform modules file. + create_terraform_modules_file(remote=remote, production=production) + add_tracks_to_terraform_modules( + tracks=distinct_tracks, + remote=remote, + production=production, + ) + + for track in distinct_tracks: + relpath = os.path.relpath( + os.path.join(CTF_ROOT_DIRECTORY, ".deploy", "common"), + ( + terraform_directory := os.path.join( + CTF_ROOT_DIRECTORY, "challenges", track, "terraform" + ) + ), + ) + + # If the file exists and is a symlink, refresh it by deleting it first. + if os.path.exists( + path=(p := os.path.join(terraform_directory, "variables.tf")) + ) and os.path.islink(path=p): + os.unlink(path=p) + + LOG.debug(msg=f"Refreshing symlink {p}.") + + if not os.path.exists(path=p): + os.symlink( + src=os.path.join(relpath, "variables.tf"), + dst=p, + ) + + LOG.debug(msg=f"Created symlink {p}.") + + # If the file exists and is a symlink, refresh it by deleting it first. + if os.path.exists( + path=(p := os.path.join(terraform_directory, "versions.tf")) + ) and os.path.islink(path=p): + os.unlink(path=p) + + LOG.debug(msg=f"Refreshing symlink {p}.") + + if not os.path.exists(path=p): + os.symlink( + src=os.path.join(relpath, "versions.tf"), + dst=p, + ) + + LOG.debug(msg=f"Created symlink {p}.") + + subprocess.run( + args=[terraform_binary(), "init", "-upgrade"], + cwd=os.path.join(CTF_ROOT_DIRECTORY, ".deploy"), + stdout=subprocess.DEVNULL, + check=True, + ) + subprocess.run( + args=[terraform_binary(), "validate"], + cwd=os.path.join(CTF_ROOT_DIRECTORY, ".deploy"), + check=True, + ) + else: + LOG.critical("No track was found") + exit(code=1) + + return distinct_tracks diff --git a/ctf/init.py b/ctf/init.py new file mode 100644 index 0000000..67da5b8 --- /dev/null +++ b/ctf/init.py @@ -0,0 +1,69 @@ +import os +import shutil + +import typer +from typing_extensions import Annotated + +from ctf import CTF_ROOT_DIRECTORY, TEMPLATES_ROOT_DIRECTORY +from ctf.logger import LOG + +app = typer.Typer() + + +@app.command( + help="Initialize a directory with the default CTF structure. If the directory does not exist, it will be created." +) +def init( + path: Annotated[ + str, typer.Argument(help="Directory in which to initialize a CTF") + ] = CTF_ROOT_DIRECTORY, + force: Annotated[ + bool, + typer.Option( + "--force", help="Overwrite the directory if it's already initialized" + ), + ] = False, +) -> None: + created_directory = False + created_assets: list[str] = [] + try: + if not os.path.isdir(path): + os.mkdir(path) + LOG.info(f'Creating directory "{path}"') + created_directory = True + elif ( + os.path.isdir(os.path.join(path, "challenges")) + or os.path.isdir(os.path.join(path, ".deploy")) + ) and not force: + LOG.error( + f'Directory "{path}" is already initialized. Use --force to overwrite.' + ) + exit(code=1) + + for asset in os.listdir(p := os.path.join(TEMPLATES_ROOT_DIRECTORY, "init")): + dst_asset = os.path.join(path, asset) + if os.path.isdir(src_asset := os.path.join(p, asset)): + shutil.copytree(src_asset, dst_asset, dirs_exist_ok=True) + LOG.info(f'Created "{dst_asset}" folder') + else: + shutil.copy(src_asset, dst_asset) + LOG.info(f'Created "{dst_asset}" file') + + created_assets.append(dst_asset) + + except Exception: + import traceback + + if created_directory: + shutil.rmtree(path) + LOG.info(f'Removed created "{path}" folder') + else: + for asset in created_assets: + if os.path.isdir(asset): + shutil.rmtree(asset) + LOG.info(f'Removed created "{asset}" folder') + else: + os.unlink(asset) + LOG.info(f'Removed created "{asset}" file') + + LOG.critical(traceback.format_exc()) diff --git a/ctf/list.py b/ctf/list.py new file mode 100644 index 0000000..b60bcee --- /dev/null +++ b/ctf/list.py @@ -0,0 +1,70 @@ +import os +from enum import StrEnum + +import typer +from tabulate import tabulate +from typing_extensions import Annotated + +from ctf import CTF_ROOT_DIRECTORY +from ctf.logger import LOG +from ctf.utils import parse_post_yamls, parse_track_yaml + +app = typer.Typer() + + +class ListOutputFormat(StrEnum): + PRETTY = "pretty" + + +@app.command("list", help="List tracks and their author(s).") +def list_tracks( + format: Annotated[ + ListOutputFormat, typer.Option("--format", "-f", help="Output format") + ] = ListOutputFormat.PRETTY, +) -> None: + tracks: set[str] = set() + for track in os.listdir(path=os.path.join(CTF_ROOT_DIRECTORY, "challenges")): + if os.path.isdir( + s=os.path.join(CTF_ROOT_DIRECTORY, "challenges", track) + ) and os.path.exists( + path=os.path.join(CTF_ROOT_DIRECTORY, "challenges", track, "track.yaml") + ): + tracks.add(track) + + parsed_tracks = [] + for track in tracks: + parsed_track = parse_track_yaml(track) + + # find the discourse topic name + posts = parse_post_yamls(track) + topic = None + for post in posts: + if post.get("type") == "topic": + topic = post["title"] + parsed_tracks.append( + [ + parsed_track["name"], + topic, + ", ".join(parsed_track["contacts"]["dev"]), + ", ".join(parsed_track["contacts"]["support"]), + ", ".join(parsed_track["contacts"]["qa"]), + ] + ) + + if format.value == "pretty": + LOG.info( + "\n" + + tabulate( + parsed_tracks, + headers=[ + "Internal track name", + "Discourse Topic Name", + "Dev", + "Support", + "QA", + ], + tablefmt="fancy_grid", + ) + ) + else: + raise ValueError(f"Invalid format: {format.value}") diff --git a/ctf/logger.py b/ctf/logger.py new file mode 100644 index 0000000..0f652a8 --- /dev/null +++ b/ctf/logger.py @@ -0,0 +1,7 @@ +import logging + +import coloredlogs + +LOG = logging.getLogger() +LOG.setLevel(level=logging.DEBUG) +coloredlogs.install(level="DEBUG", logger=LOG) diff --git a/ctf/new.py b/ctf/new.py new file mode 100644 index 0000000..e4c0101 --- /dev/null +++ b/ctf/new.py @@ -0,0 +1,291 @@ +import os +import re +import secrets +import shutil +from enum import StrEnum, unique + +import jinja2 +import typer +from typing_extensions import Annotated + +from ctf import CTF_ROOT_DIRECTORY, TEMPLATES_ROOT_DIRECTORY +from ctf.logger import LOG + +app = typer.Typer() + + +@unique +class Template(StrEnum): + APACHE_PHP = "apache-php" + PYTHON_SERVICE = "python-service" + FILES_ONLY = "files-only" + TRACK_YAML_ONLY = "track-yaml-only" + RUST_WEBSERVICE = "rust-webservice" + + +@app.command(help="Create a new CTF track with a given name") +def new( + name: Annotated[ + str, + typer.Option( + help="Track name. No space, use underscores if needed.", + prompt="Track name. No space, use underscores if needed.", + ), + ], + template: Annotated[ + Template, + typer.Option("--template", "-t", help="Template to use for the track."), + ] = Template.APACHE_PHP, + force: Annotated[ + bool, + typer.Option( + "--force", + help="If directory already exists, delete it and create it again.", + ), + ] = False, +) -> None: + LOG.info(msg=f"Creating a new track: {name}") + if not re.match(pattern=r"^[a-z][a-z0-9\-]{0,61}[a-z0-9]$", string=name): + LOG.critical( + msg="""The track name Valid instance names must fulfill the following requirements: +* The name must be between 1 and 63 characters long; +* The name must contain only letters, numbers and dashes from the ASCII table; +* The name must not start with a digit or a dash; +* The name must not end with a dash.""" + ) + exit(code=1) + + if os.path.exists( + path=( + new_challenge_directory := os.path.join( + CTF_ROOT_DIRECTORY, "challenges", name + ) + ) + ): + if force: + LOG.debug(msg=f"Deleting {new_challenge_directory}") + shutil.rmtree(new_challenge_directory) + else: + LOG.critical( + "Track already exists with that name. Use `--force` to overwrite the track." + ) + exit(code=1) + + os.mkdir(new_challenge_directory) + + LOG.debug(msg=f"Directory {new_challenge_directory} created.") + + env = jinja2.Environment( + loader=jinja2.FileSystemLoader( + searchpath=TEMPLATES_ROOT_DIRECTORY, encoding="utf-8" + ) + ) + + ipv6_subnet = f"9000:d37e:c40b:{secrets.choice('0123456789abcdef')}{secrets.choice('0123456789abcdef')}{secrets.choice('0123456789abcdef')}{secrets.choice('0123456789abcdef')}" + + rb = [ + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + secrets.choice("0123456789abcdef"), + ] + hardware_address = f"00:16:3e:{rb[0]}{rb[1]}:{rb[2]}{rb[3]}:{rb[4]}{rb[5]}" + ipv6_address = f"216:3eff:fe{rb[0]}{rb[1]}:{rb[2]}{rb[3]}{rb[4]}{rb[5]}" + full_ipv6_address = f"{ipv6_subnet}:{ipv6_address}" + + track_template = env.get_template(name="track.yaml.j2") + render = track_template.render( + data={ + "name": name, + "full_ipv6_address": full_ipv6_address, + "ipv6_subnet": ipv6_subnet, + "template": template.value, + } + ) + with open( + file=(p := os.path.join(new_challenge_directory, "track.yaml")), + mode="w", + encoding="utf-8", + ) as f: + f.write(render) + + LOG.debug(msg=f"Wrote {p}.") + + posts_directory = os.path.join(new_challenge_directory, "posts") + + os.mkdir(path=posts_directory) + + LOG.debug(msg=f"Directory {posts_directory} created.") + + track_template = env.get_template(name="topic.yaml.j2") + render = track_template.render(data={"name": name}) + with open( + file=(p := os.path.join(posts_directory, f"{name}.yaml")), + mode="w", + encoding="utf-8", + ) as f: + f.write(render) + + LOG.debug(msg=f"Wrote {p}.") + + track_template = env.get_template(name="post.yaml.j2") + render = track_template.render(data={"name": name}) + with open( + file=(p := os.path.join(posts_directory, f"{name}_flag1.yaml")), + mode="w", + encoding="utf-8", + ) as f: + f.write(render) + + LOG.debug(msg=f"Wrote {p}.") + + if template == Template.TRACK_YAML_ONLY: + return + + files_directory = os.path.join(new_challenge_directory, "files") + + os.mkdir(path=files_directory) + + LOG.debug(msg=f"Directory {files_directory} created.") + + if template == Template.FILES_ONLY: + return + + terraform_directory = os.path.join(new_challenge_directory, "terraform") + + os.mkdir(path=terraform_directory) + + LOG.debug(msg=f"Directory {terraform_directory} created.") + + track_template = env.get_template(name="main.tf.j2") + + render = track_template.render( + data={ + "name": name, + "hardware_address": hardware_address, + "ipv6": ipv6_address, + "ipv6_subnet": ipv6_subnet, + "full_ipv6_address": full_ipv6_address, + } + ) + with open( + file=(p := os.path.join(terraform_directory, "main.tf")), + mode="w", + encoding="utf-8", + ) as f: + f.write(render) + + LOG.debug(msg=f"Wrote {p}.") + + relpath = os.path.relpath( + os.path.join(CTF_ROOT_DIRECTORY, ".deploy", "common"), terraform_directory + ) + + os.symlink( + src=os.path.join(relpath, "variables.tf"), + dst=(p := os.path.join(terraform_directory, "variables.tf")), + ) + + LOG.debug(msg=f"Wrote {p}.") + + os.symlink( + src=os.path.join(relpath, "versions.tf"), + dst=(p := os.path.join(terraform_directory, "versions.tf")), + ) + + LOG.debug(msg=f"Wrote {p}.") + + ansible_directory = os.path.join(new_challenge_directory, "ansible") + + os.mkdir(path=ansible_directory) + + LOG.debug(msg=f"Directory {ansible_directory} created.") + + track_template = env.get_template(name=f"deploy-{template}.yaml.j2") + render = track_template.render(data={"name": name}) + with open( + file=(p := os.path.join(ansible_directory, "deploy.yaml")), + mode="w", + encoding="utf-8", + ) as f: + f.write(render) + + LOG.debug(msg=f"Wrote {p}.") + + track_template = env.get_template(name="inventory.j2") + render = track_template.render(data={"name": name}) + with open( + file=(p := os.path.join(ansible_directory, "inventory")), + mode="w", + encoding="utf-8", + ) as f: + f.write(render) + + LOG.debug(msg=f"Wrote {p}.") + + ansible_challenge_directory = os.path.join(ansible_directory, "challenge") + + os.mkdir(path=ansible_challenge_directory) + + LOG.debug(msg=f"Directory {ansible_challenge_directory} created.") + + if template == Template.APACHE_PHP: + track_template = env.get_template(name="index.php.j2") + render = track_template.render(data={"name": name}) + with open( + file=(p := os.path.join(ansible_challenge_directory, "index.php")), + mode="w", + encoding="utf-8", + ) as f: + f.write(render) + + LOG.debug(msg=f"Wrote {p}.") + + if template == Template.PYTHON_SERVICE: + track_template = env.get_template(name="app.py.j2") + render = track_template.render(data={"name": name}) + with open( + file=(p := os.path.join(ansible_challenge_directory, "app.py")), + mode="w", + encoding="utf-8", + ) as f: + f.write(render) + + LOG.debug(msg=f"Wrote {p}.") + + with open( + file=(p := os.path.join(ansible_challenge_directory, "flag-1.txt")), + mode="w", + encoding="utf-8", + ) as f: + f.write(f"{{{{ track_flags.{name}_flag_1 }}}} (1/2)\n") + + LOG.debug(msg=f"Wrote {p}.") + + if template == Template.RUST_WEBSERVICE: + # Copy the entire challenge template + shutil.copytree( + os.path.join(TEMPLATES_ROOT_DIRECTORY, "rust-webservice"), + ansible_challenge_directory, + dirs_exist_ok=True, + ) + LOG.debug(msg=f"Wrote files to {ansible_challenge_directory}") + + manifest_template = env.get_template(name="Cargo.toml.j2") + render = manifest_template.render(data={"name": name}) + with open( + file=(p := os.path.join(ansible_challenge_directory, "Cargo.toml")), + mode="w", + encoding="utf-8", + ) as f: + f.write(render) + + LOG.debug(msg=f"Wrote {p}.") diff --git a/ctf/redeploy.py b/ctf/redeploy.py new file mode 100644 index 0000000..41cae65 --- /dev/null +++ b/ctf/redeploy.py @@ -0,0 +1,43 @@ +import typer +from typing_extensions import Annotated + +from ctf import ENV +from ctf.deploy import deploy +from ctf.destroy import destroy + +app = typer.Typer() + + +@app.command(help="Destroy and then deploy the given tracks") +def redeploy( + tracks: Annotated[ + list[str], + typer.Option( + "--tracks", + "-t", + help="Only redeploy the given tracks (use the directory name)", + ), + ] = [], + production: Annotated[ + bool, + typer.Option( + "--production", + help="Do a production deployment. Only use this if you know what you're doing.", + ), + ] = False, + remote: Annotated[ + str, typer.Option("--remote", help="Incus remote to deploy to") + ] = "local", + force: Annotated[ + bool, + typer.Option( + "--force", + help="If there are artefacts remaining, delete them without asking.", + ), + ] = False, +) -> None: + ENV["INCUS_REMOTE"] = remote + destroy(tracks=tracks, production=production, remote=remote, force=force) + deploy( + tracks=tracks, production=production, remote=remote, force=force, redeploy=True + ) diff --git a/ctf/services.py b/ctf/services.py new file mode 100644 index 0000000..77deafd --- /dev/null +++ b/ctf/services.py @@ -0,0 +1,52 @@ +import os + +import typer +from typing_extensions import Annotated + +from ctf import CTF_ROOT_DIRECTORY +from ctf.logger import LOG +from ctf.utils import parse_track_yaml + +app = typer.Typer() + + +@app.command(help="Get services from tracks") +def services( + tracks: Annotated[ + list[str], + typer.Option( + "--tracks", + "-t", + help="Only services from the given tracks (use the directory name)", + ), + ] = [], +) -> None: + distinct_tracks: set[str] = set() + for entry in os.listdir( + path=(challenges_directory := os.path.join(CTF_ROOT_DIRECTORY, "challenges")) + ): + if os.path.isdir( + s=(track_directory := os.path.join(challenges_directory, entry)) + ) and os.path.exists(path=os.path.join(track_directory, "track.yaml")): + if not tracks: + distinct_tracks.add(entry) + elif entry in tracks: + distinct_tracks.add(entry) + + for track in distinct_tracks: + LOG.debug(msg=f"Parsing track.yaml for track {track}") + track_yaml = parse_track_yaml(track_name=track) + + if len(track_yaml["services"]) == 0: + LOG.debug(msg=f"No service in track {track}. Skipping...") + continue + + for service in track_yaml["services"]: + contact = ",".join(track_yaml["contacts"]["support"]) + name = service["name"] + instance = service["instance"] + address = service["address"] + check = service["check"] + port = service["port"] + + print(f"{track}/{instance}/{name} {contact} {address} {check} {port}") diff --git a/ctf/stats.py b/ctf/stats.py new file mode 100644 index 0000000..75d9c7c --- /dev/null +++ b/ctf/stats.py @@ -0,0 +1,339 @@ +import json +import logging +import os +import statistics +import subprocess +from datetime import datetime + +import typer +from typing_extensions import Annotated + +from ctf import CTF_ROOT_DIRECTORY +from ctf.logger import LOG +from ctf.utils import parse_track_yaml + +try: + import pybadges + + _has_pybadges = True +except ImportError: + _has_pybadges = False + +try: + import matplotlib.pyplot as plt + + _has_matplotlib = True +except ImportError: + _has_matplotlib = False + +app = typer.Typer() + + +@app.command( + help="Generate statistics (such as number of tracks, number of flags, total flag value, etc.) from all the `track.yaml files. Outputs as JSON." +) +def stats( + tracks: Annotated[ + list[str], + typer.Option( + "--tracks", + "-t", + help="Name of the tracks to count in statistics (if not specified, all tracks are counted).", + ), + ] = [], + generate_badges: Annotated[ + bool, + typer.Option( + "--generate-badges", + help="Generate SVG files of some statistics in the .badges directory.", + ), + ] = False, + charts: Annotated[ + bool, + typer.Option( + "--charts", + help="Generate PNG charts of some statistics in the .charts directory.", + ), + ] = False, + historical: Annotated[ + bool, + typer.Option( + "--historical", + help="Use in conjunction with --charts to generate historical data. ONLY USE THIS IF YOU KNOW WHAT YOU ARE DOING. THIS IS BAD CODE THAT WILL FUCK YOUR REPO IN UNEXPECTED WAYS.", + ), + ] = False, +) -> None: + LOG.debug(msg="Generating statistics...") + stats = {} + distinct_tracks: set[str] = set() + for entry in os.listdir( + (challenges_directory := os.path.join(CTF_ROOT_DIRECTORY, "challenges")) + ): + if os.path.isdir( + (track_directory := os.path.join(challenges_directory, entry)) + ) and os.path.isfile(os.path.join(track_directory, "track.yaml")): + if not tracks: + distinct_tracks.add(entry) + elif entry in tracks: + distinct_tracks.add(entry) + + stats["number_of_tracks"] = len(distinct_tracks) + stats["number_of_tracks_integrated_with_scenario"] = 0 + stats["number_of_flags"] = 0 + stats["highest_value_flag"] = 0 + stats["most_flags_in_a_track"] = 0 + stats["total_flags_value"] = 0 + stats["number_of_services"] = 0 + stats["number_of_files"] = 0 + stats["median_flag_value"] = 0 + stats["mean_flag_value"] = 0 + stats["number_of_services_per_port"] = {} + stats["flag_count_per_value"] = {} + stats["number_of_challenge_designers"] = 0 + stats["number_of_flags_per_track"] = {} + stats["number_of_points_per_track"] = {} + stats["not_integrated_with_scenario"] = [] + challenge_designers = set() + flags = [] + for track in distinct_tracks: + track_yaml = parse_track_yaml(track_name=track) + number_of_flags = len(track_yaml["flags"]) + stats["number_of_flags_per_track"][track] = number_of_flags + if track_yaml["integrated_with_scenario"]: + stats["number_of_tracks_integrated_with_scenario"] += 1 + else: + stats["not_integrated_with_scenario"].append(track) + if number_of_flags > stats["most_flags_in_a_track"]: + stats["most_flags_in_a_track"] = number_of_flags + stats["number_of_flags"] += number_of_flags + stats["number_of_services"] += len(track_yaml["services"]) + stats["number_of_points_per_track"][track] = 0 + for flag in track_yaml["flags"]: + flags.append(flag["value"]) + stats["number_of_points_per_track"][track] += flag["value"] + stats["total_flags_value"] += flag["value"] + if flag["value"] > stats["highest_value_flag"]: + stats["highest_value_flag"] = flag["value"] + if flag["value"] not in stats["flag_count_per_value"]: + stats["flag_count_per_value"][flag["value"]] = 0 + stats["flag_count_per_value"][flag["value"]] += 1 + for service in track_yaml["services"]: + if service["port"] not in stats["number_of_services_per_port"]: + stats["number_of_services_per_port"][service["port"]] = 0 + stats["number_of_services_per_port"][service["port"]] += 1 + for challenge_designer in track_yaml["contacts"]["dev"]: + challenge_designers.add(challenge_designer.lower()) + + if os.path.exists( + path=(files_directory := os.path.join(challenges_directory, track, "files")) + ): + for file in os.listdir(path=files_directory): + stats["number_of_files"] += 1 + stats["median_flag_value"] = statistics.median(flags) + stats["mean_flag_value"] = round(statistics.mean(flags), 2) + stats["number_of_challenge_designers"] = len(challenge_designers) + + # Sort dict keys + stats["flag_count_per_value"] = { + key: stats["flag_count_per_value"][key] + for key in sorted(stats["flag_count_per_value"].keys()) + } + stats["number_of_services_per_port"] = { + key: stats["number_of_services_per_port"][key] + for key in sorted(stats["number_of_services_per_port"].keys()) + } + + stats["challenge_designers"] = sorted(list(challenge_designers)) + stats["number_of_flags_per_track"] = dict( + sorted(stats["number_of_flags_per_track"].items(), key=lambda item: item[1]) + ) + stats["number_of_points_per_track"] = dict( + sorted(stats["number_of_points_per_track"].items(), key=lambda item: item[1]) + ) + + print(json.dumps(stats, indent=2, ensure_ascii=False)) + if generate_badges: + if not _has_pybadges: + LOG.critical(msg="Module pybadges was not found.") + exit(code=1) + LOG.info(msg="Generating badges...") + os.makedirs(name=".badges", exist_ok=True) + write_badge( + "flag", + pybadges.badge(left_text="Flags", right_text=str(stats["number_of_flags"])), # type: ignore + ) + write_badge( + "points", + pybadges.badge( # type: ignore + left_text="Points", right_text=str(stats["total_flags_value"]) + ), + ) + write_badge( + "tracks", + pybadges.badge( # type: ignore + left_text="Tracks", right_text=str(stats["number_of_tracks"]) + ), + ) + write_badge( + "services", + pybadges.badge( # type: ignore + left_text="Services", right_text=str(stats["number_of_services"]) + ), + ) + write_badge( + "designers", + pybadges.badge( # type: ignore + left_text="Challenge Designers", + right_text=str(stats["number_of_challenge_designers"]), + ), + ) + write_badge( + "files", + pybadges.badge( # type: ignore + left_text="Files", + right_text=str(stats["number_of_files"]), + ), + ) + write_badge( + "scenario", + pybadges.badge( # type: ignore + left_text="Integrated with scenario", + right_text=str(stats["number_of_tracks_integrated_with_scenario"]) + + "/" + + str(stats["number_of_tracks"]), + ), + ) + + if charts: + if not _has_matplotlib: + LOG.critical(msg="Module matplotlib was not found.") + exit(code=1) + LOG.info(msg="Generating charts...") + mpl_logger = logging.getLogger("matplotlib") + mpl_logger.setLevel(logging.INFO) + os.makedirs(name=".charts", exist_ok=True) + # Flag count per value barchart + plt.bar( + stats["flag_count_per_value"].keys(), stats["flag_count_per_value"].values() + ) + plt.xticks( + ticks=range(0, max(stats["flag_count_per_value"].keys()) + 1), rotation=45 + ) + plt.grid(True, linestyle="--", alpha=0.3) + plt.xlabel("Flag Value") + plt.ylabel("Number of Flags") + plt.title("Number of Flags per Value") + plt.savefig(os.path.join(".charts", "flags_per_value.png")) + plt.clf() + + # Number of flag per track barchart + plt.bar( + list(stats["number_of_flags_per_track"].keys()), + stats["number_of_flags_per_track"].values(), + ) + plt.xticks(ticks=list(stats["number_of_flags_per_track"].keys()), rotation=90) + plt.grid(True, linestyle="--", alpha=0.3) + plt.subplots_adjust(bottom=0.5) + plt.xlabel("Track") + plt.ylabel("Number of flags") + plt.title("Number of flags per track") + plt.savefig(os.path.join(".charts", "flags_per_track.png")) + plt.clf() + + # Number of points per track barchart + plt.bar( + list(stats["number_of_points_per_track"].keys()), + stats["number_of_points_per_track"].values(), + ) + plt.xticks(ticks=list(stats["number_of_points_per_track"].keys()), rotation=90) + plt.grid(True, linestyle="--", alpha=0.3) + plt.subplots_adjust(bottom=0.5) + plt.xlabel("Track") + plt.ylabel("Number of points") + plt.title("Number of points per track") + plt.savefig(os.path.join(".charts", "points_per_track.png")) + plt.clf() + + if historical: + # Number of points and flags over time + historical_data = {} + commit_list = ( + subprocess.check_output( + ["git", "log", "--pretty=format:%H %ad", "--date=iso"] + ) + .decode() + .splitlines()[::-1] + ) + commit_list_with_date = [] + for commit in commit_list: + hash, date = commit.split(" ", 1) + parsed_datetime = datetime.strptime(date, "%Y-%m-%d %H:%M:%S %z") + commit_list_with_date.append((parsed_datetime, hash)) + commit_list_with_date = sorted(commit_list_with_date, key=lambda x: x[0]) + subprocess.run(["git", "stash"], check=True) + for i, commit in list(enumerate(commit_list_with_date))[0:]: + parsed_datetime, hash = commit + # Check if the commit message has "Merge pull request" in it + commit_message = subprocess.run( + ["git", "show", "-s", "--pretty=%B", hash], + check=True, + capture_output=True, + ) + if "Merge pull request" in commit_message.stdout.decode(): + LOG.debug( + f"{i + 1}/{len(commit_list_with_date)} Checking out commit: {commit}" + ) + parsed_date = parsed_datetime.date() + subprocess.run( + ["git", "checkout", hash], check=True, capture_output=True + ) + + # Execute your command here (replace with what you need) + result = ( + subprocess.run( + ["python", "scripts/ctf.py", "stats"], + check=False, + capture_output=True, + text=True, + ), + ) + if result[0].returncode == 0: + stats = json.loads(result[0].stdout) + total_points = stats["total_flags_value"] + total_flags = stats["number_of_flags"] + print(total_flags) + historical_data[parsed_date] = { + "total_points": total_points, + "total_flags": total_flags, + } + subprocess.run(["git", "checkout", "main"], check=True, capture_output=True) + subprocess.run(["git", "stash", "pop"], check=True) + + plt.plot( + historical_data.keys(), + [data["total_points"] for data in historical_data.values()], + label="Total Points", + ) + # plt.plot(historical_data.keys(), [data["total_flags"] for data in historical_data.values()], label="Total Flags") + # plt.xticks(ticks=list(stats["number_of_points_per_track"].keys()), rotation=90) + plt.grid(True, linestyle="--", alpha=0.3) + plt.subplots_adjust(bottom=0.1) + plt.xlabel("Time") + plt.ylabel("Total points") + plt.title("Total points over time") + plt.xticks(rotation=90) + plt.subplots_adjust(bottom=0.2) + plt.subplot().set_ylim( + 0, max([data["total_points"] for data in historical_data.values()]) + 10 + ) + plt.savefig(os.path.join(".charts", "points_over_time.png")) + plt.clf() + + LOG.debug(msg="Done...") + + +def write_badge(name: str, svg: str) -> None: + with open( + file=os.path.join(".badges", f"badge-{name}.svg"), mode="w", encoding="utf-8" + ) as f: + f.write(svg) diff --git a/ctf/utils.py b/ctf/utils.py index 40eb098..ca15f1c 100644 --- a/ctf/utils.py +++ b/ctf/utils.py @@ -1,5 +1,6 @@ import os import re +import shutil import subprocess import textwrap from typing import Any, Generator @@ -7,8 +8,9 @@ import jinja2 import yaml -from ctf import CTF_ROOT_DIRECTORY +from ctf.logger import LOG +__CTF_ROOT_DIRECTORY = "" def available_incus_remotes() -> list[str]: try: @@ -30,7 +32,7 @@ def get_all_available_tracks() -> set[str]: tracks = set() for entry in os.listdir( - path=(challenges_directory := os.path.join(CTF_ROOT_DIRECTORY, "challenges")) + path=(challenges_directory := os.path.join(find_ctf_root_directory(), "challenges")) ): if not os.path.isdir(s=os.path.join(challenges_directory, entry)): continue @@ -44,17 +46,17 @@ def validate_track_can_be_deployed(track: str) -> bool: return ( os.path.exists( path=os.path.join( - CTF_ROOT_DIRECTORY, "challenges", track, "terraform", "main.tf" + find_ctf_root_directory(), "challenges", track, "terraform", "main.tf" ) ) and os.path.exists( path=os.path.join( - CTF_ROOT_DIRECTORY, "challenges", track, "ansible", "deploy.yaml" + find_ctf_root_directory(), "challenges", track, "ansible", "deploy.yaml" ) ) and os.path.exists( path=os.path.join( - CTF_ROOT_DIRECTORY, "challenges", track, "ansible", "inventory" + find_ctf_root_directory(), "challenges", track, "ansible", "inventory" ) ) ) @@ -64,7 +66,7 @@ def add_tracks_to_terraform_modules( tracks: set[str], remote: str, production: bool = False ): with open( - file=os.path.join(CTF_ROOT_DIRECTORY, ".deploy", "modules.tf"), mode="a" + file=os.path.join(find_ctf_root_directory(), ".deploy", "modules.tf"), mode="a" ) as fd: template = jinja2.Environment().from_string( source=textwrap.dedent( @@ -96,7 +98,7 @@ def add_tracks_to_terraform_modules( def create_terraform_modules_file(remote: str, production: bool = False): with open( - file=os.path.join(CTF_ROOT_DIRECTORY, ".deploy", "modules.tf"), mode="w+" + file=os.path.join(find_ctf_root_directory(), ".deploy", "modules.tf"), mode="w+" ) as fd: template = jinja2.Environment().from_string( source=textwrap.dedent( @@ -118,7 +120,7 @@ def create_terraform_modules_file(remote: str, production: bool = False): def get_terraform_tracks_from_modules() -> set[str]: with open( - file=os.path.join(CTF_ROOT_DIRECTORY, ".deploy", "modules.tf"), mode="r" + file=os.path.join(find_ctf_root_directory(), ".deploy", "modules.tf"), mode="r" ) as f: modules_tf = f.read() @@ -164,7 +166,7 @@ def get_ctf_script_schemas_directory() -> str: def remove_ctf_script_root_directory_from_path(path: str) -> str: - return os.path.relpath(path=path, start=CTF_ROOT_DIRECTORY) + return os.path.relpath(path=path, start=find_ctf_root_directory()) def parse_track_yaml(track_name: str) -> dict[str, Any]: @@ -172,7 +174,7 @@ def parse_track_yaml(track_name: str) -> dict[str, Any]: stream=open( file=( p := os.path.join( - CTF_ROOT_DIRECTORY, "challenges", track_name, "track.yaml" + find_ctf_root_directory(), "challenges", track_name, "track.yaml" ) ), mode="r", @@ -190,7 +192,7 @@ def parse_post_yamls(track_name: str) -> list[dict]: for post in os.listdir( path=( posts_dir := os.path.join( - CTF_ROOT_DIRECTORY, "challenges", track_name, "posts" + find_ctf_root_directory(), "challenges", track_name, "posts" ) ) ): @@ -205,3 +207,42 @@ def parse_post_yamls(track_name: str) -> list[dict]: posts.append(post_data) return posts + + +def find_ctf_root_directory() -> str: + global __CTF_ROOT_DIRECTORY + if __CTF_ROOT_DIRECTORY: + return __CTF_ROOT_DIRECTORY + + path = os.path.join(os.getcwd(), ".") + + while path != (path := os.path.dirname(p=path)): + dir = os.listdir(path=path) + + if ".deploy" not in dir: + continue + if "challenges" not in dir: + continue + break + + if path == "/": + if "CTF_ROOT_DIR" not in os.environ: + LOG.critical( + msg='Could not automatically find the root directory nor the "CTF_ROOT_DIR" environment variable. To initialize a new root directory, use `ctf init [path]`' + ) + exit(1) + return (__CTF_ROOT_DIRECTORY := os.environ.get("CTF_ROOT_DIR", default=".")) + + LOG.debug(msg=f"Found root directory: {path}") + return (__CTF_ROOT_DIRECTORY := path) + + +def terraform_binary() -> str: + path = shutil.which(cmd="tofu") + if not path: + path = shutil.which(cmd="terraform") + + if not path: + raise Exception("Couldn't find Terraform or OpenTofu") + + return path diff --git a/ctf/validate.py b/ctf/validate.py new file mode 100644 index 0000000..ba33769 --- /dev/null +++ b/ctf/validate.py @@ -0,0 +1,117 @@ +import os +import re +import subprocess +import textwrap + +import typer +from tabulate import tabulate + +from ctf import CTF_ROOT_DIRECTORY, SCHEMAS_ROOT_DIRECTORY +from ctf.logger import LOG +from ctf.validate_json_schemas import validate_with_json_schemas +from ctf.validators import ValidationError, validators_list + +app = typer.Typer() + + +@app.command( + help="Run many static validations to ensure coherence and quality in the tracks and repo as a whole." +) +def validate() -> None: + LOG.info(msg="Starting ctf validate...") + + LOG.info(msg=f"Found {len(validators_list)} Validators") + + validators = [validator_class() for validator_class in validators_list] + + tracks = [] + for track in os.listdir(path=os.path.join(CTF_ROOT_DIRECTORY, "challenges")): + if os.path.isdir( + s=os.path.join(CTF_ROOT_DIRECTORY, "challenges", track) + ) and os.path.exists( + path=os.path.join(CTF_ROOT_DIRECTORY, "challenges", track, "track.yaml") + ): + tracks.append(track) + + LOG.info(msg=f"Found {len(tracks)} tracks") + + errors: list[ValidationError] = [] + + LOG.info(msg="Validating track.yaml files against JSON Schema...") + validate_with_json_schemas( + schema=os.path.join(SCHEMAS_ROOT_DIRECTORY, "track.yaml.json"), + files_pattern=os.path.join(CTF_ROOT_DIRECTORY, "challenges", "*", "track.yaml"), + ) + LOG.info(msg="Validating discourse post YAML files against JSON Schema...") + validate_with_json_schemas( + schema=os.path.join(SCHEMAS_ROOT_DIRECTORY, "post.json"), + files_pattern=os.path.join( + CTF_ROOT_DIRECTORY, "challenges", "*", "posts", "*.yaml" + ), + ) + + LOG.info(msg="Validating terraform files format...") + r = subprocess.run( + args=["tofu", "fmt", "-no-color", "-check", "-recursive", CTF_ROOT_DIRECTORY], + capture_output=True, + ) + if r.returncode != 0: + errors.append( + ValidationError( + error_name="Tofu format", + error_description="Bad Terraform formatting. Please run `tofu fmt -recursive ./`", + details={ + "Files": "\n".join( + [ + *([out] if (out := r.stdout.decode().strip()) else []), + *re.findall( + pattern=r"(Failed to read file .+)$", + string=r.stderr.decode().strip(), + flags=re.MULTILINE, + ), + ] + ) + }, + ) + ) + + for validator in validators: + LOG.info(msg=f"Running {type(validator).__name__}") + for track in tracks: + errors += validator.validate(track_name=track) + + # Get the errors from finalize() + for validator in validators: + errors += validator.finalize() + + if not errors: + LOG.info(msg="No error found!") + else: + LOG.error(msg=f"{len(errors)} errors found.") + + errors_list = list( + map( + lambda error: [ + error.track_name, + error.error_name, + "\n".join(textwrap.wrap(error.error_description, 50)), + "\n".join( + [ + str(key) + ": " + str(value) + for key, value in error.details.items() + ] + ), + ], + errors, + ) + ) + + LOG.error( + "\n" + + tabulate( + errors_list, + headers=["Track", "Error", "Description", "Details"], + tablefmt="fancy_grid", + ) + ) + exit(code=1) diff --git a/ctf/validate_json_schemas.py b/ctf/validate_json_schemas.py index 437359c..90a6e57 100644 --- a/ctf/validate_json_schemas.py +++ b/ctf/validate_json_schemas.py @@ -1,10 +1,11 @@ import argparse -import logging import glob import json -import yaml -import jsonschema +import logging + import coloredlogs +import jsonschema +import yaml LOG = logging.getLogger() diff --git a/ctf/version.py b/ctf/version.py new file mode 100644 index 0000000..c6ec126 --- /dev/null +++ b/ctf/version.py @@ -0,0 +1,11 @@ +import typer + +from ctf import VERSION + +app = typer.Typer() + + +@app.command(help="Print the tool's version.") +def version(): + print(VERSION) + exit(0) diff --git a/poetry.lock b/poetry.lock index dd976d4..a43994c 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.3 and should not be changed by hand. [[package]] name = "attrs" @@ -58,8 +58,6 @@ mypy-extensions = ">=0.4.3" packaging = ">=22.0" pathspec = ">=0.9.0" platformdirs = ">=2" -tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""} -typing-extensions = {version = ">=4.0.1", markers = "python_version < \"3.11\""} [package.extras] colorama = ["colorama (>=0.4.3)"] @@ -1434,49 +1432,6 @@ files = [ [package.extras] widechars = ["wcwidth"] -[[package]] -name = "tomli" -version = "2.2.1" -description = "A lil' TOML parser" -optional = false -python-versions = ">=3.8" -groups = ["main"] -markers = "python_version == \"3.10\"" -files = [ - {file = "tomli-2.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249"}, - {file = "tomli-2.2.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:023aa114dd824ade0100497eb2318602af309e5a55595f76b626d6d9f3b7b0a6"}, - {file = "tomli-2.2.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ece47d672db52ac607a3d9599a9d48dcb2f2f735c6c2d1f34130085bb12b112a"}, - {file = "tomli-2.2.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6972ca9c9cc9f0acaa56a8ca1ff51e7af152a9f87fb64623e31d5c83700080ee"}, - {file = "tomli-2.2.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c954d2250168d28797dd4e3ac5cf812a406cd5a92674ee4c8f123c889786aa8e"}, - {file = "tomli-2.2.1-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:8dd28b3e155b80f4d54beb40a441d366adcfe740969820caf156c019fb5c7ec4"}, - {file = "tomli-2.2.1-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:e59e304978767a54663af13c07b3d1af22ddee3bb2fb0618ca1593e4f593a106"}, - {file = "tomli-2.2.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:33580bccab0338d00994d7f16f4c4ec25b776af3ffaac1ed74e0b3fc95e885a8"}, - {file = "tomli-2.2.1-cp311-cp311-win32.whl", hash = "sha256:465af0e0875402f1d226519c9904f37254b3045fc5084697cefb9bdde1ff99ff"}, - {file = "tomli-2.2.1-cp311-cp311-win_amd64.whl", hash = "sha256:2d0f2fdd22b02c6d81637a3c95f8cd77f995846af7414c5c4b8d0545afa1bc4b"}, - {file = "tomli-2.2.1-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:4a8f6e44de52d5e6c657c9fe83b562f5f4256d8ebbfe4ff922c495620a7f6cea"}, - {file = "tomli-2.2.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:8d57ca8095a641b8237d5b079147646153d22552f1c637fd3ba7f4b0b29167a8"}, - {file = "tomli-2.2.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4e340144ad7ae1533cb897d406382b4b6fede8890a03738ff1683af800d54192"}, - {file = "tomli-2.2.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:db2b95f9de79181805df90bedc5a5ab4c165e6ec3fe99f970d0e302f384ad222"}, - {file = "tomli-2.2.1-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:40741994320b232529c802f8bc86da4e1aa9f413db394617b9a256ae0f9a7f77"}, - {file = "tomli-2.2.1-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:400e720fe168c0f8521520190686ef8ef033fb19fc493da09779e592861b78c6"}, - {file = "tomli-2.2.1-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:02abe224de6ae62c19f090f68da4e27b10af2b93213d36cf44e6e1c5abd19fdd"}, - {file = "tomli-2.2.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:b82ebccc8c8a36f2094e969560a1b836758481f3dc360ce9a3277c65f374285e"}, - {file = "tomli-2.2.1-cp312-cp312-win32.whl", hash = "sha256:889f80ef92701b9dbb224e49ec87c645ce5df3fa2cc548664eb8a25e03127a98"}, - {file = "tomli-2.2.1-cp312-cp312-win_amd64.whl", hash = "sha256:7fc04e92e1d624a4a63c76474610238576942d6b8950a2d7f908a340494e67e4"}, - {file = "tomli-2.2.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:f4039b9cbc3048b2416cc57ab3bda989a6fcf9b36cf8937f01a6e731b64f80d7"}, - {file = "tomli-2.2.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:286f0ca2ffeeb5b9bd4fcc8d6c330534323ec51b2f52da063b11c502da16f30c"}, - {file = "tomli-2.2.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a92ef1a44547e894e2a17d24e7557a5e85a9e1d0048b0b5e7541f76c5032cb13"}, - {file = "tomli-2.2.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9316dc65bed1684c9a98ee68759ceaed29d229e985297003e494aa825ebb0281"}, - {file = "tomli-2.2.1-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e85e99945e688e32d5a35c1ff38ed0b3f41f43fad8df0bdf79f72b2ba7bc5272"}, - {file = "tomli-2.2.1-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:ac065718db92ca818f8d6141b5f66369833d4a80a9d74435a268c52bdfa73140"}, - {file = "tomli-2.2.1-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:d920f33822747519673ee656a4b6ac33e382eca9d331c87770faa3eef562aeb2"}, - {file = "tomli-2.2.1-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:a198f10c4d1b1375d7687bc25294306e551bf1abfa4eace6650070a5c1ae2744"}, - {file = "tomli-2.2.1-cp313-cp313-win32.whl", hash = "sha256:d3f5614314d758649ab2ab3a62d4f2004c825922f9e370b29416484086b264ec"}, - {file = "tomli-2.2.1-cp313-cp313-win_amd64.whl", hash = "sha256:a38aa0308e754b0e3c67e344754dff64999ff9b513e691d0e786265c93583c69"}, - {file = "tomli-2.2.1-py3-none-any.whl", hash = "sha256:cb55c73c5f4408779d0cf3eef9f762b9c9f147a77de7b258bef0a5628adc85cc"}, - {file = "tomli-2.2.1.tar.gz", hash = "sha256:cd45e1dc79c835ce60f7404ec8119f2eb06d38b1deba146f07ced3bbc44505ff"}, -] - [[package]] name = "typer" version = "0.16.0" @@ -1531,5 +1486,5 @@ workflow = ["matplotlib", "pybadges", "standard-imghdr"] [metadata] lock-version = "2.1" -python-versions = ">=3.10" -content-hash = "fc4325b8b3bbc432ae7b3106776d581d52e69815890691fdabcf5abf9d83f674" +python-versions = ">=3.11" +content-hash = "07718a8da6b7329184ea2e5f708c3246d3d7917815a5075de2bd874d2e30a56d" diff --git a/pyproject.toml b/pyproject.toml index fd2ad25..d3a8e24 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,7 @@ dependencies = [ "tabulate==0.9.0", "typer==0.16.0", ] -version = "2.0.0" +version = "2.0.1" classifiers = [ "Programming Language :: Python :: 3", "Operating System :: OS Independent",