# Ingest Files From Cloud Object Storage to Databases for Elasticsearch

This notebook handles file ingestion from a bucket in Cloud Object Storage instance to a Databases for Elasticsearch Platinum Plan instance using code. Supported file types in the bucket include `pdf`, `docx`, `pptx`, `html`, and `txt`.

**Prerequesites:**
1. Sucessfully set up a connection to your Cloud Object Storage instance in the project
2. Succesfully set up a connection to your Databases for Elasticsearch instance in the project
3. Created an IBM Cloud API key in https://cloud.ibm.com/iam/apikeys and added it to your `Notebook_and_Deployment_Parameters` parameter set in the `ibm_cloud_apikey` field

**Before running any cells, connect the notebook to the rest of the project by inserting a project token.**

You can do this by clicking the three vertical dots next to the `Code Snippet` icon on the top right of the notebook UI and then clicking `Insert project token` to insert the token at the beginning of your notebook. Make sure to execute the new cell at the top of the notebook before running any other notebook cells.

## Configure Notebook

The cells below configure the notebook by importing the necessary packages and adding the values configured in your project's Parameter Sets to the namespace. The following values within the `Notebook_and_Deployment_Parameters` parameter set will affect the output of running this notebook

| Parameter Set Variable Name | Description |
| --- | --- |
| `ingestion_chunk_overlap` | The number of overlapping tokens between each document. |
| `ingestion_chunk_size` | The maximum number of tokens each document will contain. |
| `es_index_name` | The name of the Elasticsearch index where the ingested data will be stored. |
| `es_index_text_field` | The name of the field that will store your document text in the Elasticsearch index. |
| `es_model_name` | The name of the model in Elasticsearch that will be used for further processing or querying. |

In [4]:
import json
import time
import warnings
import nest_asyncio
import nltk
import elasticsearch.exceptions
import elasticsearch.helpers
import elastic_transport
import utils

from llama_index.core import VectorStoreIndex, StorageContext, Settings
from llama_index.core.node_parser import SentenceSplitter
from llama_index.readers.file import (
    PDFReader,
    DocxReader,
    UnstructuredReader,
    FlatReader,
    HTMLTagReader,
)
from llama_index.vector_stores.elasticsearch import ElasticsearchStore
from ibm_watson_studio_lib import access_project_or_space
from typing import Optional, Tuple
from elasticsearch import Elasticsearch, AsyncElasticsearch

# Credential settings
# Here we are giving access to assets located in our watsonx.ai project
wslib = access_project_or_space({
        'token': '<YOUR WATSONX.AI PROJECT ACCESS TOKEN HERE>',
        'project_id': '<YOUR WATSONX.AI PROJECT ID HERE>'
})

wslib.download_file("utils.py")

nest_asyncio.apply()
nltk.download("averaged_perceptron_tagger")
warnings.filterwarnings("ignore")

[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /home/wsuser/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


In [5]:
wslib_extension = utils.WSLibExtension(wslib)
params = wslib_extension.get_all_parameter_set_values()
INGESTION_CHUNK_OVERLAP = params["ingestion_chunk_overlap"]
INGESTION_CHUNK_SIZE = params["ingestion_chunk_size"]
INDEX_NAME = params["es_index_name"]
INDEX_TEXT_FIELD = params["es_index_text_field"]
EMBEDDING_MODEL_NAME = params["es_model_name"]
DEFAULT_READERS = {
    ".pdf": PDFReader(),
    ".docx": DocxReader(),
    ".pptx": UnstructuredReader(),
    ".txt": FlatReader(),
    ".html": HTMLTagReader(),
}

## Read and Prepare Files From Cloud Object Storage

The cells below connect to your Cloud Object Storage bucket. Then, the files inside the bucket are read, chunked, and formatted into JSON objects which can be ingested by Watsonx Discovery. The cells use the LlamaIndex framework and [LlamaIndex file readers](https://llamahub.ai/l/readers/llama-index-readers-file?from=all) to perform the file ingestion. If you want to use another file reader for a particular extension, import the desired reader and edit the `DEFAULT_READERS` dictionary with the reader you want to use. The default schema for the ingested JSON objects is described below for reference. In addition to the schema below, this notebook also adds `url` and `file_name` to each document to ease the integration with the Watsonx Assistant native search extension.

| Level 1 Key | Level 2 Key | Level 3 Key | Level 4 Key | Description |
| --- | --- | --- | --- | --- |
| `_id` | | | | The unique identifier of the document. |
| `_source` | | | | Contains the main data of the document. |
| `_source` | `hash` | | | The hash of the document. |
| `_source` | `metadata` | | | Contains metadata about the document. |
| `_source` | `metadata` | `file_name` | | The name of the file from which the document was created. |
| `_source` | `metadata` | `page_label` | | The label of the page from which the document was created. |
| `_source` | `relationships` | | | Contains relationships of the document with other documents. |
| `_source` | `relationships` | `NodeRelationship.NEXT` | | Contains information about the next document. |
| `_source` | `relationships` | `NodeRelationship.NEXT` | `hash` | The hash of the next document. |
| `_source` | `relationships` | `NodeRelationship.NEXT` | `metadata` | Metadata of the next document. |
| `_source` | `relationships` | `NodeRelationship.NEXT` | `node_id` | The unique identifier of the next document. |
| `_source` | `relationships` | `NodeRelationship.NEXT` | `node_type` | The type of the next document. |
| `_source` | `relationships` | `NodeRelationship.SOURCE` | | Contains information about the source document. |
| `_source` | `relationships` | `NodeRelationship.SOURCE` | `hash` | The hash of the source document. |
| `_source` | `relationships` | `NodeRelationship.SOURCE` | `metadata` | Metadata of the source document. |
| `_source` | `relationships` | `NodeRelationship.SOURCE` | `node_id` | The unique identifier of the source document. |
| `_source` | `relationships` | `NodeRelationship.SOURCE` | `node_type` | The type of the source document. |
| `_source` | `text_field` | | | The text content of the document. |

### Connect to Cloud Object Storage

In [7]:
cos_connection_dict = wslib.get_connection("CloudObjectStorage")
cos_auth_dict = json.loads(cos_connection_dict["credentials"])

cos_reader = utils.CloudObjectStorageReader(
    bucket_name=cos_connection_dict["bucket"],
    credentials={
        "apikey": cos_auth_dict["apikey"],
        "service_instance_id": cos_auth_dict["resource_instance_id"],
    },
    hostname=f"https://{cos_connection_dict['url']}",
    file_extractor=DEFAULT_READERS,
)

### Count number of objects in the bucket

In [12]:
import ibm_boto3
from ibm_botocore.client import Config
import json

# Get connection details
cos_connection_dict = wslib.get_connection("CloudObjectStorage")
cos_auth_dict = json.loads(cos_connection_dict["credentials"])

# Initialize the COS client
cos_client = ibm_boto3.client(
    's3',
    ibm_api_key_id=cos_auth_dict["apikey"],
    ibm_service_instance_id=cos_auth_dict["resource_instance_id"],
    config=Config(signature_version='oauth'),
    endpoint_url=f"https://{cos_connection_dict['url']}"
)

# List objects in the specified bucket
response = cos_client.list_objects_v2(Bucket=cos_connection_dict["bucket"])

# Count the number of objects in the bucket
if 'Contents' in response:
    num_objects = len(response['Contents'])
else:
    num_objects = 0

print(f"Number of objects in the bucket: {num_objects}")

Number of objects in the bucket: 3


### Read files from Cloud Object Storage and convert them into LlamaIndex document objects

In [8]:
documents = cos_reader.load_data(show_progress=True)
print(f"Total documents: {len(documents)}\nExample document:\n{documents[0]}")

Downloading files to temp dir: 100%|██████████| 3/3 [00:00<00:00,  4.24it/s]
Loading files: 100%|██████████| 3/3 [00:16<00:00,  5.63s/file]

Total documents: 400
Example document:
Doc ID: 976244db-b513-41eb-9475-ced6372c1a51
Text: 2021  Annual Report Let’s create





## Ingest chunked and formatted files into Watsonx Discovery 

Now that we have prepared the documents to be ingested into Watsonx Discovery, the next step is to actually ingest the files. To do this, we first need to connect to our Databases for Elasticsearch service using the connection in the project. Then, we perform the following steps to prepare the service:

1. Try to download and deploy the ELSER model in the Databases for Elasticsearch service
2. Create an index to store the documents
3. Create a pipeline to specify the use of the ELSER model to create vector embeddings on document ingestion

Once these steps are done, we finally ingest the documents above into the index using the pipeline

### Connect to Watsonx Discovery

In [13]:
wxd_connection_dict = wslib.get_connection("WatsonxDiscovery")

es_client = Elasticsearch(
    wxd_connection_dict["url"],
    basic_auth=(wxd_connection_dict["username"], wxd_connection_dict["password"]),
    verify_certs=False,
    request_timeout=3600,
)
async_es_client = AsyncElasticsearch(
    wxd_connection_dict["url"],
    basic_auth=(wxd_connection_dict["username"], wxd_connection_dict["password"]),
    verify_certs=False,
    request_timeout=3600,
)
es_client.info()

ObjectApiResponse({'name': 'm-1.09b2ea51-6fd9-4f27-a28c-f8cae949fb22.32327196cebf47ceba64c395a7c51f5a.bn2a2uid0up8mv7mv2ig.databases.appdomain.cloud', 'cluster_name': '09b2ea51-6fd9-4f27-a28c-f8cae949fb22', 'cluster_uuid': 'm_Obcka7SUSY-_USnpOmiA', 'version': {'number': '8.15.0', 'build_flavor': 'default', 'build_type': 'tar', 'build_hash': '1a77947f34deddb41af25e6f0ddb8e830159c179', 'build_date': '2024-08-05T10:05:34.233336849Z', 'build_snapshot': False, 'lucene_version': '9.11.1', 'minimum_wire_compatibility_version': '7.17.0', 'minimum_index_compatibility_version': '7.0.0'}, 'tagline': 'You Know, for Search'})

### Set up embedding model, index, and ingestion pipeline

In [15]:
def download_model(
    client: Elasticsearch, model_id: str, model_text_field: str = "text"
) -> None:
    """
    Downloads a trained model from elasticsearch.Elasticsearch if it doesn't already exist.

    Args:
        client (elasticsearch.Elasticsearch): The elasticsearch.Elasticsearch client.
        model_id (str): The ID of the trained model.
        model_text_field (str): The name of the field the model looks for text to embed by default.

    """
    try:
        client.ml.get_trained_models(
            model_id=model_id
        )  # Throws error if model_id does not exist
        print(f"{model_id} already downloaded...")
    except elasticsearch.exceptions.NotFoundError:
        model_schema = {"input": {"field_names": [model_text_field]}}
        print(f"Downloading {model_id}...")
        client.ml.put_trained_model(model_id=model_id, body=model_schema)
        time.sleep(90)  # Wait for the model to be downloaded


def deploy_model(
    client: Elasticsearch,
    model_id: str,
    deployment_id: Optional[str] = None,
) -> None:
    """
    Deploys a model through the Elasticsearch client.

    Args:
        client (elasticsearch.Elasticsearch): The elasticsearch.Elasticsearch client.
        model_id (str): The ID of the model to deploy.
        deployment_id (str, optional): The ID to use for the deployment. Defaults to the model_id.

    """
    if not deployment_id:
        deployment_id = model_id

    existing_deployments = (
        client.ml.get_trained_models_stats(model_id=model_id)
        .body["trained_model_stats"][0]
        .get("deployment_stats")
    )
    if (
        existing_deployments
        and existing_deployments.get("deployment_id") == deployment_id
    ):
        print(f"{model_id} model deployment with the same name already exists.")
    else:
        print(
            f"Creating {model_id} model deployment with deployment id {deployment_id}..."
        )
        client.ml.start_trained_model_deployment(
            model_id=model_id, deployment_id=deployment_id
        )


def get_model_text_field(client: Elasticsearch, model_id: str) -> str:
    """
    Retrieves the text field name from a trained model configuration.

    Args:
        client (elasticsearch.Elasticsearch): The elasticsearch.Elasticsearch client.
        model_id (str): The ID of the trained model.

    Returns:
        str: The name of the text field.
    """
    response = client.ml.get_trained_models(model_id=model_id)
    return response.body["trained_model_configs"][0]["input"]["field_names"][0]


def create_index_with_default_pipeline(
    client: Elasticsearch,
    index_name: str,
    index_config: dict,
    pipeline_config: dict,
    pipeline_name: str = "default_pipeline",
) -> Tuple[elastic_transport.ObjectApiResponse, elastic_transport.ObjectApiResponse]:
    """
    Creates an elasticsearch.Elasticsearch index with a default ingestion pipeline.

    Args:
        client (elasticsearch.Elasticsearch): The elasticsearch.Elasticsearch client.
        index_name (str): The name of the index to be created.
        index_config (dict): The configuration settings for the index.
        pipeline_config (dict): The configuration settings for the ingestion pipeline.
        pipeline_name (str, optional): The name of the ingestion pipeline. Defaults to "default_pipeline".

    Returns:
        tuple: A tuple containing the index creation response and the pipeline creation response.
    """
    pipeline_name = pipeline_name or "default_pipeline"

    if client.indices.exists(index=index_name):
        print(f"Deleting the existing index {index_name}...")
        client.indices.delete(index=index_name)

    print("Creating the ingestion pipeline...")
    pipeline_response = client.ingest.put_pipeline(
        id=pipeline_name, body=pipeline_config
    )

    print("Creating the index...")
    index_config["settings"] = {
        "index.default_pipeline": pipeline_name,
    }
    index_response = client.indices.create(index=index_name, body=index_config)
    return index_response, pipeline_response

In [16]:
download_model(es_client, EMBEDDING_MODEL_NAME, INDEX_TEXT_FIELD)
deploy_model(es_client, EMBEDDING_MODEL_NAME)
embedding_model_text_field = get_model_text_field(es_client, EMBEDDING_MODEL_NAME)

index_config = {
    "mappings": {
        "properties": {
            "ml.tokens": {"type": "rank_features"},
            INDEX_TEXT_FIELD: {"type": "text"},
        }
    }
}

pipeline_config = {
    "description": "Inference pipeline using ELSER embedding model",
    "processors": [
        # Calculate embeddings
        {
            "inference": {
                "field_map": {INDEX_TEXT_FIELD: embedding_model_text_field},
                "model_id": EMBEDDING_MODEL_NAME,
                "target_field": "ml",
                "inference_config": {"text_expansion": {"results_field": "tokens"}},
            }
        },
        # Give file_name and url their own fields for native Watsonx Assistant integration
        {"set": {"field": "file_name", "value": "{{metadata.file_name}}"}},
        {"set": {"field": "url", "value": "{{metadata.url}}"}},
    ],
    "version": 1,
}

.elser_model_2_linux-x86_64 already downloaded...
.elser_model_2_linux-x86_64 model deployment with the same name already exists.


In [17]:
create_index_with_default_pipeline(es_client, INDEX_NAME, index_config, pipeline_config)

Creating the ingestion pipeline...
Creating the index...


(ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'index-created-in-watson-studio'}),
 ObjectApiResponse({'acknowledged': True}))

### Ingest the formatted documents

In [18]:
Settings.embed_model = None
Settings.llm = None
Settings.node_parser = SentenceSplitter.from_defaults(
    chunk_size=INGESTION_CHUNK_SIZE, chunk_overlap=INGESTION_CHUNK_OVERLAP
)

vector_store = ElasticsearchStore(
    es_client=async_es_client, index_name=INDEX_NAME, text_field=INDEX_TEXT_FIELD
)
try:
    index = VectorStoreIndex.from_documents(
        documents,
        storage_context=StorageContext.from_defaults(vector_store=vector_store),
        show_progress=True,
    )
except elasticsearch.helpers.BulkIndexError as e:
    first_error = e.errors[0].get("index", {}).get("error", {})
    print(f"Bulk index error: {e}")
    print(
        f"Document ingestion to Elasticsearch failed with the following error: {first_error}"
    )

Embeddings have been explicitly disabled. Using MockEmbedding.
LLM is explicitly disabled. Using MockLLM.


Parsing nodes:   0%|          | 0/400 [00:00<?, ?it/s]

Generating embeddings:   0%|          | 0/2048 [00:00<?, ?it/s]

Generating embeddings:   0%|          | 0/1346 [00:00<?, ?it/s]

### Test the ingested documents with a simple query

In [19]:
query_engine = index.as_query_engine(
    similarity_top_k=3,
    vector_store_query_mode="sparse",
    vector_store_kwargs={
        "custom_query": utils.create_sparse_vector_query_with_model(
            EMBEDDING_MODEL_NAME
        )
    },
)
response = query_engine.query("What are some features of Watsonx.ai?")
for source_node in response.source_nodes:
    print(f"{source_node.metadata['file_name']}:\n {source_node.text}\n\n")

IBM_Annual_Report_2023.pdf:
 IBM has built two powerful platforms 
to capitalize on the strong demand for both technologies: 
watsonx for AI, and Red Hat OpenShift for hybrid cloud.
Watsonx is our comprehensive AI and data platform, built to 
deliver AI models and give our clients the ability to manage 
the entire lifecycle of AI for business, including the training, 
tuning, deployment, and ongoing governance of those models. 
As clients shift from experimenting with generative AI to 
building and deploying it throughout their enterprises, we are 
focused on practical and urgent business use cases, including 
code modernization, customer service, and digital labor. 
Financial institutions like Citi, Bradesco, and NatWest 
are using watsonx to help increase productivity, improve 
code quality, and enhance customer experiences. Our 
enterprise-ready AI capabilities are being embedded into 
SAP solutions. EY launched EY.ai Workforce, a new solution 
that will use watsonx Orchestrate to a