From 3dda96cb6a36ac730ff003da215d36b3d784d482 Mon Sep 17 00:00:00 2001 From: Guilherme Souza Date: Mon, 12 May 2025 13:37:06 -0300 Subject: [PATCH] fix: validate uuid on admin methods and fix wrong factor in delete_factor method --- supabase_auth/_async/gotrue_admin_api.py | 20 +++++++++- supabase_auth/_sync/gotrue_admin_api.py | 20 +++++++++- supabase_auth/helpers.py | 9 +++++ tests/_async/test_gotrue_admin_api.py | 50 ++++++++++++++++++++++++ tests/_sync/test_gotrue_admin_api.py | 50 ++++++++++++++++++++++++ 5 files changed, 145 insertions(+), 4 deletions(-) diff --git a/supabase_auth/_async/gotrue_admin_api.py b/supabase_auth/_async/gotrue_admin_api.py index 54d48739..408f3da0 100644 --- a/supabase_auth/_async/gotrue_admin_api.py +++ b/supabase_auth/_async/gotrue_admin_api.py @@ -3,7 +3,12 @@ from functools import partial from typing import Dict, List, Optional -from ..helpers import model_validate, parse_link_response, parse_user_response +from ..helpers import ( + is_valid_uuid, + model_validate, + parse_link_response, + parse_user_response, +) from ..http_clients import AsyncClient from ..types import ( AdminUserAttributes, @@ -131,6 +136,8 @@ async def get_user_by_id(self, uid: str) -> UserResponse: This function should only be called on a server. Never expose your `service_role` key in the browser. """ + self._validate_uuid(uid) + return await self._request( "GET", f"admin/users/{uid}", @@ -148,6 +155,7 @@ async def update_user_by_id( This function should only be called on a server. Never expose your `service_role` key in the browser. """ + self._validate_uuid(uid) return await self._request( "PUT", f"admin/users/{uid}", @@ -162,6 +170,7 @@ async def delete_user(self, id: str, should_soft_delete: bool = False) -> None: This function should only be called on a server. Never expose your `service_role` key in the browser. """ + self._validate_uuid(id) body = {"should_soft_delete": should_soft_delete} return await self._request("DELETE", f"admin/users/{id}", body=body) @@ -169,6 +178,7 @@ async def _list_factors( self, params: AuthMFAAdminListFactorsParams, ) -> AuthMFAAdminListFactorsResponse: + self._validate_uuid(params.get("user_id")) return await self._request( "GET", f"admin/users/{params.get('user_id')}/factors", @@ -179,8 +189,14 @@ async def _delete_factor( self, params: AuthMFAAdminDeleteFactorParams, ) -> AuthMFAAdminDeleteFactorResponse: + self._validate_uuid(params.get("user_id")) + self._validate_uuid(params.get("id")) return await self._request( "DELETE", - f"admin/users/{params.get('user_id')}/factors/{params.get('factor_id')}", + f"admin/users/{params.get('user_id')}/factors/{params.get('id')}", xform=partial(model_validate, AuthMFAAdminDeleteFactorResponse), ) + + def _validate_uuid(self, id: str) -> None: + if not is_valid_uuid(id): + raise ValueError(f"Invalid id, '{id}' is not a valid uuid") diff --git a/supabase_auth/_sync/gotrue_admin_api.py b/supabase_auth/_sync/gotrue_admin_api.py index 3997c53d..afbb75e0 100644 --- a/supabase_auth/_sync/gotrue_admin_api.py +++ b/supabase_auth/_sync/gotrue_admin_api.py @@ -3,7 +3,12 @@ from functools import partial from typing import Dict, List, Optional -from ..helpers import model_validate, parse_link_response, parse_user_response +from ..helpers import ( + is_valid_uuid, + model_validate, + parse_link_response, + parse_user_response, +) from ..http_clients import SyncClient from ..types import ( AdminUserAttributes, @@ -131,6 +136,8 @@ def get_user_by_id(self, uid: str) -> UserResponse: This function should only be called on a server. Never expose your `service_role` key in the browser. """ + self._validate_uuid(uid) + return self._request( "GET", f"admin/users/{uid}", @@ -148,6 +155,7 @@ def update_user_by_id( This function should only be called on a server. Never expose your `service_role` key in the browser. """ + self._validate_uuid(uid) return self._request( "PUT", f"admin/users/{uid}", @@ -162,6 +170,7 @@ def delete_user(self, id: str, should_soft_delete: bool = False) -> None: This function should only be called on a server. Never expose your `service_role` key in the browser. """ + self._validate_uuid(id) body = {"should_soft_delete": should_soft_delete} return self._request("DELETE", f"admin/users/{id}", body=body) @@ -169,6 +178,7 @@ def _list_factors( self, params: AuthMFAAdminListFactorsParams, ) -> AuthMFAAdminListFactorsResponse: + self._validate_uuid(params.get("user_id")) return self._request( "GET", f"admin/users/{params.get('user_id')}/factors", @@ -179,8 +189,14 @@ def _delete_factor( self, params: AuthMFAAdminDeleteFactorParams, ) -> AuthMFAAdminDeleteFactorResponse: + self._validate_uuid(params.get("user_id")) + self._validate_uuid(params.get("id")) return self._request( "DELETE", - f"admin/users/{params.get('user_id')}/factors/{params.get('factor_id')}", + f"admin/users/{params.get('user_id')}/factors/{params.get('id')}", xform=partial(model_validate, AuthMFAAdminDeleteFactorResponse), ) + + def _validate_uuid(self, id: str) -> None: + if not is_valid_uuid(id): + raise ValueError(f"Invalid id, '{id}' is not a valid uuid") diff --git a/supabase_auth/helpers.py b/supabase_auth/helpers.py index 9407e630..81c7c465 100644 --- a/supabase_auth/helpers.py +++ b/supabase_auth/helpers.py @@ -5,6 +5,7 @@ import re import secrets import string +import uuid from base64 import urlsafe_b64decode from datetime import datetime from json import loads @@ -317,3 +318,11 @@ def validate_exp(exp: int) -> None: time_now = datetime.now().timestamp() if exp <= time_now: raise AuthInvalidJwtError("JWT has expired") + + +def is_valid_uuid(value: str) -> bool: + try: + uuid.UUID(value) + return True + except ValueError: + return False diff --git a/tests/_async/test_gotrue_admin_api.py b/tests/_async/test_gotrue_admin_api.py index dc96a145..df15abf8 100644 --- a/tests/_async/test_gotrue_admin_api.py +++ b/tests/_async/test_gotrue_admin_api.py @@ -1,3 +1,5 @@ +import uuid + import pytest from supabase_auth.errors import ( @@ -588,3 +590,51 @@ async def test_weak_phone_password_error(): ) except (AuthWeakPasswordError, AuthApiError) as e: assert e.to_dict() + + +async def test_get_user_by_id_invalid_id_raises_error(): + with pytest.raises( + ValueError, match=r"Invalid id, 'invalid_id' is not a valid uuid" + ): + await service_role_api_client().get_user_by_id("invalid_id") + + +async def test_update_user_by_id_invalid_id_raises_error(): + with pytest.raises( + ValueError, match=r"Invalid id, 'invalid_id' is not a valid uuid" + ): + await service_role_api_client().update_user_by_id( + "invalid_id", {"email": "test@test.com"} + ) + + +async def test_delete_user_invalid_id_raises_error(): + with pytest.raises( + ValueError, match=r"Invalid id, 'invalid_id' is not a valid uuid" + ): + await service_role_api_client().delete_user("invalid_id") + + +async def test_list_factors_invalid_id_raises_error(): + with pytest.raises( + ValueError, match=r"Invalid id, 'invalid_id' is not a valid uuid" + ): + await service_role_api_client()._list_factors({"user_id": "invalid_id"}) + + +async def test_delete_factor_invalid_id_raises_error(): + # invalid user id + with pytest.raises( + ValueError, match=r"Invalid id, 'invalid_id' is not a valid uuid" + ): + await service_role_api_client()._delete_factor( + {"user_id": "invalid_id", "id": "invalid_id"} + ) + + # valid user id, invalid factor id + with pytest.raises( + ValueError, match=r"Invalid id, 'invalid_id' is not a valid uuid" + ): + await service_role_api_client()._delete_factor( + {"user_id": str(uuid.uuid4()), "id": "invalid_id"} + ) diff --git a/tests/_sync/test_gotrue_admin_api.py b/tests/_sync/test_gotrue_admin_api.py index dd5fa523..a732b8b2 100644 --- a/tests/_sync/test_gotrue_admin_api.py +++ b/tests/_sync/test_gotrue_admin_api.py @@ -1,3 +1,5 @@ +import uuid + import pytest from supabase_auth.errors import ( @@ -590,3 +592,51 @@ def test_weak_phone_password_error(): ) except (AuthWeakPasswordError, AuthApiError) as e: assert e.to_dict() + + +def test_get_user_by_id_invalid_id_raises_error(): + with pytest.raises( + ValueError, match=r"Invalid id, 'invalid_id' is not a valid uuid" + ): + service_role_api_client().get_user_by_id("invalid_id") + + +def test_update_user_by_id_invalid_id_raises_error(): + with pytest.raises( + ValueError, match=r"Invalid id, 'invalid_id' is not a valid uuid" + ): + service_role_api_client().update_user_by_id( + "invalid_id", {"email": "test@test.com"} + ) + + +def test_delete_user_invalid_id_raises_error(): + with pytest.raises( + ValueError, match=r"Invalid id, 'invalid_id' is not a valid uuid" + ): + service_role_api_client().delete_user("invalid_id") + + +def test_list_factors_invalid_id_raises_error(): + with pytest.raises( + ValueError, match=r"Invalid id, 'invalid_id' is not a valid uuid" + ): + service_role_api_client()._list_factors({"user_id": "invalid_id"}) + + +def test_delete_factor_invalid_id_raises_error(): + # invalid user id + with pytest.raises( + ValueError, match=r"Invalid id, 'invalid_id' is not a valid uuid" + ): + service_role_api_client()._delete_factor( + {"user_id": "invalid_id", "id": "invalid_id"} + ) + + # valid user id, invalid factor id + with pytest.raises( + ValueError, match=r"Invalid id, 'invalid_id' is not a valid uuid" + ): + service_role_api_client()._delete_factor( + {"user_id": str(uuid.uuid4()), "id": "invalid_id"} + )