diff --git a/tests/test_user_management_list_sessions.py b/tests/test_user_management_list_sessions.py new file mode 100644 index 00000000..c9d4065c --- /dev/null +++ b/tests/test_user_management_list_sessions.py @@ -0,0 +1,73 @@ +from typing import Union + +import pytest + +from tests.utils.list_resource import list_response_of +from tests.utils.syncify import syncify +from tests.types.test_auto_pagination_function import TestAutoPaginationFunction +from workos.user_management import AsyncUserManagement, UserManagement + + +def _mock_session(id: str): + now = "2025-07-23T14:00:00.000Z" + return { + "object": "session", + "id": id, + "user_id": "user_123", + "organization_id": "org_123", + "status": "active", + "auth_method": "password", + "impersonator": None, + "ip_address": "192.168.1.1", + "user_agent": "Mozilla/5.0", + "expires_at": "2025-07-23T15:00:00.000Z", + "ended_at": None, + "created_at": now, + "updated_at": now, + } + + +@pytest.mark.sync_and_async(UserManagement, AsyncUserManagement) +class TestUserManagementListSessions: + @pytest.fixture(autouse=True) + def setup(self, module_instance: Union[UserManagement, AsyncUserManagement]): + self.http_client = module_instance._http_client + self.user_management = module_instance + + def test_list_sessions_query_and_parsing( + self, capture_and_mock_http_client_request + ): + sessions = [_mock_session("session_1"), _mock_session("session_2")] + response = list_response_of(data=sessions) + request_kwargs = capture_and_mock_http_client_request( + self.http_client, response, 200 + ) + + result = syncify( + self.user_management.list_sessions( + user_id="user_123", limit=10, before="before_id", order="desc" + ) + ) + + assert request_kwargs["url"].endswith("user_management/users/user_123/sessions") + assert request_kwargs["method"] == "get" + assert request_kwargs["params"]["limit"] == 10 + assert request_kwargs["params"]["before"] == "before_id" + assert request_kwargs["params"]["order"] == "desc" + assert "after" not in request_kwargs["params"] + assert len(result.data) == 2 + assert result.data[0].id == "session_1" + assert result.list_metadata.before is None + assert result.list_metadata.after is None + + def test_list_sessions_auto_pagination( + self, test_auto_pagination: TestAutoPaginationFunction + ): + data = [_mock_session(str(i)) for i in range(40)] + test_auto_pagination( + http_client=self.http_client, + list_function=self.user_management.list_sessions, + list_function_params={"user_id": "user_123"}, + expected_all_page_data=data, + url_path_keys=["user_id"], + ) diff --git a/tests/test_user_management_revoke_session.py b/tests/test_user_management_revoke_session.py new file mode 100644 index 00000000..7efedc65 --- /dev/null +++ b/tests/test_user_management_revoke_session.py @@ -0,0 +1,47 @@ +from typing import Union + +import pytest + +from tests.utils.syncify import syncify +from workos.user_management import AsyncUserManagement, UserManagement + + +def _mock_session(id: str): + now = "2025-07-23T14:00:00.000Z" + return { + "object": "session", + "id": id, + "user_id": "user_123", + "organization_id": "org_123", + "status": "revoked", + "auth_method": "password", + "ip_address": "192.168.1.1", + "user_agent": "Mozilla/5.0", + "expires_at": "2025-07-23T15:00:00.000Z", + "ended_at": now, + "created_at": now, + "updated_at": now, + } + + +@pytest.mark.sync_and_async(UserManagement, AsyncUserManagement) +class TestUserManagementRevokeSession: + @pytest.fixture(autouse=True) + def setup(self, module_instance: Union[UserManagement, AsyncUserManagement]): + self.http_client = module_instance._http_client + self.user_management = module_instance + + def test_revoke_session(self, capture_and_mock_http_client_request): + mock = _mock_session("session_abc") + request_kwargs = capture_and_mock_http_client_request( + self.http_client, mock, 200 + ) + + response = syncify( + self.user_management.revoke_session(session_id="session_abc") + ) + + assert request_kwargs["url"].endswith("user_management/sessions/revoke") + assert request_kwargs["method"] == "post" + assert request_kwargs["json"] == {"session_id": "session_abc"} + assert response.id == "session_abc" diff --git a/workos/types/list_resource.py b/workos/types/list_resource.py index ce5bdeb8..e2ece480 100644 --- a/workos/types/list_resource.py +++ b/workos/types/list_resource.py @@ -34,6 +34,7 @@ from workos.types.organizations import Organization from workos.types.sso import ConnectionWithDomains from workos.types.user_management import Invitation, OrganizationMembership, User +from workos.types.user_management.session import Session as UserManagementSession from workos.types.vault import ObjectDigest from workos.types.workos_model import WorkOSModel from workos.utils.request_helper import DEFAULT_LIST_RESPONSE_LIMIT @@ -54,6 +55,7 @@ AuthorizationResource, AuthorizationResourceType, User, + UserManagementSession, ObjectDigest, Warrant, WarrantQueryResult, diff --git a/workos/types/user_management/__init__.py b/workos/types/user_management/__init__.py index cd3c9d8a..29a1890e 100644 --- a/workos/types/user_management/__init__.py +++ b/workos/types/user_management/__init__.py @@ -9,3 +9,4 @@ from .password_reset import * from .user_management_provider_type import * from .user import * +from .session import * diff --git a/workos/types/user_management/list_filters.py b/workos/types/user_management/list_filters.py index 785699a2..a3be45ce 100644 --- a/workos/types/user_management/list_filters.py +++ b/workos/types/user_management/list_filters.py @@ -23,3 +23,7 @@ class OrganizationMembershipsListFilters(ListArgs, total=False): class AuthenticationFactorsListFilters(ListArgs, total=False): user_id: str + + +class SessionsListFilters(ListArgs, total=False): + user_id: str diff --git a/workos/types/user_management/session.py b/workos/types/user_management/session.py index 06869a97..546e6b2f 100644 --- a/workos/types/user_management/session.py +++ b/workos/types/user_management/session.py @@ -46,3 +46,34 @@ class RefreshWithSessionCookieErrorResponse(WorkOSModel): class SessionConfig(TypedDict, total=False): seal_session: bool cookie_password: str + + +AuthMethodType = Literal[ + "external_auth", + "impersonation", + "magic_code", + "migrated_session", + "oauth", + "passkey", + "password", + "sso", + "unknown", +] + + +class Session(WorkOSModel): + """Representation of a WorkOS User Management Session.""" + + object: Literal["session"] + id: str + user_id: str + organization_id: Optional[str] = None + status: str + auth_method: AuthMethodType + impersonator: Optional[Impersonator] = None + ip_address: Optional[str] = None + user_agent: Optional[str] = None + expires_at: str + ended_at: Optional[str] = None + created_at: str + updated_at: str diff --git a/workos/user_management.py b/workos/user_management.py index 4a093b78..9e72bb14 100644 --- a/workos/user_management.py +++ b/workos/user_management.py @@ -48,6 +48,7 @@ from workos.types.user_management.password_hash_type import PasswordHashType from workos.types.user_management.screen_hint import ScreenHintType from workos.types.user_management.session import SessionConfig +from workos.types.user_management.session import Session as UserManagementSession from workos.types.user_management.user_management_provider_type import ( UserManagementProviderType, ) @@ -86,6 +87,8 @@ MAGIC_AUTH_PATH = "user_management/magic_auth" USER_SEND_MAGIC_AUTH_PATH = "user_management/magic_auth/send" USER_AUTH_FACTORS_PATH = "user_management/users/{0}/auth_factors" +USER_SESSIONS_PATH = "user_management/users/{0}/sessions" +SESSIONS_REVOKE_PATH = "user_management/sessions/revoke" EMAIL_VERIFICATION_DETAIL_PATH = "user_management/email_verification/{0}" INVITATION_PATH = "user_management/invitations" INVITATION_DETAIL_PATH = "user_management/invitations/{0}" @@ -109,6 +112,12 @@ Invitation, InvitationsListFilters, ListMetadata ] +from workos.types.user_management.list_filters import SessionsListFilters + +SessionsListResource = WorkOSListResource[ + UserManagementSession, SessionsListFilters, ListMetadata +] + class UserManagementModule(Protocol): """Offers methods for using the WorkOS User Management API.""" @@ -720,6 +729,20 @@ def verify_email(self, *, user_id: str, code: str) -> SyncOrAsync[User]: """ ... + def list_sessions( + self, + *, + user_id: str, + limit: Optional[int] = None, + before: Optional[str] = None, + after: Optional[str] = None, + order: Optional[PaginationOrder] = "desc", + ) -> SyncOrAsync["SessionsListResource"]: ... + + def revoke_session( + self, *, session_id: str + ) -> SyncOrAsync[UserManagementSession]: ... + def get_magic_auth(self, magic_auth_id: str) -> SyncOrAsync[MagicAuth]: """Get the details of a Magic Auth object. @@ -1377,6 +1400,54 @@ def create_magic_auth( return MagicAuth.model_validate(response) + def list_sessions( + self, + *, + user_id: str, + limit: Optional[int] = DEFAULT_LIST_RESPONSE_LIMIT, + before: Optional[str] = None, + after: Optional[str] = None, + order: Optional[PaginationOrder] = "desc", + ) -> "SessionsListResource": + limit_value: int = limit if limit is not None else DEFAULT_LIST_RESPONSE_LIMIT + + params: ListArgs = { + "limit": limit_value, + "before": before, + "after": after, + "order": order, + } + + response = self._http_client.request( + USER_SESSIONS_PATH.format(user_id), + method=REQUEST_METHOD_GET, + params=params, + ) + + list_args: SessionsListFilters = { + "limit": limit_value, + "before": before, + "after": after, + "user_id": user_id, + } + if order is not None: + list_args["order"] = order + + return SessionsListResource( + list_method=self.list_sessions, + list_args=list_args, + **ListPage[UserManagementSession](**response).model_dump(), + ) + + def revoke_session(self, *, session_id: str) -> UserManagementSession: + json = {"session_id": session_id} + + response = self._http_client.request( + SESSIONS_REVOKE_PATH, method=REQUEST_METHOD_POST, json=json + ) + + return UserManagementSession.model_validate(response) + def enroll_auth_factor( self, *, @@ -2033,6 +2104,54 @@ async def create_magic_auth( return MagicAuth.model_validate(response) + async def list_sessions( + self, + *, + user_id: str, + limit: Optional[int] = DEFAULT_LIST_RESPONSE_LIMIT, + before: Optional[str] = None, + after: Optional[str] = None, + order: Optional[PaginationOrder] = "desc", + ) -> "SessionsListResource": + limit_value: int = limit if limit is not None else DEFAULT_LIST_RESPONSE_LIMIT + + params: ListArgs = { + "limit": limit_value, + "before": before, + "after": after, + "order": order, + } + + response = await self._http_client.request( + USER_SESSIONS_PATH.format(user_id), + method=REQUEST_METHOD_GET, + params=params, + ) + + list_args: SessionsListFilters = { + "limit": limit_value, + "before": before, + "after": after, + "user_id": user_id, + } + if order is not None: + list_args["order"] = order + + return SessionsListResource( + list_method=self.list_sessions, + list_args=list_args, + **ListPage[UserManagementSession](**response).model_dump(), + ) + + async def revoke_session(self, *, session_id: str) -> UserManagementSession: + json = {"session_id": session_id} + + response = await self._http_client.request( + SESSIONS_REVOKE_PATH, method=REQUEST_METHOD_POST, json=json + ) + + return UserManagementSession.model_validate(response) + async def enroll_auth_factor( self, *,