Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No admin should be assigned if last user leaves the room #12

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions manage_last_admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ async def _on_room_leave(
event: The event to check.
state_events: The current state of the room.
"""
# Check if the last user is leaving the room.
is_last_member = _is_last_member(event, state_events)
if is_last_member:
return

# Check if the last admin is leaving the room.
pl_content = _get_power_levels_content_from_state(state_events)
if pl_content is None:
Expand Down Expand Up @@ -129,20 +134,23 @@ async def _on_room_leave(
# If not, we see the default power level as admin
logger.info("Make admin as default level in room %s", event.room_id)

current_power_levels = state_events.get((EventTypes.PowerLevels, ""))
await self._set_room_user_defaults_to_admin(event, state_events)
return

async def _set_room_user_defaults_to_admin(
self, event: EventBase, state_events: StateMap[EventBase]
) -> None:
current_power_levels = state_events.get((EventTypes.PowerLevels, ""))
# Make a deep copy of the content so we don't edit the "users" dict from
# the event that's currently in the room's state.
power_levels_content = (
{}
if current_power_levels is None
else copy.deepcopy(current_power_levels.content)
)

# Send a new power levels event with a similar content to the previous one
# except users_default is 100 to allow any user to be admin of the room.
power_levels_content["users_default"] = 100

# Just to be safe, also delete all users that don't have a power level of
# 100, in order to prevent anyone from being unable to be admin the room.
# Julien : I am not why it's needed
Expand All @@ -151,7 +159,6 @@ async def _on_room_leave(
if level == 100:
users[user] = level
power_levels_content["users"] = users

await self._api.create_and_send_event_into_room(
{
"room_id": event.room_id,
Expand All @@ -165,8 +172,6 @@ async def _on_room_leave(
}
)

return

async def _promote_to_admins(
self,
users_to_promote: Iterable[str],
Expand Down Expand Up @@ -213,6 +218,37 @@ def _maybe_get_event_id_dict_for_room_version(
return {"event_id": "!%s:%s" % (random_id, server_name)}


def check_if_member_in_room(item: Tuple[Tuple[str, str], EventBase]) -> bool:
(event_type, _), event = item
return EventTypes.Member == event_type and event.membership in [
Membership.JOIN,
Membership.INVITE,
]


def _is_last_member(
event: EventBase,
state_events: StateMap[EventBase],
) -> bool:
"""Checks if the last member is leaving the room.

Args:
state_events: The current state of the room, from which we can check the room's
member list.

Returns:
Whether this event is the last member leaving the room.
"""
# Get all joined/invited members defined in the room's state
members_state_event: StateMap[EventBase] = dict(
filter(check_if_member_in_room, state_events.items())
)
return (
len(members_state_event) == 1
and members_state_event.get((EventTypes.Member, event.sender)) is not None
)


def _is_last_admin_leaving(
event: EventBase,
power_level_content: Dict[str, Any],
Expand Down
134 changes: 132 additions & 2 deletions tests/test_manage_last_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from synapse.api.constants import EventTypes, Membership
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, make_event_from_dict
from synapse.types import JsonDict
from synapse.types import JsonDict, MutableStateMap
from synapse.util.stringutils import random_string

from tests import create_module
Expand All @@ -39,8 +39,12 @@ def setUp(self) -> None:
self.user_id = "@alice:example.com"
self.left_user_id = "@nothere:example.com"
self.mod_user_id = "@mod:example.com"
self.regular_user_id = "@someuser:example.com"
self.room_id = "!someroom:example.com"
self.state = {
self.state = self.get_room_state_with_several_members()

def get_room_state_with_several_members(self) -> MutableStateMap[EventBase]:
return {
(EventTypes.PowerLevels, ""): self.create_event(
{
"sender": self.user_id,
Expand All @@ -67,6 +71,7 @@ def setUp(self) -> None:
self.user_id: 100,
self.left_user_id: 75,
self.mod_user_id: 50,
self.regular_user_id: 0,
},
"users_default": 0,
},
Expand All @@ -82,6 +87,15 @@ def setUp(self) -> None:
"room_id": self.room_id,
},
),
(EventTypes.Member, self.user_id): self.create_event(
{
"sender": self.user_id,
"type": EventTypes.Member,
"state_key": self.user_id,
"content": {"membership": Membership.JOIN},
"room_id": self.room_id,
},
),
(EventTypes.Member, self.mod_user_id): self.create_event(
{
"sender": self.mod_user_id,
Expand All @@ -91,6 +105,97 @@ def setUp(self) -> None:
"room_id": self.room_id,
},
),
(EventTypes.Member, self.regular_user_id): self.create_event(
{
"sender": self.regular_user_id,
"type": EventTypes.Member,
"state_key": self.regular_user_id,
"content": {"membership": Membership.JOIN},
"room_id": self.room_id,
},
),
(EventTypes.Member, self.left_user_id): self.create_event(
{
"sender": self.left_user_id,
"type": EventTypes.Member,
"state_key": self.left_user_id,
"content": {"membership": Membership.LEAVE},
"room_id": self.room_id,
},
),
}

def get_room_state_with_one_member(self) -> MutableStateMap[EventBase]:
return {
(EventTypes.PowerLevels, ""): self.create_event(
{
"sender": self.user_id,
"type": EventTypes.PowerLevels,
"state_key": "",
"content": {
"ban": 50,
"events": {
"m.room.avatar": 50,
"m.room.canonical_alias": 50,
"m.room.encryption": 100,
"m.room.history_visibility": 100,
"m.room.name": 50,
"m.room.power_levels": 100,
"m.room.server_acl": 100,
"m.room.tombstone": 100,
},
"events_default": 0,
"invite": 0,
"kick": 50,
"redact": 50,
"state_default": 50,
"users": {
self.user_id: 100,
self.left_user_id: 75,
self.mod_user_id: 50,
self.regular_user_id: 0,
},
"users_default": 0,
},
"room_id": self.room_id,
},
),
(EventTypes.JoinRules, ""): self.create_event(
{
"sender": self.user_id,
"type": EventTypes.JoinRules,
"state_key": "",
"content": {"join_rule": "public"},
"room_id": self.room_id,
},
),
(EventTypes.Member, self.user_id): self.create_event(
{
"sender": self.user_id,
"type": EventTypes.Member,
"state_key": self.user_id,
"content": {"membership": Membership.JOIN},
"room_id": self.room_id,
},
),
(EventTypes.Member, self.mod_user_id): self.create_event(
{
"sender": self.mod_user_id,
"type": EventTypes.Member,
"state_key": self.mod_user_id,
"content": {"membership": Membership.LEAVE},
"room_id": self.room_id,
},
),
(EventTypes.Member, self.regular_user_id): self.create_event(
{
"sender": self.regular_user_id,
"type": EventTypes.Member,
"state_key": self.regular_user_id,
"content": {"membership": Membership.LEAVE},
"room_id": self.room_id,
},
),
(EventTypes.Member, self.left_user_id): self.create_event(
{
"sender": self.left_user_id,
Expand Down Expand Up @@ -213,6 +318,31 @@ async def test_promote_when_last_admin_leaves(self) -> None:
for user, pl in pl_event_dict["content"]["users"].items():
self.assertEqual(pl, 100, user)

async def test_last_member_when_last_admin_leaves(self) -> None:
"""Tests that the module do not send any event when last member of a room leaves."""
module = create_module()

self.state = self.get_room_state_with_one_member()

leave_event = self.create_event(
{
"sender": self.user_id,
"type": EventTypes.Member,
"content": {"membership": Membership.LEAVE},
"room_id": self.room_id,
"state_key": self.user_id,
},
)

allowed, replacement = await module.check_event_allowed(
leave_event, self.state
)
self.assertTrue(allowed)
self.assertEqual(replacement, None)

# Test that no event is generated
self.assertFalse(module._api.create_and_send_event_into_room.called) # type: ignore[attr-defined]


class ManageLastAdminTestRoomV9(ManageLastAdminTestCases.BaseManageLastAdminTest):
def create_event(self, content: JsonDict) -> EventBase:
Expand Down