Skip to content
1 change: 1 addition & 0 deletions bot/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ class Channels(metaclass=YAMLGetter):
incidents_archive: int
mod_alerts: int
mod_meta: int
mods: int
nominations: int
nomination_voting: int
organisation: int
Expand Down
7 changes: 3 additions & 4 deletions bot/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,16 +188,15 @@ def respect_role_hierarchy(member_arg: function.Argument) -> t.Callable:
"""
def decorator(func: types.FunctionType) -> types.FunctionType:
@command_wraps(func)
async def wrapper(*args, **kwargs) -> None:
async def wrapper(*args, **kwargs) -> t.Any:
log.trace(f"{func.__name__}: respect role hierarchy decorator called")

bound_args = function.get_bound_args(func, args, kwargs)
target = function.get_arg_value(member_arg, bound_args)

if not isinstance(target, Member):
log.trace("The target is not a discord.Member; skipping role hierarchy check.")
await func(*args, **kwargs)
return
return await func(*args, **kwargs)

ctx = function.get_arg_value(1, bound_args)
cmd = ctx.command.name
Expand All @@ -214,7 +213,7 @@ async def wrapper(*args, **kwargs) -> None:
)
else:
log.trace(f"{func.__name__}: {target.top_role=} < {actor.top_role=}; calling func")
await func(*args, **kwargs)
return await func(*args, **kwargs)
return wrapper
return decorator

Expand Down
42 changes: 28 additions & 14 deletions bot/exts/moderation/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,17 @@ async def _delete_found(self, message_mappings: dict[TextChannel, list[Message]]

return deleted

async def _modlog_cleaned_messages(self, messages: list[Message], channels: CleanChannels, ctx: Context) -> bool:
"""Log the deleted messages to the modlog. Return True if logging was successful."""
async def _modlog_cleaned_messages(
self,
messages: list[Message],
channels: CleanChannels,
ctx: Context
) -> Optional[str]:
"""Log the deleted messages to the modlog, returning the log url if logging was successful."""
if not messages:
# Can't build an embed, nothing to clean!
await self._send_expiring_message(ctx, ":x: No matching messages could be found.")
return False
return None

# Reverse the list to have reverse chronological order
log_messages = reversed(messages)
Expand All @@ -362,7 +367,7 @@ async def _modlog_cleaned_messages(self, messages: list[Message], channels: Clea
channel_id=Channels.mod_log,
)

return True
return log_url

# endregion

Expand All @@ -375,16 +380,17 @@ async def _clean_messages(
regex: Optional[re.Pattern] = None,
first_limit: Optional[CleanLimit] = None,
second_limit: Optional[CleanLimit] = None,
) -> None:
"""A helper function that does the actual message cleaning."""
attempt_delete_invocation: bool = True,
) -> Optional[str]:
"""A helper function that does the actual message cleaning, returns the log url if logging was successful."""
self._validate_input(channels, bots_only, users, first_limit, second_limit)

# Are we already performing a clean?
if self.cleaning:
await self._send_expiring_message(
ctx, ":x: Please wait for the currently ongoing clean operation to complete."
)
return
return None
self.cleaning = True

deletion_channels = self._channels_set(channels, ctx, first_limit, second_limit)
Expand All @@ -399,8 +405,9 @@ async def _clean_messages(
# Needs to be called after standardizing the input.
predicate = self._build_predicate(first_limit, second_limit, bots_only, users, regex)

# Delete the invocation first
await self._delete_invocation(ctx)
if attempt_delete_invocation:
# Delete the invocation first
await self._delete_invocation(ctx)

if self._use_cache(first_limit):
log.trace(f"Messages for cleaning by {ctx.author.id} will be searched in the cache.")
Expand All @@ -418,7 +425,7 @@ async def _clean_messages(

if not self.cleaning:
# Means that the cleaning was canceled
return
return None

# Now let's delete the actual messages with purge.
self.mod_log.ignore(Event.message_delete, *message_ids)
Expand All @@ -427,11 +434,18 @@ async def _clean_messages(

if not channels:
channels = deletion_channels
logged = await self._modlog_cleaned_messages(deleted_messages, channels, ctx)
log_url = await self._modlog_cleaned_messages(deleted_messages, channels, ctx)

if logged and is_mod_channel(ctx.channel):
with suppress(NotFound): # Can happen if the invoker deleted their own messages.
await ctx.message.add_reaction(Emojis.check_mark)
success_message = (
f"{Emojis.ok_hand} Deleted {len(deleted_messages)} messages. "
f"A log of the deleted messages can be found here {log_url}."
)
if log_url and is_mod_channel(ctx.channel):
await ctx.reply(success_message)
elif log_url:
if mods := self.bot.get_channel(Channels.mods):
await mods.send(f"{ctx.author.mention} {success_message}")
return log_url

# region: Commands

Expand Down
79 changes: 60 additions & 19 deletions bot/exts/moderation/infraction/infractions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from bot import constants
from bot.bot import Bot
from bot.constants import Event
from bot.converters import Duration, Expiry, MemberOrUser, UnambiguousMemberOrUser
from bot.converters import Age, Duration, Expiry, Infraction, MemberOrUser, UnambiguousMemberOrUser
from bot.decorators import respect_role_hierarchy
from bot.exts.moderation.infraction import _utils
from bot.exts.moderation.infraction._scheduler import InfractionScheduler
Expand All @@ -19,6 +19,11 @@

log = get_logger(__name__)

if t.TYPE_CHECKING:
from bot.exts.moderation.clean import Clean
from bot.exts.moderation.infraction.management import ModManagement
from bot.exts.moderation.watchchannels.bigbrother import BigBrother


class Infractions(InfractionScheduler, commands.Cog):
"""Apply and pardon infractions on users for moderation purposes."""
Expand Down Expand Up @@ -91,8 +96,8 @@ async def ban(
"""
await self.apply_ban(ctx, user, reason, expires_at=duration)

@command(aliases=('pban',))
async def purgeban(
@command(aliases=("cban", "purgeban", "pban"))
async def cleanban(
self,
ctx: Context,
user: UnambiguousMemberOrUser,
Expand All @@ -101,11 +106,49 @@ async def purgeban(
reason: t.Optional[str] = None
) -> None:
"""
Same as ban but removes all their messages of the last 24 hours.
Same as ban, but also cleans all their messages from the last hour.

If duration is specified, it temporarily bans that user for the given duration.
"""
await self.apply_ban(ctx, user, reason, 1, expires_at=duration)
clean_cog: t.Optional[Clean] = self.bot.get_cog("Clean")
if clean_cog is None:
# If we can't get the clean cog, fall back to native purgeban.
await self.apply_ban(ctx, user, reason, purge_days=1, expires_at=duration)
return

infraction = await self.apply_ban(ctx, user, reason, expires_at=duration)
if not infraction or not infraction.get("id"):
# Ban was unsuccessful, quit early.
await ctx.send(":x: Failed to apply ban.")
log.error("Failed to apply ban to user %d", user.id)
return
Comment thread
GDWR marked this conversation as resolved.

# Calling commands directly skips Discord.py's convertors, so we need to convert args manually.
clean_time = await Age().convert(ctx, "1h")
infraction = await Infraction().convert(ctx, infraction["id"])

log_url = await clean_cog._clean_messages(
ctx,
users=[user],
channels="*",
first_limit=clean_time,
attempt_delete_invocation=False,
)
if not log_url:
# Cleaning failed, or there were no messages to clean, exit early.
return

infr_manage_cog: t.Optional[ModManagement] = self.bot.get_cog("ModManagement")
if infr_manage_cog is None:
# If we can't get the mod management cog, don't bother appending the log.
return

# Overwrite the context's send function so infraction append
# doesn't output the update infraction confirmation message.
async def send(*args, **kwargs) -> None:
pass
ctx.send = send
await infr_manage_cog.infraction_append(ctx, infraction, None, reason=f"[Clean log]({log_url})")

@command(aliases=("vban",))
async def voiceban(self, ctx: Context) -> None:
Expand Down Expand Up @@ -368,15 +411,15 @@ async def apply_ban(
reason: t.Optional[str],
purge_days: t.Optional[int] = 0,
**kwargs
) -> None:
) -> t.Optional[dict]:
"""
Apply a ban infraction with kwargs passed to `post_infraction`.

Will also remove the banned user from the Big Brother watch list if applicable.
"""
if isinstance(user, Member) and user.top_role >= ctx.me.top_role:
await ctx.send(":x: I can't ban users above or equal to me in the role hierarchy.")
return
return None

# In the case of a permanent ban, we don't need get_active_infractions to tell us if one is active
is_temporary = kwargs.get("expires_at") is not None
Expand All @@ -385,19 +428,19 @@ async def apply_ban(
if active_infraction:
if is_temporary:
log.trace("Tempban ignored as it cannot overwrite an active ban.")
return
return None

if active_infraction.get('expires_at') is None:
log.trace("Permaban already exists, notify.")
await ctx.send(f":x: User is already permanently banned (#{active_infraction['id']}).")
return
return None

log.trace("Old tempban is being replaced by new permaban.")
await self.pardon_infraction(ctx, "ban", user, send_msg=is_temporary)

infraction = await _utils.post_infraction(ctx, user, "ban", reason, active=True, **kwargs)
if infraction is None:
return
return None

infraction["purge"] = "purge " if purge_days else ""

Expand All @@ -409,19 +452,17 @@ async def apply_ban(
action = ctx.guild.ban(user, reason=reason, delete_message_days=purge_days)
await self.apply_infraction(ctx, infraction, user, action)

bb_cog: t.Optional[BigBrother] = self.bot.get_cog("Big Brother")
if infraction.get('expires_at') is not None:
log.trace(f"Ban isn't permanent; user {user} won't be unwatched by Big Brother.")
return

bb_cog = self.bot.get_cog("Big Brother")
if not bb_cog:
elif not bb_cog:
log.error(f"Big Brother cog not loaded; perma-banned user {user} won't be unwatched.")
return

log.trace(f"Big Brother cog loaded; attempting to unwatch perma-banned user {user}.")
else:
log.trace(f"Big Brother cog loaded; attempting to unwatch perma-banned user {user}.")
bb_reason = "User has been permanently banned from the server. Automatically removed."
await bb_cog.apply_unwatch(ctx, user, bb_reason, send_message=False)

bb_reason = "User has been permanently banned from the server. Automatically removed."
await bb_cog.apply_unwatch(ctx, user, bb_reason, send_message=False)
return infraction

@respect_role_hierarchy(member_arg=2)
async def apply_voice_mute(self, ctx: Context, user: MemberOrUser, reason: t.Optional[str], **kwargs) -> None:
Expand Down
91 changes: 90 additions & 1 deletion tests/bot/exts/moderation/infraction/test_infractions.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import inspect
import textwrap
import unittest
from unittest.mock import ANY, AsyncMock, MagicMock, Mock, patch
from unittest.mock import ANY, AsyncMock, DEFAULT, MagicMock, Mock, patch

from discord.errors import NotFound

from bot.constants import Event
from bot.exts.moderation.clean import Clean
from bot.exts.moderation.infraction import _utils
from bot.exts.moderation.infraction.infractions import Infractions
from bot.exts.moderation.infraction.management import ModManagement
from tests.helpers import MockBot, MockContext, MockGuild, MockMember, MockRole, MockUser, autospec


Expand Down Expand Up @@ -231,3 +233,90 @@ async def test_voice_unmute_dm_fail(self, format_user_mock, notify_pardon_mock):
"DM": "**Failed**"
})
notify_pardon_mock.assert_awaited_once()


class CleanBanTests(unittest.IsolatedAsyncioTestCase):
"""Tests for cleanban functionality."""

def setUp(self):
self.bot = MockBot()
self.mod = MockMember(roles=[MockRole(id=7890123, position=10)])
self.user = MockMember(roles=[MockRole(id=123456, position=1)])
self.guild = MockGuild()
self.ctx = MockContext(bot=self.bot, author=self.mod)
self.cog = Infractions(self.bot)
self.clean_cog = Clean(self.bot)
self.management_cog = ModManagement(self.bot)

self.cog.apply_ban = AsyncMock(return_value={"id": 42})
self.log_url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ"
self.clean_cog._clean_messages = AsyncMock(return_value=self.log_url)

def mock_get_cog(self, enable_clean, enable_manage):
Comment thread
GDWR marked this conversation as resolved.
"""Mock get cog factory that allows the user to specify whether clean and manage cogs are enabled."""
def inner(name):
if name == "ModManagement":
return self.management_cog if enable_manage else None
elif name == "Clean":
return self.clean_cog if enable_clean else None
else:
return DEFAULT
return inner

async def test_cleanban_falls_back_to_native_purge_without_clean_cog(self):
"""Should fallback to native purge if the Clean cog is not available."""
self.bot.get_cog.side_effect = self.mock_get_cog(False, False)

self.assertIsNone(await self.cog.cleanban(self.cog, self.ctx, self.user, None, reason="FooBar"))
self.cog.apply_ban.assert_awaited_once_with(
self.ctx,
self.user,
"FooBar",
purge_days=1,
expires_at=None,
)

async def test_cleanban_doesnt_purge_messages_if_clean_cog_available(self):
"""Cleanban command should use the native purge messages if the clean cog is available."""
self.bot.get_cog.side_effect = self.mock_get_cog(True, False)

self.assertIsNone(await self.cog.cleanban(self.cog, self.ctx, self.user, None, reason="FooBar"))
self.cog.apply_ban.assert_awaited_once_with(
self.ctx,
self.user,
"FooBar",
expires_at=None,
)

@patch("bot.exts.moderation.infraction.infractions.Age")
async def test_cleanban_uses_clean_cog_when_available(self, mocked_age_converter):
"""Test cleanban uses the clean cog to clean messages if it's available."""
self.bot.api_client.patch = AsyncMock()
self.bot.get_cog.side_effect = self.mock_get_cog(True, False)

mocked_age_converter.return_value.convert = AsyncMock(return_value="81M")
self.assertIsNone(await self.cog.cleanban(self.cog, self.ctx, self.user, None, reason="FooBar"))

self.clean_cog._clean_messages.assert_awaited_once_with(
self.ctx,
users=[self.user],
channels="*",
first_limit="81M",
attempt_delete_invocation=False,
)

@patch("bot.exts.moderation.infraction.infractions.Infraction")
async def test_cleanban_edits_infraction_reason(self, mocked_infraction_converter):
"""Ensure cleanban edits the ban reason with a link to the clean log."""
self.bot.get_cog.side_effect = self.mock_get_cog(True, True)

self.management_cog.infraction_append = AsyncMock()
mocked_infraction_converter.return_value.convert = AsyncMock(return_value=42)
self.assertIsNone(await self.cog.cleanban(self.cog, self.ctx, self.user, None, reason="FooBar"))

self.management_cog.infraction_append.assert_awaited_once_with(
self.ctx,
42,
None,
reason=f"[Clean log]({self.log_url})"
)
Loading