# Elasticsearch 构建 RAG 系统

## 1. 依赖关系和环境

In [1]:
! pip install elasticsearch sentence_transformers transformers eland==8.12.1
! pip install datasets==2.19.2
! pip install ipywidgets



## 2. 数据

In [2]:
# 下载数据集
from datasets import load_dataset

# https://huggingface.co/datasets/MongoDB/embedded_movies
dataset = load_dataset("MongoDB/embedded_movies")

dataset

DatasetDict({
    train: Dataset({
        features: ['plot', 'genres', 'runtime', 'cast', 'num_mflix_comments', 'poster', 'title', 'fullplot', 'languages', 'directors', 'writers', 'awards', 'imdb', 'countries', 'type', 'plot_embedding', 'rated', 'metacritic'],
        num_rows: 1500
    })
})

In [3]:
# 数据预处理

# 删除数据集中缺少 plot 列的数据点
dataset = dataset.filter(lambda x: x["fullplot"] is not None)

if "plot_embedding" in sum(dataset.column_names.values(), []):
    # Remove the plot_embedding from each data point in the dataset as we are going to create new embeddings with an open source embedding model from Hugging Face
    dataset = dataset.remove_columns("plot_embedding")

dataset["train"]

Dataset({
    features: ['plot', 'genres', 'runtime', 'cast', 'num_mflix_comments', 'poster', 'title', 'fullplot', 'languages', 'directors', 'writers', 'awards', 'imdb', 'countries', 'type', 'rated', 'metacritic'],
    num_rows: 1452
})

## 3. embedding 模型

In [4]:
# 定义模型参数
EMBEDDING_MODEL_ID = "thenlper/gte-small"
EMBEDDING_DIMENSIONS = 384

In [5]:
from sentence_transformers import SentenceTransformer

embedding_model = SentenceTransformer(EMBEDDING_MODEL_ID)

def get_embedding(text: str) -> list[float]:
    if not text.strip():
        print("Attempted to get embedding for empty text.")
        return []

    embedding = embedding_model.encode(text)
    return embedding.tolist()

def add_fullplot_embedding(x):
    full_plots = x["fullplot"]
    return {"embedding": [get_embedding(full_plot) for full_plot in full_plots]}

dataset = dataset.map(add_fullplot_embedding, batched=True)
dataset["train"]

  return torch._C._cuda_getDeviceCount() > 0


Dataset({
    features: ['plot', 'genres', 'runtime', 'cast', 'num_mflix_comments', 'poster', 'title', 'fullplot', 'languages', 'directors', 'writers', 'awards', 'imdb', 'countries', 'type', 'rated', 'metacritic', 'embedding'],
    num_rows: 1452
})

## 4. 创建索引

In [None]:
ELASTICSEARCH_HOST = "Your Elasticsearch Host"
ELASTICSEARCH_API_KEY = "Your API Key"

In [7]:
# 确保可以访问你的 Elasticsearch ，也可以在 Elastic Cloud上创建一个
model_id = EMBEDDING_MODEL_ID.replace("/", "__")
from elasticsearch import Elasticsearch, helpers

client = Elasticsearch(ELASTICSEARCH_HOST, api_key=ELASTICSEARCH_API_KEY)

index_name = "movies"
index_mapping = {
    "properties": {
        "fullplot": {"type": "text"},
        "plot": {"type": "text"},
        "title": {"type": "text"},
    }
}
index_mapping["properties"]["embedding"] = {
    "type": "dense_vector",
    "dims": EMBEDDING_DIMENSIONS,
    "index": "true",
    "similarity": "cosine",
}

if client.indices.exists(index=index_name):
    print("Deleting existing %s" % index_name)
    client.indices.delete(index=index_name, ignore=[400, 404])

print("Creating index %s" % index_name)

client.options(ignore_status=[400, 404]).indices.create(
    index=index_name, mappings=index_mapping
)

Deleting existing movies


  client.indices.delete(index=index_name, ignore=[400, 404])


Creating index movies


ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'movies'})

## 5. 数据上传到 Elasticsearch

In [8]:
from elasticsearch.helpers import BulkIndexError

def batch_to_bulk_actions(batch):
    for record in batch:
        action = {
            "_index": "movies",
            "_source": {
                "title": record["title"],
                "fullplot": record["fullplot"],
                "plot": record["plot"],
            },
        }
        action["_source"]["embedding"] = record["embedding"]
        yield action


def bulk_index(ds):
    start = 0
    end = len(ds)
    batch_size = 100
    for batch_start in range(start, end, batch_size):
        batch_end = min(batch_start + batch_size, end)
        print(f"batch: start [{batch_start}], end [{batch_end}]")
        batch = ds.select(range(batch_start, batch_end))
        actions = batch_to_bulk_actions(batch)
        helpers.bulk(client, actions)

try:
    bulk_index(dataset["train"])
except BulkIndexError as e:
    print(f"{e.errors}")

print("Data ingestion into Elasticsearch complete!")

batch: start [0], end [100]
batch: start [100], end [200]
batch: start [200], end [300]
batch: start [300], end [400]
batch: start [400], end [500]
batch: start [500], end [600]
batch: start [600], end [700]
batch: start [700], end [800]
batch: start [800], end [900]
batch: start [900], end [1000]
batch: start [1000], end [1100]
batch: start [1100], end [1200]
batch: start [1200], end [1300]
batch: start [1300], end [1400]
batch: start [1400], end [1452]
Data ingestion into Elasticsearch complete!


## 6. 对用户查询执行向量搜索

In [9]:
def vector_search(plot_query):
    question_embedding = get_embedding(plot_query)
    knn = {
        "field": "embedding",
        "query_vector": question_embedding,
        "k": 10,
        "num_candidates": 150,
    }

    response = client.search(index="movies", knn=knn, size=5)
    results = []
    for hit in response["hits"]["hits"]:
        id = hit["_id"]
        score = hit["_score"]
        title = hit["_source"]["title"]
        plot = hit["_source"]["plot"]
        fullplot = hit["_source"]["fullplot"]
        result = {
            "id": id,
            "_score": score,
            "title": title,
            "plot": plot,
            "fullplot": fullplot,
        }
        results.append(result)
    return results

def pretty_search(query):
    get_knowledge = vector_search(query)
    search_result = ""
    for result in get_knowledge:
        search_result += f"Title: {result.get('title', 'N/A')}, Plot: {result.get('fullplot', 'N/A')}\n"
    return search_result

In [10]:
# 结合搜索结果和问题
def combined_query(query):
    source_information = pretty_search(query)
    return f"Query: {query}\nContinue to answer the query by using these Search Results:\n{source_information}."


query = "What is the best romantic movie to watch and why?"
combined_results = combined_query(query)

print(combined_results)

Query: What is the best romantic movie to watch and why?
Continue to answer the query by using these Search Results:
Title: Shut Up and Kiss Me!, Plot: Ryan and Pete are 27-year old best friends in Miami, born on the same day and each searching for the perfect woman. Ryan is a rookie stockbroker living with his psychic Mom. Pete is a slick surfer dude yet to find commitment. Each meets the women of their dreams on the same day. Ryan knocks heads in an elevator with the gorgeous Jessica, passing out before getting her number. Pete falls for the insatiable Tiara, but Tiara's uncle is mob boss Vincent Bublione, charged with her protection. This high-energy romantic comedy asks to what extent will you go for true love?
Title: Titanic, Plot: The plot focuses on the romances of two couples upon the doomed ship's maiden voyage. Isabella Paradine (Catherine Zeta-Jones) is a wealthy woman mourning the loss of her aunt, who reignites a romance with former flame Wynn Park (Peter Gallagher). Meanw