/
builtin_functions.py
116 lines (98 loc) · 3.51 KB
/
builtin_functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import getpass
import os
import socket
import sys
from textwrap import dedent
from typing import Optional
import datetime as dt
from tgpy.hooks import Hook, HookType, delete_hook_file, get_sorted_hooks
from tgpy.message_design import get_code
from tgpy.run_code import variables
from tgpy.run_code.utils import Context, filename_prefix, save_function_to_variables
from tgpy.utils import run_cmd, get_version, installed_as_package, RunCmdException
variables['ctx'] = ctx = Context()
@save_function_to_variables
def ping():
return f'Pong!\n' \
f'Running on {getpass.getuser()}@{socket.gethostname()}\n' \
f'Version: {get_version()}'
@save_function_to_variables
def restart(msg: Optional[str] = 'Restarted successfully'):
hook_code = dedent(f'''
from tgpy.message_design import edit_message, get_code
msg = await client.get_messages({ctx.msg.chat_id}, ids={ctx.msg.id})
await edit_message(msg, get_code(msg), '{msg}')
''')
hook = Hook(
name='__restart_message',
type=HookType.onstart,
once=True,
save_locals=False,
code=hook_code,
origin=f'{filename_prefix}restart_message',
datetime=dt.datetime.fromtimestamp(0),
)
hook.save()
os.execl(sys.executable, sys.executable, '-m', 'tgpy', *sys.argv[1:])
@save_function_to_variables
def update():
old_version = get_version()
if installed_as_package():
update_args = [sys.executable, '-m', 'pip', 'install', '-U', 'tgpy']
try:
run_cmd(update_args)
except RunCmdException:
run_cmd(update_args + ['--user'])
else:
run_cmd(['git', 'pull'])
new_version = get_version()
if old_version == new_version:
return 'Already up to date'
else:
restart(f'Updated successfully! Current version: {new_version}')
class HooksObject:
async def add(self, name: str, code: Optional[str] = None) -> str:
if code is None:
original = await ctx.msg.get_reply_message()
if original is None:
return 'Use this function in reply to a message'
code = get_code(original)
origin = f'{filename_prefix}message/{original.chat_id}/{original.id}'
else:
origin = f'{filename_prefix}message/{ctx.msg.chat_id}/{ctx.msg.id}'
hook = Hook(
name=name,
type=HookType.onstart,
once=False,
save_locals=True,
code=code,
origin=origin,
datetime=dt.datetime.now(),
)
hook.save()
return dedent(f'''
Added hook {name!r}.
The hook will be executed every time TGPy starts.
''')
def remove(self, name) -> str:
try:
delete_hook_file(name)
except FileNotFoundError:
return f'No hook named {name!r}.'
return f'Removed hook {name!r}.'
def __str__(self):
lst = '\n'.join(f'{idx + 1}. {hook.name}' for idx, hook in enumerate(get_sorted_hooks()))
if not lst:
return dedent('''
You have no hooks.
Learn about hooks at https://tgpy.tmat.me/hooks.
''')
return dedent('''
Your hooks:
{}
Change hooks with `hooks.add(name)` and `hooks.remove(name)`.
Learn more at https://tgpy.tmat.me/hooks.
''').format(lst)
def __iter__(self):
return (hook.name for hook in get_sorted_hooks())
variables['hooks'] = hooks = HooksObject()