Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions tests/test_user_management_list_sessions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from typing import Union

import pytest

from tests.utils.list_resource import list_response_of
from tests.utils.syncify import syncify
from tests.types.test_auto_pagination_function import TestAutoPaginationFunction
from workos.user_management import AsyncUserManagement, UserManagement


def _mock_session(id: str):
now = "2025-07-23T14:00:00.000Z"
return {
"object": "session",
"id": id,
"user_id": "user_123",
"organization_id": "org_123",
"status": "active",
"auth_method": "password",
"impersonator": None,
"ip_address": "192.168.1.1",
"user_agent": "Mozilla/5.0",
"expires_at": "2025-07-23T15:00:00.000Z",
"ended_at": None,
"created_at": now,
"updated_at": now,
}


@pytest.mark.sync_and_async(UserManagement, AsyncUserManagement)
class TestUserManagementListSessions:
@pytest.fixture(autouse=True)
def setup(self, module_instance: Union[UserManagement, AsyncUserManagement]):
self.http_client = module_instance._http_client
self.user_management = module_instance

def test_list_sessions_query_and_parsing(
self, capture_and_mock_http_client_request
):
sessions = [_mock_session("session_1"), _mock_session("session_2")]
response = list_response_of(data=sessions)
request_kwargs = capture_and_mock_http_client_request(
self.http_client, response, 200
)

result = syncify(
self.user_management.list_sessions(
user_id="user_123", limit=10, before="before_id", order="desc"
)
)

assert request_kwargs["url"].endswith("user_management/users/user_123/sessions")
assert request_kwargs["method"] == "get"
assert request_kwargs["params"]["limit"] == 10
assert request_kwargs["params"]["before"] == "before_id"
assert request_kwargs["params"]["order"] == "desc"
assert "after" not in request_kwargs["params"]
assert len(result.data) == 2
assert result.data[0].id == "session_1"
assert result.list_metadata.before is None
assert result.list_metadata.after is None

def test_list_sessions_auto_pagination(
self, test_auto_pagination: TestAutoPaginationFunction
):
data = [_mock_session(str(i)) for i in range(40)]
test_auto_pagination(
http_client=self.http_client,
list_function=self.user_management.list_sessions,
list_function_params={"user_id": "user_123"},
expected_all_page_data=data,
url_path_keys=["user_id"],
)
47 changes: 47 additions & 0 deletions tests/test_user_management_revoke_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from typing import Union

import pytest

from tests.utils.syncify import syncify
from workos.user_management import AsyncUserManagement, UserManagement


def _mock_session(id: str):
now = "2025-07-23T14:00:00.000Z"
return {
"object": "session",
"id": id,
"user_id": "user_123",
"organization_id": "org_123",
"status": "revoked",
"auth_method": "password",
"ip_address": "192.168.1.1",
"user_agent": "Mozilla/5.0",
"expires_at": "2025-07-23T15:00:00.000Z",
"ended_at": now,
"created_at": now,
"updated_at": now,
}


@pytest.mark.sync_and_async(UserManagement, AsyncUserManagement)
class TestUserManagementRevokeSession:
@pytest.fixture(autouse=True)
def setup(self, module_instance: Union[UserManagement, AsyncUserManagement]):
self.http_client = module_instance._http_client
self.user_management = module_instance

def test_revoke_session(self, capture_and_mock_http_client_request):
mock = _mock_session("session_abc")
request_kwargs = capture_and_mock_http_client_request(
self.http_client, mock, 200
)

response = syncify(
self.user_management.revoke_session(session_id="session_abc")
)

assert request_kwargs["url"].endswith("user_management/sessions/revoke")
assert request_kwargs["method"] == "post"
assert request_kwargs["json"] == {"session_id": "session_abc"}
assert response.id == "session_abc"
2 changes: 2 additions & 0 deletions workos/types/list_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from workos.types.organizations import Organization
from workos.types.sso import ConnectionWithDomains
from workos.types.user_management import Invitation, OrganizationMembership, User
from workos.types.user_management.session import Session as UserManagementSession
from workos.types.vault import ObjectDigest
from workos.types.workos_model import WorkOSModel
from workos.utils.request_helper import DEFAULT_LIST_RESPONSE_LIMIT
Expand All @@ -54,6 +55,7 @@
AuthorizationResource,
AuthorizationResourceType,
User,
UserManagementSession,
ObjectDigest,
Warrant,
WarrantQueryResult,
Expand Down
1 change: 1 addition & 0 deletions workos/types/user_management/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
from .password_reset import *
from .user_management_provider_type import *
from .user import *
from .session import *
4 changes: 4 additions & 0 deletions workos/types/user_management/list_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ class OrganizationMembershipsListFilters(ListArgs, total=False):

class AuthenticationFactorsListFilters(ListArgs, total=False):
user_id: str


class SessionsListFilters(ListArgs, total=False):
user_id: str
31 changes: 31 additions & 0 deletions workos/types/user_management/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,34 @@ class RefreshWithSessionCookieErrorResponse(WorkOSModel):
class SessionConfig(TypedDict, total=False):
seal_session: bool
cookie_password: str


AuthMethodType = Literal[
"external_auth",
"impersonation",
"magic_code",
"migrated_session",
"oauth",
"passkey",
"password",
"sso",
"unknown",
]


class Session(WorkOSModel):
"""Representation of a WorkOS User Management Session."""

object: Literal["session"]
id: str
user_id: str
organization_id: Optional[str] = None
status: str
auth_method: AuthMethodType
impersonator: Optional[Impersonator] = None
ip_address: Optional[str] = None
user_agent: Optional[str] = None
expires_at: str
ended_at: Optional[str] = None
created_at: str
updated_at: str
119 changes: 119 additions & 0 deletions workos/user_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from workos.types.user_management.password_hash_type import PasswordHashType
from workos.types.user_management.screen_hint import ScreenHintType
from workos.types.user_management.session import SessionConfig
from workos.types.user_management.session import Session as UserManagementSession
from workos.types.user_management.user_management_provider_type import (
UserManagementProviderType,
)
Expand Down Expand Up @@ -86,6 +87,8 @@
MAGIC_AUTH_PATH = "user_management/magic_auth"
USER_SEND_MAGIC_AUTH_PATH = "user_management/magic_auth/send"
USER_AUTH_FACTORS_PATH = "user_management/users/{0}/auth_factors"
USER_SESSIONS_PATH = "user_management/users/{0}/sessions"
SESSIONS_REVOKE_PATH = "user_management/sessions/revoke"
EMAIL_VERIFICATION_DETAIL_PATH = "user_management/email_verification/{0}"
INVITATION_PATH = "user_management/invitations"
INVITATION_DETAIL_PATH = "user_management/invitations/{0}"
Expand All @@ -109,6 +112,12 @@
Invitation, InvitationsListFilters, ListMetadata
]

from workos.types.user_management.list_filters import SessionsListFilters

SessionsListResource = WorkOSListResource[
UserManagementSession, SessionsListFilters, ListMetadata
]


class UserManagementModule(Protocol):
"""Offers methods for using the WorkOS User Management API."""
Expand Down Expand Up @@ -720,6 +729,20 @@ def verify_email(self, *, user_id: str, code: str) -> SyncOrAsync[User]:
"""
...

def list_sessions(
self,
*,
user_id: str,
limit: Optional[int] = None,
before: Optional[str] = None,
after: Optional[str] = None,
order: Optional[PaginationOrder] = "desc",
) -> SyncOrAsync["SessionsListResource"]: ...

def revoke_session(
self, *, session_id: str
) -> SyncOrAsync[UserManagementSession]: ...

def get_magic_auth(self, magic_auth_id: str) -> SyncOrAsync[MagicAuth]:
"""Get the details of a Magic Auth object.

Expand Down Expand Up @@ -1377,6 +1400,54 @@ def create_magic_auth(

return MagicAuth.model_validate(response)

def list_sessions(
self,
*,
user_id: str,
limit: Optional[int] = DEFAULT_LIST_RESPONSE_LIMIT,
before: Optional[str] = None,
after: Optional[str] = None,
order: Optional[PaginationOrder] = "desc",
) -> "SessionsListResource":
limit_value: int = limit if limit is not None else DEFAULT_LIST_RESPONSE_LIMIT

params: ListArgs = {
"limit": limit_value,
"before": before,
"after": after,
"order": order,
}

response = self._http_client.request(
USER_SESSIONS_PATH.format(user_id),
method=REQUEST_METHOD_GET,
params=params,
)

list_args: SessionsListFilters = {
"limit": limit_value,
"before": before,
"after": after,
"user_id": user_id,
}
if order is not None:
list_args["order"] = order

return SessionsListResource(
list_method=self.list_sessions,
list_args=list_args,
**ListPage[UserManagementSession](**response).model_dump(),
)

def revoke_session(self, *, session_id: str) -> UserManagementSession:
json = {"session_id": session_id}

response = self._http_client.request(
SESSIONS_REVOKE_PATH, method=REQUEST_METHOD_POST, json=json
)

return UserManagementSession.model_validate(response)

def enroll_auth_factor(
self,
*,
Expand Down Expand Up @@ -2033,6 +2104,54 @@ async def create_magic_auth(

return MagicAuth.model_validate(response)

async def list_sessions(
self,
*,
user_id: str,
limit: Optional[int] = DEFAULT_LIST_RESPONSE_LIMIT,
before: Optional[str] = None,
after: Optional[str] = None,
order: Optional[PaginationOrder] = "desc",
) -> "SessionsListResource":
limit_value: int = limit if limit is not None else DEFAULT_LIST_RESPONSE_LIMIT

params: ListArgs = {
"limit": limit_value,
"before": before,
"after": after,
"order": order,
}

response = await self._http_client.request(
USER_SESSIONS_PATH.format(user_id),
method=REQUEST_METHOD_GET,
params=params,
)

list_args: SessionsListFilters = {
"limit": limit_value,
"before": before,
"after": after,
"user_id": user_id,
}
if order is not None:
list_args["order"] = order

return SessionsListResource(
list_method=self.list_sessions,
list_args=list_args,
**ListPage[UserManagementSession](**response).model_dump(),
)

async def revoke_session(self, *, session_id: str) -> UserManagementSession:
json = {"session_id": session_id}

response = await self._http_client.request(
SESSIONS_REVOKE_PATH, method=REQUEST_METHOD_POST, json=json
)

return UserManagementSession.model_validate(response)

async def enroll_auth_factor(
self,
*,
Expand Down