diff --git a/ctf/__main__.py b/ctf/__main__.py index b0b2dc7..d662602 100644 --- a/ctf/__main__.py +++ b/ctf/__main__.py @@ -23,6 +23,7 @@ from ctf.init import app as init_app from ctf.list import app as list_app from ctf.logger import LOG +from ctf.monitor import app as monitor_app from ctf.new import app as new_app from ctf.redeploy import app as redeploy_app from ctf.services import app as services_app @@ -35,11 +36,6 @@ help="CLI tool to manage CTF challenges as code. Run from the root CTF repo directory or set the CTF_ROOT_DIR environment variable to run the tool.", no_args_is_help=True, ) -app.add_typer( - askgod_app, - name="askgod", - help="Commands for interacting with a live askgod server (github.com/nsec/askgod).", -) app.add_typer(check_app) app.add_typer(deploy_app) app.add_typer(destroy_app) @@ -54,6 +50,19 @@ app.add_typer(validate_app) app.add_typer(version_app) +app.add_typer( + askgod_app, + name="askgod", + help="Commands for interacting with a live askgod server (github.com/nsec/askgod).", + rich_help_panel="Subcommands", +) +app.add_typer( + monitor_app, + name="monitor", + help="Monitor tracks subcommands.", + rich_help_panel="Subcommands", +) + def check_tool_version() -> None: diff --git a/ctf/askgod/__init__.py b/ctf/askgod/__init__.py index 1f5dc95..9161503 100644 --- a/ctf/askgod/__init__.py +++ b/ctf/askgod/__init__.py @@ -1,6 +1,6 @@ import typer -from ctf.askgod.stats import app as stats_app +from .stats import app as stats_app app = typer.Typer(no_args_is_help=True) app.add_typer(stats_app) diff --git a/ctf/monitor/__init__.py b/ctf/monitor/__init__.py new file mode 100644 index 0000000..114b4b5 --- /dev/null +++ b/ctf/monitor/__init__.py @@ -0,0 +1,10 @@ +import typer + +from .flags import app as flags_app +from .solves import app as solves_app +from .stats import app as stats_app + +app = typer.Typer(no_args_is_help=True) +app.add_typer(flags_app) +app.add_typer(stats_app) +app.add_typer(solves_app) diff --git a/ctf/monitor/flags.py b/ctf/monitor/flags.py new file mode 100644 index 0000000..f4abe86 --- /dev/null +++ b/ctf/monitor/flags.py @@ -0,0 +1,53 @@ +import re +import subprocess +from typing import Annotated + +import typer + +app = typer.Typer() + + +@app.command(help="Monitor flags using IDs or track name.") +def flags( + tracks: Annotated[ + list[str], + typer.Option( + "--tracks", + "-t", + help="Monitor flags using track name.", + ), + ] = [], + ids: Annotated[ + list[int], typer.Option("-i", "--ids", help="Monitor flags using IDs.") + ] = [], +) -> None: + try: + while True: + r: subprocess.CompletedProcess[bytes] = subprocess.run( + ["askgod", "admin", "list-flags"], + capture_output=True, + check=True, + ) + for line in r.stdout.decode().strip().splitlines(): + for track in tracks: + if re.search(r"\|\s*.*" + track, line, re.IGNORECASE): + ids.append(int(line.split("|")[0])) + + if not ids: + if not tracks: + print("You must provide tracks or ids.") + else: + print(f"No flag found from given tracks: {', '.join(tracks)}") + raise typer.Exit(1) + + print(f"Monitoring IDs: {'|'.join([str(i) for i in ids])}") + print( + f"askgod admin monitor-flags | grep -E 'id=({'|'.join([str(i) for i in ids])})'" + ) + r: subprocess.CompletedProcess[bytes] = subprocess.run( + f"askgod admin monitor-flags | grep -E 'id=({'|'.join([str(i) for i in ids])})'", + check=True, + shell=True, + ) + except KeyboardInterrupt: + ... diff --git a/ctf/monitor/solves.py b/ctf/monitor/solves.py new file mode 100644 index 0000000..115e1ed --- /dev/null +++ b/ctf/monitor/solves.py @@ -0,0 +1,76 @@ +import os +import subprocess +import sys +import time +from typing import Annotated + +import typer +from rich.console import Console +from rich.table import Table + +app = typer.Typer() + + +@app.command(help="Monitor solves per track.") +def solves( + tracks: Annotated[ + list[str], + typer.Option( + "--tracks", + "-t", + help="Tracks to monitor.", + ), + ] = [], + minimum_solves: Annotated[ + int, + typer.Option( + "-m", + "--minimum-solves", + help="Displaying tracks that has a threshold equal or higher.", + ), + ] = 1, +) -> None: + try: + while True: + r: subprocess.CompletedProcess[bytes] = subprocess.run( + ["askgod", "admin", "history"], + capture_output=True, + check=True, + ) + + solves_per_track: dict[str, dict[tuple[str, str], int]] = { + track: {} for track in tracks + } + for line in r.stdout.decode().split("\n"): + for track in tracks: + if track in line: + splitted_line = line.split("|") + key: tuple[str, str] = ( + splitted_line[3].strip().replace("team", "").lstrip("0"), + splitted_line[4], + ) + if key not in solves_per_track[track]: + solves_per_track[track][key] = 0 + solves_per_track[track][key] += 1 + + os.system("cls" if sys.platform.startswith("win") else "clear") + table = Table(show_lines=True) + table.add_column("Track", style="bold cyan") + table.add_column("Team ID") + table.add_column("Team Name") + table.add_column("Solves") + for track, team_solves in sorted( + solves_per_track.items(), + key=lambda item: item[0], + ): + for team, solves in sorted( + team_solves.items(), reverse=True, key=lambda item: item[1] + ): + if solves > minimum_solves: + table.add_row(track, team[0], team[1], str(solves)) + + Console().print(table) + + time.sleep(5) + except KeyboardInterrupt: + ... diff --git a/ctf/monitor/stats.py b/ctf/monitor/stats.py new file mode 100644 index 0000000..dee0e55 --- /dev/null +++ b/ctf/monitor/stats.py @@ -0,0 +1,90 @@ +import re +import subprocess +from typing import Annotated + +import typer +from rich.console import Console +from rich.table import Table + +app = typer.Typer() + + +@app.command(help=".") +def stats( + tracks: Annotated[ + list[str], + typer.Option( + "--tracks", + "-t", + help="Tracks to monitor.", + ), + ] = [], + ids: Annotated[ + list[int], typer.Option("-i", "--ids", help="Monitor flags using IDs.") + ] = [], +) -> None: + r: subprocess.CompletedProcess[bytes] = subprocess.run( + ["askgod", "admin", "list-teams"], + capture_output=True, + check=True, + ) + + amount_of_teams = int( + sorted( + r.stdout.decode().strip().splitlines()[2:], + reverse=True, + key=lambda item: int(item.split("|")[0].strip()), + )[0] + .split("|")[0] + .strip() + ) + + r: subprocess.CompletedProcess[bytes] = subprocess.run( + ["askgod", "admin", "list-flags"], + capture_output=True, + check=True, + ) + for line in r.stdout.decode().strip().splitlines(): + for track in tracks: + if re.search(r"\|\s*.*" + track, line, re.IGNORECASE): + ids.append(int(line.split("|")[0])) + + if not ids: + if not tracks: + print("You must provide tracks or ids.") + else: + print(f"No flag found from given tracks: {', '.join(tracks)}") + raise typer.Exit(1) + + r: subprocess.CompletedProcess[bytes] = subprocess.run( + ["askgod", "admin", "list-scores"], + capture_output=True, + check=True, + ) + + solves_per_flag = {} + for line in r.stdout.decode().strip().splitlines(): + try: + _id = line.split("|")[2].strip() + except Exception: + continue + + if not _id.isnumeric(): + continue + + if re.search(f"({'|'.join([str(i) for i in ids])})", _id): + if _id not in solves_per_flag: + solves_per_flag[_id] = 0 + solves_per_flag[_id] += 1 + + table = Table(show_lines=True) + table.add_column("ID") + table.add_column("Amount") + table.add_column("Percent") + + for _id, solves in sorted( + solves_per_flag.items(), reverse=True, key=lambda item: item[1] + ): + table.add_row(_id, str(solves), str(int((solves / amount_of_teams) * 100))) + + Console().print(table) diff --git a/pyproject.toml b/pyproject.toml index 37be2ab..21e1e2b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,7 @@ dependencies = [ "typer==0.24.1", "pydantic", ] -version = "5.0.1" +version = "5.1.0" classifiers = [ "Programming Language :: Python :: 3", "Operating System :: OS Independent",