diff --git a/config.py b/config.py index 378bfaa..a8cd9a9 100644 --- a/config.py +++ b/config.py @@ -36,3 +36,6 @@ ### 是否使用夜神模拟器 use_monitor = config["use_monitor"] + + ### 是否图片缩放 + enable_scale = config["enable_scale"] \ No newline at end of file diff --git a/config.yaml b/config.yaml index 083d1c0..e674ea8 100644 --- a/config.yaml +++ b/config.yaml @@ -51,7 +51,10 @@ prefer: - ocrspace ### enable chrome -enable_chrome: true +enable_chrome: false ### 是否使用夜神模拟器 use_monitor: false + +### 是否图片缩放 +enable_scale: true \ No newline at end of file diff --git a/core/android.py b/core/android.py index 7af353e..d4ecc3b 100644 --- a/core/android.py +++ b/core/android.py @@ -17,6 +17,8 @@ from PIL import Image +from config import enable_scale + # SCREENSHOT_WAY 是截图方法, # 经过 check_screenshot 后,会自动递 # 不需手动修改 @@ -59,13 +61,12 @@ def check_screenshot(filename, directory): check_screenshot(filename=filename, directory=directory) -def analyze_current_screen_text(crop_area, directory=".", compress_level=1, use_monitor=False): +def analyze_current_screen_text(crop_area, directory=".", compress_level=1): """ capture the android screen now :return: """ - print("capture time: ", datetime.now().strftime("%H:%M:%S")) screenshot_filename = "screenshot.png" save_text_area = os.path.join(directory, "text_area.png") capture_screen_v2(screenshot_filename, directory) @@ -74,19 +75,6 @@ def analyze_current_screen_text(crop_area, directory=".", compress_level=1, use_ return get_area_data(save_text_area) -def analyze_stored_screen_text(screenshot_filename="screenshot.png", directory=".", compress_level=1): - """ - reload screen from stored picture to store - :param directory: - :param compress_level: - :return: - """ - save_text_area = os.path.join(directory, "text_area.png") - parse_answer_area(os.path.join( - directory, screenshot_filename), save_text_area, compress_level) - return get_area_data(save_text_area) - - def capture_screen_v2(filename="screenshot.png", directory="."): """ can't use general fast way @@ -152,10 +140,9 @@ def parse_answer_area(source_file, text_area_file, compress_level, crop_area): image = image.convert("1") width, height = image.size[0], image.size[1] - print("screen width: {0}, screen height: {1}".format(width, height)) - - region = image.crop( - (width * crop_area[0], height * crop_area[1], width * crop_area[2], height * crop_area[3])) + region = image.crop((width * crop_area[0], height * crop_area[1], width * crop_area[2], height * crop_area[3])) + if enable_scale: + region = region.resize((int(1080 / 3), int(1920 / 5)), Image.BILINEAR) region.save(text_area_file) diff --git a/core/crawler/baiduzhidao.py b/core/crawler/baiduzhidao.py index 0b4d966..a5bc24c 100644 --- a/core/crawler/baiduzhidao.py +++ b/core/crawler/baiduzhidao.py @@ -5,12 +5,14 @@ Baidu zhidao searcher """ +import logging import operator import random import requests from core.crawler import text_process as T +from utils import stdout_template Agents = ( "Mozilla/5.0 (X11; Fedora; Linux x86_64; rv:57.0) Gecko/20100101 Firefox/57.0", @@ -18,6 +20,8 @@ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.71 Safari/537.36" ) +logger = logging.getLogger("assistant") + def count_key_words(text, keywords): total_count = 0 @@ -49,15 +53,15 @@ def baidu_count(keyword, answers, timeout=5): "Connection": "keep-alive", "Accept-Language": "zh-TW,zh;q=0.9,en-US;q=0.8,en;q=0.7", "Upgrade-Insecure-Requests": "1", - # "User-Agent": random.choice(Agents) + "User-Agent": random.choice(Agents) } params = { "wd": keyword, "ie": "utf-8" } - resp = requests.get("http://www.baidu.com/s", params=params, headers=headers, timeout=timeout) + resp = requests.get("http://www.baidu.com/s", params=params, headers=headers) if not resp.ok: - print("baidu search error") + logger.error("Get BaiDu HTTP ERROR") return { ans: 0 for ans in answers @@ -93,7 +97,6 @@ def baidu_count_daemon(exchage_queue, outputqueue, timeout=5): :return: """ - while True: question, answers, true_flag = exchage_queue.get() try: @@ -107,13 +110,9 @@ def baidu_count_daemon(exchage_queue, outputqueue, timeout=5): recommend = "{0}\n{1}".format( "肯定回答( ): {0}".format(summary_li[0][0]), "否定回答(**): {0}".format(summary_li[-1][0])) - outputqueue.put({ - "type": 1, - "data": "{0}\n{1}".format( - "\n".join(map(lambda item: "{0}: {1}".format(item[0], item[1]), summary_li)), - recommend - ) - }) - except: - import traceback - traceback.print_exc() + outputqueue.put(stdout_template.BAIDU_TPL.format( + "\n".join(map(lambda item: "{0}: {1}".format(item[0], item[1]), summary_li)), + recommend + )) + except Exception as e: + logger.error(str(e), exc_info=True) diff --git a/core/crawler/crawl.py b/core/crawler/crawl.py index 98a84c6..52824a4 100644 --- a/core/crawler/crawl.py +++ b/core/crawler/crawl.py @@ -1,4 +1,5 @@ # coding:utf8 +import logging import multiprocessing import operator import platform @@ -9,6 +10,7 @@ from core.crawler import html_tools as To from core.crawler import text_process as T +from utils import stdout_template def jieba_initialize(): @@ -20,7 +22,7 @@ def jieba_initialize(): def kwquery(query): ''' - 对百度、Bing 的搜索摘要进行答案的检索 + 对百度、 sougou Bing 的搜索摘要进行答案的检索 (需要加问句分类接口) ''' # 分词 去停用词 抽取关键词 @@ -28,11 +30,8 @@ def kwquery(query): words = T.postag(query) for k in words: # 只保留名词 - if k.flag.__contains__("n") or k.flag.__contains__("v"): - # print(k.flag - # print(k.word + if k.flag.__contains__("n") or k.flag.__contains__("a") or k.flag.__contains__("t"): keywords.append(k.word) - answer = [] text = '' # 找到答案就置1 @@ -42,24 +41,20 @@ def kwquery(query): soup_baidu = To.get_html_baidu('https://www.baidu.com/s?wd=' + quote(query)) for i in range(1, 10): - if soup_baidu == None: + + if not soup_baidu: break - results = soup_baidu.find(id=i) - if results == None: - # print("百度摘要找不到答案") + results = soup_baidu.find(id=i) + if not results: break # 判断是否有mu,如果第一个是百度知识图谱的 就直接命中答案 if 'mu' in results.attrs and i == 1: - # print(results.attrs["mu"] r = results.find(class_='op_exactqa_s_answer') - if r == None: - # print("百度知识图谱找不到答案") + if not r: pass else: - # print(r.get_text() - # print("百度知识图谱找到答案") answer.append(r.get_text().strip()) flag = 1 break @@ -67,12 +62,9 @@ def kwquery(query): # 古诗词判断 if "mu" in results.attrs and i == 1: r = results.find(class_="op_exactqa_detail_s_answer") - if r == None: - # print("百度诗词找不到答案") + if not r: pass else: - # print(r.get_text() - # print("百度诗词找到答案") answer.append(r.get_text().strip()) flag = 1 break @@ -81,12 +73,9 @@ def kwquery(query): if "mu" in results.attrs and i == 1 and results.attrs['mu'].__contains__( 'http://open.baidu.com/calendar'): r = results.find(class_="op-calendar-content") - if r == None: - # print("百度万年历找不到答案") + if not r: pass else: - # print(r.get_text() - # print("百度万年历找到答案") answer.append(r.get_text().strip().replace("\n", "").replace(" ", "")) flag = 1 break @@ -95,13 +84,9 @@ def kwquery(query): r = results.attrs['fk'].replace("6018_", "") print(r) - if r == None: - # print("百度万年历新版找不到答案") + if not r: pass - # continue else: - # print(r.get_text() - # print("百度万年历新版找到答案") answer.append(r) flag = 1 break @@ -110,13 +95,9 @@ def kwquery(query): if "mu" in results.attrs and i == 1 and results.attrs['mu'].__contains__( 'http://open.baidu.com/static/calculator/calculator.html'): r = results.find('div').find_all('td')[1].find_all('div')[1] - if r == None: - # print("计算器找不到答案") + if not r: pass - # continue else: - # print(r.get_text() - # print("计算器找到答案") answer.append(r.get_text().strip()) flag = 1 break @@ -125,10 +106,8 @@ def kwquery(query): if "mu" in results.attrs and i == 1: r = results.find(class_='op_best_answer_question_link') if r == None: - # print("百度知道图谱找不到答案") pass else: - # print("百度知道图谱找到答案") url = r['href'] zhidao_soup = To.get_html_zhidao(url) r = zhidao_soup.find(class_='bd answer').find('pre') @@ -139,23 +118,20 @@ def kwquery(query): flag = 1 break - if results.find("h3") != None: + if results.find("h3"): # 百度知道 if results.find("h3").find("a").get_text().__contains__(u"百度知道") and (i == 1 or i == 2): url = results.find("h3").find("a")['href'] - if url == None: - # print("百度知道图谱找不到答案") + if not url: continue else: - # print("百度知道图谱找到答案") zhidao_soup = To.get_html_zhidao(url) - r = zhidao_soup.find(class_='bd answer') - if r == None: + if not r: continue else: r = r.find('pre') - if r == None: + if not r: r = zhidao_soup.find(class_='bd answer').find(class_='line content') answer.append(r.get_text().strip()) flag = 1 @@ -165,14 +141,11 @@ def kwquery(query): if results.find("h3").find("a").get_text().__contains__(u"百度百科") and (i == 1 or i == 2): url = results.find("h3").find("a")['href'] if url == None: - # print("百度百科找不到答案") continue else: - # print("百度百科找到答案") baike_soup = To.get_html_baike(url) - r = baike_soup.find(class_='lemma-summary') - if r == None: + if not r: continue else: r = r.get_text().replace("\n", "").strip() @@ -184,36 +157,30 @@ def kwquery(query): if flag == 1: return answer + # 获取搜狗的答案 + soup_sougou= To.get + # 获取bing的摘要 soup_bing = To.get_html_bing('https://www.bing.com/search?q=' + quote(query)) # 判断是否在Bing的知识图谱中 - # bingbaike = soup_bing.find(class_="b_xlText b_emphText") bingbaike = soup_bing.find(class_="bm_box") - if bingbaike != None: - if bingbaike.find_all(class_="b_vList")[1] != None: - if bingbaike.find_all(class_="b_vList")[1].find("li") != None: - # print("Bing知识图谱找到答案") - flag = 1 - answer.append(bingbaike.get_text()) - # print("=====" - # print(answer - # print("=====" - return answer + if bingbaike and \ + bingbaike.find_all(class_="b_vList")[1] and \ + bingbaike.find_all(class_="b_vList")[1].find("li"): + flag = 1 + answer.append(bingbaike.get_text()) + return answer else: - # print("Bing知识图谱找不到答案") results = soup_bing.find(id="b_results") bing_list = results.find_all('li') for bl in bing_list: temp = bl.get_text() if temp.__contains__(u" - 必应网典"): - # print("查找Bing网典") url = bl.find("h2").find("a")['href'] if url == None: - # print("Bing网典找不到答案") continue else: - # print("Bing网典找到答案") bingwd_soup = To.get_html_bingwd(url) r = bingwd_soup.find(class_='bk_card_desc').find("p") @@ -230,8 +197,6 @@ def kwquery(query): text += results.get_text() - # print(text - # 如果再两家搜索引擎的知识图谱中都没找到答案,那么就分析摘要 if flag == 0: # 分句 @@ -243,7 +208,6 @@ def kwquery(query): if temp == '': continue else: - # print(temp sentences.append(temp) temp = '' else: @@ -261,11 +225,8 @@ def kwquery(query): # 识别人名 target_list = {} for ks in key_sentences: - # print(ks words = T.postag(ks) for w in words: - # print("=====" - # print(w.word if w.flag == ("nr"): if w.word in target_list: target_list[w.word] += 1 @@ -274,12 +235,8 @@ def kwquery(query): # 找出最大词频 sorted_lists = sorted(target_list.items(), key=operator.itemgetter(1), reverse=True) - # print(len(target_list) - # 去除问句中的关键词 sorted_lists2 = [] - # 候选队列 for i, st in enumerate(sorted_lists): - # print(st[0] if st[0] in keywords: continue else: @@ -288,13 +245,8 @@ def kwquery(query): # print("返回前n个词频") answer = [] for i, st in enumerate(sorted_lists2): - # print(st[0] - # print(st[1] if i < 3: - # print(st[0] - # print(st[1] answer.append(st[0]) - # print(answer return answer @@ -307,19 +259,14 @@ def crawler_daemon(keyword_queue, outputqueue): :param reader: :return: """ + logger = logging.getLogger("assistant") while True: question = keyword_queue.get() try: ans = kwquery(question) - outputqueue.put( - { - "type": 2, - "data": "{0}".format(wrap(" ".join(ans), 45)) - } - ) - except: - import traceback - traceback.print_exc() + outputqueue.put(stdout_template.KNOWLEDGE_TPL.format("\n".join(wrap("\n".join(ans), 45)))) + except Exception as e: + logger.error(str(e), exc_info=True) if __name__ == '__main__': diff --git a/core/crawler/html_tools.py b/core/crawler/html_tools.py index 0d2b102..4345ba1 100644 --- a/core/crawler/html_tools.py +++ b/core/crawler/html_tools.py @@ -1,8 +1,5 @@ # coding:utf8 -import re -import urllib - import requests from bs4 import BeautifulSoup @@ -51,11 +48,6 @@ def get_html_bingwd(url): return soup_bingwd -''' -获取百度搜索的结果 -''' - - def get_html_baidu(url): headers = {'User-Agent': 'Mozilla/5.0 (X11; U; Linux i686)Gecko/20071127 Firefox/2.0.0.11'} soup_baidu = BeautifulSoup(requests.get(url=url, headers=headers).content.decode('utf-8'), "lxml") @@ -66,11 +58,6 @@ def get_html_baidu(url): return soup_baidu -''' -获取Bing搜索的结果 -''' - - def get_html_bing(url): # url = 'http://global.bing.com/search?q='+word headers = {'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:22.0) Gecko/20100101 Firefox/22.0'} @@ -81,27 +68,8 @@ def get_html_bing(url): return soup_bing -''' -print answer -''' - - -def ptranswer(ans, ifhtml): - result = '' - # print ans - for answer in ans: - if ifhtml: - print(answer) - else: - if answer == u'\n': - continue - p = re.compile('<[^>]+>') - result += p.sub("", answer.string).encode('utf8') - return result - - -def ltptools(args): - url_get_base = "http://api.ltp-cloud.com/analysis/" - result = urllib.urlopen(url_get_base, urllib.urlencode(args)) # POST method - content = result.read().strip() - return content +def get_html_sougo(url): + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.71 Safari/537.36"} + soup_sougou = BeautifulSoup(requests.get(url=url, headers=headers).text, "lxml") + return soup_sougou diff --git a/dynamic_table.py b/dynamic_table.py deleted file mode 100644 index 385dbee..0000000 --- a/dynamic_table.py +++ /dev/null @@ -1,70 +0,0 @@ -# -*- coding: utf-8 -*- - - -""" - -dynamic command line output - -""" -import os -import sys -import platform - -from terminaltables import AsciiTable - -BAIDU_SEARCH = 1 -KNOWLEDGE_MAP = 2 -QUESTION = 0 - -MAX_TEXT_WIDTH = 45 - - -def clear_screen(): - system_version = platform.system().upper() - if system_version.startswith("LINUX"): - os.system("clear") - if system_version.startswith("WINDOWS"): - os.system("cls") - if system_version.startswith("DARWIN"): - os.system("clear") - - -def print_terminal(readpipe): - """ - pipe item struct - - type - - data - - :param readpipe: - :return: - """ - baidu_search = "" - knowledge_map = "" - question = "" - time_duration = "" - - while True: - item = readpipe.get() - if item["type"] == 0: - question = item["data"] - if item["type"] == 1: - baidu_search = item["data"] - elif item["type"] == 2: - knowledge_map = item["data"] - elif item["type"] == 3: - time_duration = item["data"] - elif item["type"] == 4: - print(item["data"]) - sys.stdout.flush() - continue - data = [ - ["问题", question], - ["百度决策", baidu_search], - ["知识图谱", knowledge_map], - ["耗时", time_duration] - ] - table = AsciiTable(table_data=data) - table.inner_row_border = True - clear_screen() - print(table.table) - sys.stdout.flush() diff --git a/main.py b/main.py index 1864664..9e4772c 100644 --- a/main.py +++ b/main.py @@ -6,15 +6,15 @@ Xi Gua video Million Heroes """ +import logging.handlers import multiprocessing -import operator import os +import threading import time from argparse import ArgumentParser from datetime import datetime from functools import partial -from multiprocessing import Event, Pipe -from textwrap import wrap +from multiprocessing import Event, Pipe, Queue from config import api_key, enable_chrome, use_monitor, image_compress_level, crop_areas from config import api_version @@ -26,13 +26,21 @@ from core.android import save_screen, check_screenshot, get_adb_tool, analyze_current_screen_text from core.check_words import parse_false from core.chrome_search import run_browser -from core.crawler.baiduzhidao import baidu_count -from core.crawler.crawl import jieba_initialize, kwquery +from core.crawler.baiduzhidao import baidu_count_daemon +from core.crawler.crawl import jieba_initialize, crawler_daemon from core.ocr.baiduocr import get_text_from_image as bai_get_text from core.ocr.spaceocr import get_text_from_image as ocrspace_get_text -## jieba init -from dynamic_table import clear_screen +from utils import stdout_template +from utils.backup import save_question_answers_to_file +from utils.process_stdout import ProcessStdout + +logger = logging.getLogger("assistant") +handler = logging.handlers.WatchedFileHandler("assistant.log") +formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") +handler.setFormatter(formatter) +logger.addHandler(handler) +## jieba init jieba_initialize() if prefer[0] == "baidu": @@ -94,6 +102,28 @@ def pre_process_question(keyword): return keyword +def prompt_message(): + global game_type + print(""" +请选择答题节目: + 1. 百万英雄 + 2. 冲顶大会 + 3. 芝士超人 + 4. UC答题 +""") + game_type = input("输入节目序号: ") + if game_type == "1": + game_type = '百万英雄' + elif game_type == "2": + game_type = '冲顶大会' + elif game_type == "3": + game_type = "芝士超人" + elif game_type == "4": + game_type = "UC答题" + else: + game_type = '百万英雄' + + def main(): args = parse_args() timeout = args.timeout @@ -104,25 +134,25 @@ def main(): check_screenshot(filename="screenshot.png", directory=data_directory) - # stdout_queue = Queue(10) - # ## spaw baidu count - # baidu_queue = Queue(5) - # baidu_search_job = multiprocessing.Process(target=baidu_count_daemon, - # args=(baidu_queue, stdout_queue, timeout)) - # baidu_search_job.daemon = True - # baidu_search_job.start() - # - # ## spaw crawler - # knowledge_queue = Queue(5) - # knowledge_craw_job = multiprocessing.Process(target=crawler_daemon, - # args=(knowledge_queue, stdout_queue)) - # knowledge_craw_job.daemon = True - # knowledge_craw_job.start() - # - # ## output threading - # output_job = threading.Thread(target=print_terminal, args=(stdout_queue,)) - # output_job.daemon = True - # output_job.start() + std_pipe = ProcessStdout() + ## spaw baidu count + baidu_queue = Queue(5) + baidu_search_job = multiprocessing.Process(target=baidu_count_daemon, + args=(baidu_queue, std_pipe.queue, timeout)) + baidu_search_job.daemon = True + baidu_search_job.start() + + ## spaw crawler + knowledge_queue = Queue(5) + knowledge_craw_job = multiprocessing.Process(target=crawler_daemon, + args=(knowledge_queue, std_pipe.queue)) + knowledge_craw_job.daemon = True + knowledge_craw_job.start() + + ## output threading + output_job = threading.Thread(target=std_pipe.run_forever) + output_job.daemon = True + output_job.start() if enable_chrome: closer = Event() @@ -136,14 +166,13 @@ def main(): def __inner_job(): start = time.time() - text_binary = analyze_current_screen_text( + image_binary = analyze_current_screen_text( directory=data_directory, compress_level=image_compress_level[0], - crop_area=crop_areas[game_type], - use_monitor=use_monitor + crop_area=crop_areas[game_type] ) keywords = get_text_from_image( - image_data=text_binary, + image_data=image_binary, timeout=timeout ) if not keywords: @@ -155,88 +184,33 @@ def __inner_job(): if game_type == "UC答题": answers = map(lambda a: a.rsplit(":")[-1], answers) - print("~" * 60) - print("{0}\n{1}".format(real_question, "\n".join(answers))) - print("~" * 60) - - # ### refresh question - # stdout_queue.put({ - # "type": 0, - # "data": "{0}\n{1}".format(question, "\n".join(answers)) - # }) - # - # # notice baidu and craw - # baidu_queue.put(( - # question, answers, true_flag - # )) - # knowledge_queue.put(question) + std_pipe.write(stdout_template.QUESTION_TPL.format(real_question, "\n".join(answers))) + # notice baidu and craw + baidu_queue.put(( + question, answers, true_flag + )) + knowledge_queue.put(question) if enable_chrome: writer.send(question) noticer.set() - summary = baidu_count(question, answers, timeout=timeout) - summary_li = sorted(summary.items(), key=operator.itemgetter(1), reverse=True) - if true_flag: - recommend = "{0}\n{1}".format( - "肯定回答(**): {0}".format(summary_li[0][0]), - "否定回答( ): {0}".format(summary_li[-1][0])) - else: - recommend = "{0}\n{1}".format( - "肯定回答( ): {0}".format(summary_li[0][0]), - "否定回答(**): {0}".format(summary_li[-1][0])) - print("*" * 60) - print("\n".join(map(lambda item: "{0}: {1}".format(item[0], item[1]), summary_li))) - print(recommend) - print("*" * 60) - - ans = kwquery(real_question) - print("-" * 60) - print(wrap(" ".join(ans), 60)) - print("-" * 60) - end = time.time() - # stdout_queue.put({ - # "type": 3, - # "data": "use {0} 秒".format(end - start) - # }) - print("use {0} 秒".format(end - start)) - save_screen( - directory=data_directory - ) - time.sleep(1) - - print(""" - 请选择答题节目: - 1. 百万英雄 - 2. 冲顶大会 - 3. 芝士超人 - 4. UC答题 - """) - game_type = input("输入节目序号: ") - if game_type == "1": - game_type = '百万英雄' - elif game_type == "2": - game_type = '冲顶大会' - elif game_type == "3": - game_type = "芝士超人" - elif game_type == "4": - game_type = "UC答题" - else: - game_type = '百万英雄' + std_pipe.write(stdout_template.TIME_CONSUME_TPL.format(end - start)) + save_screen(directory=data_directory) + save_question_answers_to_file(real_question, answers, directory=data_directory) + prompt_message() while True: - enter = input("按Enter键开始,按ESC键退出...") + enter = input("按Enter键开始,切换游戏请输入s,按ESC键退出...\n") if enter == chr(27): break + if enter == 's': + prompt_message() try: - clear_screen() __inner_job() except Exception as e: - import traceback - - traceback.print_exc() - print(str(e)) + logger.error(str(e), exc_info=True) print("欢迎下次使用") if enable_chrome: diff --git a/requirements.txt b/requirements.txt index d7c6e82..9e0d172 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,4 @@ Pillow==5.0.0 PyInstaller==3.3.1 PyYAML==3.12 requests==2.18.4 -selenium==3.8.1 -terminaltables==3.1.0 -urllib3==1.22 +selenium==3.8.1 \ No newline at end of file diff --git a/screenshots/screenshot.png b/screenshots/screenshot.png index 6a6e8b7..a6ede22 100644 Binary files a/screenshots/screenshot.png and b/screenshots/screenshot.png differ diff --git a/screenshots/text_area.png b/screenshots/text_area.png index 76639c8..a9f8c69 100644 Binary files a/screenshots/text_area.png and b/screenshots/text_area.png differ diff --git a/test.py b/test.py index b55f060..48c2481 100644 --- a/test.py +++ b/test.py @@ -9,93 +9,108 @@ class OcrTestCase(TestCase): """unittest""" - # def test_baidu_ocr(self): - # """ - # test baidu ocr - # - # :return: - # """ - # from core.ocr.baiduocr import get_text_from_image - # - # print("test baidu ocr") - # app_id = "10712738" - # app_key = "98QhwoCzoZxKoZbX5XWNPld4" - # app_secret = "MGYLv1BeHjWOGFc9IjZPfzuhlPlaBEWA" - # - # with open("screenshots/text_area.png", "rb") as fp: - # message = get_text_from_image(fp.read(), app_id, app_key, app_secret, 0, 10) - # print(message) - # - # def test_detect_direction(self): - # """ - # Test baidu api direction - # - # :return: - # """ - # from core.ocr.baiduocr import get_text_from_image - # - # print("test baidu ocr direction") - # app_id = "10712738" - # app_key = "98QhwoCzoZxKoZbX5XWNPld4" - # app_secret = "MGYLv1BeHjWOGFc9IjZPfzuhlPlaBEWA " - # - # with open("screenshots/screenshot.png", "rb") as fp: - # import time - # start = int(time.time()) - # message = get_text_from_image(fp.read(), app_id, app_key, app_secret, 10) - # print("time spend: ", int(time.time()) - start) - # print(message) - # - # def test_image_count(self): - # """ - # - # :return: - # """ - # from PIL import Image - # count_white = 0 - # with Image.open("screenshots/screenshot.png") as img: - # w, h = img.size - # for pix in img.getdata(): - # if all([i >= 240 for i in pix[:3]]): - # count_white += 1 - # - # print(count_white / (w * h)) - # - # def test_crawler(self): - # """ - # Test baidu crawler - # - # :return: - # """ - # from core.crawler.crawl import kwquery - # from core.crawler.crawl import jieba_initialize - # jieba_initialize() - # query = "回锅肉属于什么菜系" - # query = "北京奥运会是什么时候" - # ans = kwquery(query) - # print("~~~~~~~") - # for a in ans: - # print(a) - # print("~~~~~~~") - # - # def test_preparse_question(self): - # """ - # Test pre parse question - # - # :return: - # """ - # question = "我国什么时候开始改革开放" - # print(pre_process_question(question)) - # - # question = "今天是什么日子" - # print(pre_process_question(question)) - # - # question = "这个月有多少天" - # print(pre_process_question(question)) + def test_baidu_ocr(self): + """ + test baidu ocr + + :return: + """ + from core.ocr.baiduocr import get_text_from_image + + print("test baidu ocr") + app_id = "10712738" + app_key = "98QhwoCzoZxKoZbX5XWNPld4" + app_secret = "MGYLv1BeHjWOGFc9IjZPfzuhlPlaBEWA" + + with open("screenshots/text_area.png", "rb") as fp: + message = get_text_from_image(fp.read(), app_id, app_key, app_secret, 0, 10) + print(message) + + def test_detect_direction(self): + """ + Test baidu api direction + + :return: + """ + from core.ocr.baiduocr import get_text_from_image + + print("test baidu ocr direction") + app_id = "10712738" + app_key = "98QhwoCzoZxKoZbX5XWNPld4" + app_secret = "MGYLv1BeHjWOGFc9IjZPfzuhlPlaBEWA " + + with open("screenshots/screenshot.png", "rb") as fp: + import time + start = int(time.time()) + message = get_text_from_image(fp.read(), app_id, app_key, app_secret, 10) + print("time spend: ", int(time.time()) - start) + print(message) + + def test_image_count(self): + """ + + :return: + """ + from PIL import Image + count_white = 0 + with Image.open("screenshots/screenshot.png") as img: + w, h = img.size + for pix in img.getdata(): + if all([i >= 240 for i in pix[:3]]): + count_white += 1 + + print(count_white / (w * h)) + + def test_crawler(self): + """ + Test baidu crawler + + :return: + """ + from core.crawler.crawl import kwquery + from core.crawler.crawl import jieba_initialize + jieba_initialize() + query = "回锅肉属于什么菜系" + query = "北京奥运会是什么时候" + ans = kwquery(query) + print("~~~~~~~") + for a in ans: + print(a) + print("~~~~~~~") + + def test_preparse_question(self): + """ + Test pre parse question + + :return: + """ + question = "我国什么时候开始改革开放" + print(pre_process_question(question)) + + question = "今天是什么日子" + print(pre_process_question(question)) + + question = "这个月有多少天" + print(pre_process_question(question)) def test_baidu_word_count(self): a = baidu_count("全世界第一部公映的有声动画片是?", ["威利号汽船", "小熊维尼", "猫和老鼠"]) print(a) + def test_sougou_bs4(self): + """ + 测试搜狗接口 + 1. vrwrap, str-text-info / str-green-skin + 2. vrwrap, vr-box-border keep span + :return: + """ + import requests + from bs4 import BeautifulSoup + resp = requests.get("https://www.sogou.com/web?query=36摄氏度等于多少华氏度") + soup = BeautifulSoup(resp.content, "lxml") + import pdb + pdb.set_trace() + + if __name__ == "__main__": unittest.main() diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/backup.py b/utils/backup.py new file mode 100644 index 0000000..57ff9a3 --- /dev/null +++ b/utils/backup.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- + +import os + + +def save_question_answers_to_file(question, answers, directory=".", filename="QA.txt"): + """ + bake the question and answers + :param directory: + :param filename: + :return: + """ + with open(os.path.join(directory, filename), "at") as baker: + baker.write(";".join([question, ",".join(answers)]) + "\n") diff --git a/utils/process_stdout.py b/utils/process_stdout.py new file mode 100644 index 0000000..457a205 --- /dev/null +++ b/utils/process_stdout.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- + +""" + + multiprocess stdout + +""" + +from multiprocessing import Queue + + +class ProcessStdout(object): + def __init__(self): + self.__message_queue = Queue(100) + + @property + def queue(self): + return self.__message_queue + + def write(self, message): + self.__message_queue.put(message) + + def read(self): + message = self.__message_queue.get() + return message + + def run_forever(self): + while True: + print(self.read()) diff --git a/utils/stdout_template.py b/utils/stdout_template.py new file mode 100644 index 0000000..9e134e6 --- /dev/null +++ b/utils/stdout_template.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- + + +QUESTION_TPL = """ +问题: +======================================= +{0} +{1} +""" + +TIME_CONSUME_TPL = """ +耗时: +======================================= +{0:3f} +""" + +KNOWLEDGE_TPL = """ +知识图谱: +======================================= +{0} +""" + +BAIDU_TPL = """ +百度决策: +======================================= +{0} +{1} +"""