In [1]:
!pip install -r requirements.txt


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [14]:
import json
from getpass import getpass

from elasticsearch import Elasticsearch, helpers
from sentence_transformers import SentenceTransformer

In [3]:
cloud_id = getpass("Elastic deployment Cloud ID: ")
cloud_api_key = getpass("Elastic deployment API Key: ")
INDEX_NAME = 'books'

es = Elasticsearch(
    cloud_id=cloud_id,
    api_key=cloud_api_key,
)



Elastic deployment Cloud ID:  ········
Elastic deployment API Key:  ········


ObjectApiResponse({'acknowledged': True})

### 1. Create an Ingestion Pipeline
We will create an inference ingestion pipeline to have Elasticsearch create embeddings of the book_description when the document is indexed into Elasticsearch. This frees our hardware from needing to embed vectors.

Note that if there is any kind of failure, the documents will be placed in a `failed-books` index and will include helpful error messages.

In [15]:
resp = es.ingest.put_pipeline(
    id="text-embedding",
    description="converts book description text to a vector",
    processors=[
        {
            "inference": {
                "model_id": "sentence-transformers__msmarco-minilm-l-12-v3",
                "input_output": [
                    {
                        "input_field": "book_description",
                        "output_field": "description_embedding",
                    }
                ],
            }
        }
    ],
    on_failure=[
        {
            "set": {
                "description": "Index document to 'failed-<index>'",
                "field": "_index",
                "value": "failed-{{{_index}}}",
            }
        },
        {
            "set": {
                "description": "Set error message",
                "field": "ingest.failure",
                "value": "{{_ingest.on_failure_message}}",
            }
        },
    ],
)

print(resp)

{'acknowledged': True}


### 2. Create an index
Now lets create an index in Elasticsearch. We will not need to map our description_embedding vector data type as the ingestion pipeline will provide that for us.

In [17]:
mappings = {
    "mappings": {
        "properties": {
            "book_title": {"type": "text"},
            "author_name": {"type": "text"},
            "rating_score": {"type": "float"},
            "rating_votes": {"type": "integer"},
            "review_number": {"type": "integer"},
            "book_description": {"type": "text"},
            "genres": {"type": "keyword"},
            "year_published": {"type": "integer"},
            "url": {"type": "text"},
        }
    }
}
# Delete any previous index
es.indices.delete(index=INDEX_NAME)
es.indices.create(index=INDEX_NAME, body=mappings)
print(f"Index '{INDEX_NAME}' created.")

Index 'books' created.


### 3. Bulk Indexing many documents
Now that we have created an index in Elasticsearch, we can index our local book objects. This bulk_ingest_books method will make indexing documents much faster than if we were to run an index function on each individual book.


In [29]:
file_path="../data/books.json"
with open(file_path, "r") as file:
    books = json.load(file)

# create an array of index actions, with each element holding one document
actions = [
    {"_index": INDEX_NAME, "_id": book.get("id", None), "_source": book}
    for book in books
]

try:
    helpers.bulk(es, actions, pipeline="text-embedding", chunk_size=1000)
    print(f"Successfully added {len(actions)} books into the '{INDEX_NAME}' index.")

except helpers.BulkIndexError as e:
    print(f"Error occurred while ingesting books: {e}")
    print(e.errors)

Successfully added 10908 books into the 'books' index.


### 4. Indexing one document.
Lets also add one book, as this would be a standard function as you add new books to your vector database

In [28]:
file_path = "../data/one_book.json"
with open(file_path, "r") as file:
    book = json.load(file)

try:
    resp = es.index(
        index=INDEX_NAME,
        id=book.get("id", None),
        body=book,
        pipeline="text-embedding",
    )

    print(f"Successfully indexed book: {book.get('book_title', None)} - Result: {resp.get('result', None)}")

except Exception as e:
    print(f"Error occurred while indexing book: {e}")

Successfully indexed book: Shattered World - Result: created
