Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use new regex to properly match/parse channel topics. #3111

Merged
merged 3 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cogs/modmail.py
Original file line number Diff line number Diff line change
Expand Up @@ -1848,7 +1848,7 @@ async def repair(self, ctx):
and message.embeds[0].color.value == self.bot.main_color
and message.embeds[0].footer.text
):
user_id = match_user_id(message.embeds[0].footer.text)
user_id = match_user_id(message.embeds[0].footer.text, any_string=True)
other_recipients = match_other_recipients(ctx.channel.topic)
for n, uid in enumerate(other_recipients):
other_recipients[n] = self.bot.get_user(uid) or await self.bot.fetch_user(uid)
Expand Down
71 changes: 49 additions & 22 deletions core/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
from core.utils import (
is_image_url,
days,
parse_channel_topic,
match_title,
match_user_id,
match_other_recipients,
truncate,
get_top_hoisted_role,
create_thread_channel,
Expand Down Expand Up @@ -119,17 +119,16 @@ def cancelled(self, flag: bool):

@classmethod
async def from_channel(cls, manager: "ThreadManager", channel: discord.TextChannel) -> "Thread":
recipient_id = match_user_id(
channel.topic
) # there is a chance it grabs from another recipient's main thread
# there is a chance it grabs from another recipient's main thread
_, recipient_id, other_ids = parse_channel_topic(channel.topic)

if recipient_id in manager.cache:
thread = manager.cache[recipient_id]
else:
recipient = manager.bot.get_user(recipient_id) or await manager.bot.fetch_user(recipient_id)

other_recipients = []
for uid in match_other_recipients(channel.topic):
for uid in other_ids:
try:
other_recipient = manager.bot.get_user(uid) or await manager.bot.fetch_user(uid)
except discord.NotFound:
Expand Down Expand Up @@ -1136,10 +1135,16 @@ def get_notifications(self) -> str:
return " ".join(set(mentions))

async def set_title(self, title: str) -> None:
topic = f"Title: {title}\n"

user_id = match_user_id(self.channel.topic)
ids = ",".join(i.id for i in self._other_recipients)
topic += f"User ID: {user_id}"

if self._other_recipients:
ids = ",".join(str(i.id) for i in self._other_recipients)
topic += f"\nOther Recipients: {ids}"

await self.channel.edit(topic=f"Title: {title}\nUser ID: {user_id}\nOther Recipients: {ids}")
await self.channel.edit(topic=topic)

async def _update_users_genesis(self):
genesis_message = await self.get_genesis_message()
Expand All @@ -1162,24 +1167,37 @@ async def _update_users_genesis(self):
await genesis_message.edit(embed=embed)

async def add_users(self, users: typing.List[typing.Union[discord.Member, discord.User]]) -> None:
title = match_title(self.channel.topic)
user_id = match_user_id(self.channel.topic)
self._other_recipients += users
topic = ""
title, user_id, _ = parse_channel_topic(self.channel.topic)
if title is not None:
topic += f"Title: {title}\n"

topic += f"User ID: {user_id}"

self._other_recipients += users
ids = ",".join(str(i.id) for i in self._other_recipients)
await self.channel.edit(topic=f"Title: {title}\nUser ID: {user_id}\nOther Recipients: {ids}")

topic += f"\nOther Recipients: {ids}"

await self.channel.edit(topic=topic)
await self._update_users_genesis()

async def remove_users(self, users: typing.List[typing.Union[discord.Member, discord.User]]) -> None:
title = match_title(self.channel.topic)
user_id = match_user_id(self.channel.topic)
topic = ""
title, user_id, _ = parse_channel_topic(self.channel.topic)
if title is not None:
topic += f"Title: {title}\n"

topic += f"User ID: {user_id}"

for u in users:
self._other_recipients.remove(u)

ids = ",".join(str(i.id) for i in self._other_recipients)
await self.channel.edit(topic=f"Title: {title}\nUser ID: {user_id}\nOther Recipients: {ids}")
if self._other_recipients:
ids = ",".join(str(i.id) for i in self._other_recipients)
topic += f"\nOther Recipients: {ids}"

await self.channel.edit(topic=topic)
await self._update_users_genesis()


Expand Down Expand Up @@ -1240,16 +1258,24 @@ async def find(
await thread.close(closer=self.bot.user, silent=True, delete_channel=False)
thread = None
else:

def check(topic):
_, user_id, other_ids = parse_channel_topic(topic)
return recipient_id == user_id or recipient_id in other_ids

channel = discord.utils.find(
lambda x: str(recipient_id) in x.topic if x.topic else False,
lambda x: (check(x.topic)) if x.topic else False,
self.bot.modmail_guild.text_channels,
)

if channel:
thread = await Thread.from_channel(self, channel)
if thread.recipient:
# only save if data is valid
self.cache[recipient_id] = thread
# only save if data is valid.
# also the recipient_id here could belong to other recipient,
# it would be wrong if we set it as the dict key,
# so we use the thread id instead
self.cache[thread.id] = thread
thread.ready = True

if thread and recipient_id not in [x.id for x in thread.recipients]:
Expand All @@ -1265,10 +1291,11 @@ async def _find_from_channel(self, channel):
searching channel history for genesis embed and
extracts user_id from that.
"""
user_id = -1

if channel.topic:
user_id = match_user_id(channel.topic)
if not channel.topic:
return None

_, user_id, other_ids = parse_channel_topic(channel.topic)

if user_id == -1:
return None
Expand All @@ -1282,7 +1309,7 @@ async def _find_from_channel(self, channel):
recipient = None

other_recipients = []
for uid in match_other_recipients(channel.topic):
for uid in other_ids:
try:
other_recipient = self.bot.get_user(uid) or await self.bot.fetch_user(uid)
except discord.NotFound:
Expand Down
71 changes: 56 additions & 15 deletions core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
"human_join",
"days",
"cleanup_code",
"parse_channel_topic",
"match_title",
"match_user_id",
"match_other_recipients",
"create_thread_channel",
"create_not_found_embed",
"parse_alias",
"normalize_alias",
Expand Down Expand Up @@ -218,9 +220,45 @@ def cleanup_code(content: str) -> str:
return content.strip("` \n")


TOPIC_OTHER_RECIPIENTS_REGEX = re.compile(r"Other Recipients:\s*((?:\d{17,21},*)+)", flags=re.IGNORECASE)
TOPIC_TITLE_REGEX = re.compile(r"\bTitle: (.*)\n(?:User ID: )\b", flags=re.IGNORECASE | re.DOTALL)
TOPIC_UID_REGEX = re.compile(r"\bUser ID:\s*(\d{17,21})\b", flags=re.IGNORECASE)
TOPIC_REGEX = re.compile(
r"(?:\bTitle:\s*(?P<title>.*)\n)?"
r"\bUser ID:\s*(?P<user_id>\d{17,21})\b"
r"(?:\nOther Recipients:\s*(?P<other_ids>\d{17,21}(?:(?:\s*,\s*)\d{17,21})*)\b)?",
flags=re.IGNORECASE | re.DOTALL,
)
UID_REGEX = re.compile(r"\bUser ID:\s*(\d{17,21})\b", flags=re.IGNORECASE)


def parse_channel_topic(text: str) -> typing.Tuple[typing.Optional[str], int, typing.List[int]]:
"""
A helper to parse channel topics and respectivefully returns all the required values
at once.

Parameters
----------
text : str
The text of channel topic.

Returns
-------
Tuple[Optional[str], int, List[int]]
A tuple of title, user ID, and other recipients IDs.
"""
title, user_id, other_ids = None, -1, []
match = TOPIC_REGEX.search(text)
if match is not None:
groupdict = match.groupdict()
title = groupdict["title"]

# user ID string is the required one in regex, so if match is found
# the value of this won't be None
user_id = int(groupdict["user_id"])

oth_ids = groupdict["other_ids"]
if oth_ids:
other_ids = list(map(int, oth_ids.split(",")))

return title, user_id, other_ids


def match_title(text: str) -> str:
Expand All @@ -237,29 +275,35 @@ def match_title(text: str) -> str:
Optional[str]
The title if found.
"""
match = TOPIC_TITLE_REGEX.search(text)
if match is not None:
return match.group(1)
return parse_channel_topic(text)[0]


def match_user_id(text: str) -> int:
def match_user_id(text: str, any_string: bool = False) -> int:
"""
Matches a user ID in the format of "User ID: 12345".

Parameters
----------
text : str
The text of the user ID.
any_string: bool
Whether to search any string that matches the UID_REGEX, e.g. not from channel topic.
Defaults to False.

Returns
-------
int
The user ID if found. Otherwise, -1.
"""
match = TOPIC_UID_REGEX.search(text)
if match is not None:
return int(match.group(1))
return -1
user_id = -1
if any_string:
match = UID_REGEX.search(text)
if match is not None:
user_id = int(match.group(1))
else:
user_id = parse_channel_topic(text)[1]

return user_id


def match_other_recipients(text: str) -> typing.List[int]:
Expand All @@ -276,10 +320,7 @@ def match_other_recipients(text: str) -> typing.List[int]:
List[int]
The list of other recipients IDs.
"""
match = TOPIC_OTHER_RECIPIENTS_REGEX.search(text)
if match is not None:
return list(map(int, match.group(1).split(",")))
return []
return parse_channel_topic(text)[2]


def create_not_found_embed(word, possibilities, name, n=2, cutoff=0.6) -> discord.Embed:
Expand Down