# Demo说明
父表(demo表)有一列是context，将其切分为多个chunking，写入子表（demo_chunking）的text字段，切分方式为 title: chunking。子表预先创建搜索索引，数据
导入完成后便可进行近似检索，将检索的结果与用户问题进行prompt提交给大模型（可以选择提交chunking后的文本，还是父表中的context字段），便可以实现私域数据知识问答。
# 前提条件
1. 使用Lindorm新版售卖，开通宽表、搜索、向量、LTS(宽表数据同步搜索使用)、AI（可以提工单，提供aliuid加白后便可开通）
2. 宽表引擎在控制台开通S3兼容地址（AI引擎依赖该功能）
3. 实例配置白名单，将要运行本文代码的客户端机器IP配置到实例白名单中
# 运行环境
1. 运行环境: python >= 3.10
2. 安装依赖: pip install -r requirements.txt
3. 在 env 文件里填写AI、宽表、搜索、dashscope的api key等相关信息
4. 本文使用开源数据集：https://github.com/ymcui/cmrc2018/tree/master/squad-style-data，下载 cmrc2018_train.json 完成后，放入 data目录下

In [91]:
from langchain_text_splitters import RecursiveCharacterTextSplitter
from textsplitter import ChineseTextSplitter
from ldconfig import Config
from opensearchpy import OpenSearch
from typing import List, Tuple
import json
import mysql.connector
import requests
from tqdm import tqdm
from dashscope import Generation
from http import HTTPStatus
from collections import OrderedDict
from IPython.display import display, clear_output, HTML, JSON
# 控制opensearch的日志输出级别，防止日志打爆
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.getLogger('opensearch').setLevel(logging.WARN)

In [103]:
class DataHandler:
    def __init__(self):
        self.data_path = Config.LOAD_FILE_PATH
        self.chunking_size = 200
    
    def get_all_data(self):
        with open(self.data_path, encoding="utf-8") as file:
            data = json.load(file)
            return data["data"]
            
    """
    数据切分方式：能根据长度以及汉语的逗号等切分
    """
    def data_chinese_splite(self, context):
        chinese_splitter = ChineseTextSplitter(sentence_size=self.chunking_size)
        chunkings = chinese_splitter.split_text(text=context)
        return chunkings

    """
    数据切分方式：能根据长度切分
    """
    def data_character_splite(self, context):
        splitter = RecursiveCharacterTextSplitter(chunk_size=self.chunking_size, chunk_overlap=0)
        chunkings = splitter.split_text(text=context)
        return chunkings

class Lindorm:
    def __init__(self):
        # 主表名
        self.row_parent_table="demo"
        # 子表名
        self.row_child_table="demo_chunking" 
        # embedding 模型名
        self.embedding_model_name = "bge_m3_model"    
        # rerank 模型名
        self.reranker_model_name = "rerank_bge_v2_m3"         
        # 文本字段名
        self.text_field = "text" 
        # 向量字段名
        self.vector_field = "vector_field" 
        # 搜索创建的pipeline名称
        self.pipeline_name = "demo_chunking_embedding_pipeline" 
        # 子表创建的索引名，通过show index from demo_chunking 可查看
        self.chunking_index_name = "sidx" 
        # 父表字段
        self.demo_field = ["document_id", "title", "context", "status", "metadata"]
        # 子表字段
        self.demo_chunking_filed = ["document_id", "chunking_position", "title", self.text_field, "metadata", "chunking_number"]
        self.lindormRow = self.LindormRow(self)
        self.lindormAI = self.LindormAI(self)
        self.lindormSearch = self.LindormSearch(self)
    
    def close(self):
        self.lindormRow.close()
    
    """
    使用 mysql.connector 连接宽表
    """
    class LindormRow:
        def __init__(self, parent):
            self.parent = parent
            self.connection = None
            LD_ROW_COFNIG = {
                "host": Config.ROW_HOST,
                "port": int(Config.ROW_PORT),
                "user": Config.LD_USER,
                "passwd": Config.LD_PASSWORD,
                "db": "default",
                "connection_timeout": 15
            }
            try:
                self.connection = mysql.connector.connect(**LD_ROW_COFNIG)
            except Exception as e:
                print("Connection error: ", e)
                
        def common_create_table(self, sql):
            cursor = None
            try:
                cursor = self.connection.cursor(prepared=True)
                cursor.execute(sql)
            except Exception as e:
                print("Create error : ", e)
            finally:
                if cursor != None:
                    cursor.close()
        
        """
        创建父表语句，可按需修改
        """
        def create_parent_table(self):
            sql = """
                CREATE TABLE IF NOT EXISTS {} (
                    document_id  VARCHAR, 
                    title VARCHAR, 
                    context VARCHAR,
                    status   int, 
                    metadata JSON, 
                    PRIMARY KEY (document_id)
                )
            """.format(self.parent.row_parent_table)
            print("Create parent table sql: ", sql)
            self.common_create_table(sql)
            
        """
        创建子表语句，可按需修改
        """
        def create_child_table(self):
            sql = """
                CREATE TABLE IF NOT EXISTS {} (
                    document_id VARCHAR,
                    chunking_position INT,
                    title  VARCHAR,
                    {}   VARCHAR,
                    {} VARCHAR,
                    metadata JSON,
                    chunking_number INT,
                    PRIMARY KEY (document_id, chunking_position)
                )
            """.format(self.parent.row_child_table, 
                       self.parent.text_field, 
                       self.parent.vector_field)
            print("Create child table sql: ", sql)
            self.common_create_table(sql)
                        
        """
        子表创建搜索索引，本文范例中使用的是hnsw索引，如果数据量较大，建议使用ivfpq索引
        https://help.aliyun.com/document_detail/2773371.html
        """
        # Reference: https://help.aliyun.com/document_detail/260841.html
        def create_child_table_index(self):
            sql = """
            CREATE INDEX IF NOT EXISTS %s USING SEARCH ON %s (
                    document_id(indexed=false,columnStored=false),
                    chunking_position(indexed=false,columnStored=false),
                    title(type=text,analyzer=ik),
                    metadata(indexed=false,columnStored=false),
                    %s(type=text,analyzer=ik),
                    %s(mapping='{
                        "type": "knn_vector",
                        "dimension": 1024,
                        "data_type": "float",
                        "method": {
                            "engine": "lvector",
                            "name": "hnsw", 
                            "space_type": "cosinesimil",
                            "parameters": {
                                "ef_construction": 500,
                                "m": 24
                                }
                            }
                    }')) WITH (INDEX_SETTINGS='{
                    "index": {
                        "knn" : "true",
                        "knn.vector_empty_value_to_keep" : true,
                        "default_pipeline": "%s"
                    }}',
                        SOURCE_SETTINGS=
                        '{
                            "includes": ["document_id", "chunking_position","title", "text"]
                        }',
                        numShards=2
                    )
            """.strip() %  (self.parent.chunking_index_name, 
                            self.parent.row_child_table, 
                            self.parent.text_field, 
                            self.parent.vector_field, 
                            self.parent.pipeline_name)
            print("Create search index sql: \n ", sql)
            self.common_create_table(sql)
            
        """
        写入父表接口，其中初始化写入时，status可以设置为0，代表没有chunking写入子表。
        """
        def write_parent(self, data_json: List):
            write_fields = "{},{},{},{},{}".format(*self.parent.demo_field)
            insert_sql = """
                UPSERT INTO {}({}) VALUES (?, ?, ?, ?, ?) 
            """.format(self.parent.row_parent_table, write_fields)
            cursor = None
            try:
                cursor = self.connection.cursor(prepared=True)
                for data in data_json:
                    write_data = (data["document_id"], 
                                  data["title"], 
                                  data["context"], 
                                  data["status"], 
                                  json.dumps(data["metadata"], ensure_ascii=False))
                    cursor.execute(insert_sql, write_data)
            except Exception as e:
                print("Write error : ", e)
            finally:
                if cursor != None:
                    cursor.close()
        
        """
        宽表游标扫描的方式，参考：https://help.aliyun.com/document_detail/440825.html
        在数据量较大时能提升宽表scan的性能
        """
        def scan(self, cursor_str: str, offset: int, limit: int) -> Tuple[List, str, int]:
            sql_cursor = "/*+ _l_cursor_ */"
            if cursor_str != "" and cursor_str is not None:
                sql_cursor = "/*+ _l_cursor_('{}') */".format(cursor_str)
            sql_var = " {}, {}, {}, {}, {}".format(*self.parent.demo_field)
            sql = "SELECT {} {} from {} limit {}, {}".format(sql_cursor, 
                                                             sql_var, 
                                                             self.parent.row_parent_table, 
                                                             offset, 
                                                             limit)
            results = []
            cursor_return = None
            offset_return = offset
            cursor = None
            try:
                cursor = self.connection.cursor(prepared=False)
                cursor.execute(sql)
                rows = cursor.fetchall()        
                for row in rows:
                    results.append(dict(zip(self.parent.demo_field, row)))
                    cursor_return = row[5]
                offset_return = offset + limit
                return results, cursor_return, offset_return     
            except Exception as e:
                print("Write error : ", e)
            finally:
                if cursor != None:
                    cursor.close()
                    
        """
        写入子表：该语句为批量写入 如
        UPSERT INTO dmeo_chunking(替换为字段) values (?, ?, ?, ?, ?, ?), (?, ?, ?, ?, ?, ?)
        """
        def write_child(self, data_json: List):
            write_fields = "{},{},{},{},{},{}".format(*self.parent.demo_chunking_filed)
            placeholder = " (?, ?, ?, ?, ?, ?) "
            placeholdes = ", ".join( placeholder for _ in range(len(data_json)))
            sql = "UPSERT INTO {}({}) VALUES {}".format(self.parent.row_child_table, write_fields, placeholdes)
            params = []
            for data in data_json:
                params.extend(data[field] for field in self.parent.demo_chunking_filed)
            cursor = None
            try:
                cursor = self.connection.cursor(prepared=True)
                cursor.execute(sql, tuple(params))
            except Exception as e:
                print("Write error : ", e)
            finally:
                if cursor != None:
                    cursor.close()
        
        """
        修改父表的statu，如从0 修改为1 ，业务逻辑同时需要处理，将父表的context进行切分写入子表
        """
        def update_parent_status_sync_child(self, pk: str, status: int):
            sql = "UPSERT INTO {}({},{}) VALUES (?, ?)".format(self.parent.row_parent_table, 
                                                               self.parent.demo_field[0], 
                                                               self.parent.demo_field[3])
            cursor = None
            try:
                cursor = self.connection.cursor(prepared=True)
                cursor.execute(sql, (pk, status))
            except Exception as e:
                print("Write error : ", e)
            finally:
                if cursor != None:
                    cursor.close()
                    
        """
        根据父表的pk获取context字段信息，与问题一起进行prompt，提交大模型
        """
        def get_parent_context(self, pk: str):
            sql = """
                SELECT {} from {} where {}=?
            """.format(self.parent.demo_field[2],  self.parent.row_parent_table, self.parent.demo_field[0])
            cursor = None
            try:
                cursor = self.connection.cursor(prepared=True)
                cursor.execute(sql, (pk,))
                row = cursor.fetchone()
                return row[0]
            except Exception as e:
                print("Query context err: ", e)
            finally:
                if cursor != None:
                    cursor.close()        
            
        def close(self):
            if self.connection != None:
                self.connection.close()

    """
    使用Rest接口访问AI引擎
    """    
    class LindormAI():
        def __init__(self, parent):
            self.parent = parent
            self.headers = {
                "Content-Type": "application/json; charset=utf-8",
                "x-ld-ak": Config.LD_USER,
                "x-ld-sk": Config.LD_PASSWORD
                }
            
        """
        查询当前model的列表
        """
        def list_modes(self) -> list:
            url = "http://{}:{}/v1/ai/models/list".format(Config.AI_HOST, Config.AI_PORT)
            response = requests.get(url, headers=self.headers)
            json_response = response.json()
            if response.status_code != 200 or json_response["success"] is False:
                raise Exception("http request failed, status code: {}".format(json_response["msg"]))
            return json_response["data"]["models"]
        
        def common_create_model(self, model_name, model_path, task, algorithm):
            url = "http://{}:{}/v1/ai/models/create".format(Config.AI_HOST, int(Config.AI_PORT))
            data = {
                "model_name": model_name,
                "model_path": model_path,
                "task": task,
                "algorithm": algorithm,
                "settings": {"instance_count": "2"}
            }
            response = requests.post(url, data=json.dumps(data), headers=self.headers)
            json_response = response.json()
            if response.status_code != 200 or json_response["success"] is False:
                print("http request failed, status code: {}".format(json_response["msg"]))
        
        def check_model_exists(self, model_name) -> list: 
            models = self.list_modes()
            for model in models:
                if model['name'] == model_name:
                    return True
            return False
        
        """
        创建embedding模型，目前推荐使用bge-m3模型即可
        """
        def create_embedding_model(self):
            if self.check_model_exists(self.parent.embedding_model_name):
                print("Model {} exists, skip create".format(self.parent.embedding_model_name))
                return

            self.common_create_model(self.parent.embedding_model_name, 
                                    "huggingface://BAAI/bge-m3",
                                    "FEATURE_EXTRACTION",
                                    "BGE_M3")
          
        """
        创建reranker模型，目前推荐使用 bge-reranker-v2-m3
        """
        def create_reranker_model(self):
            if self.check_model_exists(self.parent.reranker_model_name):
                print("Model {} exists, skip create".format(self.parent.reranker_model_name))
                return
            
            self.common_create_model(self.parent.reranker_model_name,
                                    "huggingface://BAAI/bge-reranker-v2-m3",
                                    "SEMANTIC_SIMILARITY",
                                    "BGE_RERANKER_V2_M3")
        
        """
        对输入的文本文本进行embedding成向量
        """
        def text_embedding(self, input_text:str):
            url = "http://{}:{}/v1/ai/models/{}/infer".format(Config.AI_HOST, 
                                                              Config.AI_PORT, 
                                                              self.parent.embedding_model_name)
            input_text_utf8 = input_text.encode('utf-8').decode('utf-8')
            data = {
                "input": [input_text_utf8]
            }
            response = requests.post(url, data=json.dumps(data), headers=self.headers)
            json_response = response.json()
            if response.status_code != 200 or json_response["success"] is False:
                raise Exception("http request failed, status code: {}".format(json_response["msg"]))
            return json_response["data"][0]

        """
        根据问题以及目前答案的候选集，对答案进行重新排序
        * input_text: 输入的问题
        * chunks: 答案列表
        """
        def reranker(self, input_text:str,  chunks: List[str]):
            url = "http://{}:{}/v1/ai/models/{}/infer".format(Config.AI_HOST,
                                                              Config.AI_PORT, 
                                                              self.parent.reranker_model_name)        
            data = {
                "input": {"query": input_text, "chunks": chunks}
            }
            
            response = requests.post(url, data=json.dumps(data), headers=self.headers)
            json_response = response.json()
            if response.status_code != 200 or json_response["success"] is False:
                raise Exception("http request failed, status code: {}".format(json_response["msg"]))
            return json_response["data"]
            
    """
    使用opensearch客户端访问搜索引擎
    """
    class LindormSearch():
        def __init__(self, parent):
            self.parent = parent
            self.top_k = 5
            self.index_name = "default.{}.{}".format(self.parent.row_child_table,
                                                     self.parent.chunking_index_name)
            self.client = None
            try:
                self.client = OpenSearch(
                    hosts=[{"host": Config.SEARCH_HOST, "port": Config.SEARCH_PORT}],
                    http_auth=(Config.LD_USER, Config.LD_PASSWORD),
                    http_compress=False,
                    use_ssl=False,
                )
            except Exception as e:   
                print("Connection search error", e)
            
        def check_pipeline_exists(self) -> bool:
            try:
                response = self.client.ingest.get_pipeline(id=self.parent.pipeline_name)
                return True
            except Exception as e:
                return False
        
        """
        创建pipeline,搜索内部自动对text字段调用ai引擎进行embedding,写入vector_field 字段
        """
        def create_pipeline(self):
            if self.check_pipeline_exists():
                print("Pipeline {} exists".format(self.parent.pipeline_name))
                # 如果pipeline已经存在，目前策略是跳过，如果是需要调整参数重新创建，则注释掉下方的return
                return
            inner_ai_host = Config.AI_HOST
            if "-pub" in inner_ai_host:
                inner_ai_host = inner_ai_host.replace("-pub", "-vpc")
                
            pipeline = {
                "description": "demo_chunking pipeline",
                "processors": [
                    {
                        "text-embedding": {
                            "inputFields": [self.parent.text_field],   
                            "outputFields": [self.parent.vector_field],
                            "userName": Config.LD_USER,
                            "password": Config.LD_PASSWORD,
                            "url": "http://{}:{}".format(inner_ai_host, int(Config.AI_PORT)),
                            "modeName": self.parent.embedding_model_name
                        }
                    }
                ]
            }    
            try:
                response = self.client.ingest.put_pipeline(id=self.parent.pipeline_name, body=pipeline)
                print("Create pipeline success", response)
            except Exception as e:
                print("Create pipeline errr ", e)
        
        """
        纯文本检索
        """
        def text_search(self, text_query, k = int(Config.SEARCH_TOP_K)):
            query_body = {
                "size": k,
                "_source": ["document_id", "chunking_position",  "text"],
                "query": {
                    "match": {
                        self.parent.text_field: text_query
                    }
                }
            }
            res = self.client.search(index=self.index_name, body=query_body)
            return res['hits']['hits']
        
        """
        纯向量检索
        """
        def vector_search(self, text_query, k = int(Config.SEARCH_TOP_K)):
            vector = self.parent.lindormAI.text_embedding(text_query)
            query_body = {
                "size": k,
                "_source": ["document_id", "chunking_position",  "text"],
                "query":{
                    "knn": {
                        self.parent.vector_field: {
                            "vector": vector,
                            "k": k
                        }                      
                    }
                },
                "ext": {"lvector":{"ef_search": "200"}}
            }
            res = self.client.search(index=self.index_name, body=query_body)
            return res['hits']['hits']
        
        """
        全文、向量融合检索
        """
        def rrf_search(self, text_query, k = int(Config.SEARCH_TOP_K)):
            vector = self.parent.lindormAI.text_embedding(text_query)
            query_body = {
                "size": k,
                "_source": ["document_id", "chunking_position",  "text"],
                "query": {
                    "knn": {
                    self.parent.vector_field: { 
                        "vector": vector,
                        "filter": {
                            "match": {
                                self.parent.text_field: text_query,
                            }
                        },
                        "k": k
                      }
                    }
                },
                "ext": {"lvector": {
                    "hybrid_search_type": "filter_rrf", 
                    "rrf_rank_constant": "60",
                    "ef_search": "200"
                }}
            }
            res = self.client.search(index=self.index_name, body=query_body)
            return res['hits']['hits']

"""
使用api_key访问通义前问的方式
"""
# reference: https://help.aliyun.com/zh/dashscope/developer-reference/qwen-api
class AliQwen():
    def __init__(self):
        self.api_key = Config.DASHSCOPE_API_KEY
        self.model_name = "qwen-turbo"
        self.PROMPT_TEMPLATE = """已知信息：
{context} 
根据上述已知信息，专业的来回答用户的问题。如果无法从中得到答案，请说 “根据已知信息无法回答该问题” 或 “没有提供足够的相关信息”，不允许在答案中添加编造成分，答案请使用中文。 问题是：{question}"""
    
    """
    非流式对话大模型
    """
    def chat(self, prompt: str):
        response = Generation.call(model=self.model_name, prompt=prompt, stream=False, api_key=self.api_key)
        if response.status_code == HTTPStatus.OK:
            return response.output.text
        else:
            raise Exception(response.message)
    
    """
    流式对话大模型
    """
    def chat_stream(self, prompt: str):
        responses = Generation.call(model=self.model_name, prompt=prompt, stream=True, api_key=self.api_key)
        for response in responses:
            if response.status_code == HTTPStatus.OK:
                yield response.output.text
            else:
                raise Exception(response.message)
    
    """
    问题与相关提示一起组装
    """
    def gen_prompt(self, query: str, context: str):
        return self.PROMPT_TEMPLATE.replace("{question}", query).replace("{context}", context)

"""
处理rerank之后的结果，如为每个json对象添加 rerank_score 字段标识为rerank之后的得分
"""
def handler_reranker(origin_result, reranker_result, topk):
    reranked_origin_result = []
    for score_item in reranker_result:
        index = score_item['index']
        if index < len(origin_result):
            original = origin_result[index]
            original['rerank_score'] = score_item['score']
            reranked_origin_result.append(original)
    return reranked_origin_result[0:topk]

"""
打印工具
"""
def wrap_text(text, width):
    wrapped_lines = []
    for line in text.splitlines():
            wrapped_lines.extend([line[i:i + width] for i in range(0, len(line), width)])
    return "\n".join(wrapped_lines)

注意：下方代码请严格按照顺序执行，也可以拆分后按顺序一个一个执行
- 宽表引擎: 通过 mysql -h替换宽表实例域名 -P33060 -p替换密码 -uroot 可以链接宽表引擎, show tables;  show index from demo_chunking命令可以查看相关命令执行情况
- AI引擎: 可以通过调用list_modes 查看 模型创建的情况, 只有  status状态为 READY 才可以提供服务
- 搜索引擎: 可以通过 check_pipeline_exists 查看pipeline的创建情况

In [93]:
def main_lindorm():
    lindorm = Lindorm()
    # 宽表: 创建父表，通过musla
    lindorm.lindormRow.create_parent_table()
    # 宽表: 创建子表
    lindorm.lindormRow.create_child_table()
    # AI: 创建AI embedding 模型
    lindorm.lindormAI.create_embedding_model()
    # AI: 创建AI reranker 模型
    lindorm.lindormAI.create_reranker_model()
    # AI: 查看AI现有模型
    models = lindorm.lindormAI.list_modes()
    print("models: ", json.dumps(models, indent=4, ensure_ascii=False))
    # 搜索: 创建pipeline, 这样在写入text 字段时，搜索引擎内部会自动调用AI引擎进行embedding，生成向量 vector_field 字段
    lindorm.lindormSearch.create_pipeline()
    # 宽表: 为 demo_chunking 创建 搜索索引
    lindorm.lindormRow.create_child_table_index()   
    lindorm.close()

main_lindorm()

Create parent table sql:  
                CREATE TABLE IF NOT EXISTS demo (
                    document_id  VARCHAR, 
                    title VARCHAR, 
                    context VARCHAR,
                    status   int, 
                    metadata JSON, 
                    PRIMARY KEY (document_id)
                )
            
Create child table sql:  
                CREATE TABLE IF NOT EXISTS demo_chunking (
                    document_id VARCHAR,
                    chunking_position INT,
                    title  VARCHAR,
                    text   VARCHAR,
                    vector_field VARCHAR,
                    metadata JSON,
                    chunking_number INT,
                    PRIMARY KEY (document_id, chunking_position)
                )
            
Model bge_m3_model exists, skip create
Model rerank_bge_v2_m3 exists, skip create
models:  [
    {
        "name": "rerank_bge_v2_m3",
        "status": "READY",
        "sql_function": "ai_infer",
      

下方为写入父表（demo表）

In [99]:
def demo_write_parent_data():
    lindorm = Lindorm()
    data_handler = DataHandler()
    datas = data_handler.get_all_data()
    data_count = len(datas)  
    batch_size = 100
    print("total data: ", data_count, " each processing batch: ", batch_size)
    for i in tqdm(range(0, data_count, batch_size), desc="Writing Data", unit="batch"):
        batch_data = []
        for j in range(i, min(i+batch_size, data_count)):
            data = datas[j]
            writeJson = {
                "document_id": data['id'],
                "title": data['title'],
                "context": data['paragraphs'][0]["context"],
                "status": 0,
                "metadata": {"source": Config.LOAD_FILE_PATH}
            }
            batch_data.append(writeJson)
        lindorm.lindormRow.write_parent(batch_data)
    lindorm.close()
# 确保仅执行一次即可, 有写入需求请打开下方注释
#demo_write_parent_data()

total data:  2403  each processing batch:  100


Writing Data: 100%|██████████| 25/25 [03:28<00:00,  8.35s/batch]


父表中的context进行切分写入子表。需要了解切分后的格式以便于业务选择合适的切分方式
- RecursiveCharacterTextSplitter 严格按照长度切分
- ChineseTextSplitter 长度以及中的逗号等分割
切分效果可以查看下方的输出

In [100]:
# 数据切分选择
def demo_data_chunking():
    data_handler = DataHandler()
    datas = data_handler.get_all_data()
    context = datas[3]['paragraphs'][0]["context"]
    wrapped_text = wrap_text(context, 120)
    display(HTML(f"<pre>原始文档: {context}</pre>"))
    chunkings = data_handler.data_character_splite(context)
    display(HTML(f"<pre style='color: red;'>RecursiveCharacterTextSplitter 切分方式： </pre>"))
    for chunking in chunkings:
        print(chunking)
    chunkings = data_handler.data_chinese_splite(context)
    display(HTML(f"<pre style='color: red;'>ChineseTextSplitter 切分方式： </pre>"))
    for chunking in chunkings:
        print(chunking)
demo_data_chunking()

NGC 6231是一个位于天蝎座的疏散星团，天球座标为赤经16时54分，赤纬-41度48分，视觉观测大小约45角分，亮度约2.6视星等，距地球5900光年。NGC 6231年龄约为三百二十万年，是一个非常年轻的星团，星团内的最亮星是5等的天蝎座 ζ1星。用双筒望远镜或小型望远镜就能看到个别的行星。NGC 6231在1654年被意大利天文学家乔瓦尼·巴蒂斯特·霍迪尔纳（Giovanni
Battista Hodierna）以Luminosae的名字首次纪录在星表中，但是未见记载于夏尔·梅西耶的天体列表和威廉·赫歇尔的深空天体目录。这个天体在1678年被爱德蒙·哈雷（I.7）、1745年被夏西亚科斯（Jean-Phillippe Loys de Cheseaux）（9）、1751年被尼可拉·路易·拉卡伊（II.13）分别再次独立发现。


NGC 6231是一个位于天蝎座的疏散星团，天球座标为赤经16时54分，赤纬-41度48分，视觉观测大小约45角分，亮度约2.6视星等，距地球5900光年。
NGC 6231年龄约为三百二十万年，是一个非常年轻的星团，星团内的最亮星是5等的天蝎座 ζ1星。
用双筒望远镜或小型望远镜就能看到个别的行星。
NGC 6231在1654年被意大利天文学家乔瓦尼·巴蒂斯特·霍迪尔纳（Giovanni Battista Hodierna）以Luminosae的名字首次纪录在星表中，但是未见记载于夏尔·梅西耶的天体列表和威廉·赫歇尔的深空天体目录。
这个天体在1678年被爱德蒙·哈雷（I.7）、1745年被夏西亚科斯（Jean-Phillippe Loys de Cheseaux）（9）、1751年被尼可拉·路易·拉卡伊（II.13）分别再次独立发现。


下方演示对父表中的context字段进行切分写入子表的text字段，字表中的chunking_position可以记录切分位置,chunking_number记录总的切分数
- 使用游标scan宽表的方式扫描父表（demo表）
- 需要把父表中的status由0修改为1，代表已经切分子表
- 样例使用ChineseTextSplitter切分，有需求可以替换为 RecursiveCharacterTextSplitter 切分

In [101]:
def scan_update_parent_status_write_child():
    lindorm = Lindorm()
    data_handler = DataHandler()
    cursor = None
    offset = 0
    limit = 1000
    while True:
        results, cursor, offset = lindorm.lindormRow.scan(cursor, offset, limit)
        for result in tqdm(results, desc="Writing Chunking Data", unit="batch"):
            if int(result["status"]) == 0:
                title = result["title"]
                lindorm.lindormRow.update_parent_status_sync_child(result["document_id"], 1)
                context = result["context"]
                chunkings = data_handler.data_chinese_splite(context)
                chunking_number = len(chunkings)
                document_id = result["document_id"]
                batch_data = []
                for i in range(chunking_number):
                    write_data = {
                        "document_id": document_id,
                        "chunking_position": i,
                        "title": title,
                        "text": "{}: {}".format( title, chunkings[i]),
                        "metadata": result["metadata"],
                        "chunking_number": chunking_number
                    }
                    batch_data.append(write_data)
                lindorm.lindormRow.update_parent_status_sync_child(document_id, 1)
                lindorm.lindormRow.write_child(batch_data)
        if cursor == "" or cursor is None:
            break
        if len(results) == 0:
            break    
    lindorm.close()
# 确保仅执行一次即可，该操作会写入demo_chunking子表，并同步至搜索引擎
# scan_update_parent_status_write_child()

Writing Chunking Data: 100%|██████████| 1000/1000 [14:54<00:00,  1.12batch/s]
Writing Chunking Data: 100%|██████████| 1000/1000 [14:54<00:00,  1.12batch/s]
Writing Chunking Data: 100%|██████████| 403/403 [06:00<00:00,  1.12batch/s]


下方演示：全文、向量混合检索的调用方式

In [94]:
def demo_rrf_search():
    lindorm = Lindorm()
    query="国际初中科学奥林匹克主要比赛科目"
    results = lindorm.lindormSearch.rrf_search(query)
    display(JSON(results, expanded=True, root="rrf_search_result"))
    lindorm.close()
demo_rrf_search()

<IPython.core.display.JSON object>

下方演示：先全文检索再进行rerank的使用方式

In [109]:
def demo_rerank():
    lindorm = Lindorm()
    query="国际初中科学奥林匹克主要比赛科目"
    topk=int(Config.SEARCH_TOP_K)
    origin_result = lindorm.lindormSearch.rrf_search(query,  topk * 2)
    display(JSON(origin_result[0:topk], expanded=True, root="Before rerank result"))
    texts = [item["_source"]["text"] for item in origin_result]
    reranker_result = lindorm.lindormAI.reranker(query, texts)
    reranked_origin_result = handler_reranker(origin_result, reranker_result, topk)
    display(JSON(reranked_origin_result, expanded=True, root="After rerank result"))
    lindorm.close()
demo_rerank()

<IPython.core.display.JSON object>

<IPython.core.display.JSON object>

下方演示：先进行全文向量混合检索，将混合检索的结果进行rerank，然后将问题以及检索结果一起prompt提交大模型

In [106]:
def demo_chat_with_child_chunking():
    lindorm = Lindorm()
    topk=int(Config.SEARCH_TOP_K)
    query="国际初中科学奥林匹克主要比赛科目"
    search_result = lindorm.lindormSearch.rrf_search(query, topk * 2)
    texts = [item["_source"]["text"] for item in search_result]
    reranker_result = lindorm.lindormAI.reranker(query, texts)
    prompt_context = "\n".join(item['chunk'] for item in reranker_result[0:topk])
    ali_qwen = AliQwen()
    prompt = ali_qwen.gen_prompt(query, prompt_context)
    output_text = ""
    for part in ali_qwen.chat_stream(prompt):
        output_text = part 
        wrapped_text = wrap_text(output_text, 80)
        clear_output(wait=True)
        display(HTML(f"<pre style='color: red;'>{wrapped_text}</pre>"))
    
    wrapped_text = wrap_text(prompt, 120)
    display(HTML(f"<pre>提示模版为:\n{wrapped_text}</pre>"))
    lindorm.close()
    
demo_chat_with_child_chunking()

下方演示：先进行全文向量混合检索，将混合检索的结果进行rerank，然后从父表（demo表）中查询context字段，与问题一起prompt提交大模型回答

In [97]:
def demo_chat_with_parent():
    lindorm = Lindorm()
    topk=int(Config.SEARCH_TOP_K)
    query="国际初中科学奥林匹克主要比赛科目"
    search_result = lindorm.lindormSearch.rrf_search(query, topk * 2)
    texts = [item["_source"]["text"] for item in search_result]
    reranker_result = lindorm.lindormAI.reranker(query, texts)
    reranked_origin_result = handler_reranker(search_result, reranker_result, topk)
    unique_document_ids = list(OrderedDict.fromkeys(item['_source']['document_id'] for item in reranked_origin_result))
    contexts = []
    for document_id in unique_document_ids:
        contexts.append(lindorm.lindormRow.get_parent_context(pk=document_id))
    prompt_context = "\n".join(contexts)        
    ali_qwen = AliQwen()
    prompt = ali_qwen.gen_prompt(query, prompt_context)    
    # stream
    for part in ali_qwen.chat_stream(prompt):
        output_text = part 
        wrapped_text = wrap_text(output_text, 80)
        clear_output(wait=True)
        display(HTML(f"<pre style='color: red;'>{wrapped_text}</pre>"))
    
    wrapped_text = wrap_text(prompt, 120)
    display(HTML(f"<pre>提示模版为:\n{wrapped_text}</pre>")) 
    lindorm.close()

demo_chat_with_parent()