Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions integration/test_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ def test_get_user_roles_db(client_factory: ClientFactory) -> None:
with client_factory(ports=RBAC_PORTS, auth_credentials=RBAC_AUTH_CREDS) as client:
if client._connection._weaviate_version.is_lower_than(1, 30, 0):
pytest.skip("This test requires Weaviate 1.30.0 or higher")
roles_base = client.users.db.get_assigned_roles("admin-user")
roles_base = client.users.db.get_assigned_roles(user_id="admin-user")
names = list(roles_base.keys())
assert len(roles_base) > 0
assert isinstance(roles_base[names[0]], RoleBase)

roles = client.users.db.get_assigned_roles("admin-user", include_permissions=True)
roles = client.users.db.get_assigned_roles(user_id="admin-user", include_permissions=True)
assert len(roles) > 0
assert isinstance(roles[names[0]], Role)

Expand All @@ -48,12 +48,12 @@ def test_get_user_roles_oidc(client_factory: ClientFactory) -> None:
with client_factory(ports=RBAC_PORTS, auth_credentials=RBAC_AUTH_CREDS) as client:
if client._connection._weaviate_version.is_lower_than(1, 30, 0):
pytest.skip("This test requires Weaviate 1.30.0 or higher")
roles_base = client.users.oidc.get_assigned_roles("admin-user")
roles_base = client.users.oidc.get_assigned_roles(user_id="admin-user")
names = list(roles_base.keys())
assert len(roles_base) > 0
assert isinstance(roles_base[names[0]], RoleBase)

roles = client.users.oidc.get_assigned_roles("admin-user", include_permissions=True)
roles = client.users.oidc.get_assigned_roles(user_id="admin-user", include_permissions=True)
assert len(roles) > 0
assert isinstance(roles[names[0]], Role)

Expand Down Expand Up @@ -154,6 +154,42 @@ def test_de_activate(client_factory: ClientFactory) -> None:
client.users.db.delete(user_id=randomUserName)


def test_deactivate_and_revoke(client_factory: ClientFactory) -> None:
with client_factory(ports=RBAC_PORTS, auth_credentials=Auth.api_key("admin-key")) as client:
if client._connection._weaviate_version.is_lower_than(1, 30, 0):
pytest.skip("This test requires Weaviate 1.30.0 or higher")

randomUserName = "new-user" + str(random.randint(1, 1000))
apiKeyOld = client.users.db.create(user_id=randomUserName)
assert client.users.db.deactivate(user_id=randomUserName, revoke_key=True)

with pytest.raises(weaviate.exceptions.UnexpectedStatusCodeError):
weaviate.connect_to_local(
port=RBAC_PORTS[0],
grpc_port=RBAC_PORTS[1],
auth_credentials=Auth.api_key(apiKeyOld),
)

# re-activating is not enough
assert client.users.db.activate(user_id=randomUserName)
with pytest.raises(weaviate.exceptions.UnexpectedStatusCodeError):
weaviate.connect_to_local(
port=RBAC_PORTS[0],
grpc_port=RBAC_PORTS[1],
auth_credentials=Auth.api_key(apiKeyOld),
)

apiKeyNew = client.users.db.rotate_key(user_id=randomUserName)

with weaviate.connect_to_local(
port=RBAC_PORTS[0], grpc_port=RBAC_PORTS[1], auth_credentials=Auth.api_key(apiKeyNew)
) as client2:
user = client2.users.get_my_user()
assert user.user_id == randomUserName

client.users.db.delete(user_id=randomUserName)


def test_deprecated_syntax(client_factory: ClientFactory) -> None:
with client_factory(ports=RBAC_PORTS, auth_credentials=Auth.api_key("admin-key")) as client:
if client._connection._weaviate_version.is_lower_than(1, 30, 0):
Expand Down
14 changes: 7 additions & 7 deletions weaviate/users/sync.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,38 @@ from typing_extensions import deprecated
class _UsersOIDC(_UsersInit):
@overload
def get_assigned_roles(
self, user_id: str, include_permissions: Literal[False] = False
self, *, user_id: str, include_permissions: Literal[False] = False
) -> Dict[str, RoleBase]: ...
@overload
def get_assigned_roles(
self, user_id: str, include_permissions: Literal[True]
self, *, user_id: str, include_permissions: Literal[True]
) -> Dict[str, Role]: ...
@overload
def get_assigned_roles(
self, user_id: str, include_permissions: bool = False
self, *, user_id: str, include_permissions: bool = False
) -> Union[Dict[str, Role], Dict[str, RoleBase]]: ...
def assign_roles(self, *, user_id: str, role_names: Union[str, List[str]]) -> None: ...
def revoke_roles(self, *, user_id: str, role_names: Union[str, List[str]]) -> None: ...

class _UsersDB(_UsersInit):
@overload
def get_assigned_roles(
self, user_id: str, include_permissions: Literal[False] = False
self, *, user_id: str, include_permissions: Literal[False] = False
) -> Dict[str, RoleBase]: ...
@overload
def get_assigned_roles(
self, user_id: str, include_permissions: Literal[True]
self, *, user_id: str, include_permissions: Literal[True]
) -> Dict[str, Role]: ...
@overload
def get_assigned_roles(
self, user_id: str, include_permissions: bool = False
self, *, user_id: str, include_permissions: bool = False
) -> Union[Dict[str, Role], Dict[str, RoleBase]]: ...
def assign_roles(self, *, user_id: str, role_names: Union[str, List[str]]) -> None: ...
def revoke_roles(self, *, user_id: str, role_names: Union[str, List[str]]) -> None: ...
def create(self, *, user_id: str) -> str: ...
def delete(self, *, user_id: str) -> bool: ...
def rotate_key(self, *, user_id: str) -> str: ...
def deactivate(self, *, user_id: str) -> bool: ...
def deactivate(self, *, user_id: str, revoke_key: bool = False) -> bool: ...
def activate(self, *, user_id: str) -> bool: ...
def get(self, *, user_id: str) -> UserDB: ...
def list_all(self) -> List[UserDB]: ...
Expand Down
21 changes: 11 additions & 10 deletions weaviate/users/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,11 @@ async def _rotate_key(self, user_id: str) -> str:
assert resp_typed is not None
return str(resp_typed["apikey"])

async def _deactivate(self, user_id: str) -> bool:
async def _deactivate(self, user_id: str, revoke_key: bool) -> bool:
path = f"/users/db/{user_id}/deactivate"
resp = await self._connection.post(
path,
weaviate_object={},
weaviate_object={"revoke_key": revoke_key},
error_msg=f"Could not deactivate user '{user_id}'",
status_codes=_ExpectedStatusCodes(ok_in=[200, 409], error="deactivate key"),
)
Expand Down Expand Up @@ -173,16 +173,16 @@ async def _list_all_users(self) -> List[WeaviateDBUserRoleNames]:
class _UserDBAsync(_UsersBase):
@overload
async def get_assigned_roles(
self, user_id: str, include_permissions: Literal[False] = ...
self, *, user_id: str, include_permissions: Literal[False] = ...
) -> Dict[str, RoleBase]: ...

@overload
async def get_assigned_roles(
self, user_id: str, include_permissions: Literal[True] = ...
self, *, user_id: str, include_permissions: Literal[True] = ...
) -> Dict[str, Role]: ...

async def get_assigned_roles(
self, user_id: str, include_permissions: bool = False
self, *, user_id: str, include_permissions: bool = False
) -> Union[Dict[str, Role], Dict[str, RoleBase]]:
"""Get the roles assigned to a user.

Expand Down Expand Up @@ -251,13 +251,14 @@ async def activate(self, *, user_id: str) -> bool:
"""
return await self._activate(user_id)

async def deactivate(self, *, user_id: str) -> bool:
async def deactivate(self, *, user_id: str, revoke_key: bool = False) -> bool:
"""Deactivate an active user.

Args:
user_id: The id of the user.
revoke_key: If True, the old key will be revoked and needs to be rotated.
"""
return await self._deactivate(user_id)
return await self._deactivate(user_id, revoke_key)

async def get(self, *, user_id: str) -> UserDB:
"""Get all information about an user.
Expand Down Expand Up @@ -292,16 +293,16 @@ async def list_all(self) -> List[UserDB]:
class _UserOIDCAsync(_UsersBase):
@overload
async def get_assigned_roles(
self, user_id: str, include_permissions: Literal[False] = ...
self, *, user_id: str, include_permissions: Literal[False] = ...
) -> Dict[str, RoleBase]: ...

@overload
async def get_assigned_roles(
self, user_id: str, include_permissions: Literal[True] = ...
self, *, user_id: str, include_permissions: Literal[True] = ...
) -> Dict[str, Role]: ...

async def get_assigned_roles(
self, user_id: str, include_permissions: bool = False
self, *, user_id: str, include_permissions: bool = False
) -> Union[Dict[str, Role], Dict[str, RoleBase]]:
"""Get the roles assigned to a user.

Expand Down
Loading