## セットアップ

In [None]:
!pip install -q transformers sentencepiece sentence-transformers \
    torch faiss-cpu datasets scipy scikit-learn numpy

In [None]:
# VRAMの環境変数設定
import os
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

## モデルの準備

In [None]:
import torch
from transformers import AutoModel, AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("pfnet/plamo-embedding-1b", trust_remote_code=True)
model = AutoModel.from_pretrained("pfnet/plamo-embedding-1b", trust_remote_code=True)

device = "cuda" if torch.cuda.is_available() else "cpu"
model = model.to(device)
model.eval()


## 埋め込み計算モジュールの定義

In [None]:
import numpy as np

def embed_texts(
    texts: list[str],
    batch_size: int = 32,
    is_query: bool = False,
) -> np.ndarray:
    """
    texts           : 埋め込み対象の文字列リスト
    batch_size      : 一度に投入する文の数（デフォルト：32）
    is_query        : True の場合は model.encode_query を使用
                                False の場合は model.encode_document を使用
    """
    all_embs = []

    with torch.inference_mode():
      # 自動混合精度演算
      with torch.amp.autocast("cuda"):
            for i in range(0, len(texts), batch_size):
                batch = texts[i : i + batch_size]
                if is_query:
                    embs = model.encode_query(batch, tokenizer)
                else:
                    embs = model.encode_document(batch, tokenizer)

                # embs は torch.Tensor
                embs_np = embs.to("cpu").numpy()  # NumPy に変換
                # VRAM を開放する
                del embs
                torch.cuda.empty_cache()
                embs_np = np.nan_to_num(embs_np)
                all_embs.append(embs_np)

    return np.vstack(all_embs)


## JSTSによるモデルの評価

In [None]:
from datasets import load_dataset

ds = load_dataset("sbintuitions/JMTEB", "jsts")
jsts = ds["test"]
print(jsts.column_names)  # ['sentence_pair_id', 'yjcaptions_id', 'sentence1', 'sentence2', 'label']

In [None]:
from sklearn.metrics.pairwise import cosine_similarity

# JSTS の全ペアを「文1／文2」と「スコア」に分解
sent1 = jsts["sentence1"]
sent2 = jsts["sentence2"]
gold_score = np.array(jsts["label"], dtype=float)  # 0〜5 の連続値

# 埋め込み
emb1 = embed_texts(sent1)
emb2 = embed_texts(sent2)

# コサイン類似度
# N×2048 の行列同士のコサイン類似度を一度に出す
sim_matrix = cosine_similarity(emb1, emb2)
# 対角成分だけを取り出せばペアごとの類似度が得られる
cos_sim = np.diag(sim_matrix)  # shape=(N,)

In [None]:
# スピアマンとピアソンの相関係数を計算
from scipy.stats import spearmanr, pearsonr

spearman_corr, _ = spearmanr(cos_sim, gold_score)
pearson_corr, _  = pearsonr(cos_sim, gold_score)

print(f"Spearman: {spearman_corr:.4f}")
print(f"Pearson : {pearson_corr:.4f}")

## インデックス用データ読み込み・前処理

In [None]:
from google.colab import drive
drive.mount('/content/drive')
# drive.mount("/content/drive", force_remount=True)

In [None]:
from datasets import load_dataset
# Livedoor ニュースコーパス (約7,300記事) を train/val/test に分割読み込み
ds = load_dataset(
    "shunk031/livedoor-news-corpus",
    train_ratio=0.8,
    val_ratio=0.1,
    test_ratio=0.1,
    shuffle=False,
)
train = ds["train"]
print(train)

In [None]:
# 検索対象：記事のタイトル＋本文
corpus_texts = [
    f"{row['title']}。{row['content']}"
    for row in train
]

# （例）最初の5件をクエリとして流用
query_texts = corpus_texts[:5]
# 正解ID (今回は自身の記事をトップ1に返す recall@1)
true_ids = list(range(len(query_texts))) # true_ids = [0, 1, 2, ..., 5]

In [None]:
# 記事のタイトルと本文(corpus_texts)をjsonl形式で保存

import json

with open('/content/drive/MyDrive/Colab Notebooks/data/text_embedding_search/corpus.jsonl', 'w', encoding='utf-8') as f:
    for text in corpus_texts:
        json.dump({'text': text}, f, ensure_ascii=False)
        f.write('\n')


In [None]:
# 文を埋め込み化する（時間のかかる処理）
xb = embed_texts(corpus_texts, batch_size=8, is_query=False)  # index 用
xq = embed_texts(query_texts,  batch_size=8, is_query=True)   # query 用

In [None]:
import faiss

d = xb.shape[1]  # 埋め込み次元
M = 32           # 各点の近傍リンク数
efC = 200        # 構築時の探索深さ

index = faiss.IndexHNSWFlat(d, M)
index.hnsw.efConstruction = efC

# 埋め込みベクトルを追加
index.add(xb)

In [None]:
index.hnsw.efSearch = 50  # 検索時の探索深さ
k = 5                    # top-k 件取得
D, I = index.search(xq, k) # 検索を実行

In [None]:
recall1 = np.mean([1 if true_ids[i] in I[i,:1] else 0 for i in range(len(true_ids))])
print(f"Recall@1: {recall1:.3f}")

In [None]:
# 埋め込みと Faiss インデックスを保存
np.save('/content/drive/MyDrive/Colab Notebooks/data/text_embedding_search/embeddings_xb.npy', xb)
faiss.write_index(index, '/content/drive/MyDrive/Colab Notebooks/data/text_embedding_search/hnsw_index.faiss')

In [None]:
for i in range(len(query_texts)):
    print(f"Query: {query_texts[i]}")
    for rank, idx in enumerate(I[i]):
        print(f"Top {rank+1}: {corpus_texts[idx]}")
    print("========")

## 保存したインデックスを読み込んで使用

In [None]:
!pip install faiss-cpu
import faiss
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# corpus_textsの読み込み
import json

corpus_texts = []
with open('/content/drive/MyDrive/Colab Notebooks/data/text_embedding_search/corpus.jsonl', 'r', encoding='utf-8') as f:
    for line in f:
        corpus_texts.append(json.loads(line)['text'])

In [None]:
xb = np.load('/content/drive/MyDrive/Colab Notebooks/data/text_embedding_search/embeddings_xb.npy')
index = faiss.read_index('/content/drive/MyDrive/Colab Notebooks/data/text_embedding_search/hnsw_index.faiss')

In [None]:
query_texts = ["渋谷の事件についてのニュース", "プロ野球の試合結果", "新作映画のレビュー"]
xq = embed_texts(query_texts, is_query=True)

In [None]:
index.hnsw.efSearch = 50  # 検索時の探索深さ
k = 5  # 上位k件を取得
D, I = index.search(xq, k)  # Dは距離、Iはインデックス

In [None]:
for i, q in enumerate(query_texts):
  print(f"\n[Query] {q}")
  for j in range(k):
    doc_id = I[i, j] # i番目のクエリにつき5つのID
    print(f" RANK {j+1}: {corpus_texts[doc_id][:50]}...")

## gradioで検索機能を実装

In [None]:
!pip install -q gradio

In [None]:
import pandas as pd

# 検索用関数の定義
def search(query: str, k: int = 5, length: int = 30):
    # 1) クエリ埋め込み
    q_emb = embed_texts([query], is_query=True)
    # 2) Faiss 検索
    D, I = index.search(q_emb, k)
    # 3) 結果整形
    results = []
    for score, idx in zip(D[0], I[0]):
        title, snippet = corpus_texts[idx].split("。", 1)  # タイトル・本文を分割
        results.append({"title": title, "snippet": snippet[:length]+"…", "score": float(score)})

    df = pd.DataFrame(results) # gradio用にDFに変換

    return df


In [None]:
search("パソコン", k=5)

In [None]:
import gradio as gr

iface = gr.Interface(
    fn=search,
    inputs=[
        gr.Textbox(lines=2, placeholder="検索クエリを入力"),
        gr.Slider(minimum=1, maximum=10, step=1, label="Top k 件数")
    ],
    outputs=gr.Dataframe(
        headers=["title", "snippet", "score"],
        row_count=5
    ),
    title="Livedoor ニュース検索デモ",
    description="pfnet/plamo-embedding-1b + Faiss HNSW による類似ニュース検索"
)

iface.launch(share=True)