From ef897740511580d002a7663d523c153230ad393b Mon Sep 17 00:00:00 2001 From: JingWen Fan <106414602+study8677@users.noreply.github.com> Date: Wed, 20 May 2026 17:56:40 +0800 Subject: [PATCH] fix: prevent admin privilege escalation via signup --- src/opencmo/storage/accounts.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/opencmo/storage/accounts.py b/src/opencmo/storage/accounts.py index a370cbe..3454466 100644 --- a/src/opencmo/storage/accounts.py +++ b/src/opencmo/storage/accounts.py @@ -177,6 +177,7 @@ async def get_user_account(user_id: int) -> dict | None: async def create_user_with_account(email: str, password: str, name: str = "") -> tuple[dict, dict]: normalized = (email or "").strip().lower() + admin_email = os.environ.get("OPENCMO_ADMIN_EMAIL", "hello@aidcmo.com").strip().lower() if not is_valid_email(normalized): raise ValueError("invalid_email") if len(password or "") < MIN_PASSWORD_LENGTH: @@ -189,6 +190,8 @@ async def create_user_with_account(email: str, password: str, name: str = "") -> try: existing = await db.execute("SELECT id, password_hash FROM users WHERE email = ?", (normalized,)) row = await existing.fetchone() + if row and row[1] == "!unusable" and normalized == admin_email: + raise ValueError("email_exists") if row and row[1] != "!unusable": raise ValueError("email_exists") @@ -206,7 +209,7 @@ async def create_user_with_account(email: str, password: str, name: str = "") -> normalized, hash_password(password), name.strip(), - "admin" if normalized == os.environ.get("OPENCMO_ADMIN_EMAIL", "hello@aidcmo.com").strip().lower() else "user", + "user", ), ) user_id = int(cursor.lastrowid)