# late chunking chinese 

In [3]:
import os
# 使用huggingface镜像
os.environ["HF_ENDPOINT"] = "https://hf-mirror.com"

In [2]:
from transformers import AutoModel
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained('../late_chunking/jina-v2-zh', trust_remote_code=True)
model = AutoModel.from_pretrained('../late_chunking/jina-v2-zh', trust_remote_code=True)

In [3]:
input_text = "战士金的新书已经出版了。他的新书名字是大模型RAG实战。这本书由机械工业出版社出版。可以在京东上购买。"

def chunk_by_sentences(input_text: str, tokenizer: callable,  split_token = "。"):
    """文本切块+找到每个文本块在token粒度的索引范围"""
    print("input_text:", input_text)
    inputs = tokenizer(input_text, return_tensors='pt', return_offsets_mapping=True)
    punctuation_mark_id = tokenizer.convert_tokens_to_ids(split_token)
    token_offsets = inputs['offset_mapping'][0]
    # 保证最后的句子保存起来
    sep_id = int(token_offsets[-1][0])
    token_ids = inputs['input_ids'][0]
    # 找到文本粒度的划分
    chunk_positions = []
    for i, (token_id, (start, end)) in enumerate(zip(token_ids, token_offsets)):
        if token_id == punctuation_mark_id:
            if token_offsets[i + 1][0] - token_offsets[i][1] >= 0 or token_offsets[i + 1][0]==sep_id:
                chunk_positions.append((i, int(start + 1))) 
    chunks = [
        input_text[x[1] : y[1]]
        for x, y in zip([(1, 0)] + chunk_positions[:-1], chunk_positions)
    ]
    span_annotations = [
        (x[0], y[0]) for (x, y) in zip([(1, 0)] + chunk_positions[:-1], chunk_positions)
    ]
    return chunks, span_annotations


split_token = "。"
chunks, span_annotations = chunk_by_sentences(input_text, tokenizer, split_token=split_token) 
print('Chunks:\n- "' + '"\n- "'.join(chunks) + '"')
print(span_annotations)

input_text: 战士金的新书已经出版了。他的新书名字是大模型RAG实战。这本书由机械工业出版社出版。可以在京东上购买。
Chunks:
- "战士金的新书已经出版了。"
- "他的新书名字是大模型RAG实战。"
- "这本书由机械工业出版社出版。"
- "可以在京东上购买。"
[(1, 7), (7, 16), (16, 22), (22, 27)]


In [4]:
# 传统chunk方法
embeddings_traditional_chunking = model.encode(chunks)


def chunked_pooling(
    model_output, span_annotation: list, max_length=None
):
    """对token embedding序列分chunk做mean pooling"""
    token_embeddings = model_output[0]
    outputs = []
    for embeddings, annotations in zip(token_embeddings, span_annotation):
        if (
            max_length is not None
        ):  # remove annotations which go bejond the max-length of the model
            annotations = [
                (start, min(end, max_length - 1))
                for (start, end) in annotations
                if start < (max_length - 1)
            ]
        pooled_embeddings = [
            embeddings[start:end].sum(dim=0) / (end - start)
            for start, end in annotations
            if (end - start) >= 1
        ]
        pooled_embeddings = [
            embedding.detach().cpu().numpy() for embedding in pooled_embeddings
        ]
        outputs.append(pooled_embeddings)

    return outputs


inputs = tokenizer(input_text, return_tensors='pt')
model_output = model(**inputs)
embeddings = chunked_pooling(model_output, [span_annotations])[0]

In [6]:
import numpy as np

cos_sim = lambda x, y: np.dot(x, y) / (np.linalg.norm(x) * np.linalg.norm(y))
query = '战士金的新书叫什么'
berlin_embedding = model.encode(query)

for chunk, new_embedding, trad_embeddings in zip(chunks, embeddings, embeddings_traditional_chunking):
    print(f'late chunk score("{query}", "{chunk}"):', cos_sim(berlin_embedding, new_embedding))
    print(f'native chunking score("{query}", "{chunk}"):', cos_sim(berlin_embedding, trad_embeddings))
    print("===")

native chunk score("战士金的新书叫什么", "战士金的新书已经出版了。"): 0.838993
late chunking score("战士金的新书叫什么", "战士金的新书已经出版了。"): 0.9393679
===
native chunk score("战士金的新书叫什么", "他的新书名字是大模型RAG实战。"): 0.7289395
late chunking score("战士金的新书叫什么", "他的新书名字是大模型RAG实战。"): 0.52649915
===
native chunk score("战士金的新书叫什么", "这本书由机械工业出版社出版。"): 0.71318245
late chunking score("战士金的新书叫什么", "这本书由机械工业出版社出版。"): 0.3055911
===
native chunk score("战士金的新书叫什么", "可以在京东上购买。"): 0.6868561
late chunking score("战士金的新书叫什么", "可以在京东上购买。"): 0.12445604
===
