In [1]:
import os
import re
import json
import logging

#logging.basicConfig(filename='booksearch.log', encoding='utf-8', level=logging.DEBUG)
logging.basicConfig(level=logging.WARNING, format='%(asctime)s (levelname)s: %(message)s', datefmt='%Y/%m/%d %I:%M:%S %p', force=True)

In [2]:
# 对带小括号的字符串解码
# reference from: stackoverflow.com/questions/14952113/how-can-i-match-nested-brackets-using-regex
def _parse_to_list(s, i=0):
    result = []
    while i < len(s):
        if s[i] == '(':
            i, r = _parse_to_list(s, i+1)
            result.append(r)
        elif s[i] == ')':
            return i+1, result
        else:
            result.append(s[i])
            i += 1
    return i, result

In [3]:
# 对带小括号的字符串解码为嵌套的list
def _parse_nestedbrackets_to_list(s, i = 0, level = 0):
    result = []
    content = ''
    while i < len(s):
        if s[i] == '(':
            if len(content):
                logging.debug(f"L{level}, {content}")
                result.append(content)
                content = ''
                
            i, r, level = _parse_nestedbrackets_to_list(s, i + 1, level + 1)
            result.append(r)
            
        elif s[i] == ')':
            if len(content):
                logging.debug(f"L{level}, {content}")
                result.append(content)
            return i+1, result, level - 1
        else:
            content += s[i]
            i += 1
            logging.debug(f"L{level}, {content}")

    if len(content):
        logging.debug(f"L{level}, {content}")
        result.append(content)
        content = ''
        
    return i, result, level

In [4]:
# 测试嵌套括号的解码
text = "(大人 or 小人) and not 君子"
text = "h or (a and (b or c and not (f or g))) and (not e) or k or j and (i and m)"
#text = "a and and b or c"
_parse_nestedbrackets_to_list(text)

(74,
 ['h or ',
  ['a and ', ['b or c and not ', ['f or g']]],
  ' and ',
  ['not e'],
  ' or k or j and ',
  ['i and m']],
 0)

In [5]:
def _judge_condition(conditiona, conditionb, operator):
    """
    给出两个条件的结果
    """
    assert conditiona is not None or conditionb is not None
    if conditiona is None and conditionb is not None:
        return conditionb
    if conditionb is None and conditiona is not None:
        return conditiona
    
    assert operator == 'AND' or operator == 'OR'
    if operator == 'AND':
        return conditiona and conditionb
    elif operator == 'OR':
        return conditiona or conditionb
    else:
        logging.error(f"Error in A: {conditiona}, B: {conditionb}, operator: {operator}.")

In [6]:
def _excute_signle_query(query, cobject, last_condition=None, operator=None, not_operator=False, level = 0):
    """
    执行不嵌套的查询语句，这个查询语句的前一个条件，如有，则为last
    operator = AND | OR | NOT | AND + NOT | OR + NOT
    query one = condition + operator + [query one]
    query two = operator
    query three = operator + condition + [query two | query three]
    NOTE: 应该处理当条件已经变成FALSE后，就不应该在往下处理。
    """
    operator = operator
    not_operator = not_operator
    last_condition = last_condition
    result = last_condition
    condition = None
    for index, word in enumerate(query):
        if len(word) == 0:
            continue
        upper_word = word.upper()
        if upper_word == 'AND' or upper_word == 'OR':
            operator = upper_word
        elif upper_word == 'NOT':
            not_operator = True
        else:
            match = [m.start() for m in re.finditer(word, cobject['content'])]
            condition = True if len(match) else False

            if not_operator:
                condition = not condition
                not_operator = False
            if last_condition is not None:
                result = _judge_condition(last_condition, condition, operator)
                operator = None
                last_condition = result
            else:
                last_condition = condition
                condition = None
        logging.debug(f"{'': <{level*2}}{index}: {word}, A: {last_condition}, B: {condition}, operator: {operator}, not: {not_operator}.")
    return last_condition, operator, not_operator

In [7]:
def _excute_query(querylist, cobject, last_condition = None, operator = None, not_operator = False, level = 0):
    """
    执行嵌套的查询语句。
    querylist为parse_nestedbrackets_to_list获得的结果。
    """
    last_condition = last_condition
    operator = operator
    not_operator = not_operator
    level = level
    for q in querylist:
        if isinstance(q, list):
            logging.debug(f"{'': <{level*2}}L{level}, sub-query begin: {q}")
            sub_last_condition, sub_operator, sub_not_operator, _ = _excute_query(q, cobject, last_condition = None, operator = None, not_operator = False, level = level + 1)

            if not_operator:
                sub_last_condition = not sub_last_condition
            not_operator = sub_not_operator
            if last_condition is not None:
                result = _judge_condition(last_condition, sub_last_condition, operator)
                operator = sub_operator
                last_condition = result
            else:
                last_condition = sub_last_condition
            logging.debug(f"{'': <{level*2}}L{level}, sub-query end: L: {last_condition}, operator: {operator}, not: {not_operator}.")
        else:
            result = q.split(" ")  # 以空格为分隔符分割字符串 "apple banana cherry"
            logging.debug(f"{'': <{level*2}}L{level}, query begin: " + "/".join(result)) # 输出结果为 ['apple', 'banana', 'cherry']
            last_condition, operator, not_operator = _excute_signle_query(result, cobject, last_condition = last_condition, operator = operator, not_operator = not_operator, level = level)
            logging.debug(f"{'': <{level*2}}L{level}, query end: L: {last_condition}, operator: {operator}, not: {not_operator}.")
            
    return last_condition, operator, not_operator, level

In [8]:
# query: "(content:大人 or content:小人) and not content:君子"
# cobject: {
#     'book_title': book_title,
#     'volume_title': volume_title,
#     'chapter_title': chapter_title,
#     'content': content
# }
def search(query, cobject):
    _, querylist, _ = _parse_nestedbrackets_to_list(query)
    result, _, _, _ = _excute_query(querylist, cobject)
        
    return result

In [9]:
# 测试 excuteQuery
cobject = {
    'book_title': '',
    'volume_title': '',
    'chapter_title': '',
    'content': "天之爱人也，薄于圣人之爱人也；其利人也，厚于圣人之利人也。大人之爱小人也，薄于小人之爱大人也；其利小人也，厚于小人之利大人也。以臧为其亲也，而爱之，非爱其亲也；以臧为其亲也，而利之，非利其亲也。以乐为爱其子，而为其子欲之，爱其子也。以乐为利其子，而为其子求之，非利其子也。"
    #'content': "密云不雨，君子"
    #'content': "太公曰：“臣闻君子乐得其志，小人乐得其事。今吾渔甚有似也，殆非乐之也。”"
}

query = "((大人 or 小人) and (not 君子)) or 密云不雨"
#query = "(not 君子)"
#query = "大人"
#query = "not (大人 or 小人) and (君子)"
query = " 君子 and not (大人 or 小人)"

result = search(query, cobject)
#result = search(["大人","and","中人"], cobject)
result

False

In [10]:
def querybook(filename, querylist, booklist, include):
    
    #print(filename)
    pieces = []
    with open(filename, 'r', encoding='utf-8') as file:
        book = json.load(file)
        book_title = book["title"]
        if booklist is not None:
            bookin = True if book_title in booklist else False
            if (bookin == True and include == False) or (bookin == False and include == True):
                logging.info(f"pass《{book_title}》...")
                return pieces, False
                
        description = book["description"]
        logging.info(f"search in《{book_title}》...")
        for volume in book["volumes"]:
            volume_title = volume["title"]
            for chapter in volume["chapters"]:
                chapter_title = chapter["title"]                
                if (chapter.get("paragraphs")):
                    for paragraph in chapter["paragraphs"]:
                        content = paragraph["content"]
                        cobject = {
                            'book_title': book_title,
                            'volume_title': volume_title,
                            'chapter_title': chapter_title,
                            'content': content
                        }
                        result, _, _, _ = _excute_query(querylist, cobject)
                        if result == True:
                            pieces.append(cobject)
        return pieces, True

In [11]:
import concurrent.futures
def querybooks(bookspath, query, books, limit = 10):
    # 解析 books.json 文件
    filenames = os.listdir(bookspath)
    filenames.sort()
    filenames = [f"{bookspath}/{filename}" for filename in filenames if filename.endswith(".json")]
    
    total_pieces = []
    _, querylist, _ = _parse_nestedbrackets_to_list(query)
    
    include = None
    booklist = None
    if len(books) > 0 and (books is not None):
        if books[0] == '+':
            include = True
        elif books[0] == '-':
            include = False
        booklist = books[1:].split(",")
            
    # 非多线程方式
    #for filename in filenames:
    #    pieces = querybook(filename, querylist)
    #    total_pieces = total_pieces + pieces

    # 多线程方式
    # 使用 ThreadPoolExecutor 对每个book的搜索启动一个线程进行处理
    search_book_count = 0
    with concurrent.futures.ThreadPoolExecutor(max_workers = 10, thread_name_prefix='s_thread') as executor:
        futures = [executor.submit(lambda p: querybook(*p), (filename, querylist, booklist, include)) for filename in filenames]
        # 等待每个线程执行完毕
        for future in concurrent.futures.as_completed(futures):
            pieces, search = future.result()
            total_pieces = total_pieces + pieces #非线程安全
            search_book_count = search_book_count + 1 if search == True else search_book_count
     
    if limit is not None:
        return total_pieces[:10], len(total_pieces), search_book_count
    else:
        return total_pieces, len(total_pieces), search_book_count

In [12]:
bookspath = "/Users/sunyafu/zebra/BookMan/BookServer/books"

In [13]:
def output(results):
    for index, result in enumerate(results):
        if len(result['volume_title']):
            print(f"{index}. {result['book_title']} {result['volume_title']}·{result['chapter_title']}")
        else:
            print(f"{index}. {result['book_title']} {result['chapter_title']}")
        #print(f"... {result.highlights('content')} ...\n")
        print(f"... {result['content']} ...\n") 

In [14]:
def output_markdown(results):
    if len(results) == 0:
        return
    
    print("|NO|所在书籍|所在章节|所在段落|")
    print("|--|--|--|--|")
    
    for index, result in enumerate(results):
        if len(result['volume_title']):
            print(f"|{index}|{result['book_title']}|{result['volume_title']}·{result['chapter_title']}|{result['content']}|")
        else:
            print(f"|{index}|{result['book_title']}|{result['chapter_title']}|{result['content']}|")

In [20]:
#query = "not (大人 or 小人) and (君子)"
#query = "大人 and 小人"
#query = "君子 not (大人 or 小人)"
#query = "君子 and not (大人 or 小人)"
query = "拯马"
query = "左股"
query = "乘马"
query = "右骖" # 骈、骖、驷、騑、服
query = "大人"
#books = "+诗经" # +bookname+bookname, -bookname-bookname
books = "+诗经,周礼,尚书,逸周书,左传,公羊传,谷梁传,国语,战国策,竹书纪年,穆天子传,周易,易传"
query = "布衣"
query = "大人 and 小人 and 君子 and 圣人" # 愚人 哲人 庶民 庶人 仕

results, piececount, bookcount = querybooks(bookspath, query, books, limit = None)
print(f"搜索{bookcount}本书籍，一共发现{piececount}个结果。\n")

#output(results)
output_markdown(results)

搜索13本书籍，一共发现0个结果。



In [16]:
def checkQuery(querylist):
    for q in querylist:
        if isinstance(q, list):
            print(f"sub-query:")
            checkQuery(q)
        else:
            result = q.split(" ")  # 以逗号或者空格为分隔符分割字符串
            print("query: " + "/".join(result))  # 输出结果为 ['apple', 'banana', 'cherry']
    return True

def queryParser(query):
    _, querylist, _ = _parse_nestedbrackets_to_list(query)
    
    #querylist = querylist[1]
    r = checkQuery(querylist)
    if r == False:
        print(f"query: '{query}' is not a correctly query.")
        print(f"error message: {r}.")
        return querylist
    else:
        return []

In [17]:
import re

books = "-易经+周易+诗经-油车还扯"
booklist = re.findall("[\+｜-]\w+", books, re.I)
if '-' in booklist:
    print("易经 在booklist。")

books = "+易经,周易,诗经,孙子兵法"
#books = None
include = None
if books is not None and books[0] == '+':
    include = True
elif books is not None and books[0] == '-':
    include = False

book = "周易"
booklist = books[1:].split(",")
if include == True:
    if book in booklist:
        print(f"inlcude, book: {book} in booklist: {booklist}")
elif include == False:
    if book in booklist:
        print(f"exlcude, book: {book} in booklist: {booklist}")

inlcude, book: 周易 in booklist: ['易经', '周易', '诗经', '孙子兵法']
