diff --git a/cogs/modmail.py b/cogs/modmail.py index 4a974b26cc..e3c19d8182 100644 --- a/cogs/modmail.py +++ b/cogs/modmail.py @@ -1848,7 +1848,7 @@ async def repair(self, ctx): and message.embeds[0].color.value == self.bot.main_color and message.embeds[0].footer.text ): - user_id = match_user_id(message.embeds[0].footer.text) + user_id = match_user_id(message.embeds[0].footer.text, any_string=True) other_recipients = match_other_recipients(ctx.channel.topic) for n, uid in enumerate(other_recipients): other_recipients[n] = self.bot.get_user(uid) or await self.bot.fetch_user(uid) diff --git a/core/thread.py b/core/thread.py index 6bc7adc6cb..2a2a243ba1 100644 --- a/core/thread.py +++ b/core/thread.py @@ -17,9 +17,9 @@ from core.utils import ( is_image_url, days, + parse_channel_topic, match_title, match_user_id, - match_other_recipients, truncate, get_top_hoisted_role, create_thread_channel, @@ -119,9 +119,8 @@ def cancelled(self, flag: bool): @classmethod async def from_channel(cls, manager: "ThreadManager", channel: discord.TextChannel) -> "Thread": - recipient_id = match_user_id( - channel.topic - ) # there is a chance it grabs from another recipient's main thread + # there is a chance it grabs from another recipient's main thread + _, recipient_id, other_ids = parse_channel_topic(channel.topic) if recipient_id in manager.cache: thread = manager.cache[recipient_id] @@ -129,7 +128,7 @@ async def from_channel(cls, manager: "ThreadManager", channel: discord.TextChann recipient = manager.bot.get_user(recipient_id) or await manager.bot.fetch_user(recipient_id) other_recipients = [] - for uid in match_other_recipients(channel.topic): + for uid in other_ids: try: other_recipient = manager.bot.get_user(uid) or await manager.bot.fetch_user(uid) except discord.NotFound: @@ -1136,10 +1135,16 @@ def get_notifications(self) -> str: return " ".join(set(mentions)) async def set_title(self, title: str) -> None: + topic = f"Title: {title}\n" + user_id = match_user_id(self.channel.topic) - ids = ",".join(i.id for i in self._other_recipients) + topic += f"User ID: {user_id}" + + if self._other_recipients: + ids = ",".join(str(i.id) for i in self._other_recipients) + topic += f"\nOther Recipients: {ids}" - await self.channel.edit(topic=f"Title: {title}\nUser ID: {user_id}\nOther Recipients: {ids}") + await self.channel.edit(topic=topic) async def _update_users_genesis(self): genesis_message = await self.get_genesis_message() @@ -1162,24 +1167,37 @@ async def _update_users_genesis(self): await genesis_message.edit(embed=embed) async def add_users(self, users: typing.List[typing.Union[discord.Member, discord.User]]) -> None: - title = match_title(self.channel.topic) - user_id = match_user_id(self.channel.topic) - self._other_recipients += users + topic = "" + title, user_id, _ = parse_channel_topic(self.channel.topic) + if title is not None: + topic += f"Title: {title}\n" + topic += f"User ID: {user_id}" + + self._other_recipients += users ids = ",".join(str(i.id) for i in self._other_recipients) - await self.channel.edit(topic=f"Title: {title}\nUser ID: {user_id}\nOther Recipients: {ids}") + topic += f"\nOther Recipients: {ids}" + + await self.channel.edit(topic=topic) await self._update_users_genesis() async def remove_users(self, users: typing.List[typing.Union[discord.Member, discord.User]]) -> None: - title = match_title(self.channel.topic) - user_id = match_user_id(self.channel.topic) + topic = "" + title, user_id, _ = parse_channel_topic(self.channel.topic) + if title is not None: + topic += f"Title: {title}\n" + + topic += f"User ID: {user_id}" + for u in users: self._other_recipients.remove(u) - ids = ",".join(str(i.id) for i in self._other_recipients) - await self.channel.edit(topic=f"Title: {title}\nUser ID: {user_id}\nOther Recipients: {ids}") + if self._other_recipients: + ids = ",".join(str(i.id) for i in self._other_recipients) + topic += f"\nOther Recipients: {ids}" + await self.channel.edit(topic=topic) await self._update_users_genesis() @@ -1240,16 +1258,24 @@ async def find( await thread.close(closer=self.bot.user, silent=True, delete_channel=False) thread = None else: + + def check(topic): + _, user_id, other_ids = parse_channel_topic(topic) + return recipient_id == user_id or recipient_id in other_ids + channel = discord.utils.find( - lambda x: str(recipient_id) in x.topic if x.topic else False, + lambda x: (check(x.topic)) if x.topic else False, self.bot.modmail_guild.text_channels, ) if channel: thread = await Thread.from_channel(self, channel) if thread.recipient: - # only save if data is valid - self.cache[recipient_id] = thread + # only save if data is valid. + # also the recipient_id here could belong to other recipient, + # it would be wrong if we set it as the dict key, + # so we use the thread id instead + self.cache[thread.id] = thread thread.ready = True if thread and recipient_id not in [x.id for x in thread.recipients]: @@ -1265,10 +1291,11 @@ async def _find_from_channel(self, channel): searching channel history for genesis embed and extracts user_id from that. """ - user_id = -1 - if channel.topic: - user_id = match_user_id(channel.topic) + if not channel.topic: + return None + + _, user_id, other_ids = parse_channel_topic(channel.topic) if user_id == -1: return None @@ -1282,7 +1309,7 @@ async def _find_from_channel(self, channel): recipient = None other_recipients = [] - for uid in match_other_recipients(channel.topic): + for uid in other_ids: try: other_recipient = self.bot.get_user(uid) or await self.bot.fetch_user(uid) except discord.NotFound: diff --git a/core/utils.py b/core/utils.py index 0fa74e457a..9f99333cca 100644 --- a/core/utils.py +++ b/core/utils.py @@ -20,9 +20,11 @@ "human_join", "days", "cleanup_code", + "parse_channel_topic", "match_title", "match_user_id", "match_other_recipients", + "create_thread_channel", "create_not_found_embed", "parse_alias", "normalize_alias", @@ -218,9 +220,45 @@ def cleanup_code(content: str) -> str: return content.strip("` \n") -TOPIC_OTHER_RECIPIENTS_REGEX = re.compile(r"Other Recipients:\s*((?:\d{17,21},*)+)", flags=re.IGNORECASE) -TOPIC_TITLE_REGEX = re.compile(r"\bTitle: (.*)\n(?:User ID: )\b", flags=re.IGNORECASE | re.DOTALL) -TOPIC_UID_REGEX = re.compile(r"\bUser ID:\s*(\d{17,21})\b", flags=re.IGNORECASE) +TOPIC_REGEX = re.compile( + r"(?:\bTitle:\s*(?P.*)\n)?" + r"\bUser ID:\s*(?P<user_id>\d{17,21})\b" + r"(?:\nOther Recipients:\s*(?P<other_ids>\d{17,21}(?:(?:\s*,\s*)\d{17,21})*)\b)?", + flags=re.IGNORECASE | re.DOTALL, +) +UID_REGEX = re.compile(r"\bUser ID:\s*(\d{17,21})\b", flags=re.IGNORECASE) + + +def parse_channel_topic(text: str) -> typing.Tuple[typing.Optional[str], int, typing.List[int]]: + """ + A helper to parse channel topics and respectivefully returns all the required values + at once. + + Parameters + ---------- + text : str + The text of channel topic. + + Returns + ------- + Tuple[Optional[str], int, List[int]] + A tuple of title, user ID, and other recipients IDs. + """ + title, user_id, other_ids = None, -1, [] + match = TOPIC_REGEX.search(text) + if match is not None: + groupdict = match.groupdict() + title = groupdict["title"] + + # user ID string is the required one in regex, so if match is found + # the value of this won't be None + user_id = int(groupdict["user_id"]) + + oth_ids = groupdict["other_ids"] + if oth_ids: + other_ids = list(map(int, oth_ids.split(","))) + + return title, user_id, other_ids def match_title(text: str) -> str: @@ -237,12 +275,10 @@ def match_title(text: str) -> str: Optional[str] The title if found. """ - match = TOPIC_TITLE_REGEX.search(text) - if match is not None: - return match.group(1) + return parse_channel_topic(text)[0] -def match_user_id(text: str) -> int: +def match_user_id(text: str, any_string: bool = False) -> int: """ Matches a user ID in the format of "User ID: 12345". @@ -250,16 +286,24 @@ def match_user_id(text: str) -> int: ---------- text : str The text of the user ID. + any_string: bool + Whether to search any string that matches the UID_REGEX, e.g. not from channel topic. + Defaults to False. Returns ------- int The user ID if found. Otherwise, -1. """ - match = TOPIC_UID_REGEX.search(text) - if match is not None: - return int(match.group(1)) - return -1 + user_id = -1 + if any_string: + match = UID_REGEX.search(text) + if match is not None: + user_id = int(match.group(1)) + else: + user_id = parse_channel_topic(text)[1] + + return user_id def match_other_recipients(text: str) -> typing.List[int]: @@ -276,10 +320,7 @@ def match_other_recipients(text: str) -> typing.List[int]: List[int] The list of other recipients IDs. """ - match = TOPIC_OTHER_RECIPIENTS_REGEX.search(text) - if match is not None: - return list(map(int, match.group(1).split(","))) - return [] + return parse_channel_topic(text)[2] def create_not_found_embed(word, possibilities, name, n=2, cutoff=0.6) -> discord.Embed: