diff --git a/channel/chat_channel.py b/channel/chat_channel.py index 89dfe145a..1a7fa30ee 100644 --- a/channel/chat_channel.py +++ b/channel/chat_channel.py @@ -144,14 +144,14 @@ def _compose_context(self, ctype: ContextType, content, **kwargs): context.type = ContextType.TEXT context.content = content.strip() if ( - "desire_rtype" not in context + context["desire_rtype"] == None and conf().get("always_reply_voice") and ReplyType.VOICE not in self.NOT_SUPPORT_REPLYTYPE ): context["desire_rtype"] = ReplyType.VOICE elif context.type == ContextType.VOICE: if ( - "desire_rtype" not in context + context["desire_rtype"] == None and conf().get("voice_reply_voice") and ReplyType.VOICE not in self.NOT_SUPPORT_REPLYTYPE ): diff --git a/channel/wechatmp/active_reply.py b/channel/wechatmp/active_reply.py index 2df3f234a..c700ca3a4 100644 --- a/channel/wechatmp/active_reply.py +++ b/channel/wechatmp/active_reply.py @@ -5,6 +5,7 @@ from channel.wechatmp.wechatmp_message import parse_xml from channel.wechatmp.passive_reply_message import TextMsg from bridge.context import * +from bridge.reply import ReplyType from channel.wechatmp.common import * from channel.wechatmp.wechatmp_channel import WechatMPChannel from common.log import logger @@ -29,7 +30,7 @@ def POST(self): # or wechatmp_msg.msg_type == "image" ): from_user = wechatmp_msg.from_user_id - message = wechatmp_msg.content.decode("utf-8") + message = wechatmp_msg.content message_id = wechatmp_msg.msg_id logger.info( @@ -41,8 +42,9 @@ def POST(self): message, ) ) + rtype = ReplyType.VOICE if wechatmp_msg.msg_type == "voice" else None context = channel._compose_context( - ContextType.TEXT, message, isgroup=False, msg=wechatmp_msg + ContextType.TEXT, message, isgroup=False, desire_rtype=rtype, msg=wechatmp_msg ) if context: # set private openai_api_key diff --git a/channel/wechatmp/passive_reply.py b/channel/wechatmp/passive_reply.py index c5decf489..f489270c4 100644 --- a/channel/wechatmp/passive_reply.py +++ b/channel/wechatmp/passive_reply.py @@ -1,10 +1,12 @@ import time +import asyncio import web from channel.wechatmp.wechatmp_message import parse_xml -from channel.wechatmp.passive_reply_message import TextMsg +from channel.wechatmp.passive_reply_message import TextMsg, VoiceMsg, ImageMsg from bridge.context import * +from bridge.reply import ReplyType from channel.wechatmp.common import * from channel.wechatmp.wechatmp_channel import WechatMPChannel from common.log import logger @@ -26,7 +28,7 @@ def POST(self): if wechatmp_msg.msg_type == "text" or wechatmp_msg.msg_type == "voice": from_user = wechatmp_msg.from_user_id to_user = wechatmp_msg.to_user_id - message = wechatmp_msg.content.decode("utf-8") + message = wechatmp_msg.content message_id = wechatmp_msg.msg_id supported = True @@ -41,8 +43,9 @@ def POST(self): and message_id not in channel.request_cnt # insert the godcmd ): # The first query begin + rtype = ReplyType.VOICE if wechatmp_msg.msg_type == "voice" else None context = channel._compose_context( - ContextType.TEXT, message, isgroup=False, msg=wechatmp_msg + ContextType.TEXT, message, isgroup=False, desire_rtype=rtype, msg=wechatmp_msg ) logger.debug( "[wechatmp] context: {} {}".format(context, wechatmp_msg) @@ -115,10 +118,10 @@ def POST(self): else: # request_cnt == 3: # return timeout message reply_text = "【正在思考中,回复任意文字尝试获取回复】" - # replyPost = reply.TextMsg(from_user, to_user, reply_text).send() - # return replyPost + replyPost = TextMsg(from_user, to_user, reply_text).send() + return replyPost - # reply or reply_text is ready + # reply is ready channel.request_cnt.pop(message_id) # no return because of bandwords or other reasons @@ -128,14 +131,13 @@ def POST(self): ): return "success" - # reply is ready - if from_user in channel.cache_dict: - # Only one message thread can access to the cached data - try: - content = channel.cache_dict.pop(from_user) - except KeyError: - return "success" + # Only one request can access to the cached data + try: + (reply_type, content) = channel.cache_dict.pop(from_user) + except KeyError: + return "success" + if (reply_type == "text"): if len(content.encode("utf8")) <= MAX_UTF8_LEN: reply_text = content else: @@ -146,19 +148,31 @@ def POST(self): max_split=1, ) reply_text = splits[0] + continue_text - channel.cache_dict[from_user] = splits[1] - - logger.info( - "[wechatmp] Request {} do send to {} {}: {}\n{}".format( - request_cnt, - from_user, - message_id, - message, - reply_text, + channel.cache_dict[from_user] = ("text", splits[1]) + + logger.info( + "[wechatmp] Request {} do send to {} {}: {}\n{}".format( + request_cnt, + from_user, + message_id, + message, + reply_text, + ) ) - ) - replyPost = TextMsg(from_user, to_user, reply_text).send() - return replyPost + replyPost = TextMsg(from_user, to_user, reply_text).send() + return replyPost + + elif (reply_type == "voice"): + media_id = content + asyncio.run_coroutine_threadsafe(channel.delete_media(media_id), channel.delete_media_loop) + replyPost = VoiceMsg(from_user, to_user, media_id).send() + return replyPost + + elif (reply_type == "image"): + media_id = content + asyncio.run_coroutine_threadsafe(channel.delete_media(media_id), channel.delete_media_loop) + replyPost = ImageMsg(from_user, to_user, media_id).send() + return replyPost elif wechatmp_msg.msg_type == "event": logger.info( diff --git a/channel/wechatmp/wechatmp_channel.py b/channel/wechatmp/wechatmp_channel.py index 74e6e25a0..1c83149a6 100644 --- a/channel/wechatmp/wechatmp_channel.py +++ b/channel/wechatmp/wechatmp_channel.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- import io +import os +import time import imghdr import requests from bridge.context import * @@ -11,6 +13,9 @@ from common.singleton import singleton from config import conf +import asyncio +from threading import Thread + import web # If using SSL, uncomment the following lines, and modify the certificate path. # from cheroot.server import HTTPServer @@ -25,19 +30,20 @@ class WechatMPChannel(ChatChannel): def __init__(self, passive_reply=True): super().__init__() self.passive_reply = passive_reply - self.flag = 0 - + self.NOT_SUPPORT_REPLYTYPE = [] + self.client = WechatMPClient() if self.passive_reply: - self.NOT_SUPPORT_REPLYTYPE = [ReplyType.IMAGE, ReplyType.VOICE] # Cache the reply to the user's first message self.cache_dict = dict() # Record whether the current message is being processed self.running = set() # Count the request from wechat official server by message_id self.request_cnt = dict() - else: - self.NOT_SUPPORT_REPLYTYPE = [] - self.client = WechatMPClient() + # The permanent media need to be deleted to avoid media number limit + self.delete_media_loop = asyncio.new_event_loop() + t = Thread(target=self.start_loop, args=(self.delete_media_loop,)) + t.setDaemon(True) + t.start() def startup(self): @@ -49,18 +55,63 @@ def startup(self): port = conf().get("wechatmp_port", 8080) web.httpserver.runsimple(app.wsgifunc(), ("0.0.0.0", port)) + def start_loop(self, loop): + asyncio.set_event_loop(loop) + loop.run_forever() + + async def delete_media(self, media_id): + logger.info("[wechatmp] media {} will be deleted in 10s".format(media_id)) + await asyncio.sleep(10) + self.client.delete_permanent_media(media_id) + logger.info("[wechatmp] media {} has been deleted".format(media_id)) def send(self, reply: Reply, context: Context): receiver = context["receiver"] if self.passive_reply: - logger.info("[wechatmp] reply to {} cached:\n{}".format(receiver, reply)) - self.cache_dict[receiver] = reply.content + if reply.type == ReplyType.TEXT or reply.type == ReplyType.INFO or reply.type == ReplyType.ERROR: + reply_text = reply.content + logger.info("[wechatmp] reply to {} cached:\n{}".format(receiver, reply_text)) + self.cache_dict[receiver] = ("text", reply_text) + elif reply.type == ReplyType.VOICE: + voice_file_path = reply.content + logger.info("[wechatmp] voice file path {}".format(voice_file_path)) + with open(voice_file_path, 'rb') as f: + filename = receiver + "-" + context["msg"].msg_id + ".mp3" + media_id = self.client.upload_permanent_media("voice", (filename, f, "audio/mpeg")) + # 根据文件大小估计一个微信自动审核的时间,审核结束前返回将会导致语音无法播放,这个估计有待验证 + f_size = os.fstat(f.fileno()).st_size + print(f_size) + time.sleep(1.0 + 2 * f_size / 1024 / 1024) + logger.info("[wechatmp] voice reply to {} uploaded: {}".format(receiver, media_id)) + self.cache_dict[receiver] = ("voice", media_id) + elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片 + img_url = reply.content + pic_res = requests.get(img_url, stream=True) + print(pic_res.headers) + image_storage = io.BytesIO() + for block in pic_res.iter_content(1024): + image_storage.write(block) + image_storage.seek(0) + image_type = imghdr.what(image_storage) + filename = receiver + "-" + context["msg"].msg_id + "." + image_type + content_type = "image/" + image_type + media_id = self.client.upload_permanent_media("image", (filename, image_storage, content_type)) + logger.info("[wechatmp] image reply to {} uploaded: {}".format(receiver, media_id)) + self.cache_dict[receiver] = ("image", media_id) + elif reply.type == ReplyType.IMAGE: # 从文件读取图片 + image_storage = reply.content + image_storage.seek(0) + image_type = imghdr.what(image_storage) + filename = receiver + "-" + context["msg"].msg_id + "." + image_type + content_type = "image/" + image_type + media_id = self.client.upload_permanent_media("image", (filename, image_storage, content_type)) + logger.info("[wechatmp] image reply to {} uploaded: {}".format(receiver, media_id)) + self.cache_dict[receiver] = ("image", media_id) else: if reply.type == ReplyType.TEXT or reply.type == ReplyType.INFO or reply.type == ReplyType.ERROR: reply_text = reply.content self.client.send_text(receiver, reply_text) logger.info("[wechatmp] Do send to {}: {}".format(receiver, reply_text)) - elif reply.type == ReplyType.VOICE: voice_file_path = reply.content logger.info("[wechatmp] voice file path {}".format(voice_file_path)) @@ -69,7 +120,6 @@ def send(self, reply: Reply, context: Context): media_id = self.client.upload_media("voice", (filename, f, "audio/mpeg")) self.client.send_voice(receiver, media_id) logger.info("[wechatmp] Do send voice to {}".format(receiver)) - elif reply.type == ReplyType.IMAGE_URL: # 从网络下载图片 img_url = reply.content pic_res = requests.get(img_url, stream=True) @@ -85,7 +135,6 @@ def send(self, reply: Reply, context: Context): media_id = self.client.upload_media("image", (filename, image_storage, content_type)) self.client.send_image(receiver, media_id) logger.info("[wechatmp] sendImage url={}, receiver={}".format(img_url, receiver)) - elif reply.type == ReplyType.IMAGE: # 从文件读取图片 image_storage = reply.content image_storage.seek(0) @@ -95,7 +144,6 @@ def send(self, reply: Reply, context: Context): media_id = self.client.upload_media("image", (filename, image_storage, content_type)) self.client.send_image(receiver, media_id) logger.info("[wechatmp] sendImage, receiver={}".format(receiver)) - return def _success_callback(self, session_id, context, **kwargs): # 线程异常结束时的回调函数 diff --git a/channel/wechatmp/wechatmp_client.py b/channel/wechatmp/wechatmp_client.py index 7a2cd022c..321726d22 100644 --- a/channel/wechatmp/wechatmp_client.py +++ b/channel/wechatmp/wechatmp_client.py @@ -23,6 +23,8 @@ def wechatmp_request(self, method, url, **kwargs): r.encoding = "utf-8" ret = r.json() if "errcode" in ret and ret["errcode"] != 0: + if ret["errcode"] == 45009: + self.clear_quota_v2() raise WeChatAPIException("{}".format(ret)) return ret @@ -123,3 +125,54 @@ def upload_media(self, media_type, media_file): files=files ) return ret["media_id"] + + + def upload_permanent_media(self, media_type, media_file): + url="https://api.weixin.qq.com/cgi-bin/material/add_material" + params={ + "access_token": self.get_access_token(), + "type": media_type + } + files={"media": media_file} + logger.info("[wechatmp] media {} uploaded".format(media_file)) + ret = self.wechatmp_request( + method="post", + url=url, + params=params, + files=files + ) + return ret["media_id"] + + + def delete_permanent_media(self, media_id): + url="https://api.weixin.qq.com/cgi-bin/material/del_material" + params={ + "access_token": self.get_access_token() + } + logger.info("[wechatmp] media {} deleted".format(media_id)) + self.wechatmp_request( + method="post", + url=url, + params=params, + data={"media_id": media_id} + ) + + def clear_quota(self): + url="https://api.weixin.qq.com/cgi-bin/clear_quota" + params = { + "access_token": self.get_access_token() + } + self.wechatmp_request( + method="post", + url=url, + params=params, + data={"appid": self.app_id} + ) + + def clear_quota_v2(self): + url="https://api.weixin.qq.com/cgi-bin/clear_quota/v2" + self.wechatmp_request( + method="post", + url=url, + data={"appid": self.app_id, "appsecret": self.app_secret} + ) diff --git a/channel/wechatmp/wechatmp_message.py b/channel/wechatmp/wechatmp_message.py index 1285fd155..d385897c3 100644 --- a/channel/wechatmp/wechatmp_message.py +++ b/channel/wechatmp/wechatmp_message.py @@ -32,12 +32,15 @@ def __init__(self, xmlData): if self.msg_type == "text": self.ctype = ContextType.TEXT - self.content = xmlData.find("Content").text.encode("utf-8") + self.content = xmlData.find("Content").text elif self.msg_type == "voice": self.ctype = ContextType.TEXT - self.content = xmlData.find("Recognition").text.encode("utf-8") # 接收语音识别结果 + self.content = xmlData.find("Recognition").text # 接收语音识别结果 + # other voice_to_text method not implemented yet + if self.content == None: + self.content = "你好" elif self.msg_type == "image": - # not implemented + # not implemented yet self.pic_url = xmlData.find("PicUrl").text self.media_id = xmlData.find("MediaId").text elif self.msg_type == "event":