diff --git a/.github/workflows/build-deploy.yml b/.github/workflows/build-deploy.yml index f38686426d..929f016e35 100644 --- a/.github/workflows/build-deploy.yml +++ b/.github/workflows/build-deploy.yml @@ -8,74 +8,40 @@ on: required: true type: string - jobs: - build: - name: Build & Push - runs-on: ubuntu-latest - - steps: - - name: Checkout code - uses: actions/checkout@v3 - - # The current version (v2) of Docker's build-push action uses - # buildx, which comes with BuildKit features that help us speed - # up our builds using additional cache features. Buildx also - # has a lot of other features that are not as relevant to us. - # - # See https://github.com/docker/build-push-action - - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 - - - name: Login to Github Container Registry - uses: docker/login-action@v2 - with: - registry: ghcr.io - username: ${{ github.repository_owner }} - password: ${{ secrets.GITHUB_TOKEN }} - - # Build and push the container to the GitHub Container - # Repository. The container will be tagged as "latest" - # and with the short SHA of the commit. - - - name: Build and push - uses: docker/build-push-action@v4 - with: - context: . - file: ./Dockerfile - push: ${{ github.ref == 'refs/heads/main' }} - cache-from: type=registry,ref=ghcr.io/python-discord/bot:latest - cache-to: type=inline - tags: | - ghcr.io/python-discord/bot:latest - ghcr.io/python-discord/bot:${{ inputs.sha-tag }} - build-args: | - git_sha=${{ github.sha }} - deploy: name: Deploy - needs: build runs-on: ubuntu-latest if: ${{ github.ref == 'refs/heads/main' }} environment: production steps: - - name: Checkout Kubernetes repository - uses: actions/checkout@v3 - with: - repository: python-discord/kubernetes - - - uses: azure/setup-kubectl@v3 - - - name: Authenticate with Kubernetes - uses: azure/k8s-set-context@v3 - with: - method: kubeconfig - kubeconfig: ${{ secrets.KUBECONFIG }} - - - name: Deploy to Kubernetes - uses: azure/k8s-deploy@v4 - with: - manifests: | - namespaces/default/bot/deployment.yaml - images: 'ghcr.io/python-discord/bot:${{ inputs.sha-tag }}' + - name: Checkout code + uses: actions/checkout@v3 + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements.txt + + - name: Run tests + run: pytest + + - name: Checkout Kubernetes repository + uses: actions/checkout@v3 + with: + repository: python-discord/kubernetes + + - uses: azure/setup-kubectl@v3 + + - name: Authenticate with Kubernetes + uses: azure/k8s-set-context@v3 + with: + method: kubeconfig + kubeconfig: ${{ secrets.KUBECONFIG }} + + - name: Deploy to Kubernetes + uses: azure/k8s-deploy@v4 + with: + manifests: | + namespaces/default/bot/deployment.yaml + images: 'ghcr.io/python-discord/bot:${{ inputs.sha-tag }}' diff --git a/bot/__main__.py b/bot/__main__.py index ddb6ed6082..3a559c89e2 100644 --- a/bot/__main__.py +++ b/bot/__main__.py @@ -36,13 +36,7 @@ async def _create_redis_session() -> RedisSession: async def main() -> None: """Entry async method for starting the bot.""" - statsd_url = constants.Stats.statsd_host - if constants.DEBUG_MODE: - # Since statsd is UDP, there are no errors for sending to a down port. - # For this reason, setting the statsd host to 127.0.0.1 for development - # will effectively disable stats. - statsd_url = LOCALHOST - + statsd_url = LOCALHOST if constants.DEBUG_MODE else constants.Stats.statsd_host allowed_roles = list({discord.Object(id_) for id_ in constants.MODERATION_ROLES}) intents = discord.Intents.all() intents.presences = False diff --git a/bot/converters.py b/bot/converters.py index 3c8ea44d7d..0321a9909f 100644 --- a/bot/converters.py +++ b/bot/converters.py @@ -52,8 +52,7 @@ class ValidDiscordServerInvite(Converter): async def convert(self, ctx: Context, server_invite: str) -> dict: """Check whether the string is a valid Discord server invite.""" - invite_code = DISCORD_INVITE.match(server_invite) - if invite_code: + if invite_code := DISCORD_INVITE.match(server_invite): response = await ctx.bot.http_session.get( f"{URLs.discord_invite_api}/{invite_code.group('invite')}" ) @@ -78,7 +77,7 @@ class Extension(Converter): async def convert(self, ctx: Context, argument: str) -> str: """Fully qualify the name of an extension and ensure it exists.""" # Special values to reload all extensions - if argument == "*" or argument == "**": + if argument in {"*", "**"}: return argument argument = argument.lower() @@ -89,11 +88,11 @@ async def convert(self, ctx: Context, argument: str) -> str: if (qualified_arg := f"{exts.__name__}.{argument}") in bot_instance.all_extensions: return qualified_arg - matches = [] - for ext in bot_instance.all_extensions: - if argument == unqualify(ext): - matches.append(ext) - + matches = [ + ext + for ext in bot_instance.all_extensions + if argument == unqualify(ext) + ] if len(matches) > 1: matches.sort() names = "\n".join(matches) @@ -229,12 +228,10 @@ async def convert(ctx: Context, argument: str) -> SourceType: if argument.lower() == "help": return ctx.bot.help_command - cog = ctx.bot.get_cog(argument) - if cog: + if cog := ctx.bot.get_cog(argument): return cog - cmd = ctx.bot.get_command(argument) - if cmd: + if cmd := ctx.bot.get_command(argument): return cmd tags_cog = ctx.bot.get_cog("Tags") @@ -386,11 +383,7 @@ async def convert(self, ctx: Context, datetime_string: str) -> datetime: except ValueError: raise BadArgument(f"`{datetime_string}` is not a valid ISO-8601 datetime string") - if dt.tzinfo: - dt = dt.astimezone(UTC) - else: # Without a timezone, assume it represents UTC. - dt = dt.replace(tzinfo=UTC) - + dt = dt.astimezone(UTC) if dt.tzinfo else dt.replace(tzinfo=UTC) return dt @@ -477,7 +470,7 @@ class Infraction(Converter): async def convert(self, ctx: Context, arg: str) -> dict | None: """Attempts to convert `arg` into an infraction `dict`.""" - if arg in ("l", "last", "recent"): + if arg in {"l", "last", "recent"}: params = { "actor__id": ctx.author.id, "ordering": "-inserted_at" diff --git a/bot/exts/backend/branding/_cog.py b/bot/exts/backend/branding/_cog.py index 5d194ec3ed..e0f5851d3e 100644 --- a/bot/exts/backend/branding/_cog.py +++ b/bot/exts/backend/branding/_cog.py @@ -69,10 +69,7 @@ def extract_event_duration(event: Event) -> str: start_date = event.meta.start_date.strftime(fmt) end_date = event.meta.end_date.strftime(fmt) - if start_date == end_date: - return start_date - - return f"{start_date} - {end_date}" + return start_date if start_date == end_date else f"{start_date} - {end_date}" def extract_event_name(event: Event) -> str: @@ -525,13 +522,14 @@ async def branding_sync_cmd(self, ctx: commands.Context) -> None: async with ctx.typing(): banner_success, icon_success = await self.synchronise() - failed_assets = ", ".join( + if failed_assets := ", ".join( name - for name, status in [("banner", banner_success), ("icon", icon_success)] + for name, status in [ + ("banner", banner_success), + ("icon", icon_success), + ] if status is False - ) - - if failed_assets: + ): resp = make_embed("Synchronisation unsuccessful", f"Failed to apply: {failed_assets}.", success=False) resp.set_footer(text="Check log for details.") else: diff --git a/bot/exts/backend/branding/_repository.py b/bot/exts/backend/branding/_repository.py index db2061faac..40c6613c75 100644 --- a/bot/exts/backend/branding/_repository.py +++ b/bot/exts/backend/branding/_repository.py @@ -43,8 +43,7 @@ class RemoteObject: def __init__(self, dictionary: dict[str, t.Any]) -> None: """Initialize by grabbing annotated attributes from `dictionary`.""" - missing_keys = self.__annotations__.keys() - dictionary.keys() - if missing_keys: + if missing_keys := self.__annotations__.keys() - dictionary.keys(): raise KeyError(f"Fetched object lacks expected keys: {missing_keys}") for annotation in self.__annotations__: setattr(self, annotation, dictionary[annotation]) @@ -163,9 +162,10 @@ async def construct_event(self, directory: RemoteObject) -> Event: """ contents = await self.fetch_directory(directory.path) - missing_assets = {"meta.md", "server_icons", "banners"} - contents.keys() - - if missing_assets: + if ( + missing_assets := {"meta.md", "server_icons", "banners"} + - contents.keys() + ): raise BrandingMisconfigurationError(f"Directory is missing following assets: {missing_assets}") server_icons = await self.fetch_directory(contents["server_icons"].path, types=("file",)) diff --git a/bot/exts/backend/config_verifier.py b/bot/exts/backend/config_verifier.py index 84ae5ca92f..62a9b8a3b4 100644 --- a/bot/exts/backend/config_verifier.py +++ b/bot/exts/backend/config_verifier.py @@ -23,12 +23,11 @@ async def cog_load(self) -> None: server = self.bot.get_guild(constants.Guild.id) server_channel_ids = {channel.id for channel in server.channels} - invalid_channels = [ - (channel_name, channel_id) for channel_name, channel_id in constants.Channels + if invalid_channels := [ + (channel_name, channel_id) + for channel_name, channel_id in constants.Channels if channel_id not in server_channel_ids - ] - - if invalid_channels: + ]: log.warning(f"Configured channels do not exist in server: {invalid_channels}.") diff --git a/bot/exts/backend/error_handler.py b/bot/exts/backend/error_handler.py index 0c89389184..a5f6dd2a80 100644 --- a/bot/exts/backend/error_handler.py +++ b/bot/exts/backend/error_handler.py @@ -186,7 +186,7 @@ async def try_get_tag(self, ctx: Context) -> None: if await tags_get_command(ctx, maybe_tag_name): return - if not any(role.id in MODERATION_ROLES for role in ctx.author.roles): + if all(role.id not in MODERATION_ROLES for role in ctx.author.roles): await self.send_command_suggestion(ctx, maybe_tag_name) except Exception as err: log.debug("Error while attempting to invoke tag fallback.") @@ -206,7 +206,7 @@ async def try_run_fixed_codeblock(self, ctx: Context) -> bool: msg = copy.copy(ctx.message) command, sep, end = msg.content.partition("```") - msg.content = command + " " + sep + end + msg.content = f"{command} {sep}{end}" new_ctx = await self.bot.get_context(msg) if new_ctx.command is None: diff --git a/bot/exts/backend/sync/_syncers.py b/bot/exts/backend/sync/_syncers.py index cd7f5040d2..ce84b60bfc 100644 --- a/bot/exts/backend/sync/_syncers.py +++ b/bot/exts/backend/sync/_syncers.py @@ -54,10 +54,7 @@ async def sync(cls, guild: Guild, ctx: Context | None = None) -> None: """ log.info(f"Starting {cls.name} syncer.") - if ctx: - message = await ctx.send(f"📊 Synchronising {cls.name}s.") - else: - message = None + message = await ctx.send(f"📊 Synchronising {cls.name}s.") if ctx else None diff = await cls._get_diff(guild) try: diff --git a/bot/exts/filtering/_filter_lists/antispam.py b/bot/exts/filtering/_filter_lists/antispam.py index f27412e1a4..791bd33687 100644 --- a/bot/exts/filtering/_filter_lists/antispam.py +++ b/bot/exts/filtering/_filter_lists/antispam.py @@ -42,7 +42,7 @@ class AntispamList(UniquesListBase): def __init__(self, filtering_cog: "Filtering"): super().__init__(filtering_cog) - self.message_deletion_queue: dict[Member, DeletionContext] = dict() + self.message_deletion_queue: dict[Member, DeletionContext] = {} def get_filter_type(self, content: str) -> type[UniqueFilter] | None: """Get a subclass of filter matching the filter list and the filter's content.""" diff --git a/bot/exts/filtering/_filter_lists/filter_list.py b/bot/exts/filtering/_filter_lists/filter_list.py index e5b6b2a65d..8a8d397383 100644 --- a/bot/exts/filtering/_filter_lists/filter_list.py +++ b/bot/exts/filtering/_filter_lists/filter_list.py @@ -95,19 +95,18 @@ async def _create_filter_list_result( relevant_filters = [] for filter_ in filters: - if not filter_.validations: - if default_answer and await filter_.triggered_on(ctx): - relevant_filters.append(filter_) - else: + if filter_.validations: passed, failed = filter_.validations.evaluate(ctx) if not failed and failed_by_default < passed: if await filter_.triggered_on(ctx): relevant_filters.append(filter_) + elif default_answer and await filter_.triggered_on(ctx): + relevant_filters.append(filter_) if ctx.event == Event.MESSAGE_EDIT and ctx.message and self.list_type == ListType.DENY: - previously_triggered = ctx.message_cache.get_message_metadata(ctx.message.id) - # The message might not be cached. - if previously_triggered: + if previously_triggered := ctx.message_cache.get_message_metadata( + ctx.message.id + ): ignore_filters = previously_triggered[self] # This updates the cache. Some filters are ignored, but they're necessary if there's another edit. previously_triggered[self] = relevant_filters @@ -120,8 +119,8 @@ def default(self, setting_name: str) -> Any: value = self.defaults.actions.get_setting(setting_name, missing) if value is missing: value = self.defaults.validations.get_setting(setting_name, missing) - if value is missing: - raise ValueError(f"Couldn't find a setting named {setting_name!r}.") + if value is missing: + raise ValueError(f"Couldn't find a setting named {setting_name!r}.") return value def merge_actions(self, filters: list[Filter]) -> ActionSettings | None: @@ -144,14 +143,12 @@ def merge_actions(self, filters: list[Filter]) -> ActionSettings | None: @staticmethod def format_messages(triggers: list[Filter], *, expand_single_filter: bool = True) -> list[str]: """Convert the filters into strings that can be added to the alert embed.""" - if len(triggers) == 1 and expand_single_filter: - message = f"#{triggers[0].id} (`{triggers[0].content}`)" - if triggers[0].description: - message += f" - {triggers[0].description}" - messages = [message] - else: - messages = [f"{filter_.id} (`{filter_.content}`)" for filter_ in triggers] - return messages + if len(triggers) != 1 or not expand_single_filter: + return [f"{filter_.id} (`{filter_.content}`)" for filter_ in triggers] + message = f"#{triggers[0].id} (`{triggers[0].content}`)" + if triggers[0].description: + message += f" - {triggers[0].description}" + return [message] def __hash__(self): return hash(id(self)) @@ -177,8 +174,7 @@ def add_list(self, list_data: dict) -> AtomicList: filters = {} for filter_data in list_data["filters"]: - new_filter = self._create_filter(filter_data, defaults) - if new_filter: + if new_filter := self._create_filter(filter_data, defaults): filters[filter_data["id"]] = new_filter self[list_type] = AtomicList( @@ -218,8 +214,7 @@ def _create_filter(self, filter_data: dict, defaults: Defaults) -> T | None: """Create a filter from the given data.""" try: content = filter_data["content"] - filter_type = self.get_filter_type(content) - if filter_type: + if filter_type := self.get_filter_type(content): return filter_type(filter_data, defaults) if content not in self._already_warned: log.warning(f"A filter named {content} was supplied, but no matching implementation found.") @@ -292,8 +287,7 @@ def add_list(self, list_data: dict) -> SubscribingAtomicList: filters = {} events = set() for filter_data in list_data["filters"]: - new_filter = self._create_filter(filter_data, defaults) - if new_filter: + if new_filter := self._create_filter(filter_data, defaults): new_list.subscribe(new_filter, *new_filter.events) filters[filter_data["id"]] = new_filter self.loaded_types[new_filter.name] = type(new_filter) diff --git a/bot/exts/filtering/_filter_lists/invite.py b/bot/exts/filtering/_filter_lists/invite.py index cfc22c56ee..b12159edef 100644 --- a/bot/exts/filtering/_filter_lists/invite.py +++ b/bot/exts/filtering/_filter_lists/invite.py @@ -85,8 +85,8 @@ async def actions_for( check_if_allowed = not failed # Sort the invites into two categories: - invites_for_inspection = dict() # Found guild invites requiring further inspection. - unknown_invites = dict() # Either don't resolve or group DMs. + invites_for_inspection = {} + unknown_invites = {} for invite_code in refined_invites.values(): try: invite = await bot.instance.fetch_invite(invite_code) @@ -131,9 +131,7 @@ async def actions_for( if not triggered and not unknown_invites: return None, [], all_triggers - actions = None - if unknown_invites: # There are invites which weren't allowed but aren't explicitly blocked. - actions = self[ListType.ALLOW].defaults.actions + actions = self[ListType.ALLOW].defaults.actions if unknown_invites else None # Blocked invites come second so that their actions have preference. if triggered: if actions: diff --git a/bot/exts/filtering/_filter_lists/token.py b/bot/exts/filtering/_filter_lists/token.py index 186715c9fc..5eb2d036d1 100644 --- a/bot/exts/filtering/_filter_lists/token.py +++ b/bot/exts/filtering/_filter_lists/token.py @@ -69,6 +69,4 @@ async def actions_for( def _expand_spoilers(text: str) -> str: """Return a string containing all interpretations of a spoilered message.""" split_text = SPOILER_RE.split(text) - return "".join( - split_text[0::2] + split_text[1::2] + split_text - ) + return "".join(split_text[::2] + split_text[1::2] + split_text) diff --git a/bot/exts/filtering/_filters/antispam/links.py b/bot/exts/filtering/_filters/antispam/links.py index 0a2a98fc84..e7c2580f12 100644 --- a/bot/exts/filtering/_filters/antispam/links.py +++ b/bot/exts/filtering/_filters/antispam/links.py @@ -40,8 +40,7 @@ async def triggered_on(self, ctx: FilterContext) -> bool: total_links = 0 messages_with_links = 0 for msg in detected_messages: - total_matches = len(LINK_RE.findall(msg.content)) - if total_matches: + if total_matches := len(LINK_RE.findall(msg.content)): messages_with_links += 1 total_links += total_matches diff --git a/bot/exts/filtering/_filters/domain.py b/bot/exts/filtering/_filters/domain.py index c3f7f28865..35019dd2df 100644 --- a/bot/exts/filtering/_filters/domain.py +++ b/bot/exts/filtering/_filters/domain.py @@ -41,8 +41,8 @@ async def triggered_on(self, ctx: FilterContext) -> bool: for found_url in ctx.content: extract = tldextract.extract(found_url) if self.content.lower() in found_url and extract.registered_domain == domain: - if self.extra_fields.only_subdomains: - if not extract.subdomain and not urlparse(f"https://{found_url}").path: + if not extract.subdomain and not urlparse(f"https://{found_url}").path: + if self.extra_fields.only_subdomains: return False ctx.matches.append(found_url) ctx.notification_domain = self.content diff --git a/bot/exts/filtering/_filters/filter.py b/bot/exts/filtering/_filters/filter.py index 3f201cfde4..5174c9f3e3 100644 --- a/bot/exts/filtering/_filters/filter.py +++ b/bot/exts/filtering/_filters/filter.py @@ -38,9 +38,7 @@ def __init__(self, filter_data: dict, defaults: Defaults | None = None): @property def overrides(self) -> tuple[dict[str, Any], dict[str, Any]]: """Return a tuple of setting overrides and filter setting overrides.""" - settings = {} - if self.actions: - settings = self.actions.overrides + settings = self.actions.overrides if self.actions else {} if self.validations: settings |= self.validations.overrides diff --git a/bot/exts/filtering/_filters/token.py b/bot/exts/filtering/_filters/token.py index 3cd9b909d1..5f5aa76b0c 100644 --- a/bot/exts/filtering/_filters/token.py +++ b/bot/exts/filtering/_filters/token.py @@ -15,8 +15,7 @@ async def triggered_on(self, ctx: FilterContext) -> bool: """Searches for a regex pattern within a given context.""" pattern = self.content - match = re.search(pattern, ctx.content, flags=re.IGNORECASE) - if match: + if match := re.search(pattern, ctx.content, flags=re.IGNORECASE): ctx.matches.append(match[0]) return True return False diff --git a/bot/exts/filtering/_filters/unique/discord_token.py b/bot/exts/filtering/_filters/unique/discord_token.py index 1745fa86cc..db4dd264c9 100644 --- a/bot/exts/filtering/_filters/unique/discord_token.py +++ b/bot/exts/filtering/_filters/unique/discord_token.py @@ -166,11 +166,7 @@ def extract_user_id(b64_content: str) -> int | None: try: decoded_bytes = base64.urlsafe_b64decode(b64_content) string = decoded_bytes.decode("utf-8") - if not (string.isascii() and string.isdigit()): - # This case triggers if there are fancy unicode digits in the base64 encoding, - # that means it's not a valid user id. - return None - return int(string) + return None if not string.isascii() or not string.isdigit() else int(string) except ValueError: return None diff --git a/bot/exts/filtering/_filters/unique/webhook.py b/bot/exts/filtering/_filters/unique/webhook.py index 4e1e2e44df..ee46244bc6 100644 --- a/bot/exts/filtering/_filters/unique/webhook.py +++ b/bot/exts/filtering/_filters/unique/webhook.py @@ -44,7 +44,7 @@ async def triggered_on(self, ctx: FilterContext) -> bool: # Queue the webhook for deletion. ctx.additional_actions.append(self._delete_webhook_wrapper(match[0], extra)) # Don't show the full webhook in places such as the mod alert. - ctx.content = ctx.content.replace(match[0], match[1] + "xxx") + ctx.content = ctx.content.replace(match[0], f"{match[1]}xxx") return True @@ -56,8 +56,8 @@ async def _delete_webhook(ctx: FilterContext) -> None: async with bot.instance.http_session.delete(webhook_url) as resp: # The Discord API Returns a 204 NO CONTENT response on success. if resp.status == 204: - ctx.action_descriptions.append("webhook deleted" + extra_message) + ctx.action_descriptions.append(f"webhook deleted{extra_message}") else: - ctx.action_descriptions.append("failed to delete webhook" + extra_message) + ctx.action_descriptions.append(f"failed to delete webhook{extra_message}") return _delete_webhook diff --git a/bot/exts/filtering/_settings.py b/bot/exts/filtering/_settings.py index 7005dd2d1b..4b77659cb6 100644 --- a/bot/exts/filtering/_settings.py +++ b/bot/exts/filtering/_settings.py @@ -89,10 +89,9 @@ def __init__(self, settings_data: dict, *, defaults: Settings | None = None, kee else: try: entry_defaults = None if defaults is None else defaults[entry_name] - new_entry = entry_cls.create( + if new_entry := entry_cls.create( entry_data, defaults=entry_defaults, keep_empty=keep_empty - ) - if new_entry: + ): self[entry_name] = new_entry except TypeError as e: raise TypeError( @@ -110,10 +109,14 @@ def copy(self: TSettings) -> TSettings: def get_setting(self, key: str, default: Any | None = None) -> Any: """Get the setting matching the key, or fall back to the default value if the key is missing.""" - for entry in self.values(): - if hasattr(entry, key): - return getattr(entry, key) - return default + return next( + ( + getattr(entry, key) + for entry in self.values() + if hasattr(entry, key) + ), + default, + ) @classmethod def create( @@ -128,10 +131,7 @@ def create( settings = cls(settings_data, defaults=defaults, keep_empty=keep_empty) # If an entry doesn't hold any values, its `create` method will return None. # If all entries are None, then the settings object holds no values. - if not keep_empty and not any(settings.values()): - return None - - return settings + return None if not keep_empty and not any(settings.values()) else settings class ValidationSettings(Settings[ValidationEntry]): @@ -177,13 +177,12 @@ def __init__(self, settings_data: dict, *, defaults: Settings | None = None, kee def union(self, other: Self) -> Self: """Combine the entries of two collections of settings into a new ActionsSettings.""" - actions = {} - # A settings object doesn't necessarily have all types of entries (e.g in the case of filter overrides). - for entry in self: - if entry in other: - actions[entry] = self[entry].union(other[entry]) - else: - actions[entry] = self[entry] + actions = { + entry: self[entry].union(other[entry]) + if entry in other + else self[entry] + for entry in self + } for entry in other: if entry not in actions: actions[entry] = other[entry] diff --git a/bot/exts/filtering/_settings_types/actions/infraction_and_notification.py b/bot/exts/filtering/_settings_types/actions/infraction_and_notification.py index 359aa7bc34..67b5d9a97a 100644 --- a/bot/exts/filtering/_settings_types/actions/infraction_and_notification.py +++ b/bot/exts/filtering/_settings_types/actions/infraction_and_notification.py @@ -50,7 +50,7 @@ def process_value(cls, v: str | relativedelta) -> relativedelta: if not (delta := parse_duration_string(v)): raise ValueError(f"`{v}` is not a valid duration string.") else: - delta = relativedelta(seconds=float(v)).normalized() + delta = relativedelta(seconds=v).normalized() return delta @@ -172,11 +172,14 @@ async def send_message(self, ctx: FilterContext) -> None: if dm_content or dm_embed: formatting = {"domain": ctx.notification_domain} dm_content = f"Hey {ctx.author.mention}!\n{dm_content.format(**formatting)}" - if dm_embed: - dm_embed = Embed(description=dm_embed.format(**formatting), colour=Colour.og_blurple()) - else: - dm_embed = None - + dm_embed = ( + Embed( + description=dm_embed.format(**formatting), + colour=Colour.og_blurple(), + ) + if dm_embed + else None + ) try: await ctx.author.send(dm_content, embed=dm_embed) ctx.action_descriptions.append("notified") diff --git a/bot/exts/filtering/_settings_types/actions/ping.py b/bot/exts/filtering/_settings_types/actions/ping.py index 4b38a19f34..078c5b6421 100644 --- a/bot/exts/filtering/_settings_types/actions/ping.py +++ b/bot/exts/filtering/_settings_types/actions/ping.py @@ -29,9 +29,7 @@ class Ping(ActionEntry): @classmethod def init_sequence_if_none(cls, pings: list[str] | None) -> list[str]: """Initialize an empty sequence if the value is None.""" - if pings is None: - return [] - return pings + return [] if pings is None else pings async def action(self, ctx: FilterContext) -> None: """Add the stored pings to the alert message content.""" diff --git a/bot/exts/filtering/_settings_types/actions/remove_context.py b/bot/exts/filtering/_settings_types/actions/remove_context.py index b833978fa8..18be5df13f 100644 --- a/bot/exts/filtering/_settings_types/actions/remove_context.py +++ b/bot/exts/filtering/_settings_types/actions/remove_context.py @@ -67,7 +67,7 @@ async def _handle_messages(ctx: FilterContext) -> None: channel_messages[message.channel].add(message) success = fail = 0 - deleted = list() + deleted = [] for channel, messages in channel_messages.items(): try: await channel.delete_messages(messages) diff --git a/bot/exts/filtering/_settings_types/validations/filter_dm.py b/bot/exts/filtering/_settings_types/validations/filter_dm.py index 9961984d69..5890a51fe2 100644 --- a/bot/exts/filtering/_settings_types/validations/filter_dm.py +++ b/bot/exts/filtering/_settings_types/validations/filter_dm.py @@ -14,7 +14,4 @@ class FilterDM(ValidationEntry): def triggers_on(self, ctx: FilterContext) -> bool: """Return whether the filter should be triggered even if it was triggered in DMs.""" - if not ctx.channel: # No channel - out of scope for this setting. - return True - - return ctx.channel.guild is not None or self.filter_dm + return ctx.channel.guild is not None or self.filter_dm if ctx.channel else True diff --git a/bot/exts/filtering/_ui/filter.py b/bot/exts/filtering/_ui/filter.py index d15a3cacb8..4248b0592e 100644 --- a/bot/exts/filtering/_ui/filter.py +++ b/bot/exts/filtering/_ui/filter.py @@ -288,7 +288,9 @@ async def update_embed( if description and description is not self._REMOVE: self.embed.description += f" - {description}" if len(self.embed.description) > MAX_EMBED_DESCRIPTION: - self.embed.description = self.embed.description[:MAX_EMBED_DESCRIPTION - 5] + "[...]" + self.embed.description = ( + f"{self.embed.description[:MAX_EMBED_DESCRIPTION - 5]}[...]" + ) if setting_name: # Find the right dictionary to update. @@ -300,15 +302,18 @@ async def update_embed( dict_to_edit = self.settings_overrides default_value = self.filter_list[self.list_type].default(setting_name) # Update the setting override value or remove it - if setting_value is not self._REMOVE: - if not repr_equals(setting_value, default_value): - dict_to_edit[setting_name] = setting_value - # If there's already an override, remove it, since the new value is the same as the default. - elif setting_name in dict_to_edit: - dict_to_edit.pop(setting_name) - elif setting_name in dict_to_edit: + if setting_value is not self._REMOVE and not repr_equals( + setting_value, default_value + ): + dict_to_edit[setting_name] = setting_value + elif ( + setting_value is not self._REMOVE + and repr_equals(setting_value, default_value) + and setting_name in dict_to_edit + or setting_value is self._REMOVE + and setting_name in dict_to_edit + ): dict_to_edit.pop(setting_name) - # This is inefficient, but otherwise the selects go insane if the user attempts to edit the same setting # multiple times, even when replacing the select with a new one. self.embed.clear_fields() @@ -396,11 +401,8 @@ def description_and_settings_converter( if not SINGLE_SETTING_PATTERN.match(parsed[0]): description, *parsed = parsed - settings = {setting: value for setting, value in [part.split("=", maxsplit=1) for part in parsed]} # noqa: C416 - template = None - if "--template" in settings: - template = settings.pop("--template") - + settings = dict([part.split("=", maxsplit=1) for part in parsed]) + template = settings.pop("--template") if "--template" in settings else None filter_settings = {} for setting, _ in list(settings.items()): if setting in loaded_settings: # It's a filter list setting diff --git a/bot/exts/filtering/_ui/filter_list.py b/bot/exts/filtering/_ui/filter_list.py index a06e8a71e9..e4c3677cfc 100644 --- a/bot/exts/filtering/_ui/filter_list.py +++ b/bot/exts/filtering/_ui/filter_list.py @@ -31,7 +31,7 @@ def settings_converter(loaded_settings: dict, input_data: str) -> dict[str, Any] return {} try: - settings = {setting: value for setting, value in [part.split("=", maxsplit=1) for part in parsed]} # noqa: C416 + settings = dict([part.split("=", maxsplit=1) for part in parsed]) except ValueError: raise BadArgument("The settings provided are not in the correct format.") diff --git a/bot/exts/filtering/_ui/search.py b/bot/exts/filtering/_ui/search.py index d26fd99295..078f78020b 100644 --- a/bot/exts/filtering/_ui/search.py +++ b/bot/exts/filtering/_ui/search.py @@ -39,14 +39,11 @@ def search_criteria_converter( return {}, {}, filter_type try: - settings = {setting: value for setting, value in [part.split("=", maxsplit=1) for part in parsed]} # noqa: C416 + settings = dict([part.split("=", maxsplit=1) for part in parsed]) except ValueError: raise BadArgument("The settings provided are not in the correct format.") - template = None - if "--template" in settings: - template = settings.pop("--template") - + template = settings.pop("--template") if "--template" in settings else None filter_settings = {} for setting, _ in list(settings.items()): if setting in loaded_settings: # It's a filter list setting diff --git a/bot/exts/filtering/_ui/ui.py b/bot/exts/filtering/_ui/ui.py index 7026c7d05c..d08699e6be 100644 --- a/bot/exts/filtering/_ui/ui.py +++ b/bot/exts/filtering/_ui/ui.py @@ -77,7 +77,7 @@ async def _build_alert_message_content(ctx: FilterContext, current_message_lengt log_site_msg = f"The full message can be found [here]({url})" # 7 because that's the length of "[...]\n\n" return alert_content[:remaining_chars - (7 + len(log_site_msg))] + "[...]\n\n" + log_site_msg - return alert_content[:remaining_chars - 5] + "[...]" + return f"{alert_content[:remaining_chars - 5]}[...]" return alert_content @@ -98,10 +98,11 @@ async def build_mod_alert(ctx: FilterContext, triggered_filters: dict[FilterList triggered_by += "\n" triggered_in = "" - filters = [] - for filter_list, list_message in triggered_filters.items(): - if list_message: - filters.append(f"**{filter_list.name.title()} Filters:** {', '.join(list_message)}") + filters = [ + f"**{filter_list.name.title()} Filters:** {', '.join(list_message)}" + for filter_list, list_message in triggered_filters.items() + if list_message + ] filters = "\n".join(filters) matches = "**Matches:** " + escape_markdown(", ".join(repr(match) for match in ctx.matches)) if ctx.matches else "" @@ -130,7 +131,7 @@ def populate_embed_from_dict(embed: Embed, data: dict) -> None: else: value = str(value) if value not in ("", None) else "-" if len(value) > MAX_FIELD_SIZE: - value = value[:MAX_FIELD_SIZE] + " [...]" + value = f"{value[:MAX_FIELD_SIZE]} [...]" embed.add_field(name=setting, value=value, inline=len(value) < MAX_INLINE_SIZE) @@ -145,10 +146,7 @@ def parse_value(value: str, type_: type[T]) -> T: return list(value.split(",")) if type_ is bool: return value.lower() == "true" or value == "1" - if isinstance(type_, EnumMeta): - return type_[value.upper()] - - return type_(value) + return type_[value.upper()] if isinstance(type_, EnumMeta) else type_(value) def format_response_error(e: ResponseCodeError) -> Embed: @@ -165,12 +163,13 @@ def format_response_error(e: ResponseCodeError) -> Embed: description = description.strip() if len(description) > MAX_EMBED_DESCRIPTION: - description = description[:MAX_EMBED_DESCRIPTION] + "[...]" + description = f"{description[:MAX_EMBED_DESCRIPTION]}[...]" if not description: description = "Something unexpected happened, check the logs." - embed = Embed(colour=discord.Colour.red(), title="Oops...", description=description) - return embed + return Embed( + colour=discord.Colour.red(), title="Oops...", description=description + ) class ArgumentCompletionSelect(discord.ui.Select): @@ -389,7 +388,9 @@ async def apply_addition(self, interaction: Interaction, item: str) -> None: async def apply_edit(self, interaction: Interaction, new_list: str) -> None: """Change the contents of the list.""" - self.stored_value = list(set(part.strip() for part in new_list.split(",") if part.strip())) + self.stored_value = list( + {part.strip() for part in new_list.split(",") if part.strip()} + ) await interaction.response.edit_message( content=f"Current list: [{', '.join(self.stored_value)}]", view=self.copy() ) diff --git a/bot/exts/filtering/_utils.py b/bot/exts/filtering/_utils.py index 944cf38370..b8d693c24b 100644 --- a/bot/exts/filtering/_utils.py +++ b/bot/exts/filtering/_utils.py @@ -62,10 +62,10 @@ def past_tense(word: str) -> str: if not word: return word if word.endswith("e"): - return word + "d" + return f"{word}d" if word.endswith("y") and len(word) > 1 and word[-2] not in "aeiou": - return word[:-1] + "ied" - return word + "ed" + return f"{word[:-1]}ied" + return f"{word}ed" def to_serializable(item: Any, *, ui_repr: bool = False) -> Serializable: @@ -95,7 +95,7 @@ def to_serializable(item: Any, *, ui_repr: bool = False) -> Serializable: def resolve_mention(mention: str) -> str: """Return the appropriate formatting for the mention, be it a literal, a user ID, or a role ID.""" guild = bot.instance.get_guild(Guild.id) - if mention in ("here", "everyone"): + if mention in {"here", "everyone"}: return f"@{mention}" try: mention = int(mention) # It's an ID. @@ -110,10 +110,10 @@ def resolve_mention(mention: str) -> str: for role in guild.roles: if role.name == mention: return role.mention - for member in guild.members: - if str(member) == mention: - return member.mention - return mention + return next( + (member.mention for member in guild.members if str(member) == mention), + mention, + ) def repr_equals(override: Any, default: Any) -> bool: @@ -139,11 +139,10 @@ def normalize_type(type_: type, *, prioritize_nonetype: bool = True) -> type: if type(None) in args: if prioritize_nonetype: return type(None) - args = tuple(set(args) - {type(None)}) + else: + args = tuple(set(args) - {type(None)}) type_ = args[0] # Pick one, doesn't matter - if origin := get_origin(type_): # In case of a parameterized List, Set, Dict etc. - return origin - return type_ + return origin if (origin := get_origin(type_)) else type_ def starting_value(type_: type[T]) -> T: @@ -264,10 +263,7 @@ def __get_pydantic_core_schema__( @classmethod def validate(cls, v: Any, _info: core_schema.ValidationInfo) -> Self: """Takes the given value and returns a class instance with that value.""" - if isinstance(v, CustomIOField): - return cls(v.value) - - return cls(v) + return cls(v.value) if isinstance(v, CustomIOField) else cls(v) def __eq__(self, other: CustomIOField): if not isinstance(other, CustomIOField): diff --git a/bot/exts/filtering/filtering.py b/bot/exts/filtering/filtering.py index 844f2942e6..75e6f37b9e 100644 --- a/bot/exts/filtering/filtering.py +++ b/bot/exts/filtering/filtering.py @@ -416,7 +416,9 @@ async def filter(self, ctx: Context, id_: int | None = None) -> None: embed.description = f"`{filter_.content}`" if filter_.description: embed.description += f" - {filter_.description}" - embed.set_author(name=f"Filter {id_} - " + f"{filter_list[list_type].label}".title()) + embed.set_author( + name=f'Filter {id_} - {f"{filter_list[list_type].label}".title()}' + ) embed.set_footer(text=( "Field names with an asterisk have values which override the defaults of the containing filter list. " f"To view all defaults of the list, " @@ -451,9 +453,9 @@ async def f_describe(self, ctx: Context, filter_name: str | None) -> None: filter_type = self.loaded_filters.get(filter_name) if not filter_type: filter_type = self.loaded_filters.get(filter_name[:-1]) # A plural form or a typo. - if not filter_type: - await ctx.send(f":x: There's no filter type named {filter_name!r}.") - return + if not filter_type: + await ctx.send(f":x: There's no filter type named {filter_name!r}.") + return # Use the class's docstring, and ignore single newlines. embed = Embed(description=re.sub(r"(? None: @@ -1054,8 +1050,8 @@ def _get_list_by_name(self, list_name: str) -> FilterList: if not filter_list: if list_name.endswith("s"): # The user may have attempted to use the plural form. filter_list = self.filter_lists.get(list_name[:-1]) - if not filter_list: - raise BadArgument(f"There's no filter list named {list_name!r}.") + if not filter_list: + raise BadArgument(f"There's no filter list named {list_name!r}.") log.trace(f"Found list named {filter_list.name}") return filter_list @@ -1181,8 +1177,7 @@ async def _maybe_alert_auto_infraction( return if infraction_type != Infraction.NONE: - filter_log = bot.instance.get_channel(Channels.filter_log) - if filter_log: + if filter_log := bot.instance.get_channel(Channels.filter_log): await filter_log.send( f":warning: Heads up! The new {filter_list[list_type].label} filter " f"({filter_}) will automatically {infraction_type.name.lower()} users." @@ -1218,7 +1213,7 @@ async def _post_new_filter( if new_filter: await self._maybe_alert_auto_infraction(filter_list, list_type, new_filter) extra_msg = Filtering._identical_filters_message(content, filter_list, list_type, new_filter) - await msg.reply(f"✅ Added filter: {new_filter}" + extra_msg) + await msg.reply(f"✅ Added filter: {new_filter}{extra_msg}") else: await msg.reply(":x: Could not create the filter. Are you sure it's implemented?") @@ -1269,7 +1264,7 @@ async def _patch_filter( log.info(f"Successfully patched filter {edited_filter}.") await self._maybe_alert_auto_infraction(filter_list, list_type, edited_filter, filter_) extra_msg = Filtering._identical_filters_message(content, filter_list, list_type, edited_filter) - await msg.reply(f"✅ Edited filter: {edited_filter}" + extra_msg) + await msg.reply(f"✅ Edited filter: {edited_filter}{extra_msg}") async def _post_filter_list(self, msg: Message, list_name: str, list_type: ListType, settings: dict) -> None: """POST the new data of the filter list to the site API.""" @@ -1345,8 +1340,9 @@ async def _search_filters( if filter_type and filter_type not in filter_list.filter_types: continue for atomic_list in filter_list.values(): - list_results = self._search_filter_list(atomic_list, filter_type, settings, filter_settings) - if list_results: + if list_results := self._search_filter_list( + atomic_list, filter_type, settings, filter_settings + ): lines.append(f"**{atomic_list.label.title()}**") lines.extend(map(str, list_results)) lines.append("") @@ -1360,8 +1356,7 @@ async def _search_filters( async def _delete_offensive_msg(self, msg: Mapping[str, int]) -> None: """Delete an offensive message, and then delete it from the DB.""" try: - channel = self.bot.get_channel(msg["channel_id"]) - if channel: + if channel := self.bot.get_channel(msg["channel_id"]): msg_obj = await channel.fetch_message(msg["id"]) await msg_obj.delete() except discord.NotFound: @@ -1451,9 +1446,13 @@ async def send_weekly_auto_infraction_report( # Nicely format the output so each filter list type is grouped lines = [f"**Auto-infraction filters added since {seven_days_ago.format('YYYY-MM-DD')}**"] - for list_label, filters in found_filters.items(): - lines.append("\n".join([f"**{list_label.title()}**"]+[f"{filter_} ({infr})" for filter_, infr in filters])) - + lines.extend( + "\n".join( + [f"**{list_label.title()}**"] + + [f"{filter_} ({infr})" for filter_, infr in filters] + ) + for list_label, filters in found_filters.items() + ) if len(lines) == 1: lines.append("Nothing to show") diff --git a/bot/exts/fun/off_topic_names.py b/bot/exts/fun/off_topic_names.py index 120e3b4a46..f59c58e38a 100644 --- a/bot/exts/fun/off_topic_names.py +++ b/bot/exts/fun/off_topic_names.py @@ -107,9 +107,9 @@ async def add_command(self, ctx: Context, *, name: OffTopicName) -> None: The name is not added if it is too similar to an existing name. """ existing_names = await self.bot.api_client.get("bot/off-topic-channel-names") - close_match = difflib.get_close_matches(name, existing_names, n=1, cutoff=0.8) - - if close_match: + if close_match := difflib.get_close_matches( + name, existing_names, n=1, cutoff=0.8 + ): match = close_match[0] log.info( f"{ctx.author} tried to add channel name '{name}' but it was too similar to '{match}'" diff --git a/bot/exts/help_channels/_message.py b/bot/exts/help_channels/_message.py index aa6e52340f..511bf92bc0 100644 --- a/bot/exts/help_channels/_message.py +++ b/bot/exts/help_channels/_message.py @@ -19,7 +19,7 @@ def _serialise_session_participants(participants: set[int]) -> str: def _deserialise_session_participants(s: str) -> set[int]: """Convert a comma separated string into a set.""" - return set(int(user_id) for user_id in s.split(",") if user_id != "") + return {int(user_id) for user_id in s.split(",") if user_id != ""} @lock.lock_arg(NAMESPACE, "message", attrgetter("channel.id")) diff --git a/bot/exts/info/code_snippets.py b/bot/exts/info/code_snippets.py index dc5c2258af..1a65a6da4f 100644 --- a/bot/exts/info/code_snippets.py +++ b/bot/exts/info/code_snippets.py @@ -62,9 +62,7 @@ async def _fetch_response(self, url: str, response_format: str, **kwargs) -> Any async with self.bot.http_session.get(url, raise_for_status=True, **kwargs) as response: if response_format == "text": return await response.text() - if response_format == "json": - return await response.json() - return None + return await response.json() if response_format == "json" else None def _find_ref(self, path: str, refs: tuple) -> tuple: """Loops through all branches and tags to find the required ref.""" @@ -113,7 +111,7 @@ async def _fetch_github_gist_snippet( ) -> str: """Fetches a snippet from a GitHub gist.""" gist_json = await self._fetch_response( - f'https://api.github.com/gists/{gist_id}{f"/{revision}" if len(revision) > 0 else ""}', + f'https://api.github.com/gists/{gist_id}{f"/{revision}" if revision != "" else ""}', "json", headers=GITHUB_HEADERS, ) diff --git a/bot/exts/info/codeblock/_cog.py b/bot/exts/info/codeblock/_cog.py index 6f095a2d00..42b9de66f5 100644 --- a/bot/exts/info/codeblock/_cog.py +++ b/bot/exts/info/codeblock/_cog.py @@ -150,8 +150,7 @@ async def on_message(self, msg: Message) -> None: log.trace(f"Skipping code block detection of {msg.id}: #{msg.channel} is on cooldown.") return - instructions = get_instructions(msg.content) - if instructions: + if instructions := get_instructions(msg.content): await self.send_instructions(msg, instructions) if msg.channel.id not in constants.CodeBlock.channel_whitelist: diff --git a/bot/exts/info/codeblock/_instructions.py b/bot/exts/info/codeblock/_instructions.py index 0a2bdf5861..8b502b575d 100644 --- a/bot/exts/info/codeblock/_instructions.py +++ b/bot/exts/info/codeblock/_instructions.py @@ -163,9 +163,10 @@ def get_instructions(content: str) -> str | None: instructions = _get_no_ticks_message(content) else: log.trace("Searching results for a code block with invalid ticks.") - block = next((block for block in blocks if block.tick != _parsing.BACKTICK), None) - - if block: + if block := next( + (block for block in blocks if block.tick != _parsing.BACKTICK), + None, + ): log.trace("A code block exists but has invalid ticks.") instructions = _get_bad_ticks_message(block) else: diff --git a/bot/exts/info/codeblock/_parsing.py b/bot/exts/info/codeblock/_parsing.py index abad09eef1..5b276ce360 100644 --- a/bot/exts/info/codeblock/_parsing.py +++ b/bot/exts/info/codeblock/_parsing.py @@ -170,16 +170,15 @@ def parse_bad_language(content: str) -> BadLanguage | None: """ log.trace("Parsing bad language.") - match = _RE_LANGUAGE.match(content) - if not match: + if match := _RE_LANGUAGE.match(content): + return BadLanguage( + language=match["lang"], + has_leading_spaces=match["spaces"] is not None, + has_terminal_newline=match["newline"] is not None, + ) + else: return None - return BadLanguage( - language=match["lang"], - has_leading_spaces=match["spaces"] is not None, - has_terminal_newline=match["newline"] is not None, - ) - def _get_leading_spaces(content: str) -> int: """Return the number of spaces at the start of the first line in `content`.""" @@ -227,7 +226,4 @@ def _fix_indentation(content: str) -> str: # All lines must be dedented at least by the same amount as the first line. first_indent = max(first_indent, second_indent) - # Dedent the rest of the lines and join them together with the first line. - content = first_line + "".join(line[first_indent:] for line in lines[1:]) - - return content + return first_line + "".join(line[first_indent:] for line in lines[1:]) diff --git a/bot/exts/info/doc/_cog.py b/bot/exts/info/doc/_cog.py index c82f40557f..24bc474266 100644 --- a/bot/exts/info/doc/_cog.py +++ b/bot/exts/info/doc/_cog.py @@ -260,8 +260,8 @@ async def get_symbol_markdown(self, doc_item: DocItem) -> str: log.exception(f"An unexpected error has occurred when requesting parsing of {doc_item}.") return "Unable to parse the requested symbol due to an error." - if markdown is None: - return "Unable to parse the requested symbol." + if markdown is None: + return "Unable to parse the requested symbol." return markdown async def create_symbol_embed(self, symbol_name: str) -> discord.Embed | None: @@ -289,7 +289,9 @@ async def create_symbol_embed(self, symbol_name: str) -> discord.Embed | None: # with a max of 200 chars. if symbol_name in self.renamed_symbols: renamed_symbols = ", ".join(self.renamed_symbols[symbol_name]) - footer_text = textwrap.shorten("Similar names: " + renamed_symbols, 200, placeholder=" ...") + footer_text = textwrap.shorten( + f"Similar names: {renamed_symbols}", 200, placeholder=" ..." + ) else: footer_text = "" @@ -433,10 +435,10 @@ async def refresh_command(self, ctx: commands.Context) -> None: new_inventories = set(self.base_urls) if added := ", ".join(new_inventories - old_inventories): - added = "+ " + added + added = f"+ {added}" if removed := ", ".join(old_inventories - new_inventories): - removed = "- " + removed + removed = f"- {removed}" embed = discord.Embed( title="Inventories refreshed", diff --git a/bot/exts/info/doc/_html.py b/bot/exts/info/doc/_html.py index 2ef1328f10..7b14307ca9 100644 --- a/bot/exts/info/doc/_html.py +++ b/bot/exts/info/doc/_html.py @@ -68,10 +68,12 @@ def _find_elements_until_tag( for element in func(start_element, name=Strainer(include_strings=include_strings), limit=limit): if isinstance(element, Tag): - if use_container_filter: - if element.name in end_tag_filter: - break - elif end_tag_filter(element): + if ( + use_container_filter + and element.name in end_tag_filter + or not use_container_filter + and end_tag_filter(element) + ): break elements.append(element) @@ -130,8 +132,7 @@ def get_signatures(start_signature: PageElement) -> list[str]: for tag in element.find_all(_filter_signature_links, recursive=False): tag.decompose() - signature = element.text - if signature: + if signature := element.text: signatures.append(signature) return signatures diff --git a/bot/exts/info/doc/_inventory_parser.py b/bot/exts/info/doc/_inventory_parser.py index 4bfd5c9190..807c5e86bd 100644 --- a/bot/exts/info/doc/_inventory_parser.py +++ b/bot/exts/info/doc/_inventory_parser.py @@ -56,10 +56,10 @@ async def _load_v1(stream: aiohttp.StreamReader) -> InventoryDict: # version 1 did not add anchors to the location if type_ == "mod": type_ = "py:module" - location += "#module-" + name + location += f"#module-{name}" else: - type_ = "py:" + type_ - location += "#" + name + type_ = f"py:{type_}" + location += f"#{name}" invdata[type_].append((name, location)) return invdata diff --git a/bot/exts/info/doc/_markdown.py b/bot/exts/info/doc/_markdown.py index f3d7690703..2579903aab 100644 --- a/bot/exts/info/doc/_markdown.py +++ b/bot/exts/info/doc/_markdown.py @@ -33,9 +33,7 @@ def convert_li(self, el: PageElement, text: str, convert_as_inline: bool) -> str def convert_hn(self, _n: int, el: PageElement, text: str, convert_as_inline: bool) -> str: """Convert h tags to bold text with ** instead of adding #.""" - if convert_as_inline: - return text - return f"**{text}**\n\n" + return text if convert_as_inline else f"**{text}**\n\n" def convert_code(self, el: PageElement, text: str, convert_as_inline: bool) -> str: """Undo `markdownify`s underscore escaping.""" diff --git a/bot/exts/info/doc/_parsing.py b/bot/exts/info/doc/_parsing.py index dd2d6496c9..d4cc20f4ae 100644 --- a/bot/exts/info/doc/_parsing.py +++ b/bot/exts/info/doc/_parsing.py @@ -38,7 +38,7 @@ _MAX_SIGNATURES_LENGTH = (_EMBED_CODE_BLOCK_LINE_LENGTH + 8) * MAX_SIGNATURE_AMOUNT # Maximum embed description length - signatures on top _MAX_DESCRIPTION_LENGTH = 4096 - _MAX_SIGNATURES_LENGTH -_TRUNCATE_STRIP_CHARACTERS = "!?:;." + string.whitespace +_TRUNCATE_STRIP_CHARACTERS = f"!?:;.{string.whitespace}" BracketPair = namedtuple("BracketPair", ["opening_bracket", "closing_bracket"]) _BRACKET_PAIRS = { @@ -81,14 +81,13 @@ def _split_parameters(parameters_string: str) -> Iterator[str]: yield parameters_string[last_split:index] last_split = index + 1 - else: - if character == current_search.opening_bracket: - depth += 1 + elif character == current_search.opening_bracket: + depth += 1 - elif character == current_search.closing_bracket: - depth -= 1 - if depth == 0: - current_search = None + elif character == current_search.closing_bracket: + depth -= 1 + if depth == 0: + current_search = None yield parameters_string[last_split:] @@ -157,21 +156,20 @@ def _get_truncated_description( is_tag = isinstance(element, Tag) element_length = len(element.text) if is_tag else len(element) - if rendered_length + element_length < max_length: - if is_tag: - element_markdown = markdown_converter.process_tag(element, convert_as_inline=False) - else: - element_markdown = markdown_converter.process_text(element) - - rendered_length += element_length - tag_end_index += len(element_markdown) - - if not element_markdown.isspace(): - markdown_element_ends.append(tag_end_index) - result += element_markdown - else: + if rendered_length + element_length >= max_length: break + element_markdown = ( + markdown_converter.process_tag(element, convert_as_inline=False) + if is_tag + else markdown_converter.process_text(element) + ) + rendered_length += element_length + tag_end_index += len(element_markdown) + + if not element_markdown.isspace(): + markdown_element_ends.append(tag_end_index) + result += element_markdown if not markdown_element_ends: return "" @@ -188,9 +186,14 @@ def _get_truncated_description( if truncate_index >= markdown_element_ends[-1]: return result - # Determine the actual truncation index. - possible_truncation_indices = [cut for cut in markdown_element_ends if cut < truncate_index] - if not possible_truncation_indices: + if possible_truncation_indices := [ + cut for cut in markdown_element_ends if cut < truncate_index + ]: + # Truncate at the last Markdown element that comes before the truncation index. + markdown_truncate_index = possible_truncation_indices[-1] + truncated_result = result[:markdown_truncate_index] + + else: # In case there is no Markdown element ending before the truncation index, try to find a good cutoff point. force_truncated = result[:truncate_index] # If there is an incomplete codeblock, cut it out. @@ -206,12 +209,7 @@ def _get_truncated_description( else: truncated_result = force_truncated - else: - # Truncate at the last Markdown element that comes before the truncation index. - markdown_truncate_index = possible_truncation_indices[-1] - truncated_result = result[:markdown_truncate_index] - - return truncated_result.strip(_TRUNCATE_STRIP_CHARACTERS) + "..." + return f"{truncated_result.strip(_TRUNCATE_STRIP_CHARACTERS)}..." def _create_markdown(signatures: list[str] | None, description: Iterable[Tag], url: str) -> str: diff --git a/bot/exts/info/doc/_redis_cache.py b/bot/exts/info/doc/_redis_cache.py index 58627fafb0..7e225262ed 100644 --- a/bot/exts/info/doc/_redis_cache.py +++ b/bot/exts/info/doc/_redis_cache.py @@ -74,10 +74,12 @@ async def delete(self, package: str) -> bool: """Remove all values for `package`; return True if at least one key was deleted, False otherwise.""" pattern = f"{self.namespace}:{package}:*" - package_keys = [ - package_key async for package_key in self.redis_session.client.scan_iter(match=pattern) - ] - if package_keys: + if package_keys := [ + package_key + async for package_key in self.redis_session.client.scan_iter( + match=pattern + ) + ]: await self.redis_session.client.delete(*package_keys) log.info(f"Deleted keys from redis: {package_keys}.") self._set_expires = { @@ -102,11 +104,12 @@ async def increment_for(self, item: DocItem) -> int: async def delete(self, package: str) -> bool: """Remove all values for `package`; return True if at least one key was deleted, False otherwise.""" - package_keys = [ + if package_keys := [ package_key - async for package_key in self.redis_session.client.scan_iter(match=f"{self.namespace}:{package}:*") - ] - if package_keys: + async for package_key in self.redis_session.client.scan_iter( + match=f"{self.namespace}:{package}:*" + ) + ]: await self.redis_session.client.delete(*package_keys) return True return False diff --git a/bot/exts/info/help.py b/bot/exts/info/help.py index c560d04899..1d7332a268 100644 --- a/bot/exts/info/help.py +++ b/bot/exts/info/help.py @@ -254,7 +254,7 @@ async def command_not_found(self, query: str) -> HelpQueryNotFoundError: # Trim query to avoid embed limits when sending the error. if len(query) >= 100: - query = query[:100] + "..." + query = f"{query[:100]}..." return HelpQueryNotFoundError(f'Query "{query}" not found.', {choice[0]: choice[1] for choice in result}) @@ -293,8 +293,7 @@ async def command_formatting(self, command: Command) -> tuple[Embed, CommandView # show command aliases aliases = [f"`{alias}`" if not parent else f"`{parent} {alias}`" for alias in command.aliases] aliases += [f"`{alias}`" for alias in getattr(command, "root_aliases", ())] - aliases = ", ".join(sorted(aliases)) - if aliases: + if aliases := ", ".join(sorted(aliases)): command_details += f"**Can also use:** {aliases}\n\n" # when command is disabled, show message about it, @@ -337,9 +336,7 @@ def get_commands_brief_details(commands_: list[Command], return_as_list: bool = details.append( f"\n**`{PREFIX}{command.qualified_name}{signature}`**\n{command.short_doc or 'No details provided'}" ) - if return_as_list: - return details - return "".join(details) + return details if return_as_list else "".join(details) async def format_group_help(self, group: Group) -> tuple[Embed, CommandView | None]: """Formats help for a group command.""" @@ -354,8 +351,7 @@ async def format_group_help(self, group: Group) -> tuple[Embed, CommandView | No embed, _ = await self.command_formatting(group) - command_details = self.get_commands_brief_details(commands_) - if command_details: + if command_details := self.get_commands_brief_details(commands_): embed.description += f"\n**Subcommands:**\n{command_details}" # If the help is invoked in the context of an error, don't show subcommand navigation. @@ -377,8 +373,7 @@ async def send_cog_help(self, cog: Cog) -> None: embed.set_author(name="Command Help", icon_url=constants.Icons.questionmark) embed.description = f"**{cog.qualified_name}**\n{cog.description}" - command_details = self.get_commands_brief_details(commands_) - if command_details: + if command_details := self.get_commands_brief_details(commands_): embed.description += f"\n\n**Commands:**\n{command_details}" message = await self.context.send(embed=embed) diff --git a/bot/exts/info/information.py b/bot/exts/info/information.py index c7ee9065cd..78ffaa636f 100644 --- a/bot/exts/info/information.py +++ b/bot/exts/info/information.py @@ -85,19 +85,16 @@ def get_member_counts(guild: Guild) -> dict[str, int]: async def get_extended_server_info(self, ctx: Context) -> str: """Return additional server info only visible in moderation channels.""" talentpool_info = "" - talentpool_cog: TalentPool | None = self.bot.get_cog("Talentpool") - if talentpool_cog: + if talentpool_cog := self.bot.get_cog("Talentpool"): num_nominated = len(await talentpool_cog.api.get_nominations(active=True)) talentpool_info = f"Nominated: {num_nominated}\n" bb_info = "" - bb_cog: BigBrother | None = self.bot.get_cog("Big Brother") - if bb_cog: + if bb_cog := self.bot.get_cog("Big Brother"): bb_info = f"BB-watched: {len(bb_cog.watched_users)}\n" defcon_info = "" - defcon_cog: Defcon | None = self.bot.get_cog("Defcon") - if defcon_cog: + if defcon_cog := self.bot.get_cog("Defcon"): threshold = time.humanize_delta(defcon_cog.threshold) if defcon_cog.threshold else "-" defcon_info = f"Defcon threshold: {threshold}\n" @@ -120,11 +117,7 @@ async def roles_info(self, ctx: Context) -> None: # Sort the roles alphabetically and remove the @everyone role roles = sorted(ctx.guild.roles[1:], key=lambda role: role.name) - # Build a list - role_list = [] - for role in roles: - role_list.append(f"`{role.id}` - {role.mention}") - + role_list = [f"`{role.id}` - {role.mention}" for role in roles] # Build an embed embed = Embed( title=f"Role information (Total {len(roles)} role{'s' * (len(role_list) > 1)})", @@ -556,7 +549,7 @@ async def rules(self, ctx: Context, *, args: str | None) -> set[int] | None: keywords, rule_numbers = [], [] full_rules = await self.bot.api_client.get("rules", params={"link_format": "md"}) - keyword_to_rule_number = dict() + keyword_to_rule_number = {} for rule_number, (_, rule_keywords) in enumerate(full_rules, start=1): for rule_keyword in rule_keywords: @@ -581,12 +574,16 @@ async def rules(self, ctx: Context, *, args: str | None) -> set[int] | None: # Remove duplicates and sort the rule indices rule_numbers = sorted(set(rule_numbers)) - invalid = ", ".join( - str(rule_number) for rule_number in rule_numbers - if rule_number < 1 or rule_number > len(full_rules)) - - if invalid: - await ctx.send(shorten(":x: Invalid rule indices: " + invalid, 75, placeholder=" ...")) + if invalid := ", ".join( + str(rule_number) + for rule_number in rule_numbers + if rule_number < 1 or rule_number > len(full_rules) + ): + await ctx.send( + shorten( + f":x: Invalid rule indices: {invalid}", 75, placeholder=" ..." + ) + ) return None final_rules = [] diff --git a/bot/exts/info/patreon.py b/bot/exts/info/patreon.py index 948568101b..cb5211aae6 100644 --- a/bot/exts/info/patreon.py +++ b/bot/exts/info/patreon.py @@ -37,10 +37,10 @@ def get_patreon_tier(member: discord.Member) -> int: A patreon tier of 0 indicates the user is not a patron. """ - for tier, role_id in PATREON_TIERS: - if member.get_role(role_id): - return tier - return 0 + return next( + (tier for tier, role_id in PATREON_TIERS if member.get_role(role_id)), + 0, + ) class Patreon(commands.Cog): diff --git a/bot/exts/info/python_news.py b/bot/exts/info/python_news.py index 8507a0fec0..2f159249fd 100644 --- a/bot/exts/info/python_news.py +++ b/bot/exts/info/python_news.py @@ -196,10 +196,12 @@ async def post_maillist_news(self) -> None: # Build an embed and send a message to the webhook embed = discord.Embed( title=self.escape_markdown(thread_information["subject"]), - description=content[:1000] + f"... [continue reading]({link})" if len(content) > 1000 else content, + description=f"{content[:1000]}... [continue reading]({link})" + if len(content) > 1000 + else content, timestamp=new_date, url=link, - colour=constants.Colours.soft_green + colour=constants.Colours.soft_green, ) embed.set_author( name=f"{email_information['sender_name']} ({email_information['sender']['address']})", diff --git a/bot/exts/info/tags.py b/bot/exts/info/tags.py index 42ac38be67..3f240c5e71 100644 --- a/bot/exts/info/tags.py +++ b/bot/exts/info/tags.py @@ -57,9 +57,7 @@ def get_fuzzy_score(self, fuzz_tag_identifier: TagIdentifier) -> float: return fuzzy_score def __str__(self) -> str: - if self.group is not None: - return f"{self.group} {self.name}" - return self.name + return f"{self.group} {self.name}" if self.group is not None else self.name @classmethod def from_string(cls, string: str) -> TagIdentifier: @@ -157,12 +155,11 @@ def initialize_tags(self) -> None: def _get_suggestions(self, tag_identifier: TagIdentifier) -> list[tuple[TagIdentifier, Tag]]: """Return a list of suggested tags for `tag_identifier`.""" for threshold in [100, 90, 80, 70, 60]: - suggestions = [ + if suggestions := [ (identifier, tag) for identifier, tag in self.tags.items() if identifier.get_fuzzy_score(tag_identifier) >= threshold - ] - if suggestions: + ]: return suggestions return [] diff --git a/bot/exts/moderation/clean.py b/bot/exts/moderation/clean.py index 1c73736d78..0fc2eeafe9 100644 --- a/bot/exts/moderation/clean.py +++ b/bot/exts/moderation/clean.py @@ -122,25 +122,28 @@ def _channels_set( ) -> set[TextChannel]: """Standardize the input `channels` argument to a usable set of text channels.""" # Default to using the invoking context's channel or the channel of the message limit(s). - if not channels: - # Input was validated - if first_limit is a message, second_limit won't point at a different channel. - if isinstance(first_limit, Message): - channels = {first_limit.channel} - elif isinstance(second_limit, Message): - channels = {second_limit.channel} - else: - channels = {ctx.channel} - else: - if channels == "*": - channels = { - channel for channel in itertools.chain(ctx.guild.channels, ctx.guild.threads) + if channels: + channels = ( + { + channel + for channel in itertools.chain( + ctx.guild.channels, ctx.guild.threads + ) if isinstance(channel, TextChannel | Thread) # Assume that non-public channels are not needed to optimize for speed. - and channel.permissions_for(ctx.guild.default_role).view_channel + and channel.permissions_for( + ctx.guild.default_role + ).view_channel } - else: - channels = set(channels) - + if channels == "*" + else set(channels) + ) + elif isinstance(first_limit, Message): + channels = {first_limit.channel} + elif isinstance(second_limit, Message): + channels = {second_limit.channel} + else: + channels = {ctx.channel} return channels @staticmethod @@ -325,7 +328,7 @@ async def _delete_found(self, message_mappings: dict[TextChannel, list[Message]] if not self.cleaning: return deleted - if len(to_delete) > 0: + if to_delete: # Deleting any leftover messages if there are any with suppress(NotFound): await channel.delete_messages(to_delete) @@ -448,13 +451,13 @@ async def _clean_messages( f"{Emojis.ok_hand} Deleted {len(deleted_messages)} messages. " f"A log of the deleted messages can be found here {log_url}." ) - if log_url and is_mod_channel(ctx.channel): - try: - await ctx.reply(success_message) - except errors.HTTPException: - await ctx.send(success_message) - elif log_url: - if mods := self.bot.get_channel(Channels.mods): + if log_url: + if is_mod_channel(ctx.channel): + try: + await ctx.reply(success_message) + except errors.HTTPException: + await ctx.send(success_message) + elif mods := self.bot.get_channel(Channels.mods): await mods.send(f"{ctx.author.mention} {success_message}") return log_url diff --git a/bot/exts/moderation/defcon.py b/bot/exts/moderation/defcon.py index 3c16f8e0ee..2d7ee6745e 100644 --- a/bot/exts/moderation/defcon.py +++ b/bot/exts/moderation/defcon.py @@ -109,37 +109,38 @@ async def _sync_settings(self) -> None: @Cog.listener() async def on_member_join(self, member: Member) -> None: """Check newly joining users to see if they meet the account age threshold.""" - if self.threshold: - now = arrow.utcnow() + if not self.threshold: + return + now = arrow.utcnow() - if now - member.created_at < time.relativedelta_to_timedelta(self.threshold): - log.info(f"Rejecting user {member}: Account is too new") + if now - member.created_at < time.relativedelta_to_timedelta(self.threshold): + log.info(f"Rejecting user {member}: Account is too new") - message_sent = False + message_sent = False - try: - await member.send(REJECTION_MESSAGE.format(user=member.mention)) - message_sent = True - except Forbidden: - log.debug(f"Cannot send DEFCON rejection DM to {member}: DMs disabled") - except Exception: - # Broadly catch exceptions because DM isn't critical, but it's imperative to kick them. - log.exception(f"Error sending DEFCON rejection message to {member}") + try: + await member.send(REJECTION_MESSAGE.format(user=member.mention)) + message_sent = True + except Forbidden: + log.debug(f"Cannot send DEFCON rejection DM to {member}: DMs disabled") + except Exception: + # Broadly catch exceptions because DM isn't critical, but it's imperative to kick them. + log.exception(f"Error sending DEFCON rejection message to {member}") - await member.kick(reason="DEFCON active, user is too new") - self.bot.stats.incr("defcon.leaves") + await member.kick(reason="DEFCON active, user is too new") + self.bot.stats.incr("defcon.leaves") - message = ( - f"{format_user(member)} was denied entry because their account is too new." - ) + message = ( + f"{format_user(member)} was denied entry because their account is too new." + ) - if not message_sent: - message = f"{message}\n\nUnable to send rejection message via DM; they probably have DMs disabled." + if not message_sent: + message = f"{message}\n\nUnable to send rejection message via DM; they probably have DMs disabled." - await (await self.get_mod_log()).send_log_message( - Icons.defcon_denied, Colours.soft_red, "Entry denied", - message, member.display_avatar.url - ) + await (await self.get_mod_log()).send_log_message( + Icons.defcon_denied, Colours.soft_red, "Entry denied", + message, member.display_avatar.url + ) @group(name="defcon", aliases=("dc",), invoke_without_command=True) @has_any_role(*MODERATION_ROLES) diff --git a/bot/exts/moderation/dm_relay.py b/bot/exts/moderation/dm_relay.py index 03b18e46a8..11e6055386 100644 --- a/bot/exts/moderation/dm_relay.py +++ b/bot/exts/moderation/dm_relay.py @@ -40,9 +40,7 @@ async def dmrelay(self, ctx: Context, user: discord.User, limit: int = 100) -> N if (embeds := len(msg.embeds)) > 0: output += f"<{embeds} embed{'s' if embeds > 1 else ''}>\n" - # Attachments - attachments = "\n".join(a.url for a in msg.attachments) - if attachments: + if attachments := "\n".join(a.url for a in msg.attachments): output += attachments + "\n" if not output: diff --git a/bot/exts/moderation/incidents.py b/bot/exts/moderation/incidents.py index 2aecf787a7..c9dc1787b0 100644 --- a/bot/exts/moderation/incidents.py +++ b/bot/exts/moderation/incidents.py @@ -103,9 +103,9 @@ async def make_embed(incident: discord.Message, outcome: Signal, actioned_by: di # If the description will be too long (>4096 total characters), truncate the incident content if len(incident.content) > (allowed_content_chars := 4096-len(reported_on_msg)-2): # -2 for the newlines - description = incident.content[:allowed_content_chars-3] + f"...\n\n{reported_on_msg}" + description = f"{incident.content[:allowed_content_chars - 3]}...\n\n{reported_on_msg}" else: - description = incident.content + f"\n\n{reported_on_msg}" + description = f"{incident.content}\n\n{reported_on_msg}" embed = discord.Embed( description=description, @@ -165,7 +165,7 @@ def shorten_text(text: str) -> str: text = "\n".join(text.split("\n", maxsplit=3)[:3]) # If it is a single word, then truncate it to 50 characters - if text.find(" ") == -1: + if " " not in text: text = text[:50] # Remove extra whitespaces from the `text` @@ -657,7 +657,7 @@ async def send_message_link_embeds( async def delete_msg_link_embed(self, message_id: int) -> None: """Delete the Discord message link message found in cache for `message_id`.""" log.trace("Deleting Discord message link's webhook message.") - webhook_msg_id = await self.message_link_embeds_cache.get(int(message_id)) + webhook_msg_id = await self.message_link_embeds_cache.get(message_id) if webhook_msg_id: try: diff --git a/bot/exts/moderation/infraction/_scheduler.py b/bot/exts/moderation/infraction/_scheduler.py index a079f775ef..08cb94653e 100644 --- a/bot/exts/moderation/infraction/_scheduler.py +++ b/bot/exts/moderation/infraction/_scheduler.py @@ -439,12 +439,10 @@ async def deactivate_infraction( f"Can't pardon {infraction['type']} for user {infraction['user']} because user left the guild." ) log_text["Failure"] = "User left the guild." - log_content = mod_role.mention else: log.exception(f"Failed to deactivate infraction #{id_} ({type_})") log_text["Failure"] = f"HTTPException with status {e.status} and code {e.code}." - log_content = mod_role.mention - + log_content = mod_role.mention # Check if the user is currently being watched by Big Brother. try: log.trace(f"Determining if user {user_id} is currently being watched by Big Brother.") @@ -473,7 +471,7 @@ async def deactivate_infraction( data["reason"] = "" # Append pardon reason to infraction in database. if (punish_reason := infraction["reason"]) is not None: - data["reason"] = punish_reason + " | " + data["reason"] = f"{punish_reason} | " data["reason"] += f"Pardoned: {pardon_reason}" diff --git a/bot/exts/moderation/infraction/_utils.py b/bot/exts/moderation/infraction/_utils.py index baeb971e4e..cb971704d4 100644 --- a/bot/exts/moderation/infraction/_utils.py +++ b/bot/exts/moderation/infraction/_utils.py @@ -132,8 +132,7 @@ async def post_infraction( # Try to apply the infraction. If it fails because the user doesn't exist, try to add it. for should_post_user in (True, False): try: - response = await ctx.bot.api_client.post("bot/infractions", json=payload) - return response + return await ctx.bot.api_client.post("bot/infractions", json=payload) except ResponseCodeError as e: if e.status == 400 and "user" in e.response_json: # Only one attempt to add the user to the database, not two: diff --git a/bot/exts/moderation/infraction/infractions.py b/bot/exts/moderation/infraction/infractions.py index 6af2571de7..143e1f18c1 100644 --- a/bot/exts/moderation/infraction/infractions.py +++ b/bot/exts/moderation/infraction/infractions.py @@ -31,8 +31,10 @@ # Comp ban DISCORD_ARTICLE_URL = "https://support.discord.com/hc/en-us/articles" -LINK_PASSWORD = DISCORD_ARTICLE_URL + "/218410947-I-forgot-my-Password-Where-can-I-set-a-new-one" -LINK_2FA = DISCORD_ARTICLE_URL + "/219576828-Setting-up-Two-Factor-Authentication" +LINK_PASSWORD = f"{DISCORD_ARTICLE_URL}/218410947-I-forgot-my-Password-Where-can-I-set-a-new-one" +LINK_2FA = ( + f"{DISCORD_ARTICLE_URL}/219576828-Setting-up-Two-Factor-Authentication" +) COMP_BAN_REASON = ( "Your account has been used to send links to a phishing website. You have been automatically banned. " "If you are not aware of sending them, that means your account has been compromised.\n\n" diff --git a/bot/exts/moderation/infraction/management.py b/bot/exts/moderation/infraction/management.py index 6af523bb0e..059ee2ed7a 100644 --- a/bot/exts/moderation/infraction/management.py +++ b/bot/exts/moderation/infraction/management.py @@ -297,12 +297,11 @@ async def search_user(self, ctx: Context, user: MemberOrUser | discord.Object) - if isinstance(user, discord.Member | discord.User): user_str = escape_markdown(str(user)) + elif infraction_list: + user = infraction_list[0]["user"] + user_str = escape_markdown(user["name"]) + f"#{user['discriminator']:04}" else: - if infraction_list: - user = infraction_list[0]["user"] - user_str = escape_markdown(user["name"]) + f"#{user['discriminator']:04}" - else: - user_str = str(user.id) + user_str = str(user.id) formatted_infraction_count = self.format_infraction_count(len(infraction_list)) embed = discord.Embed( @@ -332,7 +331,7 @@ async def search_reason(self, ctx: Context, reason: str) -> None: colour=discord.Colour.orange() ) if len(reason) > 500: - reason = reason[:500] + "..." + reason = f"{reason[:500]}..." await self.send_infraction_list(ctx, embed, infraction_list, reason) # endregion @@ -355,11 +354,7 @@ async def search_by_actor( if isinstance(actor, str): actor = ctx.author - if oldest_first: - ordering = "inserted_at" # oldest infractions first - else: - ordering = "-inserted_at" # newest infractions first - + ordering = "inserted_at" if oldest_first else "-inserted_at" infraction_list = await self.bot.api_client.get( "bot/infractions/expanded", params={ @@ -388,9 +383,7 @@ def format_infraction_count(infraction_count: int) -> str: API limits returned infractions to a maximum of 100, so if `infraction_count` is 100 then we return `"100+"`. Otherwise, return `str(infraction_count)`. """ - if infraction_count == 100: - return "100+" - return str(infraction_count) + return "100+" if infraction_count == 100 else str(infraction_count) async def send_infraction_list( self, @@ -445,7 +438,7 @@ def infraction_to_string(self, infraction: dict[str, t.Any], ignore_fields: tupl if "actor" not in ignore_fields: actor_str = f"By <@{infraction['actor']['id']}>" - issued = "Issued " + time.discord_timestamp(inserted_at) + issued = f"Issued {time.discord_timestamp(inserted_at)}" duration = "" if infraction["type"] not in NO_DURATION_INFRACTIONS: diff --git a/bot/exts/moderation/modlog.py b/bot/exts/moderation/modlog.py index b349f4d5d5..c919503956 100644 --- a/bot/exts/moderation/modlog.py +++ b/bot/exts/moderation/modlog.py @@ -66,7 +66,7 @@ async def send_log_message( await self.bot.wait_until_guild_available() # Truncate string directly here to avoid removing newlines embed = discord.Embed( - description=text[:4093] + "..." if len(text) > 4096 else text + description=f"{text[:4093]}..." if len(text) > 4096 else text ) if title and icon_url: @@ -89,7 +89,7 @@ async def send_log_message( # Truncate content to 2000 characters and append an ellipsis. if content and len(content) > 2000: - content = content[:2000 - 3] + "..." + content = f"{content[:2000 - 3]}..." channel = self.bot.get_channel(channel_id) log_message = await channel.send( @@ -201,11 +201,7 @@ async def on_guild_channel_update(self, before: GUILD_CHANNEL, after: GuildChann if not changes: return - message = "" - - for item in sorted(changes): - message += f"{Emojis.bullet} {item}\n" - + message = "".join(f"{Emojis.bullet} {item}\n" for item in sorted(changes)) if after.category: message = f"**{after.category}/#{after.name} (`{after.id}`)**\n{message}" else: @@ -279,11 +275,7 @@ async def on_guild_role_update(self, before: discord.Role, after: discord.Role) if not changes: return - message = "" - - for item in sorted(changes): - message += f"{Emojis.bullet} {item}\n" - + message = "".join(f"{Emojis.bullet} {item}\n" for item in sorted(changes)) message = f"**{after.name}** (`{after.id}`)\n{message}" await self.send_log_message( @@ -329,11 +321,7 @@ async def on_guild_update(self, before: discord.Guild, after: discord.Guild) -> if not changes: return - message = "" - - for item in sorted(changes): - message += f"{Emojis.bullet} {item}\n" - + message = "".join(f"{Emojis.bullet} {item}\n" for item in sorted(changes)) message = f"**{after.name}** (`{after.id}`)\n{message}" await self.send_log_message( @@ -417,16 +405,17 @@ async def on_member_unban(self, guild: discord.Guild, member: discord.User) -> N @staticmethod def get_role_diff(before: list[discord.Role], after: list[discord.Role]) -> list[str]: """Return a list of strings describing the roles added and removed.""" - changes = [] before_roles = set(before) after_roles = set(after) - for role in (before_roles - after_roles): - changes.append(f"**Role removed:** {role.name} (`{role.id}`)") - - for role in (after_roles - before_roles): - changes.append(f"**Role added:** {role.name} (`{role.id}`)") - + changes = [ + f"**Role removed:** {role.name} (`{role.id}`)" + for role in (before_roles - after_roles) + ] + changes.extend( + f"**Role added:** {role.name} (`{role.id}`)" + for role in (after_roles - before_roles) + ) return changes @Cog.listener() @@ -464,11 +453,7 @@ async def on_member_update(self, before: discord.Member, after: discord.Member) if not changes: return - message = "" - - for item in sorted(changes): - message += f"{Emojis.bullet} {item}\n" - + message = "".join(f"{Emojis.bullet} {item}\n" for item in sorted(changes)) message = f"{format_user(after)}\n{message}" await self.send_log_message( @@ -551,7 +536,7 @@ async def log_cached_deleted_message(self, message: discord.Message) -> None: if message.attachments: # Prepend the message metadata with the number of attachments - response = f"**Attachments:** {len(message.attachments)}\n" + response + response = f"**Attachments:** {len(message.attachments)}\n{response}" # Shorten the message content if necessary content = message.clean_content @@ -649,11 +634,7 @@ async def on_message_edit(self, msg_before: discord.Message, msg_after: discord. for index, (diff_type, words) in enumerate(diff_groups): sub = " ".join(words) - if diff_type == "-": - content_before.append(f"[{sub}](http://o.hi)") - elif diff_type == "+": - content_after.append(f"[{sub}](http://o.hi)") - elif diff_type == " ": + if diff_type == " ": if len(words) > 2: sub = ( f"{words[0] if index > 0 else ''}" @@ -663,6 +644,10 @@ async def on_message_edit(self, msg_before: discord.Message, msg_after: discord. content_before.append(sub) content_after.append(sub) + elif diff_type == "+": + content_after.append(f"[{sub}](http://o.hi)") + elif diff_type == "-": + content_before.append(f"[{sub}](http://o.hi)") response = ( f"**Author:** {format_user(msg_before.author)}\n" f"**Channel:** {channel_name} (`{channel.id}`)\n" diff --git a/bot/exts/moderation/silence.py b/bot/exts/moderation/silence.py index 2852598cad..9065de5f75 100644 --- a/bot/exts/moderation/silence.py +++ b/bot/exts/moderation/silence.py @@ -328,13 +328,12 @@ async def _unsilence(self, channel: TextOrVoiceChannel) -> bool: # Select the role based on channel type, and get current overwrites if isinstance(channel, TextChannel): role = self._everyone_role - overwrite = channel.overwrites_for(role) permissions = "`Send Messages` and `Add Reactions`" else: role = self._verified_voice_role - overwrite = channel.overwrites_for(role) permissions = "`Speak` and `Connect`" + overwrite = channel.overwrites_for(role) # Check if old overwrites were not stored if prev_overwrites is None: log.info(f"Missing previous overwrites for #{channel} ({channel.id}); defaulting to None.") diff --git a/bot/exts/moderation/stream.py b/bot/exts/moderation/stream.py index 6ffae1e6e6..75813fc186 100644 --- a/bot/exts/moderation/stream.py +++ b/bot/exts/moderation/stream.py @@ -203,7 +203,10 @@ async def liststream(self, ctx: commands.Context) -> None: non_staff_partners_community_members_with_stream = [ member for member in ctx.guild.get_role(Roles.video).members - if not any(role.id in STAFF_PARTNERS_COMMUNITY_ROLES for role in member.roles) + if all( + role.id not in STAFF_PARTNERS_COMMUNITY_ROLES + for role in member.roles + ) ] # List of tuples (UtcPosixTimestamp, str) diff --git a/bot/exts/moderation/voice_gate.py b/bot/exts/moderation/voice_gate.py index 50d3188bd5..2e279dcc74 100644 --- a/bot/exts/moderation/voice_gate.py +++ b/bot/exts/moderation/voice_gate.py @@ -233,7 +233,10 @@ async def on_message(self, message: discord.Message) -> None: return # Then check is member moderator+, because we don't want to delete their messages. - if any(role.id in MODERATION_ROLES for role in message.author.roles) and is_verify_command is False: + if ( + any(role.id in MODERATION_ROLES for role in message.author.roles) + and not is_verify_command + ): log.trace(f"Excluding moderator message {message.id} from deletion in #{message.channel}.") return diff --git a/bot/exts/moderation/watchchannels/_watchchannel.py b/bot/exts/moderation/watchchannels/_watchchannel.py index fb75d525d1..c355c91725 100644 --- a/bot/exts/moderation/watchchannels/_watchchannel.py +++ b/bot/exts/moderation/watchchannels/_watchchannel.py @@ -83,8 +83,7 @@ def consuming_messages(self) -> bool: return False if self._consume_task.done(): - exc = self._consume_task.exception() - if exc: + if exc := self._consume_task.exception(): self.log.exception( "The message queue consume task has failed with:", exc_info=exc @@ -339,17 +338,14 @@ async def prepare_watched_users_data( The dictionary additionally has an "updated" field which is true if a cache update was requested and it succeeded. """ - list_data = {} if update_cache: if not await self.fetch_user_cache(): update_cache = False - list_data["updated"] = update_cache - watched_iter = self.watched_users.items() if oldest_first: watched_iter = reversed(watched_iter) - list_data["info"] = {} + list_data = {"updated": update_cache, "info": {}} for user_id, user_data in watched_iter: member = await get_or_fetch_member(ctx.guild, user_id) line = f"• `{user_id}`" diff --git a/bot/exts/recruitment/talentpool/_api.py b/bot/exts/recruitment/talentpool/_api.py index f7b2432092..dc2f1f7cf5 100644 --- a/bot/exts/recruitment/talentpool/_api.py +++ b/bot/exts/recruitment/talentpool/_api.py @@ -50,14 +50,12 @@ async def get_nominations( params["user__id"] = str(user_id) data = await self.site_api.get("bot/nominations", params=params) - nominations = TypeAdapter(list[Nomination]).validate_python(data) - return nominations + return TypeAdapter(list[Nomination]).validate_python(data) async def get_nomination(self, nomination_id: int) -> Nomination: """Fetch a nomination by ID.""" data = await self.site_api.get(f"bot/nominations/{nomination_id}") - nomination = Nomination.model_validate(data) - return nomination + return Nomination.model_validate(data) async def edit_nomination( self, diff --git a/bot/exts/recruitment/talentpool/_cog.py b/bot/exts/recruitment/talentpool/_cog.py index 9c180b5abf..d2e6bde073 100644 --- a/bot/exts/recruitment/talentpool/_cog.py +++ b/bot/exts/recruitment/talentpool/_cog.py @@ -438,7 +438,7 @@ async def edit_reason_command( # If not specified, assume the invoker is editing their own nomination reason. nominator = nominator or ctx.author - if not any(role.id in MODERATION_ROLES for role in ctx.author.roles): + if all(role.id not in MODERATION_ROLES for role in ctx.author.roles): if ctx.channel.id != Channels.nominations: await ctx.send(f":x: Nomination edits must be run in the <#{Channels.nominations}> channel.") return diff --git a/bot/exts/recruitment/talentpool/_review.py b/bot/exts/recruitment/talentpool/_review.py index fc48809c0e..40e2ac8e3f 100644 --- a/bot/exts/recruitment/talentpool/_review.py +++ b/bot/exts/recruitment/talentpool/_review.py @@ -239,8 +239,7 @@ async def post_review(self, nomination: Nomination) -> None: await self.api.edit_nomination(nomination.id, reviewed=True, thread_id=thread.id) - bump_cog: ThreadBumper = self.bot.get_cog("ThreadBumper") - if bump_cog: + if bump_cog := self.bot.get_cog("ThreadBumper"): context = await self.bot.get_context(message) await bump_cog.add_thread_to_bump_list(context, thread) @@ -302,8 +301,9 @@ async def archive_vote(self, message: PartialMessage, passed: bool) -> None: parts = [] for message_ in messages[::-1]: - parts.append(message_.content) - parts.append("\n" if message_.content.endswith(".") else " ") + parts.extend( + (message_.content, "\n" if message_.content.endswith(".") else " ") + ) content = "".join(parts) # We assume that the first user mentioned is the user that we are voting on @@ -354,11 +354,13 @@ async def archive_vote(self, message: PartialMessage, passed: bool) -> None: for number, part in enumerate( textwrap.wrap(embed_content, width=MAX_EMBED_SIZE, replace_whitespace=False, placeholder="") ): - await channel.send(embed=Embed( - title=embed_title if number == 0 else None, - description="[...] " + part if number != 0 else part, - colour=colour - )) + await channel.send( + embed=Embed( + title=embed_title if number == 0 else None, + description=f"[...] {part}" if number != 0 else part, + colour=colour, + ) + ) for message_ in messages: with contextlib.suppress(NotFound): @@ -412,12 +414,7 @@ async def _activity_review(self, member: Member) -> str: channels += f", and {last_channel[1]} in {last_channel[0]}" joined_at_formatted = time.format_relative(member.joined_at) - review = ( - f"{member.name} joined the server **{joined_at_formatted}**" - f" and has **{messages} messages**{channels}." - ) - - return review + return f"{member.name} joined the server **{joined_at_formatted}** and has **{messages} messages**{channels}." async def _infractions_review(self, member: Member) -> str: """ @@ -516,22 +513,13 @@ async def _previous_nominations_review(self, member: Member) -> str | None: end_time = time.format_relative(history[0].ended_at) - review = ( - f"They were nominated **{nomination_times}** before" - f", but their nomination was called off **{rejection_times}**." - f"\nList of all of their nomination threads: {nomination_vote_threads}" - f"\nThe last one ended {end_time} with the reason: {history[0].end_reason}" - ) - - return review + return f"They were nominated **{nomination_times}** before, but their nomination was called off **{rejection_times}**.\nList of all of their nomination threads: {nomination_vote_threads}\nThe last one ended {end_time} with the reason: {history[0].end_reason}" @staticmethod def _random_ducky(guild: Guild) -> Emoji | str: """Picks a random ducky emoji. If no duckies found returns 👀.""" duckies = [emoji for emoji in guild.emojis if emoji.name.startswith("ducky")] - if not duckies: - return "\N{EYES}" - return random.choice(duckies) + return "\N{EYES}" if not duckies else random.choice(duckies) @staticmethod async def _bulk_send(channel: TextChannel, text: str) -> list[Message]: diff --git a/bot/exts/utils/extensions.py b/bot/exts/utils/extensions.py index 57c62bfee5..c8e44f4dec 100644 --- a/bot/exts/utils/extensions.py +++ b/bot/exts/utils/extensions.py @@ -68,9 +68,7 @@ async def unload_command(self, ctx: Context, *extensions: Extension) -> None: await ctx.send_help(ctx.command) return - blacklisted = "\n".join(UNLOAD_BLACKLIST & set(extensions)) - - if blacklisted: + if blacklisted := "\n".join(UNLOAD_BLACKLIST & set(extensions)): await ctx.send(f":x: The following extension(s) may not be unloaded:```\n{blacklisted}```") else: if "*" in extensions or "**" in extensions: diff --git a/bot/exts/utils/internal.py b/bot/exts/utils/internal.py index ea27a5f503..02cc13ed02 100644 --- a/bot/exts/utils/internal.py +++ b/bot/exts/utils/internal.py @@ -50,9 +50,7 @@ def _format(self, inp: str, out: Any) -> tuple[str, discord.Embed | None]: res = "" # Erase temp input we made - if inp.startswith("_ = "): - inp = inp[4:] - + inp = inp.removeprefix("_ = ") # Get all non-empty lines lines = [line for line in inp.split("\n") if line.strip()] if len(lines) != 1: @@ -60,29 +58,7 @@ def _format(self, inp: str, out: Any) -> tuple[str, discord.Embed | None]: # Create the input dialog for i, line in enumerate(lines): - if i == 0: - # Start dialog - start = f"In [{self.ln}]: " - - else: - # Indent the 3 dots correctly; - # Normally, it's something like - # In [X]: - # ...: - # - # But if it's - # In [XX]: - # ...: - # - # You can see it doesn't look right. - # This code simply indents the dots - # far enough to align them. - # we first `str()` the line number - # then we get the length - # and use `str.rjust()` - # to indent it. - start = "...: ".rjust(len(str(self.ln)) + 7) - + start = f"In [{self.ln}]: " if i == 0 else "...: ".rjust(len(str(self.ln)) + 7) if i == len(lines) - 2: if line.startswith("return"): line = line[6:].strip() @@ -238,7 +214,7 @@ async def eval(self, ctx: Context, *, code: str) -> None: r"^(return|import|for|while|def|class|" r"from|exit|[a-zA-Z0-9]+\s*=)", code, re.M) and len( code.split("\n")) == 1: - code = "_ = " + code + code = f"_ = {code}" await self._eval(ctx, code) diff --git a/bot/exts/utils/reminders.py b/bot/exts/utils/reminders.py index 07e014dd09..7603e531c3 100644 --- a/bot/exts/utils/reminders.py +++ b/bot/exts/utils/reminders.py @@ -192,12 +192,9 @@ async def _edit_reminder(self, reminder_id: int, payload: dict) -> dict: Returns the edited reminder. """ - # Send the request to update the reminder in the database - reminder = await self.bot.api_client.patch( - "bot/reminders/" + str(reminder_id), - json=payload + return await self.bot.api_client.patch( + f"bot/reminders/{reminder_id}", json=payload ) - return reminder async def _reschedule_reminder(self, reminder: dict) -> None: """Reschedule a reminder object.""" @@ -349,9 +346,9 @@ async def new_reminder( # If `content` isn't provided then we try to get message content of a replied message if not content: content = await self.try_get_content_from_reply(ctx) - if not content: - # Couldn't get content from reply - return + if not content: + # Couldn't get content from reply + return # Now we can attempt to actually set the reminder. reminder = await self.bot.api_client.post( @@ -470,9 +467,9 @@ async def edit_reminder_content(self, ctx: Context, id_: int, *, content: str | """ if not content: content = await self.try_get_content_from_reply(ctx) - if not content: - # Message doesn't have a reply to get content from - return + if not content: + # Message doesn't have a reply to get content from + return await self.edit_reminder(ctx, id_, {"content": content}) @edit_reminder_group.command(name="mentions", aliases=("pings",)) @@ -579,11 +576,7 @@ async def _can_modify(self, ctx: Context, reminder_id: str | int, send_on_denial if await has_any_role_check(ctx, Roles.admins): log.debug(f"{ctx.author} is an admin, asking for confirmation to modify someone else's.") - if ctx.command == self.delete_reminder: - modify_action = "delete" - else: - modify_action = "edit" - + modify_action = "delete" if ctx.command == self.delete_reminder else "edit" confirmation_view = ModifyReminderConfirmationView(ctx.author) confirmation_message = await ctx.reply( f"Are you sure you want to {modify_action} <@{owner_id}>'s reminder?", diff --git a/bot/exts/utils/snekbox/_cog.py b/bot/exts/utils/snekbox/_cog.py index 8429914404..a7691510f6 100644 --- a/bot/exts/utils/snekbox/_cog.py +++ b/bot/exts/utils/snekbox/_cog.py @@ -115,7 +115,7 @@ async def convert(cls, ctx: Context, code: str) -> list[str]: codeblocks = [block.group("code") for block in blocks] info = "several code blocks" else: - match = match[0] if len(blocks) == 0 else blocks[0] + match = match[0] if not blocks else blocks[0] code, block, lang, delim = match.group("code", "block", "lang", "delim") codeblocks = [dedent(code)] if block: @@ -181,11 +181,7 @@ def build_python_version_switcher_view( ) -> interactions.ViewWithUserAndRoleCheck: """Return a view that allows the user to change what version of Python their code is run on.""" alt_python_version: SupportedPythonVersions - if current_python_version == "3.10": - alt_python_version = "3.11" - else: - alt_python_version = "3.10" # noqa: F841 - + alt_python_version = "3.11" if current_python_version == "3.10" else "3.10" view = interactions.ViewWithUserAndRoleCheck( allowed_users=(ctx.author.id,), allowed_roles=MODERATION_ROLES, @@ -228,12 +224,14 @@ def prepare_timeit_input(codeblocks: list[str]) -> list[str]: If there are multiple codeblocks, insert the first one into the wrapped setup code. """ - args = ["-m", "timeit"] setup_code = codeblocks.pop(0) if len(codeblocks) > 1 else "" code = "\n".join(codeblocks) - args.extend(["-s", TIMEIT_SETUP_WRAPPER.format(setup=setup_code), code]) - return args + return [ + "-m", + "timeit", + *["-s", TIMEIT_SETUP_WRAPPER.format(setup=setup_code), code], + ] async def format_output( self, @@ -320,9 +318,7 @@ async def send_job(self, ctx: Context, job: EvalJob) -> Message: async with ctx.typing(): result = await self.post_job(job) msg = result.get_message(job) - error = result.error_message - - if error: + if error := result.error_message: output, paste_link = error, None else: log.trace("Formatting output...") @@ -395,7 +391,7 @@ async def send_job(self, ctx: Context, job: EvalJob) -> Message: blocked.extend(self._filter_files(ctx, failed_files, blocked_exts).blocked) # Add notice if any files were blocked if blocked: - blocked_sorted = sorted(set(f.suffix for f in blocked)) + blocked_sorted = sorted({f.suffix for f in blocked}) # Only no extension if len(blocked_sorted) == 1 and blocked_sorted[0] == "": blocked_msg = "Files with no extension can't be uploaded." @@ -486,12 +482,10 @@ async def get_code(self, message: Message, command: Command) -> str | None: if new_ctx.command is command: log.trace(f"Message {message.id} invokes {command} command.") split = message.content.split(maxsplit=1) - code = split[1] if len(split) > 1 else None + return split[1] if len(split) > 1 else None else: log.trace(f"Message {message.id} does not invoke {command} command.") - code = message.content - - return code + return message.content async def run_job( self, diff --git a/bot/exts/utils/snekbox/_eval.py b/bot/exts/utils/snekbox/_eval.py index 9eb8039cec..c29aa6b9b3 100644 --- a/bot/exts/utils/snekbox/_eval.py +++ b/bot/exts/utils/snekbox/_eval.py @@ -77,10 +77,7 @@ def status_emoji(self) -> str: """Return an emoji corresponding to the status code or lack of output in result.""" if not self.has_output: return ":warning:" - if self.returncode == 0: # No error - return ":white_check_mark:" - # Exception - return ":x:" + return ":white_check_mark:" if self.returncode == 0 else ":x:" @property def error_message(self) -> str: @@ -130,7 +127,7 @@ def get_failed_files_str(self, char_max: int = 85) -> str: break if len(file) > char_max: - names.append(file[:char_max] + "...") + names.append(f"{file[:char_max]}...") break char_max -= len(file) names.append(file) diff --git a/bot/exts/utils/utils.py b/bot/exts/utils/utils.py index dd9daef1dd..b17666a074 100644 --- a/bot/exts/utils/utils.py +++ b/bot/exts/utils/utils.py @@ -66,10 +66,7 @@ async def charinfo(self, ctx: Context, *, characters: str) -> None: def get_info(char: str) -> tuple[str, str]: digit = f"{ord(char):x}" - if len(digit) <= 4: - u_code = f"\\u{digit:>04}" - else: - u_code = f"\\U{digit:>08}" + u_code = f"\\u{digit:>04}" if len(digit) <= 4 else f"\\U{digit:>08}" url = f"https://www.compart.com/en/unicode/U+{digit:>04}" name = f"[{unicodedata.name(char, '')}]({url})" info = f"`{u_code.ljust(10)}`: {name} - {utils.escape_markdown(char)}" diff --git a/bot/log.py b/bot/log.py index 8b18df70a0..8e00549fba 100644 --- a/bot/log.py +++ b/bot/log.py @@ -15,10 +15,7 @@ TRACE_LEVEL = 5 -if TYPE_CHECKING: - LoggerClass = Logger -else: - LoggerClass = logging.getLoggerClass() +LoggerClass = Logger if TYPE_CHECKING else logging.getLoggerClass() class CustomLogger(LoggerClass): @@ -118,8 +115,7 @@ def _set_trace_loggers() -> None: Otherwise if the env var begins with a "*", the root logger is set to the trace level and other contents are ignored. """ - level_filter = constants.Bot.trace_loggers - if level_filter: + if level_filter := constants.Bot.trace_loggers: if level_filter.startswith("*"): get_logger().setLevel(TRACE_LEVEL) diff --git a/bot/pagination.py b/bot/pagination.py index fd9dd8af65..11e30d24b7 100644 --- a/bot/pagination.py +++ b/bot/pagination.py @@ -105,9 +105,9 @@ def add_line(self, line: str = "", *, empty: bool = False) -> None: if len(line) > (max_chars := self.max_size - len(self.prefix) - 2): if len(line) > self.scale_to_size: line, remaining_words = self._split_remaining_words(line, max_chars) - if len(line) > self.scale_to_size: - log.debug("Could not continue to next page, truncating line.") - line = line[:self.scale_to_size] + if len(line) > self.scale_to_size: + log.debug("Could not continue to next page, truncating line.") + line = line[:self.scale_to_size] # Check if we should start a new page or continue the line on the current one if self.max_lines is not None and self._linecount >= self.max_lines: @@ -167,19 +167,18 @@ def _split_remaining_words(self, line: str, max_chars: int) -> tuple[str, str | is_full = False for word in line.split(" "): - if not is_full: - if len(word) + reduced_char_count <= max_chars: - reduced_words.append(word) - reduced_char_count += len(word) + 1 - else: - # If reduced_words is empty, we were unable to split the words across pages - if not reduced_words: - return line, None - is_full = True - remaining_words.append(word) - else: + if is_full: remaining_words.append(word) + elif len(word) + reduced_char_count <= max_chars: + reduced_words.append(word) + reduced_char_count += len(word) + 1 + else: + # If reduced_words is empty, we were unable to split the words across pages + if not reduced_words: + return line, None + is_full = True + remaining_words.append(word) return ( " ".join(reduced_words) + "..." if remaining_words else "", continuation_header + " ".join(remaining_words) if remaining_words else None diff --git a/bot/utils/time.py b/bot/utils/time.py index 78b46c0c45..0e3e2d8609 100644 --- a/bot/utils/time.py +++ b/bot/utils/time.py @@ -69,9 +69,7 @@ def _stringify_time_unit(value: int, unit: str) -> str: return "0 seconds" if value == 1: return f"{value} {unit[:-1]}" - if value == 0: - return f"less than a {unit[:-1]}" - return f"{value} {unit}" + return f"less than a {unit[:-1]}" if value == 0 else f"{value} {unit}" def discord_timestamp(timestamp: Timestamp, format: TimestampFormats = TimestampFormats.DATE_TIME) -> str: @@ -192,7 +190,7 @@ def humanize_delta( if args and kwargs: raise ValueError("Unsupported combination of positional and keyword arguments.") - if len(args) == 0: + if not args: delta = relativedelta(**kwargs) elif len(args) == 1 and isinstance(args[0], relativedelta): delta = args[0] @@ -234,13 +232,11 @@ def humanize_delta( time_strings[-1] = f"{time_strings[-2]} and {time_strings[-1]}" del time_strings[-2] - # If nothing has been found, just make the value 0 precision, e.g. `0 days`. - if not time_strings: - humanized = _stringify_time_unit(0, precision) - else: - humanized = ", ".join(time_strings) - - return humanized + return ( + _stringify_time_unit(0, precision) + if not time_strings + else ", ".join(time_strings) + ) def parse_duration_string(duration: str) -> relativedelta | None: @@ -265,9 +261,7 @@ def parse_duration_string(duration: str) -> relativedelta | None: return None duration_dict = {unit: int(amount) for unit, amount in match.groupdict(default=0).items()} - delta = relativedelta(**duration_dict) - - return delta + return relativedelta(**duration_dict) def relativedelta_to_timedelta(delta: relativedelta) -> datetime.timedelta: @@ -327,10 +321,7 @@ def until_expiration(expiry: Timestamp | None) -> str: return "Permanent" expiry = arrow.get(expiry) - if expiry < arrow.utcnow(): - return "Expired" - - return format_relative(expiry) + return "Expired" if expiry < arrow.utcnow() else format_relative(expiry) def unpack_duration( diff --git a/botstrap.py b/botstrap.py index 7a9d94d8b4..b1569cf4ee 100644 --- a/botstrap.py +++ b/botstrap.py @@ -237,12 +237,11 @@ def create_webhook(self, name: str, channel_id_: int) -> str: config_str += "\n#Webhooks\n" for webhook_name, webhook_model in Webhooks: - webhook = discord_client.webhook_exists(webhook_model.id) - if not webhook: + if webhook := discord_client.webhook_exists(webhook_model.id): + webhook_id = webhook_model.id + else: webhook_channel_id = int(all_channels[webhook_name]) webhook_id = discord_client.create_webhook(webhook_name, webhook_channel_id) - else: - webhook_id = webhook_model.id config_str += f"webhooks_{webhook_name}__id={webhook_id}\n" config_str += f"webhooks_{webhook_name}__channel={all_channels[webhook_name]}\n" diff --git a/tests/base.py b/tests/base.py index cad187b6ab..bc9cba19e7 100644 --- a/tests/base.py +++ b/tests/base.py @@ -32,7 +32,7 @@ class LoggingTestsMixin: """ @contextmanager - def assertNotLogs(self, logger=None, level=None, msg=None): # noqa: N802 + def assertNotLogs(self, logger=None, level=None, msg=None): # noqa: N802 """ Asserts that no logs of `level` and higher were emitted by `logger`. @@ -45,11 +45,7 @@ def assertNotLogs(self, logger=None, level=None, msg=None): # noqa: N802 if not isinstance(logger, logging.Logger): logger = get_logger(logger) - if level: - level = logging._nameToLevel.get(level, level) - else: - level = logging.INFO - + level = logging._nameToLevel.get(level, level) if level else logging.INFO handler = _CaptureLogHandler() old_handlers = logger.handlers[:] old_level = logger.level diff --git a/tests/bot/exts/filtering/test_extension_filter.py b/tests/bot/exts/filtering/test_extension_filter.py index f71de1e1ba..56d2f5fcd1 100644 --- a/tests/bot/exts/filtering/test_extension_filter.py +++ b/tests/bot/exts/filtering/test_extension_filter.py @@ -20,13 +20,19 @@ def setUp(self): """Sets up fresh objects for each test.""" self.filter_list = ExtensionsList(MagicMock()) now = arrow.utcnow().timestamp() - filters = [] self.whitelist = [".first", ".second", ".third"] - for i, filter_content in enumerate(self.whitelist, start=1): - filters.append({ - "id": i, "content": filter_content, "description": None, "settings": {}, - "additional_settings": {}, "created_at": now, "updated_at": now - }) + filters = [ + { + "id": i, + "content": filter_content, + "description": None, + "settings": {}, + "additional_settings": {}, + "created_at": now, + "updated_at": now, + } + for i, filter_content in enumerate(self.whitelist, start=1) + ] self.filter_list.add_list({ "id": 1, "list_type": 1, diff --git a/tests/bot/exts/info/test_information.py b/tests/bot/exts/info/test_information.py index e90291f62b..d01dc70087 100644 --- a/tests/bot/exts/info/test_information.py +++ b/tests/bot/exts/info/test_information.py @@ -367,21 +367,25 @@ async def test_create_user_embed_expanded_information_in_moderation_channels( nomination_counts.assert_called_once_with(user) self.assertEqual( - textwrap.dedent(f""" - Created: {""} + textwrap.dedent( + f""" + Created: Profile: {user.mention} ID: {user.id} - """).strip(), - embed.fields[0].value + """ + ).strip(), + embed.fields[0].value, ) self.assertEqual( - textwrap.dedent(f""" - Joined: {""} - Verified: {"True"} + textwrap.dedent( + ' + Joined: + Verified: True Roles: &Moderators - """).strip(), - embed.fields[1].value + ' + ).strip(), + embed.fields[1].value, ) @unittest.mock.patch(f"{COG_PATH}.basic_user_infraction_counts", new_callable=unittest.mock.AsyncMock) @@ -406,20 +410,24 @@ async def test_create_user_embed_basic_information_outside_of_moderation_channel infraction_counts.assert_called_once_with(user) self.assertEqual( - textwrap.dedent(f""" - Created: {""} + textwrap.dedent( + f""" + Created: Profile: {user.mention} ID: {user.id} - """).strip(), - embed.fields[0].value + """ + ).strip(), + embed.fields[0].value, ) self.assertEqual( - textwrap.dedent(f""" - Joined: {""} + textwrap.dedent( + ' + Joined: Roles: &Moderators - """).strip(), - embed.fields[1].value + ' + ).strip(), + embed.fields[1].value, ) self.assertEqual( @@ -618,7 +626,14 @@ async def test_return_none_if_one_rule_number_is_invalid(self): self.assertEqual( self.ctx.send.call_args, - unittest.mock.call(shorten(":x: Invalid rule indices: " + invalid, 75, placeholder=" ..."))) + unittest.mock.call( + shorten( + f":x: Invalid rule indices: {invalid}", + 75, + placeholder=" ...", + ) + ), + ) self.assertEqual(None, final_rule_numbers) async def test_return_correct_rule_numbers(self): diff --git a/tests/bot/exts/test_cogs.py b/tests/bot/exts/test_cogs.py index 99bc87120a..0c117b3e66 100644 --- a/tests/bot/exts/test_cogs.py +++ b/tests/bot/exts/test_cogs.py @@ -62,8 +62,7 @@ def get_all_commands(self) -> t.Iterator[commands.Command]: """Yield all commands for all cogs in all extensions.""" for module in self.walk_modules(): for cog in self.walk_cogs(module): - for cmd in self.walk_commands(cog): - yield cmd + yield from self.walk_commands(cog) def test_names_dont_shadow(self): """Names and aliases of commands should be unique.""" diff --git a/tests/bot/test_constants.py b/tests/bot/test_constants.py index 3492021cea..22c10fed5e 100644 --- a/tests/bot/test_constants.py +++ b/tests/bot/test_constants.py @@ -47,7 +47,7 @@ def test_section_configuration_matches_type_specification(self): if origin is typing.Union: is_instance = is_any_instance(value, annotation_args) - self.assertTrue(is_instance, failure_msg) else: is_instance = is_annotation_instance(value, annotation) - self.assertTrue(is_instance, failure_msg) + + self.assertTrue(is_instance, failure_msg)