Skip to content

Commit

Permalink
fix: 修复重连失败的问题
Browse files Browse the repository at this point in the history
  • Loading branch information
veehou committed May 28, 2022
1 parent 6156e21 commit 62d7a07
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 36 deletions.
23 changes: 13 additions & 10 deletions botpy/client.py
Expand Up @@ -148,7 +148,7 @@ async def _bot_login(self, token: Token) -> None:
max_async=self._ws_ap["session_start_limit"]["max_concurrency"],
connect=self.bot_connect,
dispatch=self.dispatch,
loop=asyncio.get_event_loop(),
loop=self.loop,
api=self.api,
)

Expand Down Expand Up @@ -191,15 +191,18 @@ def _loop_exception_handler(_loop, context):
loop = self._connection.loop
loop.set_exception_handler(_loop_exception_handler)

try:
# 返回协程对象,交由开发者自行调控
if self.ret_coro:
return self._connection.run(session_interval)
else:
await self._connection.run(session_interval)
except KeyboardInterrupt:
_log.info("[botpy]服务强行停止!")
# cancel all tasks lingering
while not self._closed:
_log.debug("session loop check")
try:
# 返回协程对象,交由开发者自行调控
coroutine = self._connection.run(session_interval)
if self.ret_coro:
return coroutine
else:
await coroutine
except KeyboardInterrupt:
_log.info("[botpy]服务强行停止!")
# cancel all tasks lingering

async def bot_connect(self, session):
"""
Expand Down
14 changes: 13 additions & 1 deletion botpy/connection.py
Expand Up @@ -19,7 +19,14 @@ class ConnectionSession:
这里通过设置session_id=""空则任务session需要重连
"""

def __init__(self, max_async, connect: Callable, dispatch: Callable, loop=None, api: BotAPI = None):
def __init__(
self,
max_async,
connect: Callable,
dispatch: Callable,
loop=None,
api: BotAPI = None,
):
self.dispatch = dispatch
self.state = ConnectionState(dispatch, api)
self.parser: Dict[str, Callable[[gateway.WsContext, Any], None]] = self.state.parsers
Expand All @@ -31,6 +38,8 @@ def __init__(self, max_async, connect: Callable, dispatch: Callable, loop=None,
self._session_list: List[session.Session] = []

async def run(self, session_interval=5):
if len(self._session_list) == 0:
return
# 根据并发数同时建立多个future
index = 0
session_list = self._session_list
Expand Down Expand Up @@ -163,3 +172,6 @@ def parse_at_message_create(self, ctx: gateway.WsContext, data: gateway.MessageP

def parse_ready(self, ctx: gateway.WsContext, data: gateway.ReadyEvent):
self._dispatch("ready")

def parse_resumed(self, ctx: gateway.WsContext, data: gateway.ReadyEvent):
self._dispatch("resumed")
55 changes: 30 additions & 25 deletions botpy/gateway.py
Expand Up @@ -51,7 +51,7 @@ async def on_error(self, exception: BaseException):
_log.error("on_error: websocket connection: %s, exception : %s" % (self._conn, exception))
traceback.print_exc()

async def on_close(self, close_status_code, close_msg):
async def on_closed(self, close_status_code, close_msg):
_log.info("[botpy]关闭, 返回码: %s" % close_status_code + ", 返回信息: %s" % close_msg)
# 这种不能重新链接
if close_status_code in self._INVALID_RECONNECT_CODE or not self._can_reconnect:
Expand All @@ -60,7 +60,6 @@ async def on_close(self, close_status_code, close_msg):
self._session["last_seq"] = 0
# 断连后启动一个新的链接并透传当前的session,不使用内部重连的方式,避免死循环
self._connection.add(self._session)
asyncio.ensure_future(self._connection.run())

async def on_message(self, ws, message):
_log.debug("[botpy]接收消息: %s" % message)
Expand All @@ -69,20 +68,29 @@ async def on_message(self, ws, message):
if await self._is_system_event(msg, ws):
return

if "t" in msg.keys() and msg["t"] == "READY":
_log.info("[botpy]鉴权成功")
event_seq = msg["s"]
if event_seq > 0:
self._session["last_seq"] = event_seq
event = msg.get("t")
opcode = msg.get("op")
event_seq = msg["s"]
if event_seq > 0:
self._session["last_seq"] = event_seq

if event == "READY":
# 心跳检查
self._connection.loop.create_task(self._send_heart(interval=30))
ready = await self._ready_handler(msg)
_log.info(f"[botpy]机器人「{ready['user']['username']}」 启动成功!")
_log.info(f"[botpy]机器人「{ready['user']['username']}」启动成功!")

if event == "RESUMED":
# 心跳检查
self._connection.loop.create_task(self._send_heart(interval=30))
_log.info("[botpy]机器人重连成功✅")

if "t" in msg.keys():
if event and opcode == self.WS_DISPATCH_EVENT:
event = msg["t"].lower()
try:
func = self._parser[event]
except KeyError:
_log.debug("_parser unknown event %s.", event)
_log.error("_parser unknown event %s.", event)
else:
ctx = gateway.WsContext(id=msg.get("id", ""))
func(ctx, msg.get("d"))
Expand All @@ -92,11 +100,9 @@ async def on_connected(self, ws: ClientWebSocketResponse):
if self._conn is None:
raise Exception("websocket connection failed ")
if self._session["session_id"] != "":
await self.resume()
await self.ws_resume()
else:
await self.identify()
# 心跳检查
asyncio.ensure_future(self._send_heart(interval=30))
await self.ws_identify()

async def ws_connect(self):
"""
Expand All @@ -118,12 +124,12 @@ async def ws_connect(self):
elif msg.type == aiohttp.WSMsgType.ERROR:
await self.on_error(ws_conn.exception())
elif msg.type == aiohttp.WSMsgType.CLOSED or msg.type == aiohttp.WSMsgType.CLOSE:
await self.on_close(ws_conn.close_code, msg.extra)
await self.on_closed(ws_conn.close_code, msg.extra)
if ws_conn.closed:
_log.debug("ws is closed, stop circle receive msg")
break

async def identify(self):
async def ws_identify(self):
"""websocket鉴权"""
if self._session["intent"] == 0:
self._session["intent"] = 1
Expand Down Expand Up @@ -157,7 +163,7 @@ async def send_msg(self, event_json):
else:
await self._conn.send_str(data=send_msg)

async def resume(self):
async def ws_resume(self):
"""
websocket重连
"""
Expand Down Expand Up @@ -209,7 +215,7 @@ async def _send_heart(self, interval):
心跳包
:param interval: 间隔时间
"""
_log.info("[botpy]心跳检测启动...")
_log.info("[botpy]心跳维持启动...")
while True:
payload = {
"op": self.WS_HEARTBEAT,
Expand All @@ -219,10 +225,9 @@ async def _send_heart(self, interval):
if self._conn is None:
_log.debug("[botpy]连接已关闭!")
return
else:
if self._conn.closed:
_log.debug("[botpy]ws连接已关闭, 心跳检测停止,ws对象: %s" % self._conn)
return
else:
await asyncio.sleep(interval)
await self.send_msg(JsonUtil.dict2json(payload))
if self._conn.closed:
_log.debug("[botpy]ws连接已关闭, 心跳检测停止,ws对象: %s" % self._conn)
return

await asyncio.sleep(interval)
await self.send_msg(JsonUtil.dict2json(payload))

0 comments on commit 62d7a07

Please sign in to comment.