In [1]:
from elasticsearch import Elasticsearch
from transformers import BertJapaneseTokenizer, BertModel
import torch 

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import transformers
print(torch.__version__)
print(transformers.__version__)

2.1.0+cpu
4.35.1


In [3]:
model_name = "tohoku-nlp/bert-base-japanese-v3"
tokenizer = BertJapaneseTokenizer.from_pretrained(model_name)
model = BertModel.from_pretrained(model_name)

In [4]:
es = Elasticsearch(
    "https://localhost:9200",
    ca_certs="./backend/api/ca.crt",
    basic_auth=("elastic", "elastic"),
)

In [5]:
index_name = "vector-test-index-01"

In [6]:
import requests
import json

In [7]:
# パイプラインの設定データ
# data = {
#     "description": "Text embedding pipeline",
#     "processors": [
#         {
#             "inference": {
#                 "model_id": "cl-tohoku__bert-base-japanese-v2",
#                 "target_field": "text_embedding",
#                 "field_map": {
#                     "title": "text_field"
#                 }
#             }
#         }
#     ]
# }

data = {
    "description": "Text embedding pipeline",
    "processors": [
        {
            "inference": {
                "model_id": "cl-tohoku__bert-base-japanese-v2",
                "input_output": [
                    {
                        "input_field": "title",
                        "output_field": "title_embedding"
                    },
                    {
                        "input_field": "text",
                        "output_field": "text_embedding"
                    }
                ],
            }
        }
    ]
}

# https://elasticsearch-py.readthedocs.io/en/v8.14.0/api/ingest-pipelines.html
from elasticsearch.client import IngestClient
IngestClient(es).put_pipeline(id="japanese-text-embeddings", body=data)

  from elasticsearch.client import IngestClient


ObjectApiResponse({'acknowledged': True})

In [8]:
# indexの作成
# body = {
#   "mappings": {
#     "properties": {
#       "text_embedding.predicted_value": {
#         "type": "dense_vector",
#         "dims": 768,
#         "index": True,
#         "similarity": "cosine"
#       }
#     }
#   }
# }

body = {
    "mappings": {
        "properties": {
            "title_embedding": {
                "type": "dense_vector",
                "dims": 768,
                "index": True,
                "similarity": "cosine"
            },
            "text_embedding": {
                "type": "dense_vector",
                "dims": 768,
                "index": True,
                "similarity": "cosine"
            },
            "title": {
                "type": "text"
            },
            "text": {
                "type": "text"
            }
        }
    }
}

# すでにindexが存在する場合は削除
if es.indices.exists(index=index_name):
    es.indices.delete(index=index_name)
es.indices.create(index=index_name, body=body)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'vector-test-index-01'})

In [9]:
# es_a.close()

In [10]:
# # サンプルドキュメントの定義
# sample_doc = {
#     "title": "日本経済の現状",
#     "text": "日本経済は現在、回復基調にありますが、依然として世界的な不確実性に直面しています。"
# }
# # ドキュメントのインデックス登録
# response = es.index(
#     index=index_name,  # 事前に作成したインデックス
#     body=sample_doc,
#     pipeline="japanese-text-embeddings"  # 事前に設定したパイプライン
# )

# # 結果の確認
# print(response)

In [19]:
es.close()

In [20]:
import gzip
from elasticsearch.helpers import bulk

doc_name = "jawikinews-20240805-cirrussearch-content.json.gz"

def bulk_insert(docs):
    for doc in docs:
        yield {
            "_op_type": "index",
            "_index": index_name,
            "_source": {
                "title": doc["title"],
                "text": doc["text"]
            },
            "pipeline": "japanese-text-embeddings"
        }
        
docs = []
with gzip.open(f"./backend/api/data/{doc_name}") as f:
    for line in f:
        json_line = json.loads(line)
        if "index" not in json_line:
            doc = json_line
            docs.append(doc)


# バルクインサートの処理
def execute_bulk_insert(docs):
    with Elasticsearch(
        "https://localhost:9200",
        ca_certs="./backend/api/ca.crt",
        basic_auth=("elastic", "elastic"),
    ) as es:
        bulk(es, bulk_insert(docs), chunk_size=1000, request_timeout=180)
        print("Bulk insert completed.")

# print("bulk insert start.")
# bulk(es, bulk_insert(docs), request_timeout=180)
# print("bulk insert end.")

execute_bulk_insert(docs)

index_count = es.count(index=index_name)
print(f"Indexed {index_count['count']} documents.")

  bulk(es, bulk_insert(docs), chunk_size=1000, request_timeout=180)


BulkIndexError: 571 document(s) failed to index.

In [21]:
len(docs)

3940

In [23]:
# import gzip
# from elasticsearch.helpers import bulk

# import asyncio
# from elasticsearch import AsyncElasticsearch
# from elasticsearch.helpers import async_streaming_bulk, BulkIndexError
# import nest_asyncio

# # nest_asyncioを適用
# nest_asyncio.apply()

# doc_name = "jawikinews-20240805-cirrussearch-content.json.gz"

# es_a = AsyncElasticsearch(
#     "https://localhost:9200",
#     ca_certs="./backend/api/ca.crt",
#     basic_auth=("elastic", "elastic"),
# )

# async def gendata(docs):
#     for doc in docs:
#         yield {
#             "_op_type": "index",
#             "_index": index_name,
#             "_source": {
#                 "title": doc["title"],
#                 "text": doc["text"]
#             },
#             "pipeline": "japanese-text-embeddings"
#         }
        
# docs = []
# with gzip.open(f"./backend/api/data/{doc_name}") as f:
#     for line in f:
#         json_line = json.loads(line)
#         if "index" not in json_line:
#             doc = json_line
#             docs.append(doc)

# async def main():
#     try:
#         async for ok, result in async_streaming_bulk(client=es_a,
#                                                     actions=gendata(docs),
#                                                     chunk_size=50,  # 一度に扱うドキュメント数
#                                                     max_chunk_bytes=5428800  # 一度に扱うバイト数
#                                                     ):
#             # 各チャンクごとの実行結果を取得
#             action, result = result.popitem()
#             # バルクインサートに失敗した場合
#             if not ok:
#                 print(f"failed to {result} document {action}")


#     # 例外処理
#     except BulkIndexError as bulk_error:
#         # エラーはリスト形式
#         print(bulk_error.errors)

#     # セッションのクローズ
#     await es_a.close()

# # イベントループを取得
# loop = asyncio.get_event_loop()
# # 並列に実行して終るまで待つ
# loop.run_until_complete(main())

# index_count = es.count(index=index_name)
# print(f"Indexed {index_count['count']} documents.")

In [61]:
WORD = "野球"

# knn_query = {
#   "query": {
#     "match": {
#       "title": {
#         "query": WORD,
#         "boost": 0.9
#       }
#     }
#   },
#   "knn": [ {
#     "field": "title_embedding",
#     "query_vector_builder": {
#         "text_embedding": {
#             "model_id": "cl-tohoku__bert-base-japanese-v2",
#             "model_text": WORD,
#         }
#     },
#     "k": 5,
#     "num_candidates": 50,
#     "boost": 0.1
#   },
#   {
#     "field": "text_embedding",
#     "query_vector_builder": {
#         "text_embedding": {
#             "model_id": "cl-tohoku__bert-base-japanese-v2",
#             "model_text": WORD,
#         }
#     },
#     "k": 10,
#     "num_candidates": 10,
#     "boost": 0.5
#   }],
#   "size": 10
# }

with open("./backend/api/query/bert_vector_search_01.json", "r") as f:
  query_data = f.read()

replace_dict = {
    "{WORD}": WORD
}
for key, value in replace_dict.items():
    query_data = query_data.replace(key, value)

query_data = json.loads(query_data)

# KNN検索の実行
response = es.search(
    index=index_name,
    body=query_data
)

# 結果の表示
for hit in response['hits']['hits']:
    print(f"Score: {hit['_score']}, Document: {hit['_source']['title'], hit['_source']['text']}")

Score: 6.59474, Document: ('野球で延長タイブレーク制導入へ 国際野球連盟', '【2008年7月27日】 毎日新聞によるとIBAF国際野球連盟のハービー・シラー会長は、野球の延長戦の試合時間短縮を目的としてタイブレーク制度を、8月に予定されている北京五輪から採用することを発表した。 IBAFによると、タイブレーク制度はソフトボールで行っている方式と同じもので、延長戦10回は通常通りのルールで試合をするが、11回の攻撃は任意の打順からノーアウト1・2塁のランナーの段階で試合を行い、12回以後は前のイニングスの続きの打順から同じくノーアウト1・2塁の段階で試合をするというものであるが、このルール変更は日本野球連盟への事前連絡がなく、突然7月26日（日本時間\u3000UTC+9）になってメールで報告が行われた。 しかし、読売新聞によると日本代表・星野仙一監督らはこれに反対する意見を出し、日本のプロ・アマの野球界全体で抗議する事を決めた。星野氏は「五輪の2週間前になってルールを変えるだなんておかしいにも程がある。我々は親善試合ではなく世界一を決める真剣勝負をするんだ。どこにも相談無しにIBAFが決めたのも納得できない」として抗議する姿勢を強く打ち出した。また、全日本アマチュア野球連盟もこの決定を確認した上で抗議を行う方向であるという。 また毎日新聞によると、シラー氏は7月30日に来日し、日本野球連盟・松田昌士会長やプロ野球・加藤良三コミッショナーと会談。その際に日本側から今回の問題についての説明を求めるものと見られている。 毎日jp 『国際野球連盟：タイブレーク採用\u3000日本関係者から困惑の声』 —\xa0毎日新聞, 2008年7月27日 YOMIURI ONLINE（北京五輪） 『五輪野球のタイブレーク制導入、日本はプロ・アマ挙げて抗議』 —\xa0読売新聞, 2008年7月26日')
Score: 5.6271515, Document: ('プロ野球・パ・リーグ 参加球団のウェブサイトを一括管理へ', '【2008年1月25日】 日本経済新聞と時事通信によると、プロ野球パ・リーグの事業会社である「パシフィック・リーグ・マーケティング（PLM社）」は1月24日、参加6チームがこれまで個別に運営していたパソコンと携帯電話用の公式ウェブサ

In [53]:
json.loads(knn_query)

TypeError: the JSON object must be str, bytes or bytearray, not dict