Skip to content

Commit

Permalink
fix: 修改重连过程收到消息无法处理的问题 (#51)
Browse files Browse the repository at this point in the history
* fix: 修改重连过程收到消息无法处理的问题

* fix: 修改重连过程收到消息无法处理的问题

* fix: 修改重连过程收到消息无法处理的问题

Co-authored-by: veehou <veehou@tencent.com>
  • Loading branch information
traveler-vee and veehou committed Apr 24, 2022
1 parent 8ccbd9d commit 37ed810
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 42 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ repos:
rev: 21.12b0
hooks:
- id: black
args: [-l 120]
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v1.2.3
hooks:
Expand Down
19 changes: 5 additions & 14 deletions qqbot/core/network/ws_async/ws_async_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,13 @@ def start(self, websocket_ap, token=Token, intent=Intents):
if _check_session_limit(websocket_ap):
raise Exception("session limit exceeded")
# 根据session限制建立链接
session_interval = _cal_interval(
websocket_ap["session_start_limit"]["max_concurrency"]
)
session_interval = _cal_interval(websocket_ap["session_start_limit"]["max_concurrency"])
shards_count = websocket_ap["shards"]
logger.debug(
"session_interval: %s, shards: %s" % (session_interval, shards_count)
)
logger.debug("session_interval: %s, shards: %s" % (session_interval, shards_count))
# 根据限制建立分片的并发链接数
return self.init_session_pool(
intent, shards_count, token, websocket_ap, session_interval
)
return self.init_session_pool(intent, shards_count, token, websocket_ap, session_interval)

def init_session_pool(
self, intent, shards_count, token, websocket_ap, session_interval
):
def init_session_pool(self, intent, shards_count, token, websocket_ap, session_interval):

# 实例一个session_pool
self.session_pool = SessionPool(
Expand Down Expand Up @@ -106,7 +98,7 @@ def start_session(self, session_interval=5):
logger.info("[连接管理]服务强行停止!")
# cancel all tasks lingering

async def new_connect(self, session, time_interval):
async def new_connect(self, session):
"""
newConnect 启动一个新的连接,如果连接在监听过程中报错了,或者被远端关闭了链接,需要识别关闭的原因,能否继续 resume
如果能够 resume,则往 sessionChan 中放入带有 sessionID 的 session
Expand All @@ -116,7 +108,6 @@ async def new_connect(self, session, time_interval):
param session: session对象
"""
logger.info("[连接管理]新会话启动中...")
await asyncio.sleep(time_interval)

client = Client(session, self, _on_connected)
try:
Expand Down
19 changes: 6 additions & 13 deletions qqbot/core/network/ws_async/ws_async_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,14 @@ class SessionPool:
def __init__(self, max_async, session_manager, loop=None):
self.max_async = max_async
self.session_manager = session_manager
self.loop: asyncio.AbstractEventLoop = (
asyncio.get_event_loop() if loop is None else loop
)
self.loop: asyncio.AbstractEventLoop = asyncio.get_event_loop() if loop is None else loop
# session链接同时最大并发数
self.session_list = []

async def run(self, session_interval=5):
loop = self.loop

# 根据并发数同时建立多个future
# 后台有频率限制,根据间隔时间发起链接请求
index = 0
session_list = self.session_list
# 需要执行的链接列表,通过time_interval控制启动时间
Expand All @@ -34,23 +31,19 @@ async def run(self, session_interval=5):
while len(session_list) > 0:
logger.debug("session list circle run")
time_interval = session_interval * (index + 1)
logger.info(
"[连接池]最大并发连接数: %s, 启动会话数: %s" % (self.max_async, len(session_list))
)
logger.info("[连接池]最大并发连接数: %s, 启动会话数: %s" % (self.max_async, len(session_list)))
for i in range(self.max_async):
if len(session_list) == 0:
break
tasks.append(
asyncio.ensure_future(
self._runner(session_list.pop(i), time_interval), loop=loop
)
)
tasks.append(asyncio.ensure_future(self._runner(session_list.pop(i), time_interval), loop=loop))
index += self.max_async

await asyncio.wait(tasks)

async def _runner(self, session, time_interval):
await self.session_manager.new_connect(session, time_interval)
await self.session_manager.new_connect(session)
# 后台有频率限制,根据间隔时间发起链接请求
await asyncio.sleep(time_interval)

def add(self, session):
self.session_list.append(session)
Expand Down
20 changes: 5 additions & 15 deletions qqbot/core/network/ws_sync/ws_session_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,13 @@ def start(self, websocket_ap, token=Token, intent=Intents):
if _check_session_limit(websocket_ap):
raise Exception("session limit exceeded")
# 根据session限制建立链接
session_interval = _cal_interval(
websocket_ap["session_start_limit"]["max_concurrency"]
)
session_interval = _cal_interval(websocket_ap["session_start_limit"]["max_concurrency"])
shards_count = websocket_ap["shards"]
logger.debug(
"session_interval: %s" % session_interval, "shards: %s" % shards_count
)
logger.debug("session_interval: %s" % session_interval, "shards: %s" % shards_count)
# 根据限制建立分片的并发链接数
self.init_session_pool(
intent, shards_count, token, websocket_ap, session_interval
)
self.init_session_pool(intent, shards_count, token, websocket_ap, session_interval)

def init_session_pool(
self, intent, shards_count, token, websocket_ap, session_interval
):
def init_session_pool(self, intent, shards_count, token, websocket_ap, session_interval):

# 实例一个session_pool
self.session_pool = SessionPool(
Expand All @@ -89,9 +81,7 @@ def init_session_pool(

def start_session(self, session_interval=5):
self.session_pool.loop.set_exception_handler(_loop_exception_handler)
self.session_pool.loop.run_until_complete(
self.session_pool.run(session_interval)
)
self.session_pool.loop.run_until_complete(self.session_pool.run(session_interval))

def new_connect(self, session):
"""
Expand Down

0 comments on commit 37ed810

Please sign in to comment.