/
filter_lists.py
273 lines (228 loc) · 10.7 KB
/
filter_lists.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
from typing import Optional
from discord import Colour, Embed
from discord.ext.commands import BadArgument, Cog, Context, IDConverter, group, has_any_role
from bot import constants
from bot.api import ResponseCodeError
from bot.bot import Bot
from bot.converters import ValidDiscordServerInvite, ValidFilterListType
from bot.log import get_logger
from bot.pagination import LinePaginator
from bot.utils import scheduling
log = get_logger(__name__)
class FilterLists(Cog):
"""Commands for blacklisting and whitelisting things."""
methods_with_filterlist_types = [
"allow_add",
"allow_delete",
"allow_get",
"deny_add",
"deny_delete",
"deny_get",
]
def __init__(self, bot: Bot) -> None:
self.bot = bot
scheduling.create_task(self._amend_docstrings(), event_loop=self.bot.loop)
async def _amend_docstrings(self) -> None:
"""Add the valid FilterList types to the docstrings, so they'll appear in !help invocations."""
await self.bot.wait_until_guild_available()
# Add valid filterlist types to the docstrings
valid_types = await ValidFilterListType.get_valid_types(self.bot)
valid_types = [f"`{type_.lower()}`" for type_ in valid_types]
for method_name in self.methods_with_filterlist_types:
command = getattr(self, method_name)
command.help = (
f"{command.help}\n\nValid **list_type** values are {', '.join(valid_types)}."
)
async def _add_data(
self,
ctx: Context,
allowed: bool,
list_type: ValidFilterListType,
content: str,
comment: Optional[str] = None,
) -> None:
"""Add an item to a filterlist."""
allow_type = "whitelist" if allowed else "blacklist"
# If this is a server invite, we gotta validate it.
if list_type == "GUILD_INVITE":
guild_data = await self._validate_guild_invite(ctx, content)
content = guild_data.get("id")
# Unless the user has specified another comment, let's
# use the server name as the comment so that the list
# of guild IDs will be more easily readable when we
# display it.
if not comment:
comment = guild_data.get("name")
# If it's a file format, let's make sure it has a leading dot.
elif list_type == "FILE_FORMAT" and not content.startswith("."):
content = f".{content}"
# Try to add the item to the database
log.trace(f"Trying to add the {content} item to the {list_type} {allow_type}")
payload = {
"allowed": allowed,
"type": list_type,
"content": content,
"comment": comment,
}
try:
item = await self.bot.api_client.post(
"bot/filter-lists",
json=payload
)
except ResponseCodeError as e:
if e.status == 400:
await ctx.message.add_reaction("❌")
log.debug(
f"{ctx.author} tried to add data to a {allow_type}, but the API returned 400, "
"probably because the request violated the UniqueConstraint."
)
raise BadArgument(
f"Unable to add the item to the {allow_type}. "
"The item probably already exists. Keep in mind that a "
"blacklist and a whitelist for the same item cannot co-exist, "
"and we do not permit any duplicates."
)
raise
# Insert the item into the cache
self.bot.insert_item_into_filter_list_cache(item)
await ctx.message.add_reaction("✅")
async def _delete_data(self, ctx: Context, allowed: bool, list_type: ValidFilterListType, content: str) -> None:
"""Remove an item from a filterlist."""
allow_type = "whitelist" if allowed else "blacklist"
# If this is a server invite, we need to convert it.
if list_type == "GUILD_INVITE" and not IDConverter()._get_id_match(content):
guild_data = await self._validate_guild_invite(ctx, content)
content = guild_data.get("id")
# If it's a file format, let's make sure it has a leading dot.
elif list_type == "FILE_FORMAT" and not content.startswith("."):
content = f".{content}"
# Find the content and delete it.
log.trace(f"Trying to delete the {content} item from the {list_type} {allow_type}")
item = self.bot.filter_list_cache[f"{list_type}.{allowed}"].get(content)
if item is not None:
try:
await self.bot.api_client.delete(
f"bot/filter-lists/{item['id']}"
)
del self.bot.filter_list_cache[f"{list_type}.{allowed}"][content]
await ctx.message.add_reaction("✅")
except ResponseCodeError as e:
log.debug(
f"{ctx.author} tried to delete an item with the id {item['id']}, but "
f"the API raised an unexpected error: {e}"
)
await ctx.message.add_reaction("❌")
else:
await ctx.message.add_reaction("❌")
async def _list_all_data(self, ctx: Context, allowed: bool, list_type: ValidFilterListType) -> None:
"""Paginate and display all items in a filterlist."""
allow_type = "whitelist" if allowed else "blacklist"
result = self.bot.filter_list_cache[f"{list_type}.{allowed}"]
# Build a list of lines we want to show in the paginator
lines = []
for content, metadata in result.items():
line = f"• `{content}`"
if comment := metadata.get("comment"):
line += f" - {comment}"
lines.append(line)
lines = sorted(lines)
# Build the embed
list_type_plural = list_type.lower().replace("_", " ").title() + "s"
embed = Embed(
title=f"{allow_type.title()}ed {list_type_plural} ({len(result)} total)",
colour=Colour.blue()
)
log.trace(f"Trying to list {len(result)} items from the {list_type.lower()} {allow_type}")
if result:
await LinePaginator.paginate(lines, ctx, embed, max_lines=15, empty=False)
else:
embed.description = "Hmmm, seems like there's nothing here yet."
await ctx.send(embed=embed)
await ctx.message.add_reaction("❌")
async def _sync_data(self, ctx: Context) -> None:
"""Syncs the filterlists with the API."""
try:
log.trace("Attempting to sync FilterList cache with data from the API.")
await self.bot.cache_filter_list_data()
await ctx.message.add_reaction("✅")
except ResponseCodeError as e:
log.debug(
f"{ctx.author} tried to sync FilterList cache data but "
f"the API raised an unexpected error: {e}"
)
await ctx.message.add_reaction("❌")
@staticmethod
async def _validate_guild_invite(ctx: Context, invite: str) -> dict:
"""
Validates a guild invite, and returns the guild info as a dict.
Will raise a BadArgument if the guild invite is invalid.
"""
log.trace(f"Attempting to validate whether or not {invite} is a guild invite.")
validator = ValidDiscordServerInvite()
guild_data = await validator.convert(ctx, invite)
# If we make it this far without raising a BadArgument, the invite is
# valid. Let's return a dict of guild information.
log.trace(f"{invite} validated as server invite. Converting to ID.")
return guild_data
@group(aliases=("allowlist", "allow", "al", "wl"))
async def whitelist(self, ctx: Context) -> None:
"""Group for whitelisting commands."""
if not ctx.invoked_subcommand:
await ctx.send_help(ctx.command)
@group(aliases=("denylist", "deny", "bl", "dl"))
async def blacklist(self, ctx: Context) -> None:
"""Group for blacklisting commands."""
if not ctx.invoked_subcommand:
await ctx.send_help(ctx.command)
@whitelist.command(name="add", aliases=("a", "set"))
async def allow_add(
self,
ctx: Context,
list_type: ValidFilterListType,
content: str,
*,
comment: Optional[str] = None,
) -> None:
"""Add an item to the specified allowlist."""
await self._add_data(ctx, True, list_type, content, comment)
@blacklist.command(name="add", aliases=("a", "set"))
async def deny_add(
self,
ctx: Context,
list_type: ValidFilterListType,
content: str,
*,
comment: Optional[str] = None,
) -> None:
"""Add an item to the specified denylist."""
await self._add_data(ctx, False, list_type, content, comment)
@whitelist.command(name="remove", aliases=("delete", "rm",))
async def allow_delete(self, ctx: Context, list_type: ValidFilterListType, content: str) -> None:
"""Remove an item from the specified allowlist."""
await self._delete_data(ctx, True, list_type, content)
@blacklist.command(name="remove", aliases=("delete", "rm",))
async def deny_delete(self, ctx: Context, list_type: ValidFilterListType, content: str) -> None:
"""Remove an item from the specified denylist."""
await self._delete_data(ctx, False, list_type, content)
@whitelist.command(name="get", aliases=("list", "ls", "fetch", "show"))
async def allow_get(self, ctx: Context, list_type: ValidFilterListType) -> None:
"""Get the contents of a specified allowlist."""
await self._list_all_data(ctx, True, list_type)
@blacklist.command(name="get", aliases=("list", "ls", "fetch", "show"))
async def deny_get(self, ctx: Context, list_type: ValidFilterListType) -> None:
"""Get the contents of a specified denylist."""
await self._list_all_data(ctx, False, list_type)
@whitelist.command(name="sync", aliases=("s",))
async def allow_sync(self, ctx: Context) -> None:
"""Syncs both allowlists and denylists with the API."""
await self._sync_data(ctx)
@blacklist.command(name="sync", aliases=("s",))
async def deny_sync(self, ctx: Context) -> None:
"""Syncs both allowlists and denylists with the API."""
await self._sync_data(ctx)
async def cog_check(self, ctx: Context) -> bool:
"""Only allow moderators to invoke the commands in this cog."""
return await has_any_role(*constants.MODERATION_ROLES).predicate(ctx)
def setup(bot: Bot) -> None:
"""Load the FilterLists cog."""
bot.add_cog(FilterLists(bot))