/
client.py
1072 lines (877 loc) · 40.1 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import sys
from asyncio import ensure_future, get_event_loop, iscoroutinefunction
from functools import wraps
from importlib import import_module
from importlib.util import resolve_name
from inspect import getmembers
from logging import Logger
from types import ModuleType
from typing import Any, Callable, Coroutine, Dict, List, Optional, Union
from .api.cache import Cache
from .api.cache import Item as Build
from .api.error import InteractionException, JSONException
from .api.gateway import WebSocket
from .api.http import HTTPClient
from .api.models.flags import Intents
from .api.models.guild import Guild
from .api.models.misc import MISSING, Snowflake
from .api.models.team import Application
from .base import get_logger
from .decor import command
from .decor import component as _component
from .enums import ApplicationCommandType, OptionType
from .models.command import ApplicationCommand, Option
from .models.component import Button, Modal, SelectMenu
log: Logger = get_logger("client")
_token: str = "" # noqa
_cache: Optional[Cache] = None
class Client:
"""
A class representing the client connection to Discord's gateway and API via. WebSocket and HTTP.
:ivar AbstractEventLoop _loop: The asynchronous event loop of the client.
:ivar HTTPClient _http: The user-facing HTTP connection to the Web API, as its own separate client.
:ivar WebSocket _websocket: An object-orientation of a websocket server connection to the Gateway.
:ivar Intents _intents: The Gateway intents of the application. Defaults to ``Intents.DEFAULT``.
:ivar Optional[List[Tuple[int]]] _shard: The list of bucketed shards for the application's connection.
:ivar Optional[Presence] _presence: The RPC-like presence shown on an application once connected.
:ivar str _token: The token of the application used for authentication when connecting.
:ivar Optional[Dict[str, ModuleType]] _extensions: The "extensions" or cog equivalence registered to the main client.
:ivar Application me: The application representation of the client.
"""
def __init__(
self,
token: str,
**kwargs,
) -> None:
r"""
Establishes a client connection to the Web API and Gateway.
:param token: The token of the application for authentication and connection.
:type token: str
:param \**kwargs: Multiple key-word arguments able to be passed through.
:type \**kwargs: dict
"""
# Arguments
# ~~~~~~~~~
# token : str
# The token of the application for authentication and connection.
# intents? : Optional[Intents]
# Allows specific control of permissions the application has when connected.
# In order to use multiple intents, the | operator is recommended.
# Defaults to ``Intents.DEFAULT``.
# shards? : Optional[List[Tuple[int]]]
# Dictates and controls the shards that the application connects under.
# presence? : Optional[Presence]
# Sets an RPC-like presence on the application when connected to the Gateway.
# disable_sync? : Optional[bool]
# Controls whether synchronization in the user-facing API should be automatic or not.
self._loop = get_event_loop()
self._http = HTTPClient(token=token)
self._intents = kwargs.get("intents", Intents.DEFAULT)
self._websocket = WebSocket(intents=self._intents)
self._shard = kwargs.get("shards", [])
self._presence = kwargs.get("presence")
self._token = token
self._extensions = {}
self.me = None
_token = self._token # noqa: F841
_cache = self._http.cache # noqa: F841
if kwargs.get("disable_sync"):
self._automate_sync = False
log.warning(
"Automatic synchronization has been disabled. Interactions may need to be manually synchronized."
)
else:
self._automate_sync = True
data = self._loop.run_until_complete(self._http.get_current_bot_information())
self.me = Application(**data)
def start(self) -> None:
"""Starts the client session."""
self._loop.run_until_complete(self._ready())
def __register_events(self) -> None:
"""Registers all raw gateway events to the known events."""
self._websocket.dispatch.register(self.__raw_socket_create)
self._websocket.dispatch.register(self.__raw_channel_create, "on_channel_create")
self._websocket.dispatch.register(self.__raw_message_create, "on_message_create")
self._websocket.dispatch.register(self.__raw_guild_create, "on_guild_create")
async def __compare_sync(self, data: dict, pool: List[dict]) -> bool:
"""
Compares an application command during the synchronization process.
:param data: The application command to compare.
:type data: dict
:param pool: The "pool" or list of commands to compare from.
:type pool: List[dict]
:return: Whether the command has changed or not.
:rtype: bool
"""
attrs: List[str] = ["type", "name", "description", "options", "guild_id"]
log.info(f"Current attributes to compare: {', '.join(attrs)}.")
clean: bool = True
for command in pool:
if command["name"] == data["name"]:
for attr in attrs:
if hasattr(data, attr) and command.get(attr) == data.get(attr):
continue
else:
clean = False
return clean
async def __create_sync(self, data: dict) -> None:
"""
Creates an application command during the synchronization process.
:param data: The application command to create.
:type data: dict
"""
log.info(f"Creating command {data['name']}.")
command: ApplicationCommand = ApplicationCommand(
**(
await self._http.create_application_command(
application_id=self.me.id, data=data, guild_id=data.get("guild_id")
)
)
)
self._http.cache.interactions.add(Build(id=command.name, value=command))
async def __bulk_update_sync(self, data: List[dict], delete: Optional[bool] = False) -> None:
"""
Bulk updates a list of application commands during the synchronization process.
The theory behind this is that instead of sending individual ``PATCH``
requests to the Web API, we collect the commands needed and do a bulk
overwrite instead. This is to mitigate the amount of calls, and hopefully,
chances of hitting rate limits during the readying state.
:param data: The application commands to update.
:type data: List[dict]
:param delete?: Whether these commands are being deleted or not.
:type delete: Optional[bool]
"""
guild_commands: dict = {}
global_commands: List[dict] = []
for command in data:
if command.get("guild_id"):
if guild_commands.get(command["guild_id"]):
guild_commands[command["guild_id"]].append(command)
else:
guild_commands[command["guild_id"]] = [command]
else:
global_commands.append(command)
self._http.cache.interactions.add(
Build(id=command["name"], value=ApplicationCommand(**command))
)
for guild, commands in guild_commands.items():
log.info(
f"Guild commands {', '.join(command['name'] for command in commands)} under ID {guild} have been {'deleted' if delete else 'synced'}."
)
await self._http.overwrite_application_command(
application_id=self.me.id,
data=[] if delete else commands,
guild_id=guild,
)
if global_commands:
log.info(
f"Global commands {', '.join(command['name'] for command in global_commands)} have been {'deleted' if delete else 'synced'}."
)
await self._http.overwrite_application_command(
application_id=self.me.id, data=[] if delete else global_commands
)
async def _synchronize(self, payload: Optional[dict] = None) -> None:
"""
Synchronizes a command from the client-facing API to the Web API.
:ivar payload?: The application command to synchronize. Defaults to ``None`` where a global synchronization process begins.
:type payload: Optional[dict]
"""
cache: Optional[List[dict]] = self._http.cache.interactions.view
if cache:
log.info("A command cache was detected, using for synchronization instead.")
commands: List[dict] = cache
else:
log.info("No command cache was found present, retrieving from Web API instead.")
commands: Optional[Union[dict, List[dict]]] = await self._http.get_application_commands(
application_id=self.me.id, guild_id=payload.get("guild_id") if payload else None
)
# TODO: redo error handling.
if isinstance(commands, dict):
if commands.get("code"): # Error exists.
raise JSONException(commands["code"], message=commands["message"] + " |")
# TODO: redo error handling.
elif isinstance(commands, list):
for command in commands:
if command.get("code"):
# Error exists.
raise JSONException(command["code"], message=command["message"] + " |")
names: List[str] = (
[command["name"] for command in commands if command.get("name")] if commands else []
)
to_sync: list = []
to_delete: list = []
if payload:
log.info(f"Checking command {payload['name']}.")
if payload["name"] in names:
if not await self.__compare_sync(payload, commands):
to_sync.append(payload)
else:
await self.__create_sync(payload)
else:
for command in commands:
if command not in cache:
to_delete.append(command)
await self.__bulk_update_sync(to_sync)
await self.__bulk_update_sync(to_delete, delete=True)
async def _ready(self) -> None:
"""
Prepares the client with an internal "ready" check to ensure
that all conditions have been met in a chronological order:
.. code-block::
CLIENT START
|___ GATEWAY
| |___ READY
| |___ DISPATCH
|___ SYNCHRONIZE
| |___ CACHE
|___ DETECT DECORATOR
| |___ BUILD MODEL
| |___ SYNCHRONIZE
| |___ CALLBACK
LOOP
"""
ready: bool = False
try:
if self.me.flags is not None:
# This can be None.
if self._intents.GUILD_PRESENCES in self._intents and not (
self.me.flags.GATEWAY_PRESENCE in self.me.flags
or self.me.flags.GATEWAY_PRESENCE_LIMITED in self.me.flags
):
raise RuntimeError("Client not authorised for the GUILD_PRESENCES intent.")
if self._intents.GUILD_MEMBERS in self._intents and not (
self.me.flags.GATEWAY_GUILD_MEMBERS in self.me.flags
or self.me.flags.GATEWAY_GUILD_MEMBERS_LIMITED in self.me.flags
):
raise RuntimeError("Client not authorised for the GUILD_MEMBERS intent.")
if self._intents.GUILD_MESSAGES in self._intents and not (
self.me.flags.GATEWAY_MESSAGE_CONTENT in self.me.flags
or self.me.flags.GATEWAY_MESSAGE_CONTENT_LIMITED in self.me.flags
):
log.critical("Client not authorised for the MESSAGE_CONTENT intent.")
else:
# This is when a bot has no intents period.
if self._intents.value != Intents.DEFAULT.value:
raise RuntimeError("Client not authorised for any privileged intents.")
self.__register_events()
if self._automate_sync:
await self._synchronize()
ready = True
except Exception as error:
log.critical(f"Could not prepare the client: {error}")
finally:
if ready:
log.debug("Client is now ready.")
await self._login()
async def _login(self) -> None:
"""Makes a login with the Discord API."""
while not self._websocket.closed:
await self._websocket.connect(self._token, self._shard, self._presence)
def event(self, coro: Coroutine, name: Optional[str] = MISSING) -> Callable[..., Any]:
"""
A decorator for listening to events dispatched from the
Gateway.
:param coro: The coroutine of the event.
:type coro: Coroutine
:param name(?): The name of the event. If not given, this defaults to the coroutine's name.
:type name: Optional[str]
:return: A callable response.
:rtype: Callable[..., Any]
"""
self._websocket.dispatch.register(coro, name if name is not MISSING else coro.__name__)
return coro
def command(
self,
*,
type: Optional[Union[int, ApplicationCommandType]] = ApplicationCommandType.CHAT_INPUT,
name: Optional[str] = MISSING,
description: Optional[str] = MISSING,
scope: Optional[Union[int, Guild, List[int], List[Guild]]] = MISSING,
options: Optional[
Union[Dict[str, Any], List[Dict[str, Any]], Option, List[Option]]
] = MISSING,
default_permission: Optional[bool] = MISSING,
) -> Callable[..., Any]:
"""
A decorator for registering an application command to the Discord API,
as well as being able to listen for ``INTERACTION_CREATE`` dispatched
gateway events.
The structure of a chat-input command:
.. code-block:: python
@command(name="command-name", description="this is a command.")
async def command_name(ctx):
...
You are also able to establish it as a message or user command by simply passing
the ``type`` kwarg field into the decorator:
.. code-block:: python
@command(type=interactions.ApplicationCommandType.MESSAGE, name="Message Command")
async def message_command(ctx):
...
The ``scope`` kwarg field may also be used to designate the command in question
applicable to a guild or set of guilds.
:param type?: The type of application command. Defaults to :meth:`interactions.enums.ApplicationCommandType.CHAT_INPUT` or ``1``.
:type type: Optional[Union[str, int, ApplicationCommandType]]
:param name: The name of the application command. This *is* required but kept optional to follow kwarg rules.
:type name: Optional[str]
:param description?: The description of the application command. This should be left blank if you are not using ``CHAT_INPUT``.
:type description: Optional[str]
:param scope?: The "scope"/applicable guilds the application command applies to.
:type scope: Optional[Union[int, Guild, List[int], List[Guild]]]
:param options?: The "arguments"/options of an application command. This should be left blank if you are not using ``CHAT_INPUT``.
:type options: Optional[Union[Dict[str, Any], List[Dict[str, Any]], Option, List[Option]]]
:param default_permission?: The default permission of accessibility for the application command. Defaults to ``True``.
:type default_permission: Optional[bool]
:return: A callable response.
:rtype: Callable[..., Any]
"""
def decorator(coro: Coroutine) -> Callable[..., Any]:
if name is MISSING:
raise InteractionException(11, message="Your command must have a name.")
elif len(name) > 32:
raise InteractionException(
11, message="Command name must not be more than 32 characters."
)
elif len(description) > 100:
raise InteractionException(
11, message="Command description must be less than 100 characters"
)
for _ in name:
if _.isupper():
raise InteractionException(
11,
message="Your command name must not contain uppercase characters (Discord limitation)",
)
if type == ApplicationCommandType.CHAT_INPUT and description is MISSING:
raise InteractionException(
11, message="Chat-input commands must have a description."
)
if not len(coro.__code__.co_varnames):
raise InteractionException(
11, message="Your command needs at least one argument to return context."
)
if options is not MISSING:
if len(coro.__code__.co_varnames) + 1 < len(options):
raise InteractionException(
11,
message="You must have the same amount of arguments as the options of the command.",
)
if isinstance(options, List) and len(options) > 25:
raise InteractionException(
11, message="You cannot have more than 25 options in a command"
)
_option: Option
for _option in options:
if _option.type not in (
OptionType.SUB_COMMAND,
OptionType.SUB_COMMAND_GROUP,
):
if getattr(_option, "autocomplete", False) and getattr(
_option, "choices", False
):
raise InteractionException(
11,
message="Autocomplete may not be set to true if choices are present",
)
if not getattr(_option, "description", False):
raise InteractionException(
11,
message="A description is required for Options that are not sub-commands",
)
if len(_option.description) > 100:
raise InteractionException(
11,
message="Command option descriptions must be less than 100 characters",
)
if len(_option.name) > 32:
raise InteractionException(
11, message="Command option name must be less than 32 characters"
)
commands: List[ApplicationCommand] = command(
type=type,
name=name,
description=description,
scope=scope,
options=options,
default_permission=default_permission,
)
if self._automate_sync:
if self._loop.is_running():
[self._loop.create_task(self._synchronize(command)) for command in commands]
else:
[
self._loop.run_until_complete(self._synchronize(command))
for command in commands
]
return self.event(coro, name=f"command_{name}")
return decorator
def message_command(
self,
*,
name: str,
scope: Optional[Union[int, Guild, List[int], List[Guild]]] = MISSING,
default_permission: Optional[bool] = MISSING,
) -> Callable[..., Any]:
"""
A decorator for registering a message context menu to the Discord API,
as well as being able to listen for ``INTERACTION_CREATE`` dispatched
gateway events.
The structure of a message context menu:
.. code-block:: python
@message_command(name="Context menu name")
async def context_menu_name(ctx):
...
The ``scope`` kwarg field may also be used to designate the command in question
applicable to a guild or set of guilds.
:param name: The name of the application command.
:type name: Optional[str]
:param scope?: The "scope"/applicable guilds the application command applies to. Defaults to ``None``.
:type scope: Optional[Union[int, Guild, List[int], List[Guild]]]
:param default_permission?: The default permission of accessibility for the application command. Defaults to ``True``.
:type default_permission: Optional[bool]
:return: A callable response.
:rtype: Callable[..., Any]
"""
def decorator(coro: Coroutine) -> Callable[..., Any]:
if not len(coro.__code__.co_varnames):
raise InteractionException(
11,
message="Your command needs at least one argument to return context.",
)
commands: List[ApplicationCommand] = command(
type=ApplicationCommandType.MESSAGE,
name=name,
scope=scope,
default_permission=default_permission,
)
if self._automate_sync:
if self._loop.is_running():
[self._loop.create_task(self._synchronize(command)) for command in commands]
else:
[
self._loop.run_until_complete(self._synchronize(command))
for command in commands
]
return self.event(coro, name=f"command_{name}")
return decorator
def user_command(
self,
*,
name: str,
scope: Optional[Union[int, Guild, List[int], List[Guild]]] = MISSING,
default_permission: Optional[bool] = MISSING,
) -> Callable[..., Any]:
"""
A decorator for registering a user context menu to the Discord API,
as well as being able to listen for ``INTERACTION_CREATE`` dispatched
gateway events.
The structure of a user context menu:
.. code-block:: python
@user_command(name="Context menu name")
async def context_menu_name(ctx):
...
The ``scope`` kwarg field may also be used to designate the command in question
applicable to a guild or set of guilds.
:param name: The name of the application command.
:type name: Optional[str]
:param scope?: The "scope"/applicable guilds the application command applies to. Defaults to ``None``.
:type scope: Optional[Union[int, Guild, List[int], List[Guild]]]
:param default_permission?: The default permission of accessibility for the application command. Defaults to ``True``.
:type default_permission: Optional[bool]
:return: A callable response.
:rtype: Callable[..., Any]
"""
def decorator(coro: Coroutine) -> Callable[..., Any]:
if not len(coro.__code__.co_varnames):
raise InteractionException(
11,
message="Your command needs at least one argument to return context.",
)
commands: List[ApplicationCommand] = command(
type=ApplicationCommandType.USER,
name=name,
scope=scope,
default_permission=default_permission,
)
if self._automate_sync:
if self._loop.is_running():
[self._loop.create_task(self._synchronize(command)) for command in commands]
else:
[
self._loop.run_until_complete(self._synchronize(command))
for command in commands
]
return self.event(coro, name=f"command_{name}")
return decorator
def component(self, component: Union[str, Button, SelectMenu]) -> Callable[..., Any]:
"""
A decorator for listening to ``INTERACTION_CREATE`` dispatched gateway
events involving components.
The structure for a component callback:
.. code-block:: python
# Method 1
@component(interactions.Button(
style=interactions.ButtonStyle.PRIMARY,
label="click me!",
custom_id="click_me_button",
))
async def button_response(ctx):
...
# Method 2
@component("custom_id")
async def button_response(ctx):
...
The context of the component callback decorator inherits the same
as of the command decorator.
:param component: The component you wish to callback for.
:type component: Union[str, Button, SelectMenu]
:return: A callable response.
:rtype: Callable[..., Any]
"""
def decorator(coro: Coroutine) -> Any:
payload: str = (
_component(component).custom_id
if isinstance(component, (Button, SelectMenu))
else component
)
return self.event(coro, name=f"component_{payload}")
return decorator
def autocomplete(
self, name: str, command: Union[ApplicationCommand, int, str, Snowflake]
) -> Callable[..., Any]:
"""
A decorator for listening to ``INTERACTION_CREATE`` dispatched gateway
events involving autocompletion fields.
The structure for an autocomplete callback:
.. code-block:: python
@autocomplete("option_name")
async def autocomplete_choice_list(ctx, user_input: str = ""):
await ctx.populate([...])
:param name: The name of the option to autocomplete.
:type name: str
:param command: The command, command ID, or command name with the option.
:type command: Union[ApplicationCommand, int, str, Snowflake]
:return: A callable response.
:rtype: Callable[..., Any]
"""
if isinstance(command, ApplicationCommand):
_command: Union[Snowflake, int] = command.id
elif isinstance(command, str):
_command_obj = self.http.cache.interactions.get(command)
if not _command_obj:
_sync_task = ensure_future(self.synchronize(), loop=self.loop)
while not _sync_task.done():
pass # wait for sync to finish
_command_obj = self.http.cache.interactions.get(command)
if not _command_obj:
raise InteractionException(6, message="The command does not exist")
_command: Union[Snowflake, int] = int(_command_obj.id)
elif isinstance(command, int) or isinstance(command, Snowflake):
_command: Union[Snowflake, int] = int(command)
else:
raise ValueError(
"You can only insert strings, integers and ApplicationCommands here!"
) # TODO: move to custom error formatter
def decorator(coro: Coroutine) -> Any:
return self.event(coro, name=f"autocomplete_{_command}_{name}")
return decorator
def modal(self, modal: Modal) -> Callable[..., Any]:
"""
A decorator for listening to ``INTERACTION_CREATE`` dispatched gateway
events involving modals.
.. error::
This feature is currently under experimental/**beta access**
to those whitelisted for testing. Currently using this will
present you with an error with the modal not working.
The structure for a modal callback:
.. code-block:: python
@modal(interactions.Modal(
interactions.TextInput(
style=interactions.TextStyleType.PARAGRAPH,
custom_id="how_was_your_day_field",
label="How has your day been?",
placeholder="Well, so far...",
),
))
async def modal_response(ctx):
...
The context of the modal callback decorator inherits the same
as of the component decorator.
:param modal: The modal you wish to callback for.
:type modal: Modal
:return: A callable response.
:rtype: Callable[..., Any]
"""
def decorator(coro: Coroutine) -> Any:
return self.event(coro, name=f"modal_{modal.custom_id}")
return decorator
def load(
self, name: str, package: Optional[str] = None, *args, **kwargs
) -> Optional["Extension"]:
r"""
"Loads" an extension off of the current client by adding a new class
which is imported from the library.
:param name: The name of the extension.
:type name: str
:param package?: The package of the extension.
:type package: Optional[str]
:param \*args?: Optional arguments to pass to the extension
:type \**args: tuple
:param \**kwargs?: Optional keyword-only arguments to pass to the extension.
:type \**kwargs: dict
:return: The loaded extension.
:rtype: Optional[Extension]
"""
_name: str = resolve_name(name, package)
if _name in self._extensions:
log.error(f"Extension {name} has already been loaded. Skipping.")
return
module = import_module(
name, package
) # should be a module, because Extensions just need to be __init__-ed
try:
setup = getattr(module, "setup")
extension = setup(self, *args, **kwargs)
except Exception as error:
del sys.modules[name]
log.error(f"Could not load {name}: {error}. Skipping.")
raise error
else:
log.debug(f"Loaded extension {name}.")
self._extensions[_name] = module
return extension
def remove(self, name: str, package: Optional[str] = None) -> None:
"""
Removes an extension out of the current client from an import resolve.
:param name: The name of the extension.
:type name: str
:param package?: The package of the extension.
:type package: Optional[str]
"""
try:
_name: str = resolve_name(name, package)
except AttributeError:
_name = name
extension = self._extensions.get(_name)
if _name not in self._extensions:
log.error(f"Extension {name} has not been loaded before. Skipping.")
return
try:
extension.teardown() # made for Extension, usable by others
except AttributeError:
pass
if isinstance(extension, ModuleType): # loaded as a module
for ext_name, ext in getmembers(
extension, lambda x: isinstance(x, type) and issubclass(x, Extension)
):
self.remove(ext_name)
del sys.modules[_name]
del self._extensions[_name]
log.debug(f"Removed extension {name}.")
def reload(
self, name: str, package: Optional[str] = None, *args, **kwargs
) -> Optional["Extension"]:
r"""
"Reloads" an extension off of current client from an import resolve.
:param name: The name of the extension.
:type name: str
:param package?: The package of the extension.
:type package: Optional[str]
:param \*args?: Optional arguments to pass to the extension
:type \**args: tuple
:param \**kwargs?: Optional keyword-only arguments to pass to the extension.
:type \**kwargs: dict
:return: The reloaded extension.
:rtype: Optional[Extension]
"""
_name: str = resolve_name(name, package)
extension = self._extensions.get(_name)
if extension is None:
log.warning(f"Extension {name} could not be reloaded because it was never loaded.")
self.load(name, package)
return
self.remove(name, package)
return self.load(name, package, *args, **kwargs)
async def __raw_socket_create(self, data: Dict[Any, Any]) -> Dict[Any, Any]:
"""
This is an internal function that takes any gateway socket event
and then returns the data purely based off of what it does in
the client instantiation class.
:param data: The data that is returned
:type data: Dict[Any, Any]
:return: A dictionary of raw data.
:rtype: Dict[Any, Any]
"""
return data
async def __raw_channel_create(self, channel) -> dict:
"""
This is an internal function that caches the channel creates when dispatched.
:param channel: The channel object data in question.
:type channel: Channel
:return: The channel as a dictionary of raw data.
:rtype: dict
"""
self._http.cache.channels.add(Build(id=channel.id, value=channel))
return channel._json
async def __raw_message_create(self, message) -> dict:
"""
This is an internal function that caches the message creates when dispatched.
:param message: The message object data in question.
:type message: Message
:return: The message as a dictionary of raw data.
:rtype: dict
"""
self._http.cache.messages.add(Build(id=message.id, value=message))
return message._json
async def __raw_guild_create(self, guild) -> dict:
"""
This is an internal function that caches the guild creates on ready.
:param guild: The guild object data in question.
:type guild: Guild
:return: The guild as a dictionary of raw data.
:rtype: dict
"""
self._http.cache.self_guilds.add(Build(id=str(guild.id), value=guild))
return guild._json
# TODO: Implement the rest of cog behaviour when possible.
class Extension:
"""
A class that allows you to represent "extensions" of your code, or
essentially cogs that can be ran independent of the root file in
an object-oriented structure.
The structure of an extension:
.. code-block:: python
class CoolCode(interactions.Extension):
def __init__(self, client):
self.client = client
@command(
type=interactions.ApplicationCommandType.USER,
name="User command in cog",
)
async def cog_user_cmd(self, ctx):
...
def setup(client):
CoolCode(client)
"""
client: Client
def __new__(cls, client: Client, *args, **kwargs) -> "Extension":
self = super().__new__(cls)
self.client = client
self._commands = {}
self._listeners = {}
# This gets every coroutine in a way that we can easily change them
# cls
for name, func in getmembers(self, predicate=iscoroutinefunction):
# TODO we can make these all share the same list, might make it easier to load/unload
if hasattr(func, "__listener_name__"): # set by extension_listener
func = client.event(
func, name=func.__listener_name__
) # capture the return value for friendlier ext-ing
listeners = self._listeners.get(func.__listener_name__, [])
listeners.append(func)
self._listeners[func.__listener_name__] = listeners
if hasattr(func, "__command_data__"): # Set by extension_command
args, kwargs = func.__command_data__
func = client.command(*args, **kwargs)(func)
cmd_name = f"command_{kwargs.get('name') or func.__name__}"
commands = self._commands.get(cmd_name, [])
commands.append(func)
self._commands[cmd_name] = commands
if hasattr(func, "__component_data__"):
args, kwargs = func.__component_data__
func = client.component(*args, **kwargs)(func)
component = kwargs.get("component") or args[0]
comp_name = (
_component(component).custom_id
if isinstance(component, (Button, SelectMenu))
else component
)
comp_name = f"component_{comp_name}"
listeners = self._listeners.get(comp_name, [])
listeners.append(func)
self._listeners[comp_name] = listeners
if hasattr(func, "__autocomplete_data__"):
args, kwargs = func.__autocomplete_data__
func = client.autocomplete(*args, **kwargs)(func)
name = kwargs.get("name") or args[0]
_command = kwargs.get("command") or args[1]
_command: Union[Snowflake, int] = (
_command.id if isinstance(_command, ApplicationCommand) else _command
)
auto_name = f"autocomplete_{_command}_{name}"
listeners = self._listeners.get(auto_name, [])
listeners.append(func)
self._listeners[auto_name] = listeners
if hasattr(func, "__modal_data__"):
args, kwargs = func.__modal_data__
func = client.modal(*args, **kwargs)(func)
modal = kwargs.get("modal") or args[0]
modal_name = f"modal_{modal.custom_id}"
listeners = self._listeners.get(modal_name, [])
listeners.append(func)
self._listeners[modal_name] = listeners
client._extensions[cls.__name__] = self
return self
def teardown(self):
for event, funcs in self._listeners.items():
for func in funcs:
self.client._websocket.dispatch.events[event].remove(func)
for cmd, funcs in self._commands.items():
for func in funcs:
self.client._websocket.dispatch.events[cmd].remove(func)
clean_cmd_names = [cmd[7:] for cmd in self._commands.keys()]
cmds = filter(
lambda cmd_data: cmd_data["name"] in clean_cmd_names,
self.client._http.cache.interactions.view,
)
if self.client._automate_sync:
[