-
-
Notifications
You must be signed in to change notification settings - Fork 27
/
bot.py
220 lines (180 loc) · 7.17 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
"""OneBot v11 机器人定义。
FrontMatter:
sidebar_position: 3
description: onebot.v11.bot 模块
"""
import re
from typing import Any, Union, Callable
from nonebot.typing import overrides
from nonebot.message import handle_event
from nonebot.adapters import Bot as BaseBot
from .utils import log, escape
from .message import Message, MessageSegment
from .event import Event, Reply, MessageEvent
async def _check_reply(bot: "Bot", event: MessageEvent) -> None:
"""检查消息中存在的回复,去除并赋值 `event.reply`, `event.to_me`。
参数:
bot: Bot 对象
event: MessageEvent 对象
"""
try:
index = list(map(lambda x: x.type == "reply", event.message)).index(True)
except ValueError:
return
msg_seg = event.message[index]
try:
event.reply = Reply.parse_obj(await bot.get_msg(message_id=msg_seg.data["id"]))
except Exception as e:
log("WARNING", f"Error when getting message reply info: {repr(e)}", e)
return
# ensure string comparation
if str(event.reply.sender.user_id) == str(event.self_id):
event.to_me = True
del event.message[index]
if len(event.message) > index and event.message[index].type == "at":
del event.message[index]
if len(event.message) > index and event.message[index].type == "text":
event.message[index].data["text"] = event.message[index].data["text"].lstrip()
if not event.message[index].data["text"]:
del event.message[index]
if not event.message:
event.message.append(MessageSegment.text(""))
def _check_at_me(bot: "Bot", event: MessageEvent) -> None:
"""检查消息开头或结尾是否存在 @机器人,去除并赋值 `event.to_me`。
参数:
bot: Bot 对象
event: MessageEvent 对象
"""
if not isinstance(event, MessageEvent):
return
# ensure message not empty
if not event.message:
event.message.append(MessageSegment.text(""))
if event.message_type == "private":
event.to_me = True
else:
def _is_at_me_seg(segment: MessageSegment):
return segment.type == "at" and str(segment.data.get("qq", "")) == str(
event.self_id
)
# check the first segment
if _is_at_me_seg(event.message[0]):
event.to_me = True
event.message.pop(0)
if event.message and event.message[0].type == "text":
event.message[0].data["text"] = event.message[0].data["text"].lstrip()
if not event.message[0].data["text"]:
del event.message[0]
if event.message and _is_at_me_seg(event.message[0]):
event.message.pop(0)
if event.message and event.message[0].type == "text":
event.message[0].data["text"] = (
event.message[0].data["text"].lstrip()
)
if not event.message[0].data["text"]:
del event.message[0]
if not event.to_me:
# check the last segment
i = -1
last_msg_seg = event.message[i]
if (
last_msg_seg.type == "text"
and not last_msg_seg.data["text"].strip()
and len(event.message) >= 2
):
i -= 1
last_msg_seg = event.message[i]
if _is_at_me_seg(last_msg_seg):
event.to_me = True
del event.message[i:]
if not event.message:
event.message.append(MessageSegment.text(""))
def _check_nickname(bot: "Bot", event: MessageEvent) -> None:
"""检查消息开头是否存在昵称,去除并赋值 `event.to_me`。
参数:
bot: Bot 对象
event: MessageEvent 对象
"""
first_msg_seg = event.message[0]
if first_msg_seg.type != "text":
return
first_text = first_msg_seg.data["text"]
nicknames = set(filter(lambda n: n, bot.config.nickname))
if nicknames:
# check if the user is calling me with my nickname
nickname_regex = "|".join(nicknames)
m = re.search(rf"^({nickname_regex})([\s,,]*|$)", first_text, re.IGNORECASE)
if m:
nickname = m.group(1)
log("DEBUG", f"User is calling me {nickname}")
event.to_me = True
first_msg_seg.data["text"] = first_text[m.end() :]
async def send(
bot: "Bot",
event: Event,
message: Union[str, Message, MessageSegment],
at_sender: bool = False,
reply_message: bool = False,
**params: Any, # extra options passed to send_msg API
) -> Any:
"""默认回复消息处理函数。"""
event_dict = event.dict()
if "message_id" not in event_dict:
reply_message = False # if no message_id, force disable reply_message
if "user_id" in event_dict: # copy the user_id to the API params if exists
params.setdefault("user_id", event_dict["user_id"])
else:
at_sender = False # if no user_id, force disable at_sender
if "group_id" in event_dict: # copy the group_id to the API params if exists
params.setdefault("group_id", event_dict["group_id"])
if "message_type" not in params: # guess the message_type
if "group_id" in params:
params["message_type"] = "group"
elif "user_id" in params:
params["message_type"] = "private"
else:
raise ValueError("Cannot guess message type to reply!")
full_message = Message() # create a new message with at sender segment
if reply_message:
full_message += MessageSegment.reply(event_dict["message_id"])
if at_sender and params["message_type"] != "private":
full_message += MessageSegment.at(params["user_id"]) + " "
full_message += message
params.setdefault("message", full_message)
return await bot.send_msg(**params)
class Bot(BaseBot):
"""
OneBot v11 协议 Bot 适配。
"""
send_handler: Callable[
["Bot", Event, Union[str, Message, MessageSegment]], Any
] = send
async def handle_event(self, event: Event) -> None:
"""处理收到的事件。"""
if isinstance(event, MessageEvent):
await _check_reply(self, event)
_check_at_me(self, event)
_check_nickname(self, event)
await handle_event(self, event)
@overrides(BaseBot)
async def send(
self,
event: Event,
message: Union[str, Message, MessageSegment],
**kwargs: Any,
) -> Any:
"""根据 `event` 向触发事件的主体回复消息。
参数:
event: Event 对象
message: 要发送的消息
at_sender (bool): 是否 @ 事件主体
reply_message (bool): 是否回复事件消息
kwargs: 其他参数,可以与 {ref}`nonebot.adapters.onebot.v11.adapter.Adapter.custom_send` 配合使用
返回:
API 调用返回数据
异常:
ValueError: 缺少 `user_id`, `group_id`
NetworkError: 网络错误
ActionFailed: API 调用失败
"""
return await self.__class__.send_handler(self, event, message, **kwargs)