diff --git a/README.md b/README.md index 323a2e2c..1431a20f 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,58 @@ 馒头模拟登录需要添加 cookie,通过cookie访问接口,不排除禁用账户的可能 +### 插件 + +- 自定义识别词 + + ​ tmdb id获取:[tmdb](https://www.themoviedb.org/?language=zh-CN) 网站搜索关键词,打开相关电影复制url对应数字id, 如 https://www.themoviedb.org/movie/693134-dune-part-two?language=zh-CN tmdb id 为693134 + + + - 通用识别词维护: + + ​ 编辑 [通用识别词](https://pad.xcreal.cc/p/通用识别词) 添加关键词 + + ​ 格式如下: + + ​ 屏蔽:被替换词 + + ​ 替换:被替换词@@替换词 + + ​ 替换+集偏移:被替换词@@替换词@@前定位词@@后定位词@@集偏移 + + ​ 集偏移:前定位词@@后定位词@@集偏移 + + - 电影识别词维护: + + ​ 编辑 [电影识别词](https://pad.xcreal.cc/p/电影识别词) 添加关键词 + + ​ 格式如下: + + ​ 屏蔽:tmdb id@@被替换词 + + ​ 替换:tmdb id@@被替换词@@替换词 + + ​ 替换+集偏移:tmdb id@@被替换词@@替换词@@前定位词@@后定位词@@集偏移 + + ​ 集偏移:tmdb id@@前定位词@@后定位词@@集偏移 + + - 电视识别词维护: + + ​ 编辑 [电视识别词](https://pad.xcreal.cc/p/电视识别词) 添加关键词 + + 格式同电影识别词 + + - 动漫识别词维护: + + ​ 编辑 [动漫识别词](https://pad.xcreal.cc/p/动漫识别词) 添加关键词 + + 格式同电影识别词 + + + + **如果有好用的识别词,请共同维护** + + ### 开启公开站点 在 config.yaml 的 laboratory 添加 ```show_more_sites: true``` diff --git a/app/helper/ocr_helper.py b/app/helper/ocr_helper.py index 3624a6f6..22f70faa 100644 --- a/app/helper/ocr_helper.py +++ b/app/helper/ocr_helper.py @@ -1,11 +1,83 @@ -import base64 +import ddddocr +import cv2 +import numpy as np +from PIL import Image +import log -from app.utils import RequestUtils +from app.utils import RequestUtils, StringUtils class OcrHelper: - _ocr_b64_url = "https://nastool.cn/captcha/base64" + @staticmethod + def around_white(img): + """ + 四周置白色 + """ + w, h = img.shape + for _w in range(w): + for _h in range(h): + if (_w <= 5) or (_h <= 5) or (_w >= w-5) or (_h >= h-5): + img.itemset((_w, _h), 255) + return img + + @staticmethod + def noise_unsome_piexl(img): + ''' + 邻域非同色降噪 + 查找像素点上下左右相邻点的颜色,如果是非白色的非像素点颜色,则填充为白色 + ''' + w, h = img.shape + for _w in range(w): + for _h in range(h): + if _h != 0 and _w != 0 and _w < w - 1 and _h < h - 1:# 剔除顶点、底点 + center_color = img[_w, _h] # 当前坐标颜色 + top_color = img[_w, _h + 1] + bottom_color = img[_w, _h - 1] + left_color = img[_w - 1, _h] + right_color = img[_w + 1, _h] + cnt = 0 + if top_color.all() == center_color.all(): + cnt += 1 + if bottom_color.all() == center_color.all(): + cnt += 1 + if left_color.all() == center_color.all(): + cnt += 1 + if right_color.all() == center_color.all(): + cnt += 1 + if cnt < 1: + img.itemset((_w, _h), 255) + return img + + def image_pre_process(self, image): + """ + 图片预处理 + """ + gray_image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) + _, binary_image = cv2.threshold(gray_image, 127, 255, cv2.THRESH_BINARY) + denoised_image = cv2.fastNlMeansDenoising(binary_image, h=30, templateWindowSize=11, searchWindowSize=21) + noise_unsome = OcrHelper.noise_unsome_piexl(denoised_image) + op_image = OcrHelper.around_white(noise_unsome) + + return op_image + + def recognize_captcha(self, image_content): + """ + 识别验证码 + """ + res = "" + try: + ocr = ddddocr.DdddOcr(show_ad=False) + image = np.asarray(bytearray(image_content), dtype="uint8") + image = cv2.imdecode(image, cv2.IMREAD_COLOR) + op_image = self.image_pre_process(image) + pil_image = Image.fromarray(cv2.cvtColor(op_image, cv2.COLOR_BGR2RGB)) + res = ocr.classification(pil_image) + res = StringUtils.replace_strings(res, {'之': '2', '>': '7'}) + return res.upper() + except Exception as e: + log.error(f"{str(e)}:{res}") + return res def get_captcha_text(self, image_url=None, image_b64=None, cookie=None, ua=None): """ @@ -22,12 +94,5 @@ def get_captcha_text(self, image_url=None, image_b64=None, cookie=None, ua=None) image_bin = ret.content if not image_bin: return "" - image_b64 = base64.b64encode(image_bin).decode() - if not image_b64: - return "" - ret = RequestUtils(content_type="application/json").post_res( - url=self._ocr_b64_url, - json={"base64_img": image_b64}) - if ret: - return ret.json().get("result") + return self.recognize_captcha(image_bin) return "" diff --git a/app/plugins/modules/customwordimport.py b/app/plugins/modules/customwordimport.py index d9e1c1dc..8c6f9971 100644 --- a/app/plugins/modules/customwordimport.py +++ b/app/plugins/modules/customwordimport.py @@ -16,7 +16,6 @@ from app.helper import WordsHelper - class CustomWordImport(_IPluginModule): # 插件名称 module_name = "自定义识别词导入" @@ -47,12 +46,11 @@ class CustomWordImport(_IPluginModule): # 任务执行间隔 _cron = None _status = None - _github_path = None - _default_path = 'https://github.com/linyuan0213/MediaRecognitionRule' + _word_path = None + _default_path = 'https://pad.xcreal.cc' _onlyonce = False _notify = False - _file_list = ['common.yaml', 'tv.yaml', 'movie.yaml', 'anime.yaml'] - _github_raw_url = 'https://raw.githubusercontent.com' + _file_list = ['通用识别词', '电视识别词', '电影识别词', '动漫识别词'] # 退出事件 _event = Event() @@ -110,14 +108,14 @@ def get_fields(): ] }, { - 'title': 'github 地址', + 'title': '识别词导入 地址', 'required': "", - 'tooltip': 'github 地址(默认地址 https://github.com/linyuan0213/MediaRecognitionRule)', + 'tooltip': '地址(默认地址 https://pad.xcreal.cc)', 'type': 'text', 'content': [ { - 'id': 'github_path', - 'placeholder': 'https://github.com/linyuan0213/MediaRecognitionRule', + 'id': 'word_path', + 'placeholder': 'https://pad.xcreal.cc', } ] } @@ -148,7 +146,7 @@ def init_config(self, config=None): self._enabled = config.get("enabled") self._cron = config.get("cron") self._status = config.get("status") - self._github_path = config.get("github_path") + self._word_path = config.get("word_path") self._notify = config.get("notify") self._onlyonce = config.get("onlyonce") self._media = Media() @@ -173,7 +171,7 @@ def init_config(self, config=None): "enabled": self._enabled, "cron": self._cron, "status": self._status, - "github_path": self._github_path, + "word_path": self._word_path, "notify": self._notify, "onlyonce": self._onlyonce, }) @@ -196,15 +194,15 @@ def __custom_word_import(self): self.info(f"当前时间 {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))} 开始导入自定义识别词") ua = Config().get_config('app').get('user_agent') - github_path = self._github_path or self._default_path - - self.info(f"github url {github_path} ") + word_path = self._word_path or self._default_path + success_word_cnt = 0 - split_url = urlsplit(github_path) - url_path = split_url.path + split_url = urlsplit(word_path) + base_url = f"{split_url.scheme}://{split_url.netloc}" + self.info(f"识别词 url {base_url} ") for file_name in self._file_list: - download_url = f'{self._github_raw_url}{url_path}/master/{file_name}' + download_url = f'{base_url}/p/{file_name}/export/txt' self.info(f'开始下载规则:{download_url}') headers = { "user-agent": ua, @@ -213,25 +211,30 @@ def __custom_word_import(self): res = RequestUtils(headers=headers).get_res(download_url) if res.status_code != 200: return - yaml = ruamel.yaml.YAML() - custom_word_dict = yaml.load(res.text) - if not custom_word_dict: - continue + + word_list = [line for line in res.text.splitlines() if line.strip() and not line.strip().startswith('#')] media_type = None gtype = None group_id = -1 - if 'tv' in file_name: + if '电视' in file_name: media_type = MediaType.TV gtype = 2 - if 'movie' in file_name: + if '电影' in file_name: media_type = MediaType.MOVIE gtype = 1 - if 'anime' in file_name: + if '动漫' in file_name: media_type = MediaType.ANIME gtype = 2 - for tmdb_id, rules in custom_word_dict.items(): - self.info(f'开始导入:{tmdb_id}') + for word_line in word_list: + if '通用' not in file_name: + tmdb_id, *word = word_line.split("@@") + word = '@@'.join(word) + self.info(f'导入:{tmdb_id}') + else: + word = word_line + self.info('导入通用识别词') + import_word_info = self.__parse_rule(word) if media_type: tmdb_info = self._media.get_tmdb_info(media_type, tmdb_id) if not tmdb_info: @@ -257,46 +260,47 @@ def __custom_word_import(self): if custom_word_groups: group_id = custom_word_groups[0].ID - for import_word_info in rules: - replaced = import_word_info.get("replaced") - replace = import_word_info.get("replace") - front = import_word_info.get("front") - back = import_word_info.get("back") - offset = import_word_info.get("offset") - whelp = import_word_info.get("help") - wtype = int(import_word_info.get("type")) - season = import_word_info.get("season") - if gtype == 1: - season = -2 - regex = 1 - # 屏蔽, 替换, 替换+集偏移 - if wtype in [1, 2, 3]: - if self._wordshelper.is_custom_words_existed(replaced=replaced): - self.info(f"识别词已存在\n(被替换词:{replaced})") - continue - # 集偏移 - elif wtype == 4: - if self._wordshelper.is_custom_words_existed(front=front, back=back): - self.info(f"识别词已存在\n(前后定位词:{front}@{back}") - continue - self._wordshelper.insert_custom_word(replaced=replaced, - replace=replace, - front=front, - back=back, - offset=offset, - wtype=wtype, - gid=group_id, - season=season, - enabled=1 if self._status else 0, - regex=regex, - whelp=whelp if whelp else "") + replaced = import_word_info.get("replaced") + replace = import_word_info.get("replace") + front = import_word_info.get("front") + back = import_word_info.get("back") + offset = import_word_info.get("offset") + whelp = "" + wtype = int(import_word_info.get("type")) + season = -1 + if gtype == 1: + season = -2 + regex = 1 + # 屏蔽, 替换, 替换+集偏移 + if wtype in [1, 2, 3]: + if self._wordshelper.is_custom_words_existed(replaced=replaced): + self.info(f"识别词已存在\n(被替换词:{replaced})") + continue + # 集偏移 + elif wtype == 4: + if self._wordshelper.is_custom_words_existed(front=front, back=back): + self.info(f"识别词已存在\n(前后定位词:{front}@{back}") + continue + self._wordshelper.insert_custom_word(replaced=replaced, + replace=replace, + front=front, + back=back, + offset=offset, + wtype=wtype, + gid=group_id, + season=season, + enabled=1 if self._status else 0, + regex=regex, + whelp=whelp if whelp else "") + # 统计导入成功的识别词数 + success_word_cnt = success_word_cnt + 1 - self.info('自定义识别词导入任务完成') + self.info(f'自定义识别词导入任务完成,导入成功 {success_word_cnt} 个识别词') # 发送通知 if self._notify: next_run_time = self._scheduler.get_jobs()[0].next_run_time.strftime('%Y-%m-%d %H:%M:%S') self.send_message(title="【自定义识别词导入任务完成】", - text=f"自定义识别词导入{'成功' if True else '失败'}\n" + text=f"导入成功 {success_word_cnt} 个识别词\n" f"下次导入时间: {next_run_time}") def stop_service(self): @@ -316,3 +320,44 @@ def stop_service(self): def get_state(self): return self._enabled and self._cron + + def __parse_rule(self, rule): + """ + 解析自定义识别词规则 + """ + if not rule: + return None + rule_list = rule.split("@@") + + word_dict = { + "type": -1, + "replaced": "", + "replace": "", + "front": "", + "back": "", + "offset": "" + } + if len(rule_list) == 1: + word_dict['type'] = 1 + word_dict['replaced'] = rule_list[0] + + elif len(rule_list) == 2: + word_dict['type'] = 2 + word_dict['replaced'] = rule_list[0] + word_dict['replace'] = rule_list[1] + elif len(rule_list) == 5: + word_dict['type'] = 3 + word_dict['replaced'] = rule_list[0] + word_dict['replace'] = rule_list[1] + word_dict['front'] = rule_list[2] + word_dict['back'] = rule_list[3] + word_dict['offset'] = rule_list[4] + elif len(rule_list) == 3: + word_dict['type'] = 4 + word_dict['front'] = rule_list[0] + word_dict['back'] = rule_list[1] + word_dict['offset'] = rule_list[2] + else: + return None + + return word_dict diff --git a/app/utils/string_utils.py b/app/utils/string_utils.py index c1357231..9a61884a 100644 --- a/app/utils/string_utils.py +++ b/app/utils/string_utils.py @@ -565,3 +565,12 @@ def get_tid_by_url(url): sign_params = parse.parse_qs(sign_data) return sign_params.get('tid')[0] + + @staticmethod + def replace_strings(text, replacements): + """ + 替换多个字符串 + """ + for old, new in replacements.items(): + text = text.replace(old, new) + return text diff --git a/requirements.txt b/requirements.txt index 8f4bf529..9f49946b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -111,4 +111,7 @@ websockets==10.3 Werkzeug==2.3.8 wsproto==1.2.0 zhconv==1.4.3 -typing-extensions==4.10.0 \ No newline at end of file +typing-extensions==4.10.0 +ddddocr==1.4.11 +opencv-python==4.6.0.66 +numpy==1.25.2 diff --git a/version.py b/version.py index af778d33..923c6787 100644 --- a/version.py +++ b/version.py @@ -1 +1 @@ -APP_VERSION = 'v3.2.5' +APP_VERSION = 'v3.2.6' diff --git a/web/templates/site/brushtask.html b/web/templates/site/brushtask.html index 45109df7..92c4e882 100644 --- a/web/templates/site/brushtask.html +++ b/web/templates/site/brushtask.html @@ -713,7 +713,7 @@