diff --git a/.flake8 b/.flake8 index 3879e514..2c5c893c 100644 --- a/.flake8 +++ b/.flake8 @@ -1,4 +1,4 @@ [flake8] -ignore = DCO010, DCO023, DOC503, DOC602, DOC603, E203, E501, E712, F401, F403, F821, W503 +ignore = DCO010, DCO023, DOC503, DOC602, DOC603, MDA002, E203, E501, E712, F401, F403, F821, W503 style = google -skip-checking-short-docstrings = False \ No newline at end of file +skip-checking-short-docstrings = False diff --git a/techsupport_bot/bot.py b/techsupport_bot/bot.py index 8c69b719..4daf24d6 100644 --- a/techsupport_bot/bot.py +++ b/techsupport_bot/bot.py @@ -329,6 +329,9 @@ async def create_new_context_config(self: Self, guild_id: str) -> munch.Munch: config_.rate_limit.enabled = False config_.rate_limit.commands = 4 config_.rate_limit.time = 10 + config_.moderation = munch.DefaultMunch(None) + config_.moderation.max_warnings = 3 + config_.moderation.alert_channel = None config_.extensions = extensions_config diff --git a/techsupport_bot/commands/__init__.py b/techsupport_bot/commands/__init__.py index ac6cbd26..720c1952 100644 --- a/techsupport_bot/commands/__init__.py +++ b/techsupport_bot/commands/__init__.py @@ -3,6 +3,7 @@ Both app and prefix commands are in this module """ +from .application import * from .burn import * from .conch import * from .config import * @@ -16,6 +17,9 @@ from .linter import * from .listen import * from .mock import * +from .moderator import * +from .modlog import * +from .notes import * from .relay import * from .roll import * from .wyr import * diff --git a/techsupport_bot/commands/application.py b/techsupport_bot/commands/application.py index a56fd219..70530850 100644 --- a/techsupport_bot/commands/application.py +++ b/techsupport_bot/commands/application.py @@ -106,7 +106,7 @@ async def setup(bot: bot.TechSupportBot) -> None: description=( "The role IDs required to manage the applications (not required to apply)" ), - default=[""], + default=[], ) config.add( key="ping_role", @@ -150,6 +150,8 @@ async def command_permission_check(interaction: discord.Interaction) -> bool: # Gets permitted roles allowed_roles = [] for role_id in config.extensions.application.manage_roles.value: + if not role_id: + continue role = interaction.guild.get_role(int(role_id)) if not role: continue diff --git a/techsupport_bot/commands/duck.py b/techsupport_bot/commands/duck.py index d1562b3b..8820e2ea 100644 --- a/techsupport_bot/commands/duck.py +++ b/techsupport_bot/commands/duck.py @@ -13,7 +13,7 @@ import munch import ui from botlogging import LogContext, LogLevel -from core import auxiliary, cogs, extensionconfig +from core import auxiliary, cogs, extensionconfig, moderation from discord import Color as embed_colors from discord.ext import commands @@ -394,9 +394,12 @@ def message_check( and channel.guild.me.guild_permissions.moderate_members ): asyncio.create_task( - message.author.timeout( - timedelta(seconds=config.extensions.duck.cooldown.value), + moderation.mute_user( + user=message.author, reason="Missed a duck", + duration=timedelta( + seconds=config.extensions.duck.cooldown.value + ), ) ) diff --git a/techsupport_bot/commands/moderator.py b/techsupport_bot/commands/moderator.py new file mode 100644 index 00000000..79322df6 --- /dev/null +++ b/techsupport_bot/commands/moderator.py @@ -0,0 +1,882 @@ +"""Manual moderation commands and helper functions""" + +from __future__ import annotations + +from datetime import datetime, timedelta +from typing import TYPE_CHECKING, Self + +import dateparser +import discord +import ui +from botlogging import LogContext, LogLevel +from commands import modlog +from core import auxiliary, cogs, extensionconfig, moderation +from discord import app_commands + +if TYPE_CHECKING: + import bot + + +async def setup(bot: bot.TechSupportBot) -> None: + """Adds the cog to the bot. Setups config + + Args: + bot (bot.TechSupportBot): The bot object to register the cog with + """ + config = extensionconfig.ExtensionConfig() + config.add( + key="immune_roles", + datatype="list", + title="Immune role names", + description="The list of role names that are immune to protect commands", + default=[], + ) + config.add( + key="ban_delete_duration", + datatype="int", + title="Ban delete duration (days)", + description=( + "The default amount of days to delete messages for a user after they are banned" + ), + default=7, + ) + await bot.add_cog(ProtectCommands(bot=bot, extension_name="moderator")) + bot.add_extension_config("moderator", config) + + +class ProtectCommands(cogs.BaseCog): + """The cog for all manual moderation activities + These are all slash commands + + Attributes: + warnings_group (app_commands.Group): The group for the /warning commands + """ + + warnings_group: app_commands.Group = app_commands.Group( + name="warning", description="...", extras={"module": "moderator"} + ) + + # Commands + + @app_commands.checks.has_permissions(ban_members=True) + @app_commands.checks.bot_has_permissions(ban_members=True) + @app_commands.command( + name="ban", + description="Bans a user from the guild", + extras={"module": "moderator"}, + ) + async def handle_ban_user( + self: Self, + interaction: discord.Interaction, + target: discord.User, + reason: str, + delete_days: app_commands.Range[int, 0, 7] = None, + ) -> None: + """The ban slash command. This checks that the permissions are correct + and that the user is not already banned + + Args: + interaction (discord.Interaction): The interaction that called this command + target (discord.User): The target to ban + reason (str): The reason the person is getting banned + delete_days (app_commands.Range[int, 0, 7], optional): How many days of + messages to delete. Defaults to None. + """ + # Ensure we can ban the person + permission_check = await self.permission_check( + invoker=interaction.user, target=target, action_name="ban" + ) + if permission_check: + embed = auxiliary.prepare_deny_embed(message=permission_check) + await interaction.response.send_message(embed=embed) + return + + if len(reason) > 500: + embed = auxiliary.prepare_deny_embed( + message="Ban reason must be under 500 characters" + ) + await interaction.response.send_message(embed=embed) + return + + is_banned = await moderation.check_if_user_banned(target, interaction.guild) + if is_banned: + embed = auxiliary.prepare_deny_embed(message="User is already banned.") + await interaction.response.send_message(embed=embed) + return + + if not delete_days: + config = self.bot.guild_configs[str(interaction.guild.id)] + delete_days = config.extensions.moderator.ban_delete_duration.value + + # Ban the user using the core moderation cog + result = await moderation.ban_user( + guild=interaction.guild, + user=target, + delete_seconds=delete_days * 86400, + reason=f"{reason} - banned by {interaction.user}", + ) + if not result: + embed = auxiliary.prepare_deny_embed( + message=f"Something went wrong when banning {target}" + ) + await interaction.response.send_message(embed=embed) + return + + await modlog.log_ban( + self.bot, target, interaction.user, interaction.guild, reason + ) + + await moderation.send_command_usage_alert( + bot_object=self.bot, + interaction=interaction, + command=( + f"/ban target: {target.display_name}, reason: {reason}, delete_days:" + f" {delete_days}" + ), + guild=interaction.guild, + target=target, + ) + embed = generate_response_embed(user=target, action="ban", reason=reason) + await interaction.response.send_message(content=target.mention, embed=embed) + + @app_commands.checks.has_permissions(ban_members=True) + @app_commands.checks.bot_has_permissions(ban_members=True) + @app_commands.command( + name="unban", + description="Unbans a user from the guild", + extras={"module": "moderator"}, + ) + async def handle_unban_user( + self: Self, interaction: discord.Interaction, target: discord.User, reason: str + ) -> None: + """The logic for the /unban command + + Args: + interaction (discord.Interaction): The interaction that triggered this command + target (discord.User): The target to be unbanned + reason (str): The reason for the user being unbanned + """ + permission_check = await self.permission_check( + invoker=interaction.user, target=target, action_name="unban" + ) + if permission_check: + embed = auxiliary.prepare_deny_embed(message=permission_check) + await interaction.response.send_message(embed=embed) + return + + if len(reason) > 500: + embed = auxiliary.prepare_deny_embed( + message="Unban reason must be under 500 characters" + ) + await interaction.response.send_message(embed=embed) + return + + is_banned = await moderation.check_if_user_banned(target, interaction.guild) + + if not is_banned: + embed = auxiliary.prepare_deny_embed(message=f"{target} is not banned") + await interaction.response.send_message(embed=embed) + return + + result = await moderation.unban_user( + guild=interaction.guild, + user=target, + reason=f"{reason} - unbanned by {interaction.user}", + ) + if not result: + embed = auxiliary.prepare_deny_embed( + message=f"Something went wrong when unbanning {target}" + ) + await interaction.response.send_message(embed=embed) + return + + await modlog.log_unban( + self.bot, target, interaction.user, interaction.guild, reason + ) + + await moderation.send_command_usage_alert( + bot_object=self.bot, + interaction=interaction, + command=f"/unban target: {target.display_name}, reason: {reason}", + guild=interaction.guild, + target=target, + ) + embed = generate_response_embed(user=target, action="unban", reason=reason) + await interaction.response.send_message(content=target.mention, embed=embed) + + @app_commands.checks.has_permissions(kick_members=True) + @app_commands.checks.bot_has_permissions(kick_members=True) + @app_commands.command( + name="kick", + description="Kicks a user from the guild", + extras={"module": "moderator"}, + ) + async def handle_kick_user( + self: Self, + interaction: discord.Interaction, + target: discord.Member, + reason: str, + ) -> None: + """The core logic for the /kick command + + Args: + interaction (discord.Interaction): The interaction that triggered the command + target (discord.Member): The target for being kicked + reason (str): The reason for the user being kicked + """ + permission_check = await self.permission_check( + invoker=interaction.user, target=target, action_name="kick" + ) + if permission_check: + embed = auxiliary.prepare_deny_embed(message=permission_check) + await interaction.response.send_message(embed=embed) + return + + if len(reason) > 500: + embed = auxiliary.prepare_deny_embed( + message="Kick reason must be under 500 characters" + ) + await interaction.response.send_message(embed=embed) + return + + result = await moderation.kick_user( + guild=interaction.guild, + user=target, + reason=f"{reason} - kicked by {interaction.user}", + ) + if not result: + embed = auxiliary.prepare_deny_embed( + message=f"Something went wrong when kicking {target}" + ) + await interaction.response.send_message(embed=embed) + return + + await moderation.send_command_usage_alert( + bot_object=self.bot, + interaction=interaction, + command=f"/kick target: {target.display_name}, reason: {reason}", + guild=interaction.guild, + target=target, + ) + embed = generate_response_embed(user=target, action="kick", reason=reason) + await interaction.response.send_message(content=target.mention, embed=embed) + + @app_commands.checks.has_permissions(moderate_members=True) + @app_commands.checks.bot_has_permissions(moderate_members=True) + @app_commands.command( + name="mute", description="Times out a user", extras={"module": "moderator"} + ) + async def handle_mute_user( + self: Self, + interaction: discord.Interaction, + target: discord.Member, + reason: str, + duration: str = None, + ) -> None: + """The core logic for the /mute command + + Args: + interaction (discord.Interaction): The interaction that triggered this command + target (discord.Member): The target for being muted + reason (str): The reason for being muted + duration (str, optional): The human readable duration to be muted for. Defaults to None. + """ + permission_check = await self.permission_check( + invoker=interaction.user, target=target, action_name="mute" + ) + if permission_check: + embed = auxiliary.prepare_deny_embed(message=permission_check) + await interaction.response.send_message(embed=embed) + return + + if len(reason) > 500: + embed = auxiliary.prepare_deny_embed( + message="Mute reason must be under 500 characters" + ) + await interaction.response.send_message(embed=embed) + return + + # The API prevents administrators from being timed out. Check it here + if target.guild_permissions.administrator: + embed = auxiliary.prepare_deny_embed( + message=( + "Someone with the `administrator` permissions cannot be timed out" + ) + ) + await interaction.response.send_message(embed=embed) + return + + delta_duration = None + + if duration: + # The date parser defaults to time in the past, so it is second + # This could be fixed by appending "in" to your query, but this is simpler + try: + delta_duration = datetime.now() - dateparser.parse(duration) + delta_duration = timedelta( + seconds=round(delta_duration.total_seconds()) + ) + except TypeError: + embed = auxiliary.prepare_deny_embed(message="Invalid duration") + await interaction.response.send_message(embed=embed) + return + if not delta_duration: + embed = auxiliary.prepare_deny_embed(message="Invalid duration") + await interaction.response.send_message(embed=embed) + return + else: + delta_duration = timedelta(hours=1) + + # Checks to ensure time is valid and within the scope of the API + if delta_duration > timedelta(days=28): + embed = auxiliary.prepare_deny_embed( + message="Timeout duration cannot be more than 28 days" + ) + await interaction.response.send_message(embed=embed) + return + if delta_duration < timedelta(seconds=1): + embed = auxiliary.prepare_deny_embed( + message="Timeout duration cannot be less than 1 second" + ) + await interaction.response.send_message(embed=embed) + return + + result = await moderation.mute_user( + user=target, + reason=f"{reason} - muted by {interaction.user}", + duration=delta_duration, + ) + if not result: + embed = auxiliary.prepare_deny_embed( + message=f"Something went wrong when muting {target}" + ) + await interaction.response.send_message(embed=embed) + return + + await moderation.send_command_usage_alert( + bot_object=self.bot, + interaction=interaction, + command=( + f"/mute target: {target.display_name}, reason: {reason}, duration:" + f" {duration}" + ), + guild=interaction.guild, + target=target, + ) + + muted_until_timestamp = ( + f"" + ) + + full_reason = f"{reason} (muted until {muted_until_timestamp})" + + embed = generate_response_embed(user=target, action="mute", reason=full_reason) + await interaction.response.send_message(content=target.mention, embed=embed) + + @app_commands.checks.has_permissions(moderate_members=True) + @app_commands.checks.bot_has_permissions(moderate_members=True) + @app_commands.command( + name="unmute", + description="Removes timeout from a user", + extras={"module": "moderator"}, + ) + async def handle_unmute_user( + self: Self, + interaction: discord.Interaction, + target: discord.Member, + reason: str, + ) -> None: + """The core logic for the /unmute command + + Args: + interaction (discord.Interaction): The interaction that triggered this command + target (discord.Member): The target for being unmuted + reason (str): The reason for the unmute + """ + permission_check = await self.permission_check( + invoker=interaction.user, target=target, action_name="unmute" + ) + if permission_check: + embed = auxiliary.prepare_deny_embed(message=permission_check) + await interaction.response.send_message(embed=embed) + return + + if len(reason) > 500: + embed = auxiliary.prepare_deny_embed( + message="Unmute reason must be under 500 characters" + ) + await interaction.response.send_message(embed=embed) + return + + if not target.timed_out_until: + embed = auxiliary.prepare_deny_embed( + message=(f"{target} is not currently muted") + ) + await interaction.response.send_message(embed=embed) + return + + result = await moderation.unmute_user( + user=target, + reason=f"{reason} - unmuted by {interaction.user}", + ) + if not result: + embed = auxiliary.prepare_deny_embed( + message=f"Something went wrong when unmuting {target}" + ) + await interaction.response.send_message(embed=embed) + return + + await moderation.send_command_usage_alert( + bot_object=self.bot, + interaction=interaction, + command=f"/unmute target: {target.display_name}, reason: {reason}", + guild=interaction.guild, + target=target, + ) + embed = generate_response_embed(user=target, action="unmute", reason=reason) + await interaction.response.send_message(content=target.mention, embed=embed) + + @app_commands.checks.has_permissions(kick_members=True) + @app_commands.checks.bot_has_permissions(kick_members=True) + @app_commands.command( + name="warn", description="Warns a user", extras={"module": "moderator"} + ) + async def handle_warn_user( + self: Self, + interaction: discord.Interaction, + target: discord.Member, + reason: str, + ) -> None: + """The core logic for the /warn command + + Args: + interaction (discord.Interaction): The interaction that triggered this command + target (discord.Member): The target for being warned + reason (str): The reason the user is being warned + """ + permission_check = await self.permission_check( + invoker=interaction.user, target=target, action_name="warn" + ) + if permission_check: + embed = auxiliary.prepare_deny_embed(message=permission_check) + await interaction.response.send_message(embed=embed) + return + + if len(reason) > 500: + embed = auxiliary.prepare_deny_embed( + message="Warn reason must be under 500 characters" + ) + await interaction.response.send_message(embed=embed) + return + + if target not in interaction.channel.members: + embed = auxiliary.prepare_deny_embed( + message=f"{target} cannot see this warning. No warning was added." + ) + await interaction.response.send_message(embed=embed) + return + + config = self.bot.guild_configs[str(interaction.guild.id)] + + new_count_of_warnings = ( + len(await moderation.get_all_warnings(self.bot, target, interaction.guild)) + + 1 + ) + + should_ban = False + if new_count_of_warnings >= config.moderation.max_warnings: + await interaction.response.defer(ephemeral=False) + view = ui.Confirm() + await view.send( + message="This user has exceeded the max warnings of " + + f"{config.moderation.max_warnings}. Would " + + "you like to ban them instead?", + channel=interaction.channel, + author=interaction.user, + interaction=interaction, + ) + await view.wait() + if view.value is ui.ConfirmResponse.CONFIRMED: + should_ban = True + + warn_result = await moderation.warn_user( + bot_object=self.bot, user=target, invoker=interaction.user, reason=reason + ) + + if should_ban: + ban_result = await moderation.ban_user( + guild=interaction.guild, + user=target, + delete_seconds=( + config.extensions.moderator.ban_delete_duration.value * 86400 + ), + reason=( + f"Over max warning count {new_count_of_warnings} out of" + f" {config.moderation.max_warnings} (final warning:" + f" {reason}) - banned by {interaction.user}" + ), + ) + if not ban_result: + embed = auxiliary.prepare_deny_embed( + message=f"Something went wrong when banning {target}" + ) + if interaction.response.is_done(): + await interaction.followup.send(embed=embed) + else: + await interaction.response.send_message(embed=embed) + return + await modlog.log_ban( + self.bot, target, interaction.user, interaction.guild, reason + ) + + if not warn_result: + embed = auxiliary.prepare_deny_embed( + message=f"Something went wrong when warning {target}" + ) + if interaction.response.is_done(): + await interaction.followup.send(embed=embed) + else: + await interaction.response.send_message(embed=embed) + return + + await moderation.send_command_usage_alert( + bot_object=self.bot, + interaction=interaction, + command=f"/warn target: {target.display_name}, reason: {reason}", + guild=interaction.guild, + target=target, + ) + + embed = generate_response_embed( + user=target, + action="warn", + reason=f"{reason} ({new_count_of_warnings} total warnings)", + ) + if interaction.response.is_done(): + await interaction.followup.send(content=target.mention, embed=embed) + else: + await interaction.response.send_message(content=target.mention, embed=embed) + + try: + await target.send(embed=embed) + except (discord.HTTPException, discord.Forbidden): + channel = config.get("logging_channel") + await self.bot.logger.send_log( + message=f"Failed to DM warning to {target}", + level=LogLevel.WARNING, + channel=channel, + context=LogContext( + guild=interaction.guild, channel=interaction.channel + ), + ) + + @app_commands.checks.has_permissions(kick_members=True) + @app_commands.checks.bot_has_permissions(kick_members=True) + @app_commands.command( + name="unwarn", description="Unwarns a user", extras={"module": "moderator"} + ) + async def handle_unwarn_user( + self: Self, + interaction: discord.Interaction, + target: discord.Member, + reason: str, + warning: str, + ) -> None: + """The core logic of the /unwarn command + + Args: + interaction (discord.Interaction): The interaction that triggered the command + target (discord.Member): The user being unwarned + reason (str): The reason for the unwarn + warning (str): The exact string of the warning + """ + permission_check = await self.permission_check( + invoker=interaction.user, target=target, action_name="unwarn" + ) + if permission_check: + embed = auxiliary.prepare_deny_embed(message=permission_check) + await interaction.response.send_message(embed=embed) + return + + if len(reason) > 500: + embed = auxiliary.prepare_deny_embed( + message="Unwarn reason must be under 500 characters" + ) + await interaction.response.send_message(embed=embed) + return + + database_warning = await self.get_warning(user=target, warning=warning) + + if not database_warning: + embed = auxiliary.prepare_deny_embed( + message=f"{warning} was not found on {target}" + ) + await interaction.response.send_message(embed=embed) + return + + result = await moderation.unwarn_user( + bot_object=self.bot, user=target, warning=warning + ) + if not result: + embed = auxiliary.prepare_deny_embed( + message=f"Something went wrong when unwarning {target}" + ) + await interaction.response.send_message(embed=embed) + return + + await moderation.send_command_usage_alert( + bot_object=self.bot, + interaction=interaction, + command=f"/unwarn target: {target.display_name}, reason: {reason}, warning: {warning}", + guild=interaction.guild, + target=target, + ) + embed = generate_response_embed(user=target, action="unwarn", reason=reason) + await interaction.response.send_message(content=target.mention, embed=embed) + + @app_commands.checks.has_permissions(kick_members=True) + @app_commands.checks.bot_has_permissions(kick_members=True) + @warnings_group.command( + name="clear", + description="clears all warnings from a user", + extras={"module": "moderator"}, + ) + async def handle_warning_clear( + self: Self, + interaction: discord.Interaction, + target: discord.Member, + reason: str, + ) -> None: + """The core logic of the /warnings clear command + + Args: + interaction (discord.Interaction): The interaction that triggered the command + target (discord.Member): The user having warnings cleared + reason (str): The reason for the warnings being cleared + """ + permission_check = await self.permission_check( + invoker=interaction.user, target=target, action_name="unwarn" + ) + if permission_check: + embed = auxiliary.prepare_deny_embed(message=permission_check) + await interaction.response.send_message(embed=embed) + return + + if len(reason) > 500: + embed = auxiliary.prepare_deny_embed( + message="Reason must be under 500 characters" + ) + await interaction.response.send_message(embed=embed) + return + + warnings = await moderation.get_all_warnings( + self.bot, target, interaction.guild + ) + + if not warnings: + embed = auxiliary.prepare_deny_embed(message=f"{target} has no warnings") + await interaction.response.send_message(embed=embed) + return + + for warning in warnings: + await moderation.unwarn_user(self.bot, target, warning.reason) + + await moderation.send_command_usage_alert( + bot_object=self.bot, + interaction=interaction, + command=f"/warnings clear target: {target.display_name}, reaason: {reason}", + guild=interaction.guild, + target=target, + ) + + embed = generate_response_embed( + user=target, action="warnings clear", reason=reason + ) + await interaction.response.send_message(content=target.mention, embed=embed) + + @app_commands.checks.has_permissions(kick_members=True) + @app_commands.checks.bot_has_permissions(kick_members=True) + @warnings_group.command( + name="all", + description="Shows all warnings to the invoker", + extras={"module": "moderator"}, + ) + async def handle_warning_all( + self: Self, + interaction: discord.Interaction, + target: discord.User, + ) -> None: + """The core logic of the /warnings all command + + Args: + interaction (discord.Interaction): The interaction that triggered the command + target (discord.User): The user to lookup warnings for + """ + warnings = await moderation.get_all_warnings( + self.bot, target, interaction.guild + ) + + embeds = build_warning_embeds(interaction.guild, target, warnings) + + await interaction.response.defer(ephemeral=True) + view = ui.PaginateView() + await view.send( + interaction.channel, interaction.user, embeds, interaction, True + ) + + # Helper functions + + async def permission_check( + self: Self, + invoker: discord.Member, + target: discord.User | discord.Member, + action_name: str, + ) -> str: + """Checks permissions to ensure the command should be executed. This checks: + If the target is the invoker + If the target is the bot + If the user is in the server + If the target has an immune role + If the target can be banned by the bot + If the invoker has a higher role than the target + + Args: + invoker (discord.Member): The invoker of the action. + Either will be the user who ran the command, or the bot itself + target (discord.User | discord.Member): The target of the command. + Can be a user or member. + action_name (str): The action name to be displayed in messages + + Returns: + str: The rejection string, if one exists. Otherwise, None is returned + """ + config = self.bot.guild_configs[str(invoker.guild.id)] + # Check to see if executed on author + if invoker == target: + return f"You cannot {action_name} yourself" + + # Check to see if executed on bot + if target == self.bot.user: + return f"It would be silly to {action_name} myself" + + # Check to see if User or Member + if isinstance(target, discord.User): + return None + + # Check to see if target has any immune roles + try: + for name in config.extensions.moderator.immune_roles.value: + role_check = discord.utils.get(target.guild.roles, name=name) + if role_check and role_check in getattr(target, "roles", []): + return ( + f"You cannot {action_name} {target} because they have" + f" `{role_check}` role" + ) + except AttributeError: + pass + + # Check to see if the Bot can execute on the target + if invoker.guild.get_member(int(self.bot.user.id)).top_role <= target.top_role: + return f"Bot does not have enough permissions to {action_name} `{target}`" + + # Check to see if author top role is higher than targets + if invoker.top_role <= target.top_role: + return f"You do not have enough permissions to {action_name} `{target}`" + + return None + + # Database functions + + async def get_warning( + self: Self, user: discord.Member, warning: str + ) -> bot.models.Warning: + """Gets a specific warning by string for a user + + Args: + user (discord.Member): The user to get the warning for + warning (str): The warning to look for + + Returns: + bot.models.Warning: If it exists, the warning object + """ + query = ( + self.bot.models.Warning.query.where( + self.bot.models.Warning.guild_id == str(user.guild.id) + ) + .where(self.bot.models.Warning.reason == warning) + .where(self.bot.models.Warning.user_id == str(user.id)) + ) + entry = await query.gino.first() + return entry + + +def generate_response_embed( + user: discord.Member, action: str, reason: str +) -> discord.Embed: + """This generates a simple embed to be displayed in the chat where the command was called. + + Args: + user (discord.Member): The user who was actioned against + action (str): The string representation of the action type + reason (str): The reason the action was taken + + Returns: + discord.Embed: The formatted embed ready to be sent + """ + embed = discord.Embed( + title="Chat Protection", description=f"{action.upper()} `{user}`" + ) + embed.add_field(name="Reason", value=reason) + embed.set_thumbnail(url=user.display_avatar.url) + embed.color = discord.Color.gold() + + return embed + + +def build_warning_embeds( + guild: discord.Guild, + member: discord.Member, + warnings: list[bot.models.UserNote], +) -> list[discord.Embed]: + """Makes a list of embeds with 6 warnings per page, for a given user + + Args: + guild (discord.Guild): The guild where the warnings occured + member (discord.Member): The member whose warnings are being looked for + warnings (list[bot.models.UserNote]): The list of warnings from the database + + Returns: + list[discord.Embed]: The list of well formatted embeds + """ + embed = auxiliary.generate_basic_embed( + f"Warnings for `{member.display_name}` (`{member.name}`)", + color=discord.Color.dark_blue(), + ) + embed.set_footer(text=f"{len(warnings)} total warns.") + + embeds = [] + + if not warnings: + embed.description = "No warnings" + return [embed] + + for index, warn in enumerate(warnings): + if index % 6 == 0 and index > 0: + embeds.append(embed) + embed = auxiliary.generate_basic_embed( + f"Warnings for `{member.display_name}` (`{member.name}`)", + color=discord.Color.dark_blue(), + ) + embed.set_footer(text=f"{len(warnings)} total warns.") + warn_author = "Unknown" + if warn.invoker_id: + warn_author = warn.invoker_id + author = guild.get_member(int(warn.invoker_id)) + if author: + warn_author = author.name + embed.add_field( + name=f"Warned by {warn_author}", + value=f"{warn.reason}\nWarned ", + ) + embeds.append(embed) + return embeds diff --git a/techsupport_bot/commands/modlog.py b/techsupport_bot/commands/modlog.py new file mode 100644 index 00000000..fa55633e --- /dev/null +++ b/techsupport_bot/commands/modlog.py @@ -0,0 +1,371 @@ +"""Commands and functions to log and interact with logs of bans and unbans""" + +from __future__ import annotations + +import datetime +from collections import Counter +from typing import TYPE_CHECKING, Self + +import discord +import munch +import ui +from core import auxiliary, cogs, extensionconfig +from discord import app_commands +from discord.ext import commands + +if TYPE_CHECKING: + import bot + + +async def setup(bot: bot.TechSupportBot) -> None: + """Adds the cog to the bot. Setups config + + Args: + bot (bot.TechSupportBot): The bot object to register the cog with + """ + config = extensionconfig.ExtensionConfig() + config.add( + key="alert_channel", + datatype="int", + title="Alert channel ID", + description="The ID of the channel to send auto-protect alerts to", + default=None, + ) + await bot.add_cog(BanLogger(bot=bot, extension_name="modlog")) + bot.add_extension_config("modlog", config) + + +class BanLogger(cogs.BaseCog): + """The class that holds the /modlog commands + + Attributes: + modlog_group (app_commands.Group): The group for the /modlog commands + """ + + modlog_group: app_commands.Group = app_commands.Group( + name="modlog", description="...", extras={"module": "modlog"} + ) + + @modlog_group.command( + name="highscores", + description="Shows the top 10 moderators based on ban count", + extras={"module": "modlog"}, + ) + async def high_score_command(self: Self, interaction: discord.Interaction) -> None: + """Gets the top 10 moderators based on banned user count + + Args: + interaction (discord.Interaction): The interaction that started this command + """ + all_bans = await self.bot.models.BanLog.query.where( + self.bot.models.BanLog.guild_id == str(interaction.guild.id) + ).gino.all() + ban_frequency_counter = Counter(ban.banning_moderator for ban in all_bans) + + sorted_ban_frequency = sorted( + ban_frequency_counter.items(), key=lambda x: x[1], reverse=True + ) + embed = discord.Embed(title="Most active moderators") + + final_string = "" + for index, (moderator_id, count) in enumerate(sorted_ban_frequency): + moderator = await interaction.guild.fetch_member(int(moderator_id)) + if moderator: + final_string += ( + f"{index+1}. {moderator.display_name} " + f"{moderator.mention} ({moderator.id}) - ({count})\n" + ) + else: + final_string += {f"{index+1}. Moderator left: {moderator_id}"} + + embed.description = final_string + embed.color = discord.Color.blue() + await interaction.response.send_message(embed=embed) + + @modlog_group.command( + name="lookup-user", + description="Looks up the 10 most recent bans for a given user", + extras={"module": "modlog"}, + ) + async def lookup_user_command( + self: Self, interaction: discord.Interaction, user: discord.User + ) -> None: + """This is the core of the /modlog lookup-user command + + Args: + interaction (discord.Interaction): The interaction that called the command + user (discord.User): The user to search for bans for + """ + + await interaction.response.defer(ephemeral=False) + + all_bans_by_user = ( + await self.bot.models.BanLog.query.where( + self.bot.models.BanLog.guild_id == str(interaction.guild.id) + ) + .where(self.bot.models.BanLog.banned_member == str(user.id)) + .order_by(self.bot.models.BanLog.ban_time.desc()) + .gino.all() + ) + + embeds = [] + for ban in all_bans_by_user[:10]: + temp_embed = await self.convert_ban_to_pretty_string( + ban, f"{user.name} bans" + ) + temp_embed.description += f"\n**Total bans:** {len(all_bans_by_user)}" + embeds.append(temp_embed) + + if len(embeds) == 0: + embed = auxiliary.prepare_deny_embed( + f"No bans for the user {user.name} could be found" + ) + await interaction.followup.send(embed=embed) + return + + view = ui.PaginateView() + await view.send(interaction.channel, interaction.user, embeds, interaction) + + @modlog_group.command( + name="lookup-moderator", + description="Looks up the 10 most recent bans by a given moderator", + extras={"module": "modlog"}, + ) + async def lookup_moderator_command( + self: Self, interaction: discord.Interaction, moderator: discord.Member + ) -> None: + """This is the core of the /modlog lookup-moderator command + + Args: + interaction (discord.Interaction): The interaction that called the command + moderator (discord.Member): The moderator to search for bans for + """ + await interaction.response.defer(ephemeral=False) + + all_bans_by_user = ( + await self.bot.models.BanLog.query.where( + self.bot.models.BanLog.guild_id == str(interaction.guild.id) + ) + .where(self.bot.models.BanLog.banning_moderator == str(moderator.id)) + .order_by(self.bot.models.BanLog.ban_time.desc()) + .gino.all() + ) + + embeds = [] + for ban in all_bans_by_user[:10]: + temp_embed = await self.convert_ban_to_pretty_string( + ban, f"Bans by {moderator.name}" + ) + temp_embed.description += f"\n**Total bans:** {len(all_bans_by_user)}" + embeds.append(temp_embed) + + if len(embeds) == 0: + embed = auxiliary.prepare_deny_embed( + f"No bans by the user {moderator.name} could be found" + ) + await interaction.followup.send(embed=embed) + return + + view = ui.PaginateView() + await view.send(interaction.channel, interaction.user, embeds, interaction) + + async def convert_ban_to_pretty_string( + self: Self, ban: munch.Munch, title: str + ) -> discord.Embed: + """This converts a database ban entry into a shiny embed + + Args: + ban (munch.Munch): The ban database entry + title (str): The title to set the embeds to + + Returns: + discord.Embed: The fancy embed + """ + member = await self.bot.fetch_user(int(ban.banned_member)) + moderator = await self.bot.fetch_user(int(ban.banning_moderator)) + embed = discord.Embed(title=title) + embed.description = ( + f"**Case:** {ban.pk}\n" + f"**Offender:** {member.name} {member.mention}\n" + f"**Reason:** {ban.reason}\n" + f"**Responsible moderator:** {moderator.name} {moderator.mention}" + ) + embed.timestamp = ban.ban_time + embed.color = discord.Color.red() + return embed + + @commands.Cog.listener() + async def on_member_ban( + self: Self, guild: discord.Guild, user: discord.User | discord.Member + ) -> None: + """See: https://discordpy.readthedocs.io/en/latest/api.html#discord.on_member_ban + + Args: + guild (discord.Guild): The guild the user got banned from + user (discord.User | discord.Member): The user that got banned. Can be either User + or Member depending if the user was in the guild or not at the time of removal. + """ + # Wait a short time to ensure the audit log has been updated + await discord.utils.sleep_until( + discord.utils.utcnow() + datetime.timedelta(seconds=2) + ) + + config = self.bot.guild_configs[str(guild.id)] + if not self.extension_enabled(config): + return + + entry = None + moderator = None + async for entry in guild.audit_logs( + limit=10, action=discord.AuditLogAction.ban + ): + if entry.target.id == user.id: + moderator = entry.user + break + + if not entry: + return + + if not moderator or moderator.bot: + return + + await log_ban(self.bot, user, moderator, guild, entry.reason) + + @commands.Cog.listener() + async def on_member_unban( + self: Self, guild: discord.Guild, user: discord.User + ) -> None: + """See: https://discordpy.readthedocs.io/en/latest/api.html#discord.on_member_unban + + Args: + guild (discord.Guild): The guild the user got unbanned from + user (discord.User): The user that got unbanned + """ + # Wait a short time to ensure the audit log has been updated + await discord.utils.sleep_until( + discord.utils.utcnow() + datetime.timedelta(seconds=2) + ) + + config = self.bot.guild_configs[str(guild.id)] + if not self.extension_enabled(config): + return + + entry = None + moderator = None + async for entry in guild.audit_logs( + limit=10, action=discord.AuditLogAction.unban + ): + if entry.target.id == user.id: + moderator = entry.user + if not entry: + return + + if not moderator or moderator.bot: + return + + await log_unban(self.bot, user, moderator, guild, entry.reason) + + +# Any bans initiated by TS will come through this +async def log_ban( + bot: bot.TechSupportBot, + banned_member: discord.User | discord.Member, + banning_moderator: discord.Member, + guild: discord.Guild, + reason: str, +) -> None: + """Logs a ban into the alert channel + + Args: + bot (bot.TechSupportBot): The bot object to use for the logging + banned_member (discord.User | discord.Member): The member who was banned + banning_moderator (discord.Member): The moderator who banned the member + guild (discord.Guild): The guild the member was banned from + reason (str): The reason for the ban + """ + config = bot.guild_configs[str(guild.id)] + if "modlog" not in config.get("enabled_extensions", []): + return + + if not reason: + reason = "No reason specified" + + ban = bot.models.BanLog( + guild_id=str(guild.id), + reason=reason, + banning_moderator=str(banning_moderator.id), + banned_member=str(banned_member.id), + ) + ban = await ban.create() + + embed = discord.Embed(title=f"ban | case {ban.pk}") + embed.description = ( + f"**Offender:** {banned_member.name} {banned_member.mention}\n" + f"**Reason:** {reason}\n" + f"**Responsible moderator:** {banning_moderator.name} {banning_moderator.mention}" + ) + embed.set_footer(text=f"ID: {banned_member.id}") + embed.timestamp = datetime.datetime.utcnow() + embed.color = discord.Color.red() + + config = bot.guild_configs[str(guild.id)] + + try: + alert_channel = guild.get_channel( + int(config.extensions.modlog.alert_channel.value) + ) + except TypeError: + alert_channel = None + + if not alert_channel: + return + + await alert_channel.send(embed=embed) + + +async def log_unban( + bot: bot.TechSupportBot, + unbanned_member: discord.User | discord.Member, + unbanning_moderator: discord.Member, + guild: discord.Guild, + reason: str, +) -> None: + """Logs an unban into the alert channel + + Args: + bot (bot.TechSupportBot): The bot object to use for the logging + unbanned_member (discord.User | discord.Member): The member who was unbanned + unbanning_moderator (discord.Member): The moderator who unbanned the member + guild (discord.Guild): The guild the member was unbanned from + reason (str): The reason for the unban + """ + config = bot.guild_configs[str(guild.id)] + if "modlog" not in config.get("enabled_extensions", []): + return + + if not reason: + reason = "No reason specified" + + embed = discord.Embed(title="unban") + embed.description = ( + f"**Offender:** {unbanned_member.name} {unbanned_member.mention}\n" + f"**Reason:** {reason}\n" + f"**Responsible moderator:** {unbanning_moderator.name} {unbanning_moderator.mention}" + ) + embed.set_footer(text=f"ID: {unbanned_member.id}") + embed.timestamp = datetime.datetime.utcnow() + embed.color = discord.Color.green() + + config = bot.guild_configs[str(guild.id)] + + try: + alert_channel = guild.get_channel( + int(config.extensions.modlog.alert_channel.value) + ) + except TypeError: + alert_channel = None + + if not alert_channel: + return + + await alert_channel.send(embed=embed) diff --git a/techsupport_bot/commands/notes.py b/techsupport_bot/commands/notes.py new file mode 100644 index 00000000..2afe3dd6 --- /dev/null +++ b/techsupport_bot/commands/notes.py @@ -0,0 +1,379 @@ +"""Module for the who extension for the discord bot.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Self + +import discord +import ui +from botlogging import LogContext, LogLevel +from core import auxiliary, cogs, extensionconfig, moderation +from discord import app_commands +from discord.ext import commands + +if TYPE_CHECKING: + import bot + + +async def setup(bot: bot.TechSupportBot) -> None: + """Loading the Who plugin into the bot + + Args: + bot (bot.TechSupportBot): The bot object to register the cogs to + """ + + config = extensionconfig.ExtensionConfig() + config.add( + key="note_role", + datatype="str", + title="Note role", + description="The name of the role to be added when a note is added to a user", + default=None, + ) + config.add( + key="note_bypass", + datatype="list", + title="Note bypass list", + description=( + "A list of roles that shouldn't have notes set or the note role assigned" + ), + default=["Moderator"], + ) + config.add( + key="note_readers", + datatype="list", + title="Note Reader Roles", + description="Users with roles in this list will be able to use whois", + default=[], + ) + config.add( + key="note_writers", + datatype="list", + title="Note Writer Roles", + description="Users with roles in this list will be able to create or delete notes", + default=[], + ) + + await bot.add_cog(Notes(bot=bot, extension_name="notes")) + bot.add_extension_config("notes", config) + + +async def is_reader(interaction: discord.Interaction) -> bool: + """Checks whether invoker can read notes. If at least one reader + role is not set, NO members can read notes + + Args: + interaction (discord.Interaction): The interaction in which the whois command occured + + Raises: + MissingAnyRole: Raised if the user is lacking any reader role, + but there are roles defined + AppCommandError: Raised if there are no note_readers set in the config + + Returns: + bool: True if the user can run, False if they cannot + """ + + config = interaction.client.guild_configs[str(interaction.guild.id)] + if reader_roles := config.extensions.notes.note_readers.value: + roles = ( + discord.utils.get(interaction.guild.roles, name=role) + for role in reader_roles + ) + status = any((role in interaction.user.roles for role in roles)) + if not status: + raise app_commands.MissingAnyRole(reader_roles) + return True + + # Reader_roles are empty (not set) + message = "There aren't any `note_readers` roles set in the config!" + + raise app_commands.AppCommandError(message) + + +async def is_writer(interaction: discord.Interaction) -> bool: + """Checks whether invoker can write notes. If at least one writer + role is not set, NO members can write notes + + Args: + interaction (discord.Interaction): The interaction in which the whois command occured + + Raises: + MissingAnyRole: Raised if the user is lacking any writer role, + but there are roles defined + AppCommandError: Raised if there are no note_writers set in the config + + Returns: + bool: True if the user can run, False if they cannot + """ + config = interaction.client.guild_configs[str(interaction.guild.id)] + if writer_roles := config.extensions.notes.note_writers.value: + roles = ( + discord.utils.get(interaction.guild.roles, name=role) + for role in writer_roles + ) + status = any((role in interaction.user.roles for role in roles)) + if not status: + raise app_commands.MissingAnyRole(writer_roles) + return True + + # Reader_roles are empty (not set) + message = "There aren't any `note_writers` roles set in the config!" + + raise app_commands.AppCommandError(message) + + +class Notes(cogs.BaseCog): + """Class to set up who for the extension. + + Attributes: + notes (app_commands.Group): The group for the /note commands + + """ + + notes: app_commands.Group = app_commands.Group( + name="notes", description="Command Group for the Notes Extension" + ) + + @app_commands.check(is_reader) + @app_commands.check(is_writer) + @notes.command( + name="set", + description="Adds a note to a given user.", + extras={ + "brief": "Sets a note for a user", + "usage": "@user [note]", + "module": "notes", + }, + ) + async def set_note( + self: Self, interaction: discord.Interaction, user: discord.Member, body: str + ) -> None: + """Adds a new note to a user + This is the entrance for the /note set command + + Args: + interaction (discord.Interaction): The interaction that called this command + user (discord.Member): The member to add the note to + body (str): The contents of the note being created + """ + if interaction.user.id == user.id: + embed = auxiliary.prepare_deny_embed( + message="You cannot add a note for yourself" + ) + await interaction.response.send_message(embed=embed, ephemeral=True) + return + + note = self.bot.models.UserNote( + user_id=str(user.id), + guild_id=str(interaction.guild.id), + author_id=str(interaction.user.id), + body=body, + ) + + config = self.bot.guild_configs[str(interaction.guild.id)] + + # Check to make sure notes are allowed to be assigned + for name in config.extensions.notes.note_bypass.value: + role_check = discord.utils.get(interaction.guild.roles, name=name) + if not role_check: + continue + if role_check in getattr(user, "roles", []): + embed = auxiliary.prepare_deny_embed( + message=f"You cannot assign notes to `{user}` because " + + f"they have `{role_check}` role", + ) + await interaction.response.send_message(embed=embed, ephemeral=True) + return + + await note.create() + + role = discord.utils.get( + interaction.guild.roles, name=config.extensions.notes.note_role.value + ) + + if not role: + embed = auxiliary.prepare_confirm_embed( + message=f"Note created for `{user}`, but no note " + + "role is configured so no role was added", + ) + await interaction.response.send_message(embed=embed, ephemeral=True) + return + + await user.add_roles(role, reason=f"First note was added by {interaction.user}") + + embed = auxiliary.prepare_confirm_embed(message=f"Note created for `{user}`") + await interaction.response.send_message(embed=embed, ephemeral=True) + + @app_commands.check(is_reader) + @app_commands.check(is_writer) + @notes.command( + name="clear", + description="Clears all existing notes for a user", + extras={ + "brief": "Clears all notes for a user", + "usage": "@user", + "module": "notes", + }, + ) + async def clear_notes( + self: Self, interaction: discord.Interaction, user: discord.Member + ) -> None: + """Clears all notes on a given user + This is the entrace for the /note clear command + + Args: + interaction (discord.Interaction): The interaction that called this command + user (discord.Member): The member to remove all notes from + """ + notes = await moderation.get_all_notes(self.bot, user, interaction.guild) + + if not notes: + embed = auxiliary.prepare_deny_embed( + message="There are no notes for that user" + ) + await interaction.response.send_message(embed=embed, ephemeral=True) + return + + await interaction.response.defer(ephemeral=True) + view = ui.Confirm() + + await view.send( + message=f"Are you sure you want to clear {len(notes)} notes?", + channel=interaction.channel, + author=interaction.user, + interaction=interaction, + ephemeral=True, + ) + + await view.wait() + if view.value is ui.ConfirmResponse.TIMEOUT: + return + if view.value is ui.ConfirmResponse.DENIED: + embed = auxiliary.prepare_deny_embed( + message=f"Notes for `{user}` were not cleared" + ) + await view.followup.send(embed=embed, ephemeral=True) + return + + for note in notes: + await note.delete() + + config = self.bot.guild_configs[str(interaction.guild.id)] + role = discord.utils.get( + interaction.guild.roles, name=config.extensions.notes.note_role.value + ) + if role: + await user.remove_roles( + role, reason=f"Notes were cleared by {interaction.user}" + ) + + embed = auxiliary.prepare_confirm_embed(message=f"Notes cleared for `{user}`") + await view.followup.send(embed=embed, ephemeral=True) + + @app_commands.check(is_reader) + @notes.command( + name="all", + description="Gets all notes for a user instead of just new ones", + extras={ + "brief": "Gets all notes for a user", + "usage": "@user", + "module": "notes", + }, + ) + async def all_notes( + self: Self, interaction: discord.Interaction, member: discord.Member + ) -> None: + """Gets a file containing every note on a user + This is the entrance for the /note all command + + Args: + interaction (discord.Interaction): The interaction that called this command + member (discord.Member): The member to get all notes for + """ + notes = await moderation.get_all_notes(self.bot, member, interaction.guild) + + embeds = build_note_embeds(interaction.guild, member, notes) + + await interaction.response.defer(ephemeral=True) + view = ui.PaginateView() + await view.send( + interaction.channel, interaction.user, embeds, interaction, True + ) + + # re-adds note role back to joining users + @commands.Cog.listener() + async def on_member_join(self: Self, member: discord.Member) -> None: + """Automatic listener to look at users when they join the guild. + This is to apply the note role back to joining users + + Args: + member (discord.Member): The member who has just joined + """ + config = self.bot.guild_configs[str(member.guild.id)] + if not self.extension_enabled(config): + return + + role = discord.utils.get( + member.guild.roles, name=config.extensions.notes.note_role.value + ) + if not role: + return + + user_notes = await moderation.get_all_notes(self.bot, member, member.guild) + if not user_notes: + return + + await member.add_roles(role, reason="Noted user has joined the guild") + + log_channel = config.get("logging_channel") + await self.bot.logger.send_log( + message=f"Found noted user with ID {member.id} joining - re-adding role", + level=LogLevel.INFO, + context=LogContext(guild=member.guild), + channel=log_channel, + ) + + +def build_note_embeds( + guild: discord.Guild, + member: discord.Member, + notes: list[bot.models.UserNote], +) -> list[discord.Embed]: + """Makes a list of embeds with 6 notes per page, for a given user + + Args: + guild (discord.Guild): The guild where the notes occured + member (discord.Member): The member whose notes are being looked for + notes (list[bot.models.UserNote]): The list of notes from the database + + Returns: + list[discord.Embed]: The list of well formatted embeds + """ + embed = auxiliary.generate_basic_embed( + f"Notes for `{member.display_name}` (`{member.name}`)", + color=discord.Color.dark_blue(), + ) + embed.set_footer(text=f"{len(notes)} total notes.") + + embeds = [] + + if not notes: + embed.description = "No notes" + return [embed] + + for index, note in enumerate(notes): + if index % 6 == 0 and index > 0: + embeds.append(embed) + embed = auxiliary.generate_basic_embed( + f"Notes for `{member.display_name}` (`{member.name}`)", + color=discord.Color.dark_blue(), + ) + embed.set_footer(text=f"{len(notes)} total notes.") + author = guild.get_member(int(note.author_id)) or note.author_id + embed.add_field( + name=f"Note by {author}", + value=f"{note.body}\nNote added ", + ) + embeds.append(embed) + return embeds diff --git a/techsupport_bot/commands/protect.py b/techsupport_bot/commands/protect.py deleted file mode 100644 index 2a495392..00000000 --- a/techsupport_bot/commands/protect.py +++ /dev/null @@ -1,1264 +0,0 @@ -"""Module for the protect extension of the discord bot.""" - -from __future__ import annotations - -import datetime -import io -import re -from datetime import timedelta -from typing import TYPE_CHECKING, Self - -import dateparser -import discord -import expiringdict -import munch -import ui -from botlogging import LogContext, LogLevel -from core import auxiliary, cogs, extensionconfig -from discord.ext import commands - -if TYPE_CHECKING: - import bot - - -async def setup(bot: bot.TechSupportBot) -> None: - """Loading the ChatGPT plugin into the bot - - Args: - bot (bot.TechSupportBot): The bot object to register the cogs to - """ - - config = extensionconfig.ExtensionConfig() - config.add( - key="channels", - datatype="list", - title="Protected channels", - description=( - "The list of channel ID's associated with the channels to auto-protect" - ), - default=[], - ) - config.add( - key="bypass_roles", - datatype="list", - title="Bypassed role names", - description=( - "The list of role names associated with bypassed roles by the auto-protect" - ), - default=[], - ) - config.add( - key="immune_roles", - datatype="list", - title="Immune role names", - description="The list of role names that are immune to protect commands", - default=[], - ) - config.add( - key="bypass_ids", - datatype="list", - title="Bypassed member ID's", - description=( - "The list of member ID's associated with bypassed members by the" - " auto-protect" - ), - default=[], - ) - config.add( - key="length_limit", - datatype="int", - title="Max length limit", - description=( - "The max char limit on messages before they trigger an action by" - " auto-protect" - ), - default=500, - ) - config.add( - key="string_map", - datatype="dict", - title="Keyword string map", - description=( - "Mapping of keyword strings to data defining the action taken by" - " auto-protect" - ), - default={}, - ) - config.add( - key="banned_file_extensions", - datatype="dict", - title="List of banned file types", - description=( - "A list of all file extensions to be blocked and have a auto warning issued" - ), - default=[], - ) - config.add( - key="alert_channel", - datatype="int", - title="Alert channel ID", - description="The ID of the channel to send auto-protect alerts to", - default=None, - ) - config.add( - key="max_mentions", - datatype="int", - title="Max message mentions", - description=( - "Max number of mentions allowed in a message before triggering auto-protect" - ), - default=3, - ) - config.add( - key="max_warnings", - datatype="int", - title="Max Warnings", - description="The amount of warnings a user should be banned on", - default=3, - ) - config.add( - key="ban_delete_duration", - datatype="int", - title="Ban delete duration (days)", - description=( - "The amount of days to delete messages for a user after they are banned" - ), - default=7, - ) - config.add( - key="max_purge_amount", - datatype="int", - title="Max Purge Amount", - description="The max amount of messages allowed to be purged in one command", - default=50, - ) - config.add( - key="paste_footer_message", - datatype="str", - title="The linx embed footer", - description="The message used on the footer of the large message paste URL", - default="Note: Long messages are automatically pasted", - ) - - await bot.add_cog(Protector(bot=bot, extension_name="protect")) - bot.add_extension_config("protect", config) - - -class Protector(cogs.MatchCog): - """Class for the protector command. - - Attributes: - ALERT_ICON_URL (str): The icon for the alert messages - CLIPBOARD_ICON_URL (str): The icon for the paste messages - CHARS_PER_NEWLINE (int): The arbitrary length of a line - - """ - - ALERT_ICON_URL: str = ( - "https://www.iconarchive.com/download/i76061/martz90/circle-addon2/warning.512.png" - ) - CLIPBOARD_ICON_URL: str = ( - "https://www.iconarchive.com/download/i107916/" - "google/noto-emoji-objects/62930-clipboard.512.png" - ) - CHARS_PER_NEWLINE: int = 80 - - async def preconfig(self: Self) -> None: - """Method to preconfig the protect.""" - self.string_alert_cache = expiringdict.ExpiringDict( - max_len=100, max_age_seconds=3600 - ) - - async def match( - self: Self, config: munch.Munch, ctx: commands.Context, content: str - ) -> bool: - """Checks if the message could be triggered by any protect rules - Checks for channel and that the user isn't exempt - - Args: - config (munch.Munch): The guild config where the message was sent - ctx (commands.Context): The context in which the command was run in - content (str): The string content of the message sent - - Returns: - bool: False if the message shouldn't be checked, True if it should - """ - # exit the match based on exclusion parameters - if not str(ctx.channel.id) in config.extensions.protect.channels.value: - await self.bot.logger.send_log( - message="Channel not in protected channels - ignoring protect check", - level=LogLevel.DEBUG, - context=LogContext(guild=ctx.guild, channel=ctx.channel), - ) - return False - - role_names = [role.name.lower() for role in getattr(ctx.author, "roles", [])] - - if any( - role_name.lower() in role_names - for role_name in config.extensions.protect.bypass_roles.value - ): - return False - - if ctx.author.id in config.extensions.protect.bypass_ids.value: - return False - - return True - - @commands.Cog.listener() - async def on_raw_message_edit( - self: Self, payload: discord.RawMessageUpdateEvent - ) -> None: - """This is called when any message is edited in any guild the bot is in. - There is no guarantee that the message exists or is used - - Args: - payload (discord.RawMessageUpdateEvent): The raw event that the edit generated - """ - guild = self.bot.get_guild(payload.guild_id) - if not guild: - return - - config = self.bot.guild_configs[str(guild.id)] - if not self.extension_enabled(config): - return - - channel = self.bot.get_channel(payload.channel_id) - if not channel: - return - - message = await channel.fetch_message(payload.message_id) - if not message: - return - - # Don't trigger if content hasn't changed - if payload.cached_message and payload.cached_message.content == message.content: - return - - ctx = await self.bot.get_context(message) - matched = await self.match(config, ctx, message.content) - if not matched: - return - - await self.response(config, ctx, message.content, None) - - def search_by_text_regex( - self: Self, config: munch.Munch, content: str - ) -> munch.Munch: - """Searches a given message for static text and regex rule violations - - Args: - config (munch.Munch): The guild config where the message was sent - content (str): The string contents of the message that might be filtered - - Returns: - munch.Munch: The most aggressive filter that is triggered - """ - triggered_config = None - for ( - keyword, - filter_config, - ) in config.extensions.protect.string_map.value.items(): - filter_config = munch.munchify(filter_config) - search_keyword = keyword - search_content = content - - regex = filter_config.get("regex") - if regex: - try: - match = re.search(regex, search_content) - except re.error: - match = None - if match: - filter_config["trigger"] = keyword - triggered_config = filter_config - if triggered_config.get("delete"): - return triggered_config - else: - if filter_config.get("sensitive"): - search_keyword = search_keyword.lower() - search_content = search_content.lower() - if search_keyword in search_content: - filter_config["trigger"] = keyword - triggered_config = filter_config - if triggered_config.get("delete"): - return triggered_config - return triggered_config - - async def response( - self: Self, config: munch.Munch, ctx: commands.Context, content: str, _: bool - ) -> None: - """Checks if a message does violate any set automod rules - - Args: - config (munch.Munch): The guild config where the message was sent - ctx (commands.Context): The context of the original message - content (str): The string content of the message sent - """ - # check mass mentions first - return after handling - if len(ctx.message.mentions) > config.extensions.protect.max_mentions.value: - await self.handle_mass_mention_alert(config, ctx, content) - return - - # search the message against keyword strings - triggered_config = self.search_by_text_regex(config, content) - - for attachment in ctx.message.attachments: - if ( - attachment.filename.split(".")[-1] - in config.extensions.protect.banned_file_extensions.value - ): - await self.handle_file_extension_alert(config, ctx, attachment.filename) - return - - if triggered_config: - await self.handle_string_alert(config, ctx, content, triggered_config) - if triggered_config.get("delete"): - # the message is deleted, no need to pastebin it - return - - # check length of content - if len(content) > config.extensions.protect.length_limit.value or content.count( - "\n" - ) > self.max_newlines(config.extensions.protect.length_limit.value): - await self.handle_length_alert(config, ctx, content) - - def max_newlines(self: Self, max_length: int) -> int: - """Gets a theoretical maximum number of new lines in a given message - - Args: - max_length (int): The max length of characters per theoretical line - - Returns: - int: The maximum number of new lines based on config - """ - return int(max_length / self.CHARS_PER_NEWLINE) + 1 - - async def handle_length_alert( - self: Self, config: munch.Munch, ctx: commands.Context, content: str - ) -> None: - """Moves message into a linx paste if it's too long - - Args: - config (munch.Munch): The guild config where the too long message was sent - ctx (commands.Context): The context where the original message was sent - content (str): The string content of the flagged message - """ - attachments: list[discord.File] = [] - if ctx.message.attachments: - total_attachment_size = 0 - for attch in ctx.message.attachments: - if ( - total_attachment_size := total_attachment_size + attch.size - ) <= ctx.filesize_limit: - attachments.append(await attch.to_file()) - if (lf := len(ctx.message.attachments) - len(attachments)) != 0: - log_channel = config.get("logging_channel") - await self.bot.logger.send_log( - message=( - f"Protect did not reupload {lf} file(s) due to file size limit." - ), - level=LogLevel.WARN, - channel=log_channel, - context=LogContext(guild=ctx.guild, channel=ctx.channel), - ) - await ctx.message.delete() - - reason = "message too long (too many newlines or characters)" - - if not self.bot.file_config.api.api_url.linx: - await self.send_default_delete_response(config, ctx, content, reason) - return - - linx_embed = await self.create_linx_embed(config, ctx, content) - if not linx_embed: - await self.send_default_delete_response(config, ctx, content, reason) - await self.send_alert(config, ctx, "Could not convert text to Linx paste") - return - - await ctx.send( - ctx.message.author.mention, embed=linx_embed, files=attachments[:10] - ) - - async def handle_mass_mention_alert( - self: Self, config: munch.Munch, ctx: commands.Context, content: str - ) -> None: - """Handles a mass mention alert from automod - - Args: - config (munch.Munch): The guild config where the message was sent - ctx (commands.Context): The context where the message was sent - content (str): The string content of the message - """ - await ctx.message.delete() - await self.handle_warn(ctx, ctx.author, "mass mention", bypass=True) - await self.send_alert(config, ctx, f"Mass mentions from {ctx.author}") - - async def handle_file_extension_alert( - self: Self, config: munch.Munch, ctx: commands.Context, filename: str - ) -> None: - """Handles a suspicous file extension flag from automod - - Args: - config (munch.Munch): The guild config from where the message was sent - ctx (commands.Context): The context where the message was sent - filename (str): The filename of the suspicious file that was uploaded - """ - await ctx.message.delete() - await self.handle_warn( - ctx, ctx.author, "Suspicious file extension", bypass=True - ) - await self.send_alert( - config, ctx, f"Suspicious file uploaded by {ctx.author}: {filename}" - ) - - async def handle_string_alert( - self: Self, - config: munch.Munch, - ctx: commands.Context, - content: str, - filter_config: munch.Munch, - ) -> None: - """Handles a static string alert. Is given a rule that was violated - - Args: - config (munch.Munch): The guild config where the message was sent - ctx (commands.Context): The context where the original message was sent - content (str): The string content of the message - filter_config (munch.Munch): The rule that was triggered by the message - """ - # If needed, delete the message - if filter_config.delete: - await ctx.message.delete() - - # Send only 1 response based on warn, deletion, or neither - if filter_config.warn: - await self.handle_warn(ctx, ctx.author, filter_config.message, bypass=True) - elif filter_config.delete: - await self.send_default_delete_response( - config, ctx, content, filter_config.message - ) - else: - # Ensure we don't trigger people more than once if the only trigger is a warning - cache_key = self.get_cache_key(ctx.guild, ctx.author, filter_config.trigger) - if self.string_alert_cache.get(cache_key): - return - - self.string_alert_cache[cache_key] = True - embed = auxiliary.generate_basic_embed( - title="Chat Protection", - description=filter_config.message, - color=discord.Color.gold(), - ) - await ctx.send(ctx.message.author.mention, embed=embed) - - await self.send_alert( - config, - ctx, - f"Message contained trigger: {filter_config.trigger}", - ) - - async def handle_warn( - self: Self, - ctx: commands.Context, - user: discord.Member, - reason: str, - bypass: bool = False, - ) -> None: - """Handles the logic of a warning - - Args: - ctx (commands.Context): The context that generated the warning - user (discord.Member): The member to warn - reason (str): The reason for warning - bypass (bool, optional): If this should bypass the confirmation check. - Defaults to False. - """ - if not bypass: - can_execute = await self.can_execute(ctx, user) - if not can_execute: - return - - warnings = await self.get_warnings(user, ctx.guild) - - new_count = len(warnings) + 1 - - config = self.bot.guild_configs[str(ctx.guild.id)] - - if new_count >= config.extensions.protect.max_warnings.value: - # Start by assuming we don't want to ban someone - should_ban = False - - # If there is no bypass, ask using ui.Confirm - # If there is a bypass, assume we want to ban - if not bypass: - view = ui.Confirm() - await view.send( - message="This user has exceeded the max warnings of " - + f"{config.extensions.protect.max_warnings.value}. Would " - + "you like to ban them instead?", - channel=ctx.channel, - author=ctx.author, - ) - await view.wait() - if view.value is ui.ConfirmResponse.CONFIRMED: - should_ban = True - else: - should_ban = True - - if should_ban: - await self.handle_ban( - ctx, - user, - f"Over max warning count {new_count} out " - + f"of {config.extensions.protect.max_warnings.value}" - + f" (final warning: {reason})", - bypass=True, - ) - await self.clear_warnings(user, ctx.guild) - return - - embed = await self.generate_user_modified_embed( - user, "warn", f"{reason} ({new_count} total warnings)" - ) - - # Attempt DM for manually initiated, non-banning warns - if ctx.command == self.bot.get_command("warn"): - # Cancel warns in channels invisible to user - if not ctx.channel.permissions_for(user).view_channel: - await auxiliary.send_deny_embed( - message=f"{user} cannot see this warning.", channel=ctx.channel - ) - return - - try: - await user.send(embed=embed) - - except (discord.HTTPException, discord.Forbidden): - channel = config.get("logging_channel") - await self.bot.logger.send_log( - message=f"Failed to DM warning to {user}", - level=LogLevel.WARNING, - channel=channel, - context=LogContext(guild=ctx.guild, channel=ctx.channel), - ) - - finally: - await ctx.send(content=user.mention, embed=embed) - - else: - await ctx.send(ctx.message.author.mention, embed=embed) - - await self.bot.models.Warning( - user_id=str(user.id), guild_id=str(ctx.guild.id), reason=reason - ).create() - - async def handle_unwarn( - self: Self, - ctx: commands.Context, - user: discord.Member, - reason: str, - bypass: bool = False, - ) -> None: - """Handles the logic of clearing all warnings - - Args: - ctx (commands.Context): The context that generated theis unwarn - user (discord.Member): The member to remove warnings from - reason (str): The reason for clearing warnings - bypass (bool, optional): If this should bypass the confirmation check. - Defaults to False. - """ - # Always allow admins to unwarn other admins - if not bypass and not ctx.message.author.guild_permissions.administrator: - can_execute = await self.can_execute(ctx, user) - if not can_execute: - return - - warnings = await self.get_warnings(user, ctx.guild) - if not warnings: - await auxiliary.send_deny_embed( - message="There are no warnings for that user", channel=ctx.channel - ) - return - - await self.clear_warnings(user, ctx.guild) - - embed = await self.generate_user_modified_embed(user, "unwarn", reason) - await ctx.send(embed=embed) - - async def handle_ban( - self: Self, - ctx: commands.Context, - user: discord.User | discord.Member, - reason: str, - bypass: bool = False, - ) -> None: - """Handles the logic of banning a user. Is not a discord command - - Args: - ctx (commands.Context): The context that generated the need for a bad - user (discord.User | discord.Member): The user or member to be banned - reason (str): The ban reason to be stored in discord - bypass (bool, optional): True will ignore permission chekcks. Defaults to False. - """ - if not bypass: - can_execute = await self.can_execute(ctx, user) - if not can_execute: - return - - async for ban in ctx.guild.bans(limit=None): - if user == ban.user: - await auxiliary.send_deny_embed( - message="User is already banned.", channel=ctx.channel - ) - return - - config = self.bot.guild_configs[str(ctx.guild.id)] - await ctx.guild.ban( - user, - reason=reason, - delete_message_days=config.extensions.protect.ban_delete_duration.value, - ) - - embed = await self.generate_user_modified_embed(user, "ban", reason) - - await ctx.send(embed=embed) - - async def handle_unban( - self: Self, - ctx: commands.Context, - user: discord.User, - reason: str, - bypass: bool = False, - ) -> None: - """Handles the logic of unbanning a user. Is not a discord command - - Args: - ctx (commands.Context): The context that generated the need for the unban - user (discord.User): The user to be unbanned - reason (str): The unban reason to be saved in the audit log - bypass (bool, optional): True will ignore permission chekcks. Defaults to False. - """ - if not bypass: - can_execute = await self.can_execute(ctx, user) - if not can_execute: - return - - try: - await ctx.guild.unban(user, reason=reason) - except discord.NotFound: - await auxiliary.send_deny_embed( - message="This user is not banned, or does not exist", - channel=ctx.channel, - ) - return - - embed = await self.generate_user_modified_embed(user, "unban", reason) - - await ctx.send(embed=embed) - - async def handle_kick( - self: Self, - ctx: commands.Context, - user: discord.Member, - reason: str, - bypass: bool = False, - ) -> None: - """Handles the logic of kicking a user. Is not a discord command - - Args: - ctx (commands.Context): The context that generated the need for the kick - user (discord.Member): The user to be kicked - reason (str): The kick reason to be saved in the audit log - bypass (bool, optional): True will ignore permission chekcks. Defaults to False. - """ - if not bypass: - can_execute = await self.can_execute(ctx, user) - if not can_execute: - return - - await ctx.guild.kick(user, reason=reason) - - embed = await self.generate_user_modified_embed(user, "kick", reason) - - await ctx.send(embed=embed) - - async def clear_warnings( - self: Self, user: discord.User | discord.Member, guild: discord.Guild - ) -> None: - """This clears all warnings for a given user - - Args: - user (discord.User | discord.Member): The user or member to wipe all warnings for - guild (discord.Guild): The guild to clear warning from - """ - await self.bot.models.Warning.delete.where( - self.bot.models.Warning.user_id == str(user.id) - ).where(self.bot.models.Warning.guild_id == str(guild.id)).gino.status() - - async def generate_user_modified_embed( - self: Self, user: discord.User | discord.Member, action: str, reason: str - ) -> discord.Embed: - """This generates an embed to be shown to the user on why their message was actioned - - Args: - user (discord.User | discord.Member): The user or member who was punished - action (str): The action that was taken against the person - reason (str): The reason for the action taken - - Returns: - discord.Embed: The prepared embed ready to be sent - """ - embed = discord.Embed( - title="Chat Protection", description=f"{action.upper()} `{user}`" - ) - embed.set_footer(text=f"Reason: {reason}") - embed.set_thumbnail(url=user.display_avatar.url) - embed.color = discord.Color.gold() - - return embed - - def get_cache_key( - self: Self, guild: discord.Guild, user: discord.Member, trigger: str - ) -> str: - """Gets the cache key for repeated automod triggers - - Args: - guild (discord.Guild): The guild where the trigger has occured - user (discord.Member): The member that triggered the automod - trigger (str): The string representation of the automod rule that triggered - - Returns: - str: The key to lookup the cache entry, if it exists - """ - return f"{guild.id}_{user.id}_{trigger}" - - async def can_execute( - self: Self, ctx: commands.Context, target: discord.User | discord.Member - ) -> bool: - """Checks permissions to determine if the protect command should execute. - This checks: - - If the executer is the same as the target - - If the target is a bot - - If the member is immune to protect - - If the bot doesn't have permissions - - If the user wouldn't have permissions based on their roles - - Args: - ctx (commands.Context): The context that required the need for moderative action - target (discord.User | discord.Member): The target of the moderative action - - Returns: - bool: True if the executer can execute this command, False if they can't - """ - action = ctx.command.name or "do that to" - config = self.bot.guild_configs[str(ctx.guild.id)] - - # Check to see if executed on author - if target == ctx.author: - await auxiliary.send_deny_embed( - message=f"You cannot {action} yourself", channel=ctx.channel - ) - return False - # Check to see if executed on bot - if target == self.bot.user: - await auxiliary.send_deny_embed( - message=f"It would be silly to {action} myself", channel=ctx.channel - ) - return False - # Check to see if target has a role. Will allow execution on Users outside of server - if not hasattr(target, "top_role"): - return True - # Check to see if target has any immune roles - for name in config.extensions.protect.immune_roles.value: - role_check = discord.utils.get(target.guild.roles, name=name) - if role_check and role_check in getattr(target, "roles", []): - await auxiliary.send_deny_embed( - message=( - f"You cannot {action} {target} because they have `{role_check}`" - " role" - ), - channel=ctx.channel, - ) - return False - # Check to see if the Bot can execute on the target - if ctx.guild.me.top_role <= target.top_role: - await auxiliary.send_deny_embed( - message=f"Bot does not have enough permissions to {action} `{target}`", - channel=ctx.channel, - ) - return False - # Check to see if author top role is higher than targets - if target.top_role >= ctx.author.top_role: - await auxiliary.send_deny_embed( - message=f"You do not have enough permissions to {action} `{target}`", - channel=ctx.channel, - ) - return False - return True - - async def send_alert( - self: Self, config: munch.Munch, ctx: commands.Context, message: str - ) -> None: - """Sends a protect alert to the protect events channel to alert the mods - - Args: - config (munch.Munch): The guild config in the guild where the event occured - ctx (commands.Context): The context that generated this alert - message (str): The message to send to the mods about the alert - """ - try: - alert_channel = ctx.guild.get_channel( - int(config.extensions.protect.alert_channel.value) - ) - except TypeError: - alert_channel = None - - if not alert_channel: - return - - embed = discord.Embed(title="Protect Alert", description=message) - - if len(ctx.message.content) >= 256: - message_content = ctx.message.content[0:256] - else: - message_content = ctx.message.content - - embed.add_field(name="Channel", value=f"#{ctx.channel.name}") - embed.add_field(name="User", value=ctx.author.mention) - embed.add_field(name="Message", value=message_content, inline=False) - embed.add_field(name="URL", value=ctx.message.jump_url, inline=False) - - embed.set_thumbnail(url=self.ALERT_ICON_URL) - embed.color = discord.Color.red() - - await alert_channel.send(embed=embed) - - async def send_default_delete_response( - self: Self, - config: munch.Munch, - ctx: commands.Context, - content: str, - reason: str, - ) -> None: - """Sends a DM to a user containing a message that was deleted - - Args: - config (munch.Munch): The config of the guild where the message was sent - ctx (commands.Context): The context of the deleted message - content (str): The context of the deleted message - reason (str): The reason the message was deleted - """ - embed = auxiliary.generate_basic_embed( - title="Chat Protection", - description=f"Message deleted. Reason: *{reason}*", - color=discord.Color.gold(), - ) - await ctx.send(ctx.message.author.mention, embed=embed) - await ctx.author.send(f"Deleted message: ```{content[:1994]}```") - - async def get_warnings( - self: Self, user: discord.Member | discord.User, guild: discord.Guild - ) -> list[bot.models.Warning]: - """Gets a list of every warning for a given user - - Args: - user (discord.Member | discord.User): The user or member object to lookup warnings for - guild (discord.Guild): The guild to get the warnings for - - Returns: - list[bot.models.Warning]: The list of all warnings that - user or member has in the given guild - """ - warnings = ( - await self.bot.models.Warning.query.where( - self.bot.models.Warning.user_id == str(user.id) - ) - .where(self.bot.models.Warning.guild_id == str(guild.id)) - .gino.all() - ) - return warnings - - async def create_linx_embed( - self: Self, config: munch.Munch, ctx: commands.Context, content: str - ) -> discord.Embed | None: - """This function sends a message to the linx url and puts the result in - an embed to be sent to the user - - Args: - config (munch.Munch): The guild config where the message was sent - ctx (commands.Context): The context that generated the need for a paste - content (str): The context of the message to be pasted - - Returns: - discord.Embed | None: The formatted embed, or None if there was an API error - """ - if not content: - return None - - headers = { - "Linx-Expiry": "1800", - "Linx-Randomize": "yes", - "Accept": "application/json", - } - file_to_paste = {"file": io.StringIO(content)} - response = await self.bot.http_functions.http_call( - "post", - self.bot.file_config.api.api_url.linx, - headers=headers, - data=file_to_paste, - ) - - url = response.get("url") - if not url: - return None - - embed = discord.Embed(description=url) - - embed.add_field(name="Paste Link", value=url) - embed.description = content[0:100].replace("\n", " ") - embed.set_author( - name=f"Paste by {ctx.author}", icon_url=ctx.author.display_avatar.url - ) - embed.set_footer(text=config.extensions.protect.paste_footer_message.value) - embed.color = discord.Color.blue() - - return embed - - @commands.has_permissions(ban_members=True) - @commands.bot_has_permissions(ban_members=True) - @commands.command( - name="ban", - brief="Bans a user", - description="Bans a user with a given reason", - usage="@user [reason]", - ) - async def ban_user( - self: Self, ctx: commands.Context, user: discord.User, *, reason: str = None - ) -> None: - """The ban discord command, starts the process of banning a user - - Args: - ctx (commands.Context): The context that called this command - user (discord.User): The user that is going to be banned - reason (str, optional): The reason for the ban. Defaults to None. - """ - - # Uses the discord.Member class to get the top role attribute if the - # user is a part of the target guild - if ctx.guild.get_member(user.id) is not None: - await self.handle_ban(ctx, ctx.guild.get_member(user.id), reason) - else: - await self.handle_ban(ctx, user, reason) - - config = self.bot.guild_configs[str(ctx.guild.id)] - await self.send_alert(config, ctx, "Ban command") - - @commands.has_permissions(ban_members=True) - @commands.bot_has_permissions(ban_members=True) - @commands.command( - name="unban", - brief="Unbans a user", - description="Unbans a user with a given reason", - usage="@user [reason]", - ) - async def unban_user( - self: Self, ctx: commands.Context, user: discord.User, *, reason: str = None - ) -> None: - """The unban discord command, starts the process of unbanning a user - - Args: - ctx (commands.Context): The context that called this command - user (discord.User): The user that is going to be unbanned - reason (str, optional): The reason for the unban. Defaults to None. - """ - - # Uses the discord.Member class to get the top role attribute if the - # user is a part of the target guild - if ctx.guild.get_member(user.id) is not None: - await self.handle_unban(ctx, ctx.guild.get_member(user.id), reason) - else: - await self.handle_unban(ctx, user, reason) - - @commands.has_permissions(kick_members=True) - @commands.bot_has_permissions(kick_members=True) - @commands.command( - name="kick", - brief="Kicks a user", - description="Kicks a user with a given reason", - usage="@user [reason]", - ) - async def kick_user( - self: Self, ctx: commands.Context, user: discord.Member, *, reason: str = None - ) -> None: - """The kick discord command, starts the process of kicking a user - - Args: - ctx (commands.Context): The context that called this command - user (discord.Member): The user that is going to be kicked - reason (str, optional): The reason for the kick. Defaults to None. - """ - await self.handle_kick(ctx, user, reason) - - config = self.bot.guild_configs[str(ctx.guild.id)] - await self.send_alert(config, ctx, "Kick command") - - @commands.has_permissions(kick_members=True) - @commands.bot_has_permissions(kick_members=True) - @commands.command( - name="warn", - brief="Warns a user", - description="Warn a user with a given reason", - usage="@user [reason]", - ) - async def warn_user( - self: Self, ctx: commands.Context, user: discord.Member, *, reason: str = None - ) -> None: - """The warn discord command, starts the process of warning a user - - Args: - ctx (commands.Context): The context that called this command - user (discord.Member): The user that is going to be warned - reason (str, optional): The reason for the warn. Defaults to None. - """ - await self.handle_warn(ctx, user, reason) - - config = self.bot.guild_configs[str(ctx.guild.id)] - await self.send_alert(config, ctx, "Warn command") - - @commands.has_permissions(kick_members=True) - @commands.bot_has_permissions(kick_members=True) - @commands.command( - name="unwarn", - brief="Unwarns a user", - description="Unwarns a user with a given reason", - usage="@user [reason]", - ) - async def unwarn_user( - self: Self, ctx: commands.Context, user: discord.Member, *, reason: str = None - ) -> None: - """The unwarn discord command, starts the process of unwarning a user - This clears ALL warnings from a member - - Args: - ctx (commands.Context): The context that called this command - user (discord.Member): The user that is going to be unwarned - reason (str, optional): The reason for the unwarn. Defaults to None. - """ - await self.handle_unwarn(ctx, user, reason) - - @commands.has_permissions(kick_members=True) - @commands.bot_has_permissions(kick_members=True) - @commands.command( - name="warnings", - brief="Gets warnings", - description="Gets warnings for a user", - usage="@user", - ) - async def get_warnings_command( - self: Self, ctx: commands.Context, user: discord.User - ) -> None: - """Displays all warnings that a given user has - - Args: - ctx (commands.Context): The context that called this command - user (discord.User): The user to get warnings for - """ - warnings = await self.get_warnings(user, ctx.guild) - if not warnings: - await auxiliary.send_deny_embed( - message="There are no warnings for that user", channel=ctx.channel - ) - return - - embed = discord.Embed(title=f"Warnings for {user}") - for warning in warnings: - embed.add_field(name=warning.time, value=warning.reason, inline=False) - - embed.set_thumbnail(url=user.display_avatar.url) - - embed.color = discord.Color.red() - - await ctx.send(embed=embed) - - @commands.has_permissions(moderate_members=True) - @commands.bot_has_permissions(moderate_members=True) - @commands.command( - name="mute", - brief="Mutes a user", - description="Times out a user for the specified duration", - usage="@user [time] [reason]", - aliases=["timeout"], - ) - async def mute( - self: Self, ctx: commands.Context, user: discord.Member, *, duration: str = None - ) -> None: - """Method to mute a user in discord using the native timeout. - This should be run via discord - - Args: - ctx (commands.Context): The context that was generated by running this command - user (discord.Member): The discord.Member to be timed out. - duration (str, optional): Max time is 28 days by discord API. Defaults to 1 hour - - Raises: - ValueError: Raised if the provided duration string cannot be converted into a time - """ - - can_execute = await self.can_execute(ctx, user) - if not can_execute: - return - - # The API prevents administrators from being timed out. Check it here - if user.guild_permissions.administrator: - await auxiliary.send_deny_embed( - message=( - "Someone with the `administrator` permissions cannot be timed out" - ), - channel=ctx.channel, - ) - return - - delta_duration = None - - if duration: - # The date parser defaults to time in the past, so it is second - # This could be fixed by appending "in" to your query, but this is simpler - try: - delta_duration = datetime.datetime.now() - dateparser.parse(duration) - delta_duration = timedelta( - seconds=round(delta_duration.total_seconds()) - ) - except TypeError as exc: - raise ValueError("Invalid duration") from exc - if not delta_duration: - raise ValueError("Invalid duration") - else: - delta_duration = timedelta(hours=1) - - # Checks to ensure time is valid and within the scope of the API - if delta_duration > timedelta(days=28): - raise ValueError("Timeout duration cannot be more than 28 days") - if delta_duration < timedelta(seconds=1): - raise ValueError("Timeout duration cannot be less than 1 second") - - # Timeout the user and send messages to both the invocation channel, and the protect log - await user.timeout(delta_duration) - - embed = await self.generate_user_modified_embed( - user, f"muted for {delta_duration}", reason=None - ) - - await ctx.send(embed=embed) - - config = self.bot.guild_configs[str(ctx.guild.id)] - await self.send_alert(config, ctx, "Mute command") - - @commands.has_permissions(moderate_members=True) - @commands.bot_has_permissions(moderate_members=True) - @commands.command( - name="unmute", - brief="Unutes a user", - description="Removes a timeout from the user", - usage="@user", - aliases=["untimeout"], - ) - async def unmute( - self: Self, ctx: commands.Context, user: discord.Member, reason: str = None - ) -> None: - """Method to mute a user in discord using the native timeout. - This should be run via discord - - Args: - ctx (commands.Context): The context that was generated by running this command - user (discord.Member): The discord.Member to have mute be cleared. - reason (str, optional): The reason for the unmute. Defaults to None. - """ - can_execute = await self.can_execute(ctx, user) - if not can_execute: - return - - if user.timed_out_until is None: - await auxiliary.send_deny_embed( - message="That user is not timed out", channel=ctx.channel - ) - return - - await user.timeout(None) - - embed = await self.generate_user_modified_embed(user, "unmuted", reason) - - await ctx.send(embed=embed) - - @commands.has_permissions(manage_messages=True) - @commands.bot_has_permissions(manage_messages=True) - @commands.group( - brief="Executes a purge command", - description="Executes a purge command", - ) - async def purge(self: Self, ctx: commands.Context) -> None: - """The bare .purge command. This does nothing but generate the help message - - Args: - ctx (commands.Context): The context in which the command was run in - """ - await auxiliary.extension_help(self, ctx, self.__module__[9:]) - - @purge.command( - name="amount", - aliases=["x"], - brief="Purges messages by amount", - description="Purges the current channel's messages based on amount", - usage="[amount]", - ) - async def purge_amount(self: Self, ctx: commands.Context, amount: int = 1) -> None: - """Purges the most recent amount+1 messages in the channel the command was run in - - Args: - ctx (commands.Context): The context that called the command - amount (int, optional): The amount of messages to purge. Defaults to 1. - """ - config = self.bot.guild_configs[str(ctx.guild.id)] - - if amount <= 0 or amount > config.extensions.protect.max_purge_amount.value: - amount = config.extensions.protect.max_purge_amount.value - - await ctx.channel.purge(limit=amount + 1) - - await self.send_alert(config, ctx, "Purge command") - - @purge.command( - name="duration", - aliases=["d"], - brief="Purges messages by duration", - description="Purges the current channel's messages up to a time", - usage="[duration (minutes)]", - ) - async def purge_duration( - self: Self, ctx: commands.Context, duration_minutes: int - ) -> None: - """Purges the most recent duration_minutes worth of messages in the - channel the command was run in - - Args: - ctx (commands.Context): The context that called the command - duration_minutes (int): The amount of minutes to purge away - """ - if duration_minutes < 0: - await auxiliary.send_deny_embed( - message="I can't use that input", channel=ctx.channel - ) - return - - timestamp = datetime.datetime.utcnow() - datetime.timedelta( - minutes=duration_minutes - ) - - config = self.bot.guild_configs[str(ctx.guild.id)] - - await ctx.channel.purge( - after=timestamp, limit=config.extensions.protect.max_purge_amount.value - ) - - await self.send_alert(config, ctx, "Purge command") diff --git a/techsupport_bot/commands/purge.py b/techsupport_bot/commands/purge.py new file mode 100644 index 00000000..0a25fb8c --- /dev/null +++ b/techsupport_bot/commands/purge.py @@ -0,0 +1,96 @@ +"""The file that holds the purge command""" + +from __future__ import annotations + +import datetime +from typing import TYPE_CHECKING, Self + +import discord +from core import auxiliary, cogs, extensionconfig, moderation +from discord import app_commands + +if TYPE_CHECKING: + import bot + + +async def setup(bot: bot.TechSupportBot) -> None: + """Adds the cog to the bot. Setups config + + Args: + bot (bot.TechSupportBot): The bot object to register the cog with + """ + config = extensionconfig.ExtensionConfig() + config.add( + key="max_purge_amount", + datatype="int", + title="Max Purge Amount", + description="The max amount of messages allowed to be purged in one command", + default=50, + ) + await bot.add_cog(Purger(bot=bot, extension_name="purge")) + bot.add_extension_config("purge", config) + + +class Purger(cogs.BaseCog): + """The class that holds the /purge command""" + + @app_commands.checks.has_permissions(manage_messages=True) + @app_commands.checks.bot_has_permissions(manage_messages=True) + @app_commands.command( + name="purge", + description="Purge by pure duration of messages", + extras={"module": "purge"}, + ) + async def purge_command( + self: Self, + interaction: discord.Interaction, + amount: int, + duration_minutes: int = None, + ) -> None: + """The core purge command that can purge by either amount or duration + + Args: + interaction (discord.Interaction): The interaction that called this command + amount (int): The max amount of messages to purge + duration_minutes (int, optional): The max age of a message to purge. Defaults to None. + """ + config = self.bot.guild_configs[str(interaction.guild.id)] + + if amount <= 0 or amount > config.extensions.purge.max_purge_amount.value: + embed = auxiliary.prepare_deny_embed( + message=( + "Messages to purge must be between 1 " + f"and {config.extensions.purge.max_purge_amount.value}" + ), + ) + await interaction.response.send_message(embed=embed, ephemeral=True) + return + + if duration_minutes and duration_minutes < 0: + embed = auxiliary.prepare_deny_embed( + message="Message age must be older than 0 minutes", + ) + await interaction.response.send_message(embed=embed, ephemeral=True) + return + + if duration_minutes: + timestamp = datetime.datetime.utcnow() - datetime.timedelta( + minutes=duration_minutes + ) + else: + timestamp = None + + await interaction.response.send_message("Purge Successful", ephemeral=True) + sent_message = await interaction.original_response() + deleted = await interaction.channel.purge(after=timestamp, limit=amount) + await interaction.followup.edit_message( + message_id=sent_message.id, + content=f"Purge Successful. Deleted {len(deleted)} messages.", + ) + + await moderation.send_command_usage_alert( + bot_object=self.bot, + interaction=interaction, + command=f"/purge amount: {amount}, duration: {duration_minutes}", + guild=interaction.guild, + ) diff --git a/techsupport_bot/commands/relay.py b/techsupport_bot/commands/relay.py index 36f21339..1360ff8b 100644 --- a/techsupport_bot/commands/relay.py +++ b/techsupport_bot/commands/relay.py @@ -11,6 +11,7 @@ from bidict import bidict from core import auxiliary, cogs from discord.ext import commands +from functions import automod if TYPE_CHECKING: import bot @@ -411,6 +412,37 @@ async def send_message_from_irc(self: Self, split_message: dict[str, str]) -> No mentions = self.get_mentions( message=split_message["content"], channel=discord_channel ) + + config = self.bot.guild_configs[str(discord_channel.guild.id)] + if "automod" in config.get("enabled_extensions", []): + automod_actions = automod.run_only_string_checks( + config, split_message["content"] + ) + automod_final = automod.process_automod_violations(automod_actions) + if automod_final and automod_final.delete_message: + embed = discord.Embed(title="IRC Automod") + embed.description = ( + f"**Blocked message:** {split_message['content']}\n" + f"**Reason: ** {automod_final.violation_string}\n" + f"**Message sent by:** {split_message['username']} " + f"({split_message['hostmask']})\n" + f"**In channel:** {split_message['channel']}" + ) + embed.color = discord.Color.red() + try: + alert_channel = discord_channel.guild.get_channel( + int(config.extensions.automod.alert_channel.value) + ) + except TypeError: + alert_channel = None + + if not alert_channel: + return + + await alert_channel.send(embed=embed) + + return + mentions_string = auxiliary.construct_mention_string(targets=mentions) embed = self.generate_sent_message_embed(split_message=split_message) diff --git a/techsupport_bot/commands/report.py b/techsupport_bot/commands/report.py new file mode 100644 index 00000000..f3c4730f --- /dev/null +++ b/techsupport_bot/commands/report.py @@ -0,0 +1,128 @@ +"""The report command""" + +from __future__ import annotations + +import datetime +import re +from typing import TYPE_CHECKING, Self + +import discord +from core import auxiliary, cogs, extensionconfig +from discord import app_commands + +if TYPE_CHECKING: + import bot + + +async def setup(bot: bot.TechSupportBot) -> None: + """Adds the cog to the bot. Setups config + + Args: + bot (bot.TechSupportBot): The bot object to register the cog with + """ + config = extensionconfig.ExtensionConfig() + config.add( + key="alert_channel", + datatype="int", + title="Alert channel ID", + description="The ID of the channel to send auto-protect alerts to", + default=None, + ) + await bot.add_cog(Report(bot=bot, extension_name="report")) + bot.add_extension_config("report", config) + + +class Report(cogs.BaseCog): + """The class that holds the report command and helper function""" + + @app_commands.command( + name="report", + description="Reports something to the moderators", + extras={"module": "report"}, + ) + async def report_command( + self: Self, interaction: discord.Interaction, report_str: str + ) -> None: + """This is the core of the /report command + Allows users to report potential moderation issues to staff + + Args: + interaction (discord.Interaction): The interaction that called this command + report_str (str): The report string that the user submitted + """ + if len(report_str) > 2000: + embed = auxiliary.prepare_deny_embed( + "Your report cannot be longer than 2000 characters." + ) + await interaction.response.send_message(embed=embed) + return + + embed = discord.Embed(title="New Report", description=report_str) + embed.color = discord.Color.red() + embed.set_author( + name=interaction.user.name, + icon_url=interaction.user.avatar.url or interaction.user.default_avatar.url, + ) + + embed.add_field( + name="User info", + value=( + f"**Name:** {interaction.user.name} ({interaction.user.mention})\n" + f"**Joined:** \n" + f"**Created:** \n" + f"**Sent from:** {interaction.channel.mention} [Jump to context]" + f"(https://discord.com/channels/{interaction.guild.id}/{interaction.channel.id}/" + f"{discord.utils.time_snowflake(datetime.datetime.utcnow())})" + ), + ) + + mention_pattern = re.compile(r"<@!?(\d+)>") + mentioned_user_ids = mention_pattern.findall(report_str) + + mentioned_users = [] + for user_id in mentioned_user_ids: + user = None + try: + user = await interaction.guild.fetch_member(int(user_id)) + except discord.NotFound: + user = None + if user: + mentioned_users.append(user) + mentioned_users: list[discord.Member] = set(mentioned_users) + + for index, user in enumerate(mentioned_users): + embed.add_field( + name=f"Mentioned user #{index+1}", + value=( + f"**Name:** {user.name} ({user.mention})\n" + f"**Joined:** \n" + f"**Created:** \n" + f"**ID:** {user.id}" + ), + ) + + embed.set_footer(text=f"Author ID: {interaction.user.id}") + embed.timestamp = datetime.datetime.utcnow() + + config = self.bot.guild_configs[str(interaction.guild.id)] + + try: + alert_channel = interaction.guild.get_channel( + int(config.extensions.report.alert_channel.value) + ) + except TypeError: + alert_channel = None + + if not alert_channel: + user_embed = auxiliary.prepare_deny_embed( + message="An error occurred while processing your report. It was not sent." + ) + await interaction.response.send_message(embed=user_embed, ephemeral=True) + return + + await alert_channel.send(embed=embed) + + user_embed = auxiliary.prepare_confirm_embed( + message="Your report was successfully sent" + ) + await interaction.response.send_message(embed=user_embed, ephemeral=True) diff --git a/techsupport_bot/commands/who.py b/techsupport_bot/commands/who.py deleted file mode 100644 index 36a7a14d..00000000 --- a/techsupport_bot/commands/who.py +++ /dev/null @@ -1,486 +0,0 @@ -"""Module for the who extension for the discord bot.""" - -from __future__ import annotations - -import datetime -import io -from typing import TYPE_CHECKING, Self - -import discord -import ui -import yaml -from botlogging import LogContext, LogLevel -from core import auxiliary, cogs, extensionconfig -from discord import app_commands -from discord.ext import commands - -if TYPE_CHECKING: - import bot - - -async def setup(bot: bot.TechSupportBot) -> None: - """Loading the Who plugin into the bot - - Args: - bot (bot.TechSupportBot): The bot object to register the cogs to - """ - - config = extensionconfig.ExtensionConfig() - config.add( - key="note_role", - datatype="str", - title="Note role", - description="The name of the role to be added when a note is added to a user", - default=None, - ) - config.add( - key="note_bypass", - datatype="list", - title="Note bypass list", - description=( - "A list of roles that shouldn't have notes set or the note role assigned" - ), - default=["Moderator"], - ) - config.add( - key="note_readers", - datatype="list", - title="Note Reader Roles", - description="Users with roles in this list will be able to use whois", - default=[], - ) - config.add( - key="note_writers", - datatype="list", - title="Note Writer Roles", - description="Users with roles in this list will be able to create or delete notes", - default=[], - ) - - await bot.add_cog(Who(bot=bot, extension_name="who")) - bot.add_extension_config("who", config) - - -class Who(cogs.BaseCog): - """Class to set up who for the extension. - - Attributes: - notes (app_commands.Group): The group for the /note commands - - """ - - notes: app_commands.Group = app_commands.Group( - name="note", description="Command Group for the Notes Extension" - ) - - @staticmethod - async def is_writer(interaction: discord.Interaction) -> bool: - """Checks whether invoker can write notes. If at least one writer - role is not set, NO members can write notes - - Args: - interaction (discord.Interaction): The interaction in which the whois command occured - - Raises: - MissingAnyRole: Raised if the user is lacking any writer role, - but there are roles defined - AppCommandError: Raised if there are no note_writers set in the config - - Returns: - bool: True if the user can run, False if they cannot - """ - config = interaction.client.guild_configs[str(interaction.guild.id)] - if reader_roles := config.extensions.who.note_writers.value: - roles = ( - discord.utils.get(interaction.guild.roles, name=role) - for role in reader_roles - ) - status = any((role in interaction.user.roles for role in roles)) - if not status: - raise app_commands.MissingAnyRole(reader_roles) - return True - - # Reader_roles are empty (not set) - message = "There aren't any `note_writers` roles set in the config!" - embed = auxiliary.prepare_deny_embed(message=message) - - await interaction.response.send_message(embed=embed, ephemeral=True) - - raise app_commands.AppCommandError(message) - - @staticmethod - async def is_reader(interaction: discord.Interaction) -> bool: - """Checks whether invoker can read notes. If at least one reader - role is not set, NO members can read notes - - Args: - interaction (discord.Interaction): The interaction in which the whois command occured - - Raises: - MissingAnyRole: Raised if the user is lacking any reader role, - but there are roles defined - AppCommandError: Raised if there are no note_readers set in the config - - Returns: - bool: True if the user can run, False if they cannot - """ - - config = interaction.client.guild_configs[str(interaction.guild.id)] - if reader_roles := config.extensions.who.note_readers.value: - roles = ( - discord.utils.get(interaction.guild.roles, name=role) - for role in reader_roles - ) - status = any((role in interaction.user.roles for role in roles)) - if not status: - raise app_commands.MissingAnyRole(reader_roles) - return True - - # Reader_roles are empty (not set) - message = "There aren't any `note_readers` roles set in the config!" - embed = auxiliary.prepare_deny_embed(message=message) - - await interaction.response.send_message(embed=embed, ephemeral=True) - - raise app_commands.AppCommandError(message) - - @app_commands.check(is_reader) - @app_commands.command( - name="whois", - description="Gets Discord user information", - extras={"brief": "Gets user data", "usage": "@user", "module": "who"}, - ) - async def get_note( - self: Self, interaction: discord.Interaction, user: discord.Member - ) -> None: - """This is the base of the /whois command - - Args: - interaction (discord.Interaction): The interaction that called this command - user (discord.Member): The member to lookup. Will not work on discord.User - """ - embed = discord.Embed( - title=f"User info for `{user}`", - description="**Note: this is a bot account!**" if user.bot else "", - ) - - embed.set_thumbnail(url=user.display_avatar.url) - - embed.add_field(name="Created at", value=user.created_at.replace(microsecond=0)) - embed.add_field(name="Joined at", value=user.joined_at.replace(microsecond=0)) - embed.add_field( - name="Status", value=interaction.guild.get_member(user.id).status - ) - embed.add_field(name="Nickname", value=user.display_name) - - role_string = ", ".join(role.name for role in user.roles[1:]) - embed.add_field(name="Roles", value=role_string or "No roles") - - # Adds special information only visible to mods - if interaction.permissions.kick_members: - embed = await self.modify_embed_for_mods(interaction, user, embed) - - user_notes = await self.get_notes(user, interaction.guild) - total_notes = 0 - if user_notes: - total_notes = len(user_notes) - user_notes = user_notes[:3] - embed.set_footer(text=f"{total_notes} total notes") - embed.color = discord.Color.dark_blue() - - for note in user_notes: - author = interaction.guild.get_member(int(note.author_id)) or note.author_id - embed.add_field( - name=f"Note from {author} ({note.updated.date()})", - value=f"*{note.body}*" or "*None*", - inline=False, - ) - - await interaction.response.send_message(embed=embed, ephemeral=True) - - async def modify_embed_for_mods( - self: Self, - interaction: discord.Interaction, - user: discord.Member, - embed: discord.Embed, - ) -> discord.Embed: - """Makes modifications to the whois embed to add mod only information - - Args: - interaction (discord.Interaction): The interaction where the /whois command was called - user (discord.Member): The user being looked up - embed (discord.Embed): The embed already filled with whois information - - Returns: - discord.Embed: The embed with mod only information added - """ - # If the user has warnings, add them - warnings = ( - await self.bot.models.Warning.query.where( - self.bot.models.Warning.user_id == str(user.id) - ) - .where(self.bot.models.Warning.guild_id == str(interaction.guild.id)) - .gino.all() - ) - warning_str = "" - for warning in warnings: - warning_str += f"{warning.reason} - {warning.time.date()}\n" - if warning_str: - embed.add_field( - name="**Warnings**", - value=warning_str, - inline=True, - ) - - # If the user has a pending application, show it - # If the user is banned from making applications, show it - application_cog = interaction.client.get_cog("ApplicationManager") - if application_cog: - has_application = await application_cog.search_for_pending_application(user) - is_banned = await application_cog.get_ban_entry(user) - embed.add_field( - name="Application information:", - value=( - f"Has pending application: {bool(has_application)}\nIs banned from" - f" making applications: {bool(is_banned)}" - ), - inline=True, - ) - return embed - - @app_commands.check(is_writer) - @notes.command( - name="set", - description="Sets a note for a user, which can be read later from their whois", - extras={ - "brief": "Sets a note for a user", - "usage": "@user [note]", - "module": "who", - }, - ) - async def set_note( - self: Self, interaction: discord.Interaction, user: discord.Member, body: str - ) -> None: - """Adds a new note to a user - This is the entrance for the /note set command - - Args: - interaction (discord.Interaction): The interaction that called this command - user (discord.Member): The member to add the note to - body (str): The contents of the note being created - """ - if interaction.user.id == user.id: - embed = auxiliary.prepare_deny_embed( - message="You cannot add a note for yourself" - ) - await interaction.response.send_message(embed=embed, ephemeral=True) - return - - note = self.bot.models.UserNote( - user_id=str(user.id), - guild_id=str(interaction.guild.id), - author_id=str(interaction.user.id), - body=body, - ) - - config = self.bot.guild_configs[str(interaction.guild.id)] - - # Check to make sure notes are allowed to be assigned - for name in config.extensions.who.note_bypass.value: - role_check = discord.utils.get(interaction.guild.roles, name=name) - if not role_check: - continue - if role_check in getattr(user, "roles", []): - embed = auxiliary.prepare_deny_embed( - message=f"You cannot assign notes to `{user}` because " - + f"they have `{role_check}` role", - ) - await interaction.response.send_message(embed=embed, ephemeral=True) - return - - await note.create() - - role = discord.utils.get( - interaction.guild.roles, name=config.extensions.who.note_role.value - ) - - if not role: - embed = auxiliary.prepare_confirm_embed( - message=f"Note created for `{user}`, but no note " - + "role is configured so no role was added", - ) - await interaction.response.send_message(embed=embed, ephemeral=True) - return - - await user.add_roles(role, reason=f"First note was added by {interaction.user}") - - embed = auxiliary.prepare_confirm_embed(message=f"Note created for `{user}`") - await interaction.response.send_message(embed=embed, ephemeral=True) - - @app_commands.check(is_writer) - @notes.command( - name="clear", - description="Clears all existing notes for a user", - extras={ - "brief": "Clears all notes for a user", - "usage": "@user", - "module": "who", - }, - ) - async def clear_notes( - self: Self, interaction: discord.Interaction, user: discord.Member - ) -> None: - """Clears all notes on a given user - This is the entrace for the /note clear command - - Args: - interaction (discord.Interaction): The interaction that called this command - user (discord.Member): The member to remove all notes from - """ - notes = await self.get_notes(user, interaction.guild) - - if not notes: - embed = auxiliary.prepare_deny_embed( - message="There are no notes for that user" - ) - await interaction.response.send_message(embed=embed, ephemeral=True) - return - - await interaction.response.defer(ephemeral=True) - view = ui.Confirm() - - await view.send( - message=f"Are you sure you want to clear {len(notes)} notes?", - channel=interaction.channel, - author=interaction.user, - interaction=interaction, - ephemeral=True, - ) - - await view.wait() - if view.value is ui.ConfirmResponse.TIMEOUT: - return - if view.value is ui.ConfirmResponse.DENIED: - embed = auxiliary.prepare_deny_embed( - message=f"Notes for `{user}` were not cleared" - ) - await view.followup.send(embed=embed, ephemeral=True) - return - - for note in notes: - await note.delete() - - config = self.bot.guild_configs[str(interaction.guild.id)] - role = discord.utils.get( - interaction.guild.roles, name=config.extensions.who.note_role.value - ) - if role: - await user.remove_roles( - role, reason=f"Notes were cleared by {interaction.user}" - ) - - embed = auxiliary.prepare_confirm_embed(message=f"Notes cleared for `{user}`") - await view.followup.send(embed=embed, ephemeral=True) - - @app_commands.check(is_reader) - @notes.command( - name="all", - description="Gets all notes for a user instead of just new ones", - extras={ - "brief": "Gets all notes for a user", - "usage": "@user", - "module": "who", - }, - ) - async def all_notes( - self: Self, interaction: discord.Interaction, user: discord.Member - ) -> None: - """Gets a file containing every note on a user - This is the entrance for the /note all command - - Args: - interaction (discord.Interaction): The interaction that called this command - user (discord.Member): The member to get all notes for - """ - notes = await self.get_notes(user, interaction.guild) - - if not notes: - embed = auxiliary.prepare_deny_embed( - message=f"There are no notes for `{user}`" - ) - await interaction.response.send_message(embed=embed, ephemeral=True) - return - - note_output_data = [] - for note in notes: - author = interaction.guild.get_member(int(note.author_id)) or note.author_id - data = { - "body": note.body, - "from": str(author), - "at": str(note.updated), - } - note_output_data.append(data) - - yaml_file = discord.File( - io.StringIO(yaml.dump({"notes": note_output_data})), - filename=f"notes-for-{user.id}-{datetime.datetime.utcnow()}.yaml", - ) - - await interaction.response.send_message(file=yaml_file, ephemeral=True) - - async def get_notes( - self: Self, user: discord.Member, guild: discord.Guild - ) -> list[bot.models.UserNote]: - """Calls to the database to get a list of note database entries for a given user and guild - - Args: - user (discord.Member): The member to look for notes for - guild (discord.Guild): The guild to fetch the notes from - - Returns: - list[bot.models.UserNote]: The list of notes on the member/guild combo. - Will be an empty list if there are no notes - """ - user_notes = ( - await self.bot.models.UserNote.query.where( - self.bot.models.UserNote.user_id == str(user.id) - ) - .where(self.bot.models.UserNote.guild_id == str(guild.id)) - .order_by(self.bot.models.UserNote.updated.desc()) - .gino.all() - ) - - return user_notes - - # re-adds note role back to joining users - @commands.Cog.listener() - async def on_member_join(self: Self, member: discord.Member) -> None: - """Automatic listener to look at users when they join the guild. - This is to apply the note role back to joining users - - Args: - member (discord.Member): The member who has just joined - """ - config = self.bot.guild_configs[str(member.guild.id)] - if not self.extension_enabled(config): - return - - role = discord.utils.get( - member.guild.roles, name=config.extensions.who.note_role.value - ) - if not role: - return - - user_notes = await self.get_notes(member, member.guild) - if not user_notes: - return - - await member.add_roles(role, reason="Noted user has joined the guild") - - log_channel = config.get("logging_channel") - await self.bot.logger.send_log( - message=f"Found noted user with ID {member.id} joining - re-adding role", - level=LogLevel.INFO, - context=LogContext(guild=member.guild), - channel=log_channel, - ) diff --git a/techsupport_bot/commands/whois.py b/techsupport_bot/commands/whois.py new file mode 100644 index 00000000..26f0008c --- /dev/null +++ b/techsupport_bot/commands/whois.py @@ -0,0 +1,171 @@ +"""Module for the who extension for the discord bot.""" + +from __future__ import annotations + +import datetime +from typing import TYPE_CHECKING, Self + +import discord +import ui +from commands import application, moderator, notes +from core import auxiliary, cogs, moderation +from discord import app_commands + +if TYPE_CHECKING: + import bot + + +async def setup(bot: bot.TechSupportBot) -> None: + """Loading the Who plugin into the bot + + Args: + bot (bot.TechSupportBot): The bot object to register the cogs to + """ + await bot.add_cog(Whois(bot=bot, extension_name="whois")) + + +class Whois(cogs.BaseCog): + """The class for the /whois command""" + + @app_commands.command( + name="whois", + description="Gets Discord user information", + extras={"brief": "Gets user data", "usage": "@user", "module": "whois"}, + ) + async def whois_command( + self: Self, interaction: discord.Interaction, member: discord.Member + ) -> None: + """This is the base of the /whois command + + Args: + interaction (discord.Interaction): The interaction that called this command + member (discord.Member): The member to lookup. Will not work on discord.User + """ + await interaction.response.defer(ephemeral=True) + + embed = auxiliary.generate_basic_embed( + title=f"User info for `{member.display_name}` (`{member.name}`)", + description="**Note: this is a bot account!**" if member.bot else "", + color=discord.Color.dark_blue(), + url=member.display_avatar.url, + ) + + embed.add_field( + name="Created", value=f"" + ) + embed.add_field(name="Joined", value=f"") + embed.add_field( + name="Status", value=interaction.guild.get_member(member.id).status + ) + embed.add_field(name="Nickname", value=member.display_name) + + role_string = ", ".join(role.name for role in member.roles[1:]) + embed.add_field(name="Roles", value=role_string or "No roles") + + config = self.bot.guild_configs[str(interaction.guild.id)] + + if "application" in config.enabled_extensions: + try: + await application.command_permission_check(interaction) + embed = await add_application_info_field(interaction, member, embed) + except (app_commands.MissingAnyRole, app_commands.AppCommandError): + pass + + if interaction.permissions.kick_members: + flags = [] + if member.flags.automod_quarantined_username: + flags.append("Quarantined by Automod") + if not member.flags.completed_onboarding: + flags.append("Has not completed onboarding") + if member.flags.did_rejoin: + flags.append("Has left and rejoined the server") + if member.flags.guest: + flags.append("Is a guest") + if member.public_flags.staff: + flags.append("Is discord staff") + if member.public_flags.spammer: + flags.append("Is a flagged spammer") + if ( + member.is_timed_out + and member.timed_out_until + and member.timed_out_until.astimezone(datetime.timezone.utc) + > datetime.datetime.now((datetime.timezone.utc)) + ): + flags.append( + f"Is timed out until " + ) + + flag_string = "\n - ".join(flag for flag in flags) + if flag_string: + embed.add_field(name="Flags", value=f"- {flag_string}", inline=False) + + embeds = [embed] + + if "notes" in config.enabled_extensions: + try: + await notes.is_reader(interaction) + all_notes = await moderation.get_all_notes( + self.bot, member, interaction.guild + ) + notes_embeds = notes.build_note_embeds( + interaction.guild, member, all_notes + ) + notes_embeds[0].description = ( + f"Showing {min(len(all_notes), 6)}/{len(all_notes)} notes" + ) + embeds.append(notes_embeds[0]) + except (app_commands.MissingAnyRole, app_commands.AppCommandError): + pass + + if ( + "moderator" in config.enabled_extensions + and interaction.permissions.kick_members + ): + all_warnings = await moderation.get_all_warnings( + self.bot, member, interaction.guild + ) + warning_embeds = moderator.build_warning_embeds( + interaction.guild, member, all_warnings + ) + warning_embeds[0].description = ( + f"Showing {min(len(all_warnings), 6)}/{len(all_warnings)} warnings" + ) + embeds.append(warning_embeds[0]) + + view = ui.PaginateView() + await view.send( + interaction.channel, interaction.user, embeds, interaction, True + ) + return + + +async def add_application_info_field( + interaction: discord.Interaction, + user: discord.Member, + embed: discord.Embed, +) -> discord.Embed: + """Makes modifications to the whois embed to add mod only information + + Args: + interaction (discord.Interaction): The interaction where the /whois command was called + user (discord.Member): The user being looked up + embed (discord.Embed): The embed already filled with whois information + + Returns: + discord.Embed: The embed with mod only information added + """ + # If the user has a pending application, show it + # If the user is banned from making applications, show it + application_cog = interaction.client.get_cog("ApplicationManager") + if application_cog: + has_application = await application_cog.search_for_pending_application(user) + is_banned = await application_cog.get_ban_entry(user) + embed.add_field( + name="Application information:", + value=( + f"Has pending application: {bool(has_application)}\nIs banned from" + f" making applications: {bool(is_banned)}" + ), + inline=True, + ) + return embed diff --git a/techsupport_bot/core/__init__.py b/techsupport_bot/core/__init__.py index c34c5c7c..22073028 100644 --- a/techsupport_bot/core/__init__.py +++ b/techsupport_bot/core/__init__.py @@ -5,3 +5,4 @@ from .custom_errors import * from .databases import * from .http import * +from .moderation import * diff --git a/techsupport_bot/core/databases.py b/techsupport_bot/core/databases.py index 9caf9564..7f5be522 100644 --- a/techsupport_bot/core/databases.py +++ b/techsupport_bot/core/databases.py @@ -63,6 +63,29 @@ class ApplicationBans(bot.db.Model): guild_id: str = bot.db.Column(bot.db.String) applicant_id: str = bot.db.Column(bot.db.String) + class BanLog(bot.db.Model): + """The postgres table for banlogs + Currently used in modlog.py + + Attributes: + __tablename__ (str): The name of the table in postgres + pk (int): The automatic primary key + guild_id (str): The string of the guild ID the user was banned in + reason (str): The reason of the ban + banning_moderator (str): The ID of the moderator who banned + banned_member (str): The ID of the user who was banned + ban_time (datetime): The date and time of the ban + """ + + __tablename__ = "banlog" + + pk = bot.db.Column(bot.db.Integer, primary_key=True, autoincrement=True) + guild_id = bot.db.Column(bot.db.String) + reason = bot.db.Column(bot.db.String) + banning_moderator = bot.db.Column(bot.db.String) + banned_member = bot.db.Column(bot.db.String) + ban_time = bot.db.Column(bot.db.DateTime, default=datetime.datetime.utcnow) + class DuckUser(bot.db.Model): """The postgres table for ducks Currently used in duck.py @@ -228,22 +251,22 @@ class Warning(bot.db.Model): Currently used in protect.py and who.py Attributes: + __tablename__ (str): The name of the table in postgres pk (int): The primary key for the database user_id (str): The user who got warned guild_id (str): The guild this warn occured in reason (str): The reason for the warn - time (datetime.datetime): The time the warning was given + time (datetime): The time the warning was given + invoker_id (str): The moderator who made the warning """ __tablename__ = "warnings" - - pk: int = bot.db.Column(bot.db.Integer, primary_key=True) - user_id: str = bot.db.Column(bot.db.String) - guild_id: str = bot.db.Column(bot.db.String) - reason: str = bot.db.Column(bot.db.String) - time: datetime.datetime = bot.db.Column( - bot.db.DateTime, default=datetime.datetime.utcnow - ) + pk = bot.db.Column(bot.db.Integer, primary_key=True) + user_id = bot.db.Column(bot.db.String) + guild_id = bot.db.Column(bot.db.String) + reason = bot.db.Column(bot.db.String) + time = bot.db.Column(bot.db.DateTime, default=datetime.datetime.utcnow) + invoker_id = bot.db.Column(bot.db.String) class Config(bot.db.Model): """The postgres table for guild config @@ -343,6 +366,7 @@ class Votes(bot.db.Model): bot.models.Applications = Applications bot.models.AppBans = ApplicationBans + bot.models.BanLog = BanLog bot.models.DuckUser = DuckUser bot.models.Factoid = Factoid bot.models.FactoidJob = FactoidJob diff --git a/techsupport_bot/core/moderation.py b/techsupport_bot/core/moderation.py new file mode 100644 index 00000000..c87859e7 --- /dev/null +++ b/techsupport_bot/core/moderation.py @@ -0,0 +1,281 @@ +"""This file will hold the core moderation functions. These functions will: +Do the proper moderative action and return true if successful, false if not.""" + +import datetime + +import discord +import munch + + +async def ban_user( + guild: discord.Guild, user: discord.User, delete_seconds: int, reason: str +) -> bool: + """A very simple function that bans a given user from the passed guild + + Args: + guild (discord.Guild): The guild to ban from + user (discord.User): The user who needs to be banned + delete_seconds (int): The numbers of seconds of past messages to delete + reason (str): The reason for banning + + Returns: + bool: True if ban was successful + """ + # Ban the user + await guild.ban( + user, + reason=reason, + delete_message_seconds=delete_seconds, + ) + return True + + +async def unban_user(guild: discord.Guild, user: discord.User, reason: str) -> bool: + """A very simple functon that unbans a given user from the passed guild + + Args: + guild (discord.Guild): The guild to unban from + user (discord.User): The user to unban + reason (str): The reason they are being unbanned + + Returns: + bool: True if unban was successful + """ + # Attempt to unban. If the user isn't found, return false + try: + await guild.unban(user, reason=reason) + return True + except discord.NotFound: + return False + + +async def kick_user(guild: discord.Guild, user: discord.Member, reason: str) -> bool: + """A very simple function that kicks a given user from the guild + + Args: + guild (discord.Guild): The guild to kick from + user (discord.Member): The member to kick from the guild + reason (str): The reason they are being kicked + + Returns: + bool: True if kick was successful + """ + await guild.kick(user, reason=reason) + return True + + +async def mute_user( + user: discord.Member, reason: str, duration: datetime.timedelta +) -> bool: + """Times out a given user + + Args: + user (discord.Member): The user to timeout + reason (str): The reason they are being timed out + duration (datetime.timedelta): How long to timeout the user for + + Returns: + bool: True if the timeout was successful + """ + try: + await user.timeout(duration, reason=reason) + except discord.Forbidden: + return False + return True + + +async def unmute_user(user: discord.Member, reason: str) -> bool: + """Untimes out a given user. + + Args: + user (discord.Member): The user to untimeout + reason (str): The reason they are being untimeout + + Returns: + bool: True if untimeout was successful + """ + if not user.timed_out_until: + return False + await user.timeout(None, reason=reason) + return True + + +async def warn_user( + bot_object: object, + user: discord.Member, + invoker: discord.Member, + reason: str, +) -> bool: + """Warns a user. Does NOT check config or how many warnings a user has + + Args: + bot_object (object): The bot object to use + user (discord.Member): The user to warn + invoker (discord.Member): The person who warned the user + reason (str): The reason for the warning + + Returns: + bool: True if warning was successful + """ + await bot_object.models.Warning( + user_id=str(user.id), + guild_id=str(invoker.guild.id), + reason=reason, + invoker_id=str(invoker.id), + ).create() + return True + + +async def unwarn_user(bot_object: object, user: discord.Member, warning: str) -> bool: + """Removes a specific warning from a user by string + + Args: + bot_object (object): The bot object to use + user (discord.Member): The member to remove a warning from + warning (str): The warning to remove + + Returns: + bool: True if unwarning was successful + """ + query = ( + bot_object.models.Warning.query.where( + bot_object.models.Warning.guild_id == str(user.guild.id) + ) + .where(bot_object.models.Warning.reason == warning) + .where(bot_object.models.Warning.user_id == str(user.id)) + ) + entry = await query.gino.first() + if not entry: + return False + await entry.delete() + return True + + +async def get_all_warnings( + bot_object: object, user: discord.User, guild: discord.Guild +) -> list[munch.Munch]: + """Gets a list of all warnings for a specific user in a specific guild + + Args: + bot_object (object): The bot object to use + user (discord.User): The user that we want warns from + guild (discord.Guild): The guild that we want warns from + + Returns: + list[munch.Munch]: The list of all warnings for the user/guild, if any exist + """ + warnings = ( + await bot_object.models.Warning.query.where( + bot_object.models.Warning.user_id == str(user.id) + ) + .where(bot_object.models.Warning.guild_id == str(guild.id)) + .order_by(bot_object.models.Warning.time.desc()) + .gino.all() + ) + return warnings + + +async def get_all_notes( + bot: object, user: discord.Member, guild: discord.Guild +) -> list[munch.Munch]: + """Calls to the database to get a list of note database entries for a given user and guild + + Args: + bot (object): The TS bot object to use for the database lookup + user (discord.Member): The member to look for notes for + guild (discord.Guild): The guild to fetch the notes from + + Returns: + list[munch.Munch]: The list of notes on the member/guild combo. + Will be an empty list if there are no notes + """ + user_notes = ( + await bot.models.UserNote.query.where( + bot.models.UserNote.user_id == str(user.id) + ) + .where(bot.models.UserNote.guild_id == str(guild.id)) + .order_by(bot.models.UserNote.updated.desc()) + .gino.all() + ) + + return user_notes + + +async def send_command_usage_alert( + bot_object: object, + interaction: discord.Interaction, + command: str, + guild: discord.Guild, + target: discord.Member = None, +) -> None: + """Sends a usage alert to the protect events channel, if configured + + Args: + bot_object (object): The bot object to use + interaction (discord.Interaction): The interaction that trigger the command + command (str): The string representation of the command that was run + guild (discord.Guild): The guild the command was run in + target (discord.Member): The target of the command + """ + + ALERT_ICON_URL: str = ( + "https://www.iconarchive.com/download/i76061/martz90/circle-addon2/warning.512.png" + ) + + config = bot_object.guild_configs[str(guild.id)] + + try: + alert_channel = guild.get_channel(int(config.moderation.alert_channel)) + except TypeError: + alert_channel = None + + if not alert_channel: + return + + embed = discord.Embed(title="Command Usage Alert") + + embed.description = f"**Command**\n`{command}`" + embed.add_field( + name="Channel", + value=f"{interaction.channel.name} ({interaction.channel.mention}) [Jump to context]" + f"(https://discord.com/channels/{interaction.guild.id}/{interaction.channel.id}/" + f"{discord.utils.time_snowflake(datetime.datetime.utcnow())})", + ) + + embed.add_field( + name="Invoking User", + value=( + f"{interaction.user.display_name} ({interaction.user.mention}, {interaction.user.id})" + ), + ) + + if target: + embed.add_field( + name="Target", + value=f"{target.display_name} ({target.mention}, {target.id})", + ) + + embed.set_thumbnail(url=ALERT_ICON_URL) + embed.color = discord.Color.red() + embed.timestamp = datetime.datetime.utcnow() + + await alert_channel.send(embed=embed) + + +async def check_if_user_banned(user: discord.User, guild: discord.Guild) -> bool: + """Queries the given guild to find if the given discord.User is banned or not + + Args: + user (discord.User): The user to search for being banned + guild (discord.Guild): The guild to search the bans for + + Returns: + bool: Whether the user is banned or not + """ + + try: + await guild.fetch_ban(user) + except discord.NotFound: + return False + + return True diff --git a/techsupport_bot/functions/__init__.py b/techsupport_bot/functions/__init__.py index 26928a16..df256aa0 100644 --- a/techsupport_bot/functions/__init__.py +++ b/techsupport_bot/functions/__init__.py @@ -1,3 +1,4 @@ """Functions are commandless cogs""" +from .automod import * from .nickname import * diff --git a/techsupport_bot/functions/automod.py b/techsupport_bot/functions/automod.py new file mode 100644 index 00000000..84c24370 --- /dev/null +++ b/techsupport_bot/functions/automod.py @@ -0,0 +1,616 @@ +"""Handles the automod checks""" + +from __future__ import annotations + +import datetime +import re +from dataclasses import dataclass +from typing import TYPE_CHECKING, Self + +import discord +import munch +from botlogging import LogContext, LogLevel +from commands import moderator, modlog +from core import cogs, extensionconfig, moderation +from discord.ext import commands + +if TYPE_CHECKING: + import bot + + +async def setup(bot: bot.TechSupportBot) -> None: + """Adds the cog to the bot. Setups config + + Args: + bot (bot.TechSupportBot): The bot object to register the cog with + """ + config = extensionconfig.ExtensionConfig() + config.add( + key="channels", + datatype="list", + title="Protected channels", + description=( + "The list of channel ID's associated with the channels to auto-protect" + ), + default=[], + ) + config.add( + key="bypass_roles", + datatype="list", + title="Bypassed role names", + description=( + "The list of role names associated with bypassed roles by the auto-protect" + ), + default=[], + ) + config.add( + key="string_map", + datatype="dict", + title="Keyword string map", + description=( + "Mapping of keyword strings to data defining the action taken by" + " auto-protect" + ), + default={}, + ) + config.add( + key="banned_file_extensions", + datatype="dict", + title="List of banned file types", + description=( + "A list of all file extensions to be blocked and have a auto warning issued" + ), + default=[], + ) + config.add( + key="alert_channel", + datatype="int", + title="Alert channel ID", + description="The ID of the channel to send auto-protect alerts to", + default=None, + ) + config.add( + key="max_mentions", + datatype="int", + title="Max message mentions", + description=( + "Max number of mentions allowed in a message before triggering auto-protect" + ), + default=3, + ) + await bot.add_cog(AutoMod(bot=bot, extension_name="automod")) + bot.add_extension_config("automod", config) + + +@dataclass +class AutoModPunishment: + """This is a base class holding the violation and recommended actions + Since automod is a framework, the actions can translate to different things + + Attributes: + violation_str (str): The string of the policy broken. Should be displayed to user + recommend_delete (bool): If the policy recommends deletion of the message + recommend_warn (bool): If the policy recommends warning the user + recommend_mute (int): If the policy recommends muting the user. + If so, the amount of seconds to mute for. + is_silent (bool, optional): If the punishment should be silent. Defaults to False + score (int): The weighted score for sorting punishments + + """ + + violation_str: str + recommend_delete: bool + recommend_warn: bool + recommend_mute: int + is_silent: bool = False + + @property + def score(self: Self) -> int: + """A score so that the AutoModPunishment object is sortable + This sorts based on actions recommended to be taken + + Returns: + int: The score + """ + score = 0 + if self.recommend_mute: + score += 4 + if self.recommend_warn: + score += 2 + if self.recommend_delete: + score += 1 + return score + + +@dataclass +class AutoModAction: + """The final summarized action for this automod violation + + Attributes: + warn (bool): Whether the user should be warned + delete_message (bool): Whether the message should be deleted + mute (bool): Whether the user should be muted + mute_duration (int): How many seconds to mute the user for + be_silent (bool): If the actions should be taken silently + action_string (str): The string of & separated actions taken + violation_string (str): The most severe punishment to be used as a reason + total_punishments (str): All the punishment reasons + violations_list (list[AutoModPunishment]): The list of original AutoModPunishment items + + """ + + warn: bool + delete_message: bool + mute: bool + mute_duration: int + be_silent: bool + action_string: str + violation_string: str + total_punishments: str + violations_list: list[AutoModPunishment] + + +class AutoMod(cogs.MatchCog): + """Holds all of the discord message specific automod functions + Most of the automod is a class function""" + + async def match( + self: Self, config: munch.Munch, ctx: commands.Context, content: str + ) -> bool: + """Checks to see if a message should be considered for automod violations + + Args: + config (munch.Munch): The config of the guild to check + ctx (commands.Context): The context of the original message + content (str): The string representation of the message + + Returns: + bool: Whether the message should be inspected for automod violations + """ + if not str(ctx.channel.id) in config.extensions.automod.channels.value: + await self.bot.logger.send_log( + message="Channel not in automod channels - ignoring automod check", + level=LogLevel.DEBUG, + context=LogContext(guild=ctx.guild, channel=ctx.channel), + ) + return False + + role_names = [role.name.lower() for role in getattr(ctx.author, "roles", [])] + + if any( + role_name.lower() in role_names + for role_name in config.extensions.automod.bypass_roles.value + ): + return False + + return True + + async def response( + self: Self, + config: munch.Munch, + ctx: commands.Context, + content: str, + result: bool, + ) -> None: + """Handles a discord automod violation + + Args: + config (munch.Munch): The config of the guild where the message was sent + ctx (commands.Context): The context the message was sent in + content (str): The string content of the message + result (bool): What the match() function returned + """ + + # If user outranks bot, do nothing + if ctx.message.author.top_role >= ctx.channel.guild.me.top_role: + return + + all_punishments = run_all_checks(config, ctx.message) + + if len(all_punishments) == 0: + return + + total_punishment = process_automod_violations(all_punishments=all_punishments) + + if total_punishment.mute > 0: + await moderation.mute_user( + user=ctx.author, + reason=total_punishment.violation_string, + duration=datetime.timedelta(seconds=total_punishment.mute_duration), + ) + + if total_punishment.delete_message: + await ctx.message.delete() + + if total_punishment.warn: + count_of_warnings = ( + len(await moderation.get_all_warnings(self.bot, ctx.author, ctx.guild)) + + 1 + ) + total_punishment.violation_string += ( + f" ({count_of_warnings} total warnings)" + ) + await moderation.warn_user( + self.bot, + ctx.author, + ctx.channel.guild.me, + total_punishment.violation_string, + ) + if count_of_warnings >= config.moderation.max_warnings: + ban_embed = moderator.generate_response_embed( + ctx.author, + "ban", + reason=( + f"Over max warning count {count_of_warnings} out of" + f" {config.moderation.max_warnings} (final warning:" + f" {total_punishment.violation_string}) - banned by automod" + ), + ) + if not total_punishment.be_silent: + await ctx.send(content=ctx.author.mention, embed=ban_embed) + try: + await ctx.author.send(embed=ban_embed) + except discord.Forbidden: + await self.bot.logger.send_log( + message=f"Could not DM {ctx.author} about being banned", + level=LogLevel.WARNING, + context=LogContext(guild=ctx.guild, channel=ctx.channel), + ) + + await moderation.ban_user( + ctx.guild, + ctx.author, + delete_seconds=( + config.extensions.moderator.ban_delete_duration.value * 86400 + ), + reason=total_punishment.violation_string, + ) + await modlog.log_ban( + self.bot, + ctx.author, + ctx.guild.me, + ctx.guild, + total_punishment.violation_string, + ) + + if total_punishment.be_silent: + return + + embed = moderator.generate_response_embed( + ctx.author, + total_punishment.action_string, + total_punishment.violation_string, + ) + + await ctx.send(content=ctx.author.mention, embed=embed) + try: + await ctx.author.send(embed=embed) + except discord.Forbidden: + await self.bot.logger.send_log( + message=f"Could not DM {ctx.author} about being automodded", + level=LogLevel.WARNING, + context=LogContext(guild=ctx.guild, channel=ctx.channel), + ) + + alert_channel_embed = generate_automod_alert_embed( + ctx, total_punishment.total_punishments, total_punishment.action_string + ) + + config = self.bot.guild_configs[str(ctx.guild.id)] + + try: + alert_channel = ctx.guild.get_channel( + int(config.extensions.automod.alert_channel.value) + ) + except TypeError: + alert_channel = None + + if not alert_channel: + return + + await alert_channel.send(embed=alert_channel_embed) + + @commands.Cog.listener() + async def on_raw_message_edit( + self: Self, payload: discord.RawMessageUpdateEvent + ) -> None: + """This is called when any message is edited in any guild the bot is in. + There is no guarantee that the message exists or is used + + Args: + payload (discord.RawMessageUpdateEvent): The raw event that the edit generated + """ + guild = self.bot.get_guild(payload.guild_id) + if not guild: + return + + config = self.bot.guild_configs[str(guild.id)] + if not self.extension_enabled(config): + return + + channel = self.bot.get_channel(payload.channel_id) + if not channel: + return + + message = await channel.fetch_message(payload.message_id) + if not message: + return + + # Don't trigger if content hasn't changed + if payload.cached_message and payload.cached_message.content == message.content: + return + + ctx = await self.bot.get_context(message) + matched = await self.match(config, ctx, message.content) + if not matched: + return + + await self.response(config, ctx, message.content, matched) + + +def process_automod_violations( + all_punishments: list[AutoModPunishment], +) -> AutoModAction: + """This processes a list of potentially many AutoModPunishments into a single + recommended action + + Args: + all_punishments (list[AutoModPunishment]): The list of punishments that should be taken + + Returns: + AutoModAction: The final summarized action that is recommended to be taken + """ + if len(all_punishments) == 0: + return None + + should_delete = False + should_warn = False + mute_duration = 0 + + silent = True + + sorted_punishments = sorted(all_punishments, key=lambda p: p.score, reverse=True) + for punishment in sorted_punishments: + should_delete = should_delete or punishment.recommend_delete + should_warn = should_warn or punishment.recommend_warn + mute_duration = max(mute_duration, punishment.recommend_mute) + + if not punishment.is_silent: + silent = False + + actions = [] + + reason_str = sorted_punishments[0].violation_str + + if mute_duration > 0: + actions.append("mute") + + if should_delete: + actions.append("delete") + + if should_warn: + actions.append("warn") + + if len(actions) == 0: + actions.append("notice") + + actions_str = " & ".join(actions) + + all_alerts_str = "\n".join(violation.violation_str for violation in all_punishments) + + final_action = AutoModAction( + warn=should_warn, + delete_message=should_delete, + mute=mute_duration > 0, + mute_duration=mute_duration, + be_silent=silent, + action_string=actions_str, + violation_string=reason_str, + total_punishments=all_alerts_str, + violations_list=all_punishments, + ) + + return final_action + + +def generate_automod_alert_embed( + ctx: commands.Context, violations: str, action_taken: str +) -> discord.Embed: + """Generates an alert embed for the automod rules that are broken + + Args: + ctx (commands.Context): The context of the message that violated the automod + violations (str): The string form of ALL automod violations the user triggered + action_taken (str): The text based action taken against the user + + Returns: + discord.Embed: The formatted embed ready to be sent to discord + """ + + ALERT_ICON_URL: str = ( + "https://www.iconarchive.com/download/i76061/martz90/circle-addon2/warning.512.png" + ) + + embed = discord.Embed( + title="Automod Violations", + description=violations, + ) + embed.add_field(name="Actions Taken", value=action_taken) + embed.add_field(name="Channel", value=f"{ctx.channel.mention} ({ctx.channel.name})") + embed.add_field( + name="User", value=f"{ctx.author.mention} ({ctx.author.name}, {ctx.author.id})" + ) + embed.add_field(name="Message", value=ctx.message.content[:1024], inline=False) + embed.add_field(name="URL", value=ctx.message.jump_url, inline=False) + + embed.set_thumbnail(url=ALERT_ICON_URL) + embed.color = discord.Color.red() + embed.timestamp = datetime.datetime.utcnow() + + return embed + + +# Automod will only ever be a framework to say something needs to be done +# Outside of running from the response function, NO ACTION will be taken +# All checks will return a list of AutoModPunishment, which may be nothing + + +def run_all_checks( + config: munch.Munch, message: discord.Message +) -> list[AutoModPunishment]: + """This runs all 4 checks on a given discord.Message + handle_file_extensions + handle_mentions + handle_exact_string + handle_regex_string + + Args: + config (munch.Munch): The guild config to check with + message (discord.Message): The message object to use to search + + Returns: + list[AutoModPunishment]: The automod violations that the given message violated + """ + all_violations = ( + run_only_string_checks(config, message.clean_content) + + handle_file_extensions(config, message.attachments) + + handle_mentions(config, message) + ) + return all_violations + + +def run_only_string_checks( + config: munch.Munch, content: str +) -> list[AutoModPunishment]: + """This runs the plaintext string texts and returns the combined list of violations + handle_exact_string + handle_regex_string + + Args: + config (munch.Munch): The guild config to check with + content (str): The content of the message to search + + Returns: + list[AutoModPunishment]: The automod violations that the given message violated + """ + all_violations = handle_exact_string(config, content) + handle_regex_string( + config, content + ) + return all_violations + + +def handle_file_extensions( + config: munch.Munch, attachments: list[discord.Attachment] +) -> list[AutoModPunishment]: + """This checks a list of attachments for attachments that violate the automod rules + + Args: + config (munch.Munch): The guild config to check with + attachments (list[discord.Attachment]): The list of attachments to search + + Returns: + list[AutoModPunishment]: The automod violations that the given message violated + """ + violations = [] + for attachment in attachments: + if ( + attachment.filename.split(".")[-1] + in config.extensions.automod.banned_file_extensions.value + ): + violations.append( + AutoModPunishment( + f"{attachment.filename} has a suspicious file extension", + recommend_delete=True, + recommend_warn=True, + recommend_mute=0, + ) + ) + return violations + + +def handle_mentions( + config: munch.Munch, message: discord.Message +) -> list[AutoModPunishment]: + """This checks a given discord message to make sure it doesn't violate the mentions maximum + + Args: + config (munch.Munch): The guild config to check with + message (discord.Message): The message to check for mentions with + + Returns: + list[AutoModPunishment]: The automod violations that the given message violated + """ + if len(message.mentions) > config.extensions.automod.max_mentions.value: + return [ + AutoModPunishment( + "Mass Mentions", + recommend_delete=True, + recommend_warn=True, + recommend_mute=0, + ) + ] + return [] + + +def handle_exact_string(config: munch.Munch, content: str) -> list[AutoModPunishment]: + """This checks the configued automod exact string blocks + If the content matches the string, it's added to a list + + Args: + config (munch.Munch): The guild config to check with + content (str): The content of the message to search + + Returns: + list[AutoModPunishment]: The automod violations that the given message violated + """ + violations = [] + for ( + keyword, + filter_config, + ) in config.extensions.automod.string_map.value.items(): + if keyword.lower() in content.lower(): + violations.append( + AutoModPunishment( + filter_config.message, + filter_config.delete, + filter_config.warn, + filter_config.mute, + filter_config.silent_punishment, + ) + ) + return violations + + +def handle_regex_string(config: munch.Munch, content: str) -> list[AutoModPunishment]: + """This checks the configued automod regex blocks + If the content matches the regex, it's added to a list + + Args: + config (munch.Munch): The guild config to check with + content (str): The content of the message to search + + Returns: + list[AutoModPunishment]: The automod violations that the given message violated + """ + violations = [] + for ( + _, + filter_config, + ) in config.extensions.automod.string_map.value.items(): + regex = filter_config.get("regex") + if regex: + try: + match = re.search(regex, content) + except re.error: + match = None + if match: + violations.append( + AutoModPunishment( + filter_config.message, + filter_config.delete, + filter_config.warn, + filter_config.mute, + filter_config.silent_punishment, + ) + ) + return violations diff --git a/techsupport_bot/functions/paste.py b/techsupport_bot/functions/paste.py new file mode 100644 index 00000000..8f9b4e94 --- /dev/null +++ b/techsupport_bot/functions/paste.py @@ -0,0 +1,279 @@ +"""The file that holds the paste function""" + +from __future__ import annotations + +import io +from typing import TYPE_CHECKING, Self + +import discord +import munch +from botlogging import LogContext, LogLevel +from core import cogs, extensionconfig +from discord.ext import commands +from functions import automod + +if TYPE_CHECKING: + import bot + + +async def setup(bot: bot.TechSupportBot) -> None: + """Adds the cog to the bot. Setups config + + Args: + bot (bot.TechSupportBot): The bot object to register the cog with + """ + config = extensionconfig.ExtensionConfig() + config.add( + key="channels", + datatype="list", + title="Protected channels", + description=( + "The list of channel ID's associated with the channels to auto-protect" + ), + default=[], + ) + config.add( + key="bypass_roles", + datatype="list", + title="Bypassed role names", + description=( + "The list of role names associated with bypassed roles by the auto-protect" + ), + default=[], + ) + config.add( + key="length_limit", + datatype="int", + title="Max length limit", + description=( + "The max char limit on messages before they trigger an action by" + " auto-protect" + ), + default=500, + ) + config.add( + key="paste_footer_message", + datatype="str", + title="The linx embed footer", + description="The message used on the footer of the large message paste URL", + default="Note: Long messages are automatically pasted", + ) + await bot.add_cog(Paster(bot=bot, extension_name="paste")) + bot.add_extension_config("paste", config) + + +class Paster(cogs.MatchCog): + """The pasting module""" + + async def match( + self: Self, config: munch.Munch, ctx: commands.Context, content: str + ) -> bool: + """Checks to see if a message should be considered for a paste + + Args: + config (munch.Munch): The config of the guild to check + ctx (commands.Context): The context of the original message + content (str): The string representation of the message + + Returns: + bool: Whether the message should be inspected for a paste + """ + # exit the match based on exclusion parameters + if not str(ctx.channel.id) in config.extensions.paste.channels.value: + await self.bot.logger.send_log( + message="Channel not in protected channels - ignoring protect check", + level=LogLevel.DEBUG, + context=LogContext(guild=ctx.guild, channel=ctx.channel), + ) + return False + + role_names = [role.name.lower() for role in getattr(ctx.author, "roles", [])] + + if any( + role_name.lower() in role_names + for role_name in config.extensions.paste.bypass_roles.value + ): + return False + + return True + + async def response( + self: Self, + config: munch.Munch, + ctx: commands.Context, + content: str, + result: bool, + ) -> None: + """Handles a paste check + + Args: + config (munch.Munch): The config of the guild where the message was sent + ctx (commands.Context): The context the message was sent in + content (str): The string content of the message + result (bool): What the match() function returned + """ + if len(content) > config.extensions.paste.length_limit.value or content.count( + "\n" + ) > self.max_newlines(config.extensions.paste.length_limit.value): + if "automod" in config.get("enabled_extensions", []): + automod_actions = automod.run_all_checks(config, ctx.message) + automod_final = automod.process_automod_violations(automod_actions) + if automod_final and automod_final.delete_message: + return + await self.paste_message(config, ctx, content) + + def max_newlines(self: Self, max_length: int) -> int: + """Gets a theoretical maximum number of new lines in a given message + + Args: + max_length (int): The max length of characters per theoretical line + + Returns: + int: The maximum number of new lines based on config + """ + return int(max_length / 80) + 1 + + @commands.Cog.listener() + async def on_raw_message_edit( + self: Self, payload: discord.RawMessageUpdateEvent + ) -> None: + """This is called when any message is edited in any guild the bot is in. + There is no guarantee that the message exists or is used + + Args: + payload (discord.RawMessageUpdateEvent): The raw event that the edit generated + """ + guild = self.bot.get_guild(payload.guild_id) + if not guild: + return + + config = self.bot.guild_configs[str(guild.id)] + if not self.extension_enabled(config): + return + + channel = self.bot.get_channel(payload.channel_id) + if not channel: + return + + message = await channel.fetch_message(payload.message_id) + if not message: + return + + # Don't trigger if content hasn't changed + if payload.cached_message and payload.cached_message.content == message.content: + return + + ctx = await self.bot.get_context(message) + matched = await self.match(config, ctx, message.content) + if not matched: + return + + await self.response(config, ctx, message.content, None) + + async def paste_message( + self: Self, config: munch.Munch, ctx: commands.Context, content: str + ) -> None: + """Moves message into a linx paste if it's too long + + Args: + config (munch.Munch): The guild config where the too long message was sent + ctx (commands.Context): The context where the original message was sent + content (str): The string content of the flagged message + """ + log_channel = config.get("logging_channel") + if not self.bot.file_config.api.api_url.linx: + await self.bot.logger.send_log( + message=( + f"Would have pasted message {ctx.message.id}" + " but no linx url has been configured." + ), + level=LogLevel.WARNING, + channel=log_channel, + context=LogContext(guild=ctx.guild, channel=ctx.channel), + ) + return + + linx_embed = await self.create_linx_embed(config, ctx, content) + + if not linx_embed: + await self.bot.logger.send_log( + message=( + f"Would have pasted message {ctx.message.id}" + " but uploading the file to linx failed." + ), + level=LogLevel.WARNING, + channel=log_channel, + context=LogContext(guild=ctx.guild, channel=ctx.channel), + ) + return + + attachments: list[discord.File] = [] + if ctx.message.attachments: + total_attachment_size = 0 + for attch in ctx.message.attachments: + if ( + total_attachment_size := total_attachment_size + attch.size + ) <= ctx.filesize_limit: + attachments.append(await attch.to_file()) + if (lf := len(ctx.message.attachments) - len(attachments)) != 0: + await self.bot.logger.send_log( + message=( + f"Protect did not reupload {lf} file(s) due to file size limit." + ), + level=LogLevel.WARNING, + channel=log_channel, + context=LogContext(guild=ctx.guild, channel=ctx.channel), + ) + + message = await ctx.send( + ctx.message.author.mention, embed=linx_embed, files=attachments[:10] + ) + + if message: + await ctx.message.delete() + + async def create_linx_embed( + self: Self, config: munch.Munch, ctx: commands.Context, content: str + ) -> discord.Embed | None: + """This function sends a message to the linx url and puts the result in + an embed to be sent to the user + + Args: + config (munch.Munch): The guild config where the message was sent + ctx (commands.Context): The context that generated the need for a paste + content (str): The context of the message to be pasted + + Returns: + discord.Embed | None: The formatted embed, or None if there was an API error + """ + if not content: + return None + + headers = { + "Linx-Expiry": "1800", + "Linx-Randomize": "yes", + "Accept": "application/json", + } + html_file = {"file": io.StringIO(content)} + response = await self.bot.http_functions.http_call( + "post", + self.bot.file_config.api.api_url.linx, + headers=headers, + data=html_file, + ) + + url = response.get("url") + if not url: + return None + + embed = discord.Embed(description=url) + + embed.add_field(name="Paste Link", value=url) + embed.description = content[0:100].replace("\n", " ") + embed.set_author( + name=f"Paste by {ctx.author}", icon_url=ctx.author.display_avatar.url + ) + embed.set_footer(text=config.extensions.paste.paste_footer_message.value) + embed.color = discord.Color.blue() + + return embed diff --git a/techsupport_bot/ui/pagination.py b/techsupport_bot/ui/pagination.py index 874290f7..7ad384db 100644 --- a/techsupport_bot/ui/pagination.py +++ b/techsupport_bot/ui/pagination.py @@ -38,6 +38,7 @@ async def send( author: discord.Member, data: list[str | discord.Embed], interaction: discord.Interaction | None = None, + ephemeral: bool = False, ) -> None: """Entry point for PaginateView @@ -48,6 +49,7 @@ async def send( with [0] being the first page interaction (discord.Interaction | None): The interaction this should followup with (Optional) + ephemeral (bool): Whether the response should be ephemeral (optional) """ self.author = author self.data = data @@ -57,7 +59,7 @@ async def send( if interaction: self.followup = interaction.followup - self.message = await self.followup.send(view=self) + self.message = await self.followup.send(view=self, ephemeral=ephemeral) else: self.message = await channel.send(view=self)