# Data Processing and Ingestion of data from Cloud Object Storage(COS) into Vector Databases

### Supported Cloud Object Storage (COS) Providers
* IBM Cloud Object Storage
* Amazon S3

### Supported Vector Databases
* Elasticsearch
* Milvus

## Introduction

<h4>Bulk Ingesting Data from AWS S3 to Elasticsearch Vector Database:</h4>

To efficiently ingest data from AWS S3 into an Elasticsearch vector database, consider the following approaches:
1. Using the Elastic S3 Connector:<br>
   For large-scale data ingestion from AWS S3 to Elasticsearch, utilize the Elastic S3 Connector. This method offers efficient indexing by directly integrating with Amazon S3, minimizing the need for intermediary processing steps. Detailed setup instructions are available in the `Readme - Watsonx Discovery S3 Connector.pdf` document.<br>
   **Note**: For substantial datasets, the Elastic S3 Connector is recommended to ensure optimal performance and efficiency during the indexing process.

2. Using this Current Ingestion Notebook:<br>
   Alternatively, you can utilize this current ingestion notebook, which provides step-by-step instructions to ingest documents from AWS S3 into Elasticsearch. This method offers flexibility and can be tailored to specific requirements.

<h4>Bulk Ingestion of large datasets from Cloud Object Storage(COS) to Vector Databases:</h4>

This notebook facilitates the bulk ingestion of data from Cloud Object Storage (IBM COS and AWS S3) into vector databases, specifically Elasticsearch and Milvus. The indexed documents are further used in the next notebook to create a Q&A AI Service function and deploy it on [watsonx.ai](https://www.ibm.com/products/watsonx-ai). 

The ingestion process uses vector embeddings to enhance data storage and retrieval within either Elasticsearch or Milvus vector database, ensuring both efficiency and effectiveness. 

The process encompasses the following steps:

- *Establish COS Connection*:
  * Connect to the COS bucket containing the `.tar` files.
  * For `Milvus`: Also, connect to the COS bucket associated with Milvus vector database for storing intermediary prepared data files.
- *Connect to Vector Database*: Establish a connection to the chosen vector database (Elasticsearch or Milvus).
- *Extract and Process Data*: Extract `.pdf` documents from `.tar` files and convert them into a structured document format.
- *Generate Document IDs and Chunk Data*: Assign unique IDs to documents and split them into manageable chunks for indexing.
- *Generate Embeddings*:
  * For `Milvus`: Create vector embeddings for each document chunk.
- *Prepare Data for Insertion*:
  * For `Milvus`: Prepare data files and store them in the associated COS bucket for bulk insertion into Milvus.
- *Bulk Insert Data*:
  * For `Elasticsearch`: Use async bulk indexing to index data into the Elasticsearch vector database.
  * For `Milvus`: Utilize Milvus's bulk insert capabilities to index data efficiently.
- *Configure Vector Index*:
  * For `Elasticsearch`: Set up the vector index with appropriate index type (**sparse** or **dense**) based on the deployed embedding model.
  * For `Milvus`: Set up the collection with appropriate embedding types (**dense** or **hybrid**) based on the search type.

**Note**: It is recommended to run this notebook in a Python environment on Cloud Pak for Data software with a GPU-enabled or high vCPU and RAM hardware configuration, as generating embeddings may require significant memory. Please update the notebook runtime env `qna_bulk_template` accordingly.

**Required Params**: 
- Make sure you have set `cos_data_input_connection_asset` , `cos_milvus_connection_asset` required `RAG_PARAMETER_SET` params with COS connection assets names
- Please update `cos_data_path` param in `RAG_ADVANCED_PARAMETER_SET` to specify the relative data path to the directory containing input files in `cos_data_input_connection_asset` bucket.

**Disclaimer**: 

This Notebook is recommendated to run via batch job if the input data size is large. Bulk ingestion may take several hours to complete depending on the input size. Running via this notebook for longer time may experience kernel disconnection related issues before completion. Please follow below steps how to run this as a job.

- Check and Click on `Jobs` dropdown icon on this notebook which is next `Share` button on top right side.
- Click on `Create a job` button 
- Under `Define details`, Enter `name` for job for this notebook and click `Next`
- Under `Configure`, select `latest` version and `qna_bulk_template` environment and keep rest same and click `Next`
- Under `Schedule`, you can select any checkbox either `Run after job creation` or `Run on a schedule` depending on your requirement. then Click `Next`
- Under `Notify`, click `Next` 
- Under `Review and create` , click `Create` button. 
- Then you will find this notebook under `Jobs` sections to run.

## Contents

This notebook contains the following parts:
- [Setup](#setup)
- [Import Dependencies](#import)
- [Process and Split Extracted Data](#split)
- [Connect to Vector Database](#connect)
- [Connect to COS Object Store](#connect_cos)
- [Update parameter set in the project](#updateParameters)
- [Extract Data from Input files](#input)
- [Indexing Documents using Bulk Indexing](#bulk_insert_documents)


<a id="setup"></a>
### Pre-Requisite Libraries and Dependencies
Below cell downloads and installs specific mandatory libraries and dependencies required to run this notebook.

**Note** : Some of the versions of the libraries may throw warnings after installation. These library versions are crucial for execution of the accelerator. Please ignore the warning/error and proceed with your execution. 

In [None]:
!pip install langchain_community | tail -n 1
!pip install ibm_watsonx_ai==1.3.26| tail -n 1
!pip install pypdf | tail -n 1
!pip install -U pymilvus[bulk_writer]==2.5.11 | tail -n 1
!pip install elasticsearch==8.18.1 | tail -n 1

Restart the kernel after performing the pip install if the below cell fails to import all the libraries.

In [None]:
from ibm_watsonx_ai import Credentials
from ibm_watsonx_ai.foundation_models import Embeddings
from ibm_watsonx_ai.metanames import EmbedTextParamsMetaNames as EmbedParams
from ibm_watsonx_ai.foundation_models.utils.enums import EmbeddingTypes
from ibm_watsonx_ai.metanames import GenTextParamsMetaNames as GenParams
from ibm_watsonx_ai import APIClient

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import PyPDFLoader

import hashlib
import multiprocessing
import json
import os
import shutil 
import warnings
import time
import tempfile
import boto3
import tarfile
import datetime
import ibm_boto3
from io import BytesIO
from tqdm import tqdm
from ibm_botocore.client import Config, ClientError

from elasticsearch import helpers, AsyncElasticsearch
from pymilvus.bulk_writer import RemoteBulkWriter, BulkFileType
from pymilvus import(IndexType,Status,connections,FieldSchema,DataType,Collection,CollectionSchema,utility,BulkInsertState,Function,FunctionType)
warnings.filterwarnings("ignore")

In [None]:
project_id=os.environ['PROJECT_ID']
# Environment and host url
hostname = os.environ['RUNTIME_ENV_APSX_URL']

if hostname.endswith("cloud.ibm.com") == True:
    environment = "cloud"
    project_id = os.environ['PROJECT_ID']
    runtime_region = os.environ["RUNTIME_ENV_REGION"] 
else:
    environment = "on-prem"
    from ibm_watson_studio_lib import access_project_or_space
    wslib = access_project_or_space()   

<a id="import"></a>
### Import Parameter Sets, Credentials and Helper functions script.

Below cells imports parameter sets values, sets the watsonx.ai credentials and imports the helper functions script. 

In [None]:
try:
    filename = 'rag_helper_functions.py'
    wslib.download_file(filename)
    import rag_helper_functions
    print("rag_helper_functions imported from the project assets")
except NameError as e:
    print(str(e))
    print("If running watsonx.ai aaS on IBM Cloud, check that the first cell in the notebook contains a project token. If not, select the vertical ellipsis button from the notebook toolbar and `insert project token`. Also check that you have specified your ibm_api_key in the second code cell of the notebook")


In [None]:
parameter_sets = ["RAG_parameter_set","RAG_advanced_parameter_set"]

parameters=rag_helper_functions.get_parameter_sets(wslib, parameter_sets)

In [None]:
ibm_api_key=parameters['watsonx_ai_api_key']
if environment == "cloud":
    WML_SERVICE_URL=f"https://{runtime_region}.ml.cloud.ibm.com" 
    wml_credentials = Credentials(api_key=parameters['watsonx_ai_api_key'], url=WML_SERVICE_URL)
else:
    token = os.environ['USER_ACCESS_TOKEN']
    wml_credentials=Credentials(token=os.environ['USER_ACCESS_TOKEN'],url=hostname,instance_id='openshift')

### Set Watsonx.ai client
Below cell uses the watson machine learning credentials to create an API client to interact with the project and deployment space. 

In [None]:
client = APIClient(wml_credentials)
client.set.default_project(project_id=project_id)

<a id="split"></a>
### Load all the Documents 

In the following cell, documents are processed to be inserted into a vector database. This involves splitting the documents using langchain's `RecursiveCharacterTextSplitter` and incorporating both content and metadata into the documents. The term "recursive" suggests that this division process happens in multiple stages or levels, breaking down the text into increasingly smaller segments.

In [None]:
def get_split_documents(documents, doc_name):
    content=[]
    metadata = []
    for doc in documents:
        
        document_url = ""
        document_title = doc.metadata["title"] if "title" in doc.metadata else doc.metadata['source'].split("/")[-1].split(".")[0]

        metadata.append({
                "title":document_title ,
                "source": doc.metadata['source'],
                "document_url":document_url,
                "page_number":str(doc.metadata['page']) if 'page' in doc.metadata else ''
                
            })
        
        content.append(doc.page_content)
    
    text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
        chunk_size=parameters['ingestion_chunk_size'],
        chunk_overlap=parameters['ingestion_chunk_overlap'],
        disallowed_special=()
    )
    
    split_documents = text_splitter.create_documents(content, metadatas=metadata)
    print(f"In {doc_name}: {len(documents)} Documents are split into {len(split_documents)} documents with a chunk size {parameters['ingestion_chunk_size']}.")
    
    
    
    for chunk in split_documents:
        chunk.metadata["title"] = chunk.metadata.get("title", "Unknown Title")
        chunk.page_content = f"Document Title: {chunk.metadata['title']}\n Document Content: {chunk.page_content}"
    
    split_docs = rag_helper_functions.remove_duplicate_records(split_documents)
    print(f'In {doc_name}: After de-duplication, there are {len(split_docs)} documents present.')

    return split_docs

<a id="connect"></a>
### Connecting to a vector database

#### Connecting using Project Connection Asset (default)

The notebook, by default, will look for a connection asset in the project named `milvus_connect` or `elasticsearch_connect`.  You can set this up by following the instructions in the project readme. 
This code checks if a specified connection exists in the project. If found, it retrieves the connection details and identifies the connection type. Depending on the connection type, it establishes a connection to the appropriate database. If the connection is not found, it raises an error indicating the absence of the specified connection in the project.

Additionally, If the connection type is **Elastic Search**, the below cell creates an AsyncElasticsearch client and gets the status of the current Elasticsearch model (e.g. ELSER 2 or E5 multilingual). For this ensure that you have the model downloaded and deployed on Elasticsearch. This can be done under **Machine Learning >> Trained Models** section on Elasticsearch.

In [None]:
connection_name = parameters["connection_asset"]
if(next((conn for conn in wslib.list_connections() if conn['name'] == connection_name), None)):
    print(connection_name, "Connection found in the project")
    db_connection = wslib.get_connection(connection_name)
    
    connection_datatypesource_id=client.connections.get_details(db_connection['.']['asset_id'])['entity']['datasource_type']
    connection_type = client.connections.get_datasource_type_details_by_id(connection_datatypesource_id)['entity']['name']
    
    print("Successfully retrieved the connection details")
    print("Connection type is identified as:",connection_type)

    if connection_type=="elasticsearch":
        es_client= await rag_helper_functions.create_and_check_async_elastic_client(db_connection, parameters['elastic_search_model_id'])
    elif connection_type=="milvus" or connection_type=="milvuswxd":
        milvus_credentials = rag_helper_functions.connect_to_milvus_database(db_connection, parameters)
else:
    db_connection=""
    raise ValueError(f"No connection named {connection_name} found in the project.")

<a id="connect_cos"></a>
### Connecting to the Cloud Object Store

This code checks if a specified Cloud Object Store (`Amazon S3` or `IBM COS`) connection exists in the project. If found, it retrieves the connection details and identifies the connection type. Depending on the connection type, it establishes a connection to the COS object store. If the connection is not found, it raises an error indicating the absence of the specified connection in the project.


In [None]:
def create_and_get_cos_connection(connection_name):
    if(next((conn for conn in wslib.list_connections() if conn['name'] == connection_name), None)):
        print(connection_name, "Connection found in the project")
        cos_connection = wslib.get_connection(connection_name)
        
        connection_datatypesource_id=client.connections.get_details(cos_connection['.']['asset_id'])['entity']['datasource_type']
        connection_type = client.connections.get_datasource_type_details_by_id(connection_datatypesource_id)['entity']['name']
        
        print("Successfully retrieved the connection details")
        print("Connection type is identified as:",connection_type)
    
        if connection_type=="amazons3": # For Amazon S3
            conn = RemoteBulkWriter.ConnectParam(
                endpoint="s3.amazonaws.com",
                access_key=cos_connection['access_key'],
                secret_key=cos_connection['secret_key'],
                bucket_name=cos_connection['bucket'],
                region = cos_connection['region'],
                secure=True
            )
            print("Successfully created connection to the AWS S3 instance")
            
        elif connection_type=="bluemixcloudobjectstorage": # For IBM Cloud Object Storage
            conn = RemoteBulkWriter.ConnectParam(
                endpoint=cos_connection['url'],
                access_key=cos_connection['access_key'],
                secret_key=cos_connection['secret_key'],
                bucket_name=cos_connection['bucket'],
                secure=True
            )
            print("Successfully created connection to the IBM COS instance")
    else:
        conn = ""
        raise ValueError(f"No connection named {connection_name} found in the project.")

    return cos_connection, connection_type, conn

<a id="input"></a>
### Input File for extracting, loading, processing and indexing into a vector database

This section of the code is responsible for establishing a connection with the specified Cloud Object Storage (COS) and retrieving all `.tar` files located within the specified data path. It performs the following key operations:
- Connects to the COS (either `Amazon S3` or `IBM COS`) client.
- Accesses the specified COS bucket object.
- Searches the defined cos_data_path within the bucket.
- Filters and retrieves all files with the .tar extension in the specified cos_data_path for further processing.

This ensures that only relevant tar files are fetched from COS, streamlining the data ingestion or processing workflow.

The paramter `cos_data_path` can be updated in the **RAG advanced parameter set** as required.

In [None]:
bucket_data_path = parameters['cos_data_path']

cos_data_connection, cos_connection_type, _ = create_and_get_cos_connection(parameters['cos_data_input_connection_asset'])

cos_client = None

if cos_connection_type=="amazons3":
    cos_client = boto3.client(
        "s3",
        aws_access_key_id=cos_data_connection["access_key"],
        aws_secret_access_key=cos_data_connection["secret_key"]
    )
elif cos_connection_type=="bluemixcloudobjectstorage":
    cos_endpoint_url = cos_data_connection["url"] if cos_data_connection["url"].startswith('https://') else 'https://'+cos_data_connection["url"]
    cos_client = ibm_boto3.client(
        "s3",
        ibm_api_key_id=cos_data_connection['api_key'],
        ibm_service_instance_id=cos_data_connection['resource_instance_id'],
        config=Config(signature_version="oauth"),
        endpoint_url=cos_endpoint_url
    )

def get_cos_objects(bucket_name, prefix):
    objects = []
    paginator = cos_client.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
        if "Contents" in page:
            for obj in page["Contents"]:
                objects.append(obj["Key"])
    return objects

cos_objects = get_cos_objects(cos_data_connection["bucket"], bucket_data_path)
tar_files = [key for key in cos_objects if key.endswith(".tar")]
print(f"Number of tar files found: {len(tar_files)}")

<a id="bulk_insert_documents"></a>
### Inserting Documents using Bulk Indexing

This section describes the process of ingesting large documents (terabytes or more) into a vector database from a Cloud Object Storage (COS) bucket for efficient and rapid indexing.

It contains the following key parts:
- Vector Index Creation:
  * For `Elasticsearch`: It initializes a vector index with either **sparse** or **dense** embeddings, depending on the selected Elasticsearch embedding model.
  * For `Milvus`:
     - Initializes a vector index with either **dense embeddings** or **hybrid search** (dense + BM25 sparse embeddings), based on the `milvus_hybrid_search` parameter. If hybrid search is enabled, it creates a new collection with **IVF_FLAT for dense** and **BM25 for sparse** indexing. Otherwise, it only adds **dense embeddings**.
     - Initializes the selected Milvus embedding model.
     - Initializes a RemoteBulkWriter object linked to the Milvus-associated Cloud Object Store bucket for bulk indexing.
- Document Processing:
  * Reads the input `.tar` files from the input COS bucket for document ingestion.
  * Extracts and processes all pdf contents from the tar archieves to create documents.
- Indexing Process:
  * For `Elasticsearch`: Bulk indexes the processed documents into the Elasticsearch vector index.
  * For `Milvus`:
    - Adds processed documents to the RemoteBulkWriter for storage in the Milvus-associated COS bucket.
    - Bulk indexes the processed data files from the Milvus-associated COS bucket into the Milvus vector index. This can be tracked using the generated task_id.

**Note**: It is recommended to run this section in a Python environment on Cloud Pak for Data software with a GPU-enabled or higher vCPU and RAM hardware configuration, as generating embeddings and processing such large datasets requires significant memory.

In [None]:
def create_collection(index_name):
    # Initializes embedding model
    if environment=="cloud":
        credentials=Credentials(
            api_key = parameters['watsonx_ai_api_key'],
            url =WML_SERVICE_URL)
        embedding = Embeddings(
          model_id=parameters['embedding_model_id'],
          credentials=credentials,
          project_id=project_id,
          verify=True
        )
    elif environment=="on-prem":
        try:
            if client.foundation_models.EmbeddingModels.__members__:
                if client.foundation_models.EmbeddingModels(parameters["embedding_model_id"]).name:
                    embedding = Embeddings(
                      model_id=parameters['embedding_model_id'],
                      credentials=wml_credentials,
                      project_id=project_id,
                      verify=True
                    )
            else:
                print("local on prem embeddng models are not found, using models from IBM Cloud API")
                credentials=Credentials(
                    api_key = parameters['watsonx_ai_api_key'],
                    url =parameters['watsonx_ai_url'])
                embedding = Embeddings(
                  model_id=parameters['embedding_model_id'],
                  credentials=credentials,
                  space_id=parameters["wx_ai_inference_space_id"],
                  verify=True
                )
        except Exception as e:
            print("Exception in loading Embedding Models:" + str(e))

    embedding_dim = embedding.embed_documents(['a'])[0]

    # Creates/ retrieves collection
    
    
    if index_name not in utility.list_collections():
        dense_index_params = {"metric_type": "L2", "index_type": "IVF_FLAT","params": {"nlist": 1024},}
        sparse_index_params = {"metric_type": "BM25","index_type": "SPARSE_INVERTED_INDEX", "params": {"drop_ratio_build": 0.2}}

        hybrid_search = True if parameters['milvus_hybrid_search'].lower()=="true" else False
        if hybrid_search:
            fields = [
                FieldSchema("id", DataType.VARCHAR, is_primary=True, max_length=65535, auto_id=False),
                FieldSchema("dense", DataType.FLOAT_VECTOR, dim=len(embedding_dim)),
                FieldSchema("sparse", DataType.SPARSE_FLOAT_VECTOR),
                FieldSchema("title", DataType.VARCHAR, max_length=65535),
                FieldSchema("source", DataType.VARCHAR, max_length=65535),
                FieldSchema("document_url", DataType.VARCHAR, max_length=65535),
                FieldSchema("page_number", DataType.VARCHAR, max_length=65535),
                FieldSchema("text", DataType.VARCHAR, max_length=65535, enable_analyzer=True)
            ]
            bm25_func = Function(
                name=f"bm25_text",
                function_type=FunctionType.BM25,
                input_field_names=['text'],
                output_field_names=['sparse'],
                )
            coll_schema = CollectionSchema(fields)
            coll_schema.add_function(bm25_func)
            coll = Collection(name=index_name, schema=coll_schema)

            coll.create_index(field_name="dense", index_params=dense_index_params)
            coll.create_index(field_name="sparse", index_params=sparse_index_params)
        else:
            fields = [
                FieldSchema("id", DataType.VARCHAR, is_primary=True, max_length=65535, auto_id=False),
                FieldSchema("vector", DataType.FLOAT_VECTOR, dim=len(embedding_dim)),
                FieldSchema("title", DataType.VARCHAR, max_length=65535),
                FieldSchema("source", DataType.VARCHAR, max_length=65535),
                FieldSchema("document_url", DataType.VARCHAR, max_length=65535),
                FieldSchema("page_number", DataType.VARCHAR, max_length=65535),
                FieldSchema("text", DataType.VARCHAR, max_length=65535)
            ]
            coll_schema = CollectionSchema(fields)
            coll = Collection(name=index_name, schema=coll_schema)
        
            coll.create_index(field_name="vector", index_params=dense_index_params)
            
        print('Milvus collection is created!')
    else:
        coll = Collection(name=index_name)
        print('Milvus collection is retrieved!')

    return coll, embedding


def create_remote_writer(collection_obj):
    # Create connection object to the remote COS object store
    _, _, cos_conn = create_and_get_cos_connection(parameters['cos_milvus_connection_asset'])
    
    # Create writer object of the Remote object store for storing data
    bulk_writer_remote_data_path = parameters['bulk_writer_remote_data_path']
    writer = RemoteBulkWriter(
        schema=collection_obj.schema,
        remote_path=bulk_writer_remote_data_path,
        connect_param=cos_conn,
        file_type=BulkFileType.NUMPY
    )
    print("Bulk writer object to the Cloud object store is created!")
    
    return writer

Below code initializes the Milvus collection and embedding model for indexing of documents. It also creates remote bulk writer object which will be used to store input data files for Milvus.

In [None]:
if connection_type=="milvus" or connection_type=="milvuswxd":
    coll, embedding = create_collection(parameters['vector_store_index_name'])
    writer = create_remote_writer(coll)

Below code initializes an Elasticsearch index and configures an ingest pipeline for document indexing.

In [None]:
if connection_type=="elasticsearch":
    try:
        es_client.options(ignore_status=400).indices.create(
                index=parameters['vector_store_index_name'],
                mappings={
                    'properties': {
                        'vector.tokens': {
                            'type': 'sparse_vector' if 'sparse' in parameters['elastic_search_vector_type'] else 'dense_vector',
                        },
                    }
                },
                settings={
                    'index': {
                        'default_pipeline': 'ingest-pipeline',
                        "mapping.total_fields.limit": 1000000
                    },
                    "number_of_shards": parameters["es_number_of_shards"],
                }
            )
        
        es_client.ingest.put_pipeline(
                id='ingest-pipeline',
                processors=[
                    {
                        'inference': {
                            'model_id': parameters['elastic_search_model_id'],
                            'input_output': [
                                {
                                    'input_field': 'text',
                                    'output_field': 'vector.tokens',
                                }
                            ]
                        }
                    }
                ]
            )
        print(f'Elastic search index created with {parameters["elastic_search_model_id"]}!')
    except Exception as e:
        print(f'Error creating index: {e}')

<h4>Dataset Preparation and Processing</h4>

This section handles the extraction and preparation of dataset contents for ingestion into vector database. The process includes the following steps:
- Extracts all PDF files from the retrieved TAR archives.
- Processes the content of each PDF to structure the data appropriately.
- For `Elasticsearch`:
    - Stores the processed data into a data structure for further processing.
- For `Milvus`:
    - Transforms the processed data into a format compatible with the Milvus bulk writer.
    - Writes the prepared data to a Milvus-associated COS bucket.

**Optional**: You can uncomment the pdf_count parameter to limit the number of PDF files processed per tar file during execution — useful for testing or resource management.

In [None]:
def generate_hash(content):
    return hashlib.sha256(content.encode()).hexdigest()
    
def prepare_and_write_data(documents, hybrid_search, doc_list, pdf_name):
    try:
        global embedding
        # Generating batch embedding for the document list
        doc_embeddings = embedding.embed_documents([doc.page_content for doc in documents])
        for index in range(len(documents)):
            doc = documents[index]
            doc._id=generate_hash(doc.page_content+'\nTitle: '+doc.metadata['title']+'\nUrl: '+doc.metadata['document_url']+'\nPage: '+doc.metadata['page_number'])
            doc.id=doc._id
            doc = json.loads(doc.json())
            doc['title']=doc['metadata']['title']
            doc['document_url']=doc['metadata']['document_url']
            doc['page_number']=doc['metadata']['page_number']
            doc['source']=doc['metadata']['source']
            doc['text'] = doc['page_content']
            if hybrid_search:
                doc['dense'] = doc_embeddings[index]
            else:
                doc['vector'] = doc_embeddings[index]
            del doc['page_content']
            del doc['type']
            del doc['metadata']
            doc_list.append(doc)
    except Exception as e:
        print(f'Error in preparing and writing data for PDF : {pdf_name}. Error : {e}.')
        
def process_pdf(pdf, doc_list, all_docs_split):
    pdf_name, pdf_content = pdf
    try:
        print(f'Started Processing pdf content: {pdf_name}.')
        with tempfile.NamedTemporaryFile(suffix=".pdf", delete=True) as temp_pdf:
            temp_pdf.write(pdf_content)
            temp_pdf.flush()  # Ensure all data is written to the file
            loader = PyPDFLoader(temp_pdf.name)
            documents = loader.load()
            split_docs = get_split_documents(documents, pdf_name)
            
            if connection_type=="elasticsearch":
                all_docs_split.extend(split_docs)

            elif connection_type=="milvus" or connection_type=="milvuswxd":
                hybrid_search = True if parameters['milvus_hybrid_search'].lower()=="true" else False
                prepare_and_write_data(split_docs, hybrid_search, doc_list, pdf_name)
                
        
        print(f'Processing completed for PDF: {pdf_name}.')
            
    except Exception as e:
        print(f'Error processing PDF: {pdf_name}. Error : {e}')
        global processing_error
        processing_error.append(f'Error processing PDF: {pdf_name}. Error : {e}.')
            
def process_tar_file(bucket_name, tar_key,):
    map_pdf_content = {}
    try:
        response = cos_client.get_object(Bucket=bucket_name, Key=tar_key)
        tar_bytes = BytesIO(response["Body"].read())
        if tar_bytes.getbuffer().nbytes > 1:
            print('--------Reading tar file---------')
            with tarfile.open(fileobj=tar_bytes, mode="r:") as tar:
                global pdf_count
                for member in tar.getmembers():
                    if member.isfile() and member.name.endswith(".pdf"):
                        print(f'Reading PDF content: {member.name}')
                        pdf_count += 1

                        # limit pdf count to be processed
                        # if pdf_count > 5: 
                        #     break

                        pdf_file = tar.extractfile(member)
                        if pdf_file:
                            pdf_content = pdf_file.read()
                            map_pdf_content.update({member.name: pdf_content})
    except Exception as e:
        print(f'Error processing tar files. Error : {e}.')
    return map_pdf_content

<h4>PDF Document Chunking and Writing to Compatible Format</h4>

This section of the code iterates through all PDF documents extracted from the list of TAR files. It utilizes multiprocessing and spawn processes based on the `num_workers` parameter. A progress bar is employed to track the number of TAR files and PDFs processed.

It performs the following key operations:<br>
- Each PDF document is processed individually.
- The content of each PDF is divided into smaller, manageable document chunks optimized for vector indexing.
- For `Elasticsearch`:
  * The processed data is organized into a data structure, preparing them for subsequent processing and ingestion into the vector database.
- For `Milvus`:
  * The processed data is transformed into a format compatible with the Milvus bulk writer.
  * Employs the RemoteBulkWriter to write the processed data to a Milvus-associated Cloud Object Store (COS) bucket.

This approach ensures efficient handling and indexing of large PDF datasets.

**Optional**: You can uncomment the tar_count parameter to limit the number of TAR files processed during execution — useful for testing or resource management.

**Note**: The execution time of this operation can vary significantly and may take several hours, depending on the size and number of input documents. <br>
 To optimize the performance, adjust the `num_workers` parameter based on your system's CPU core count. Be mindful of the increased CPU memory consumption and ensure your system can handle the load.

In [None]:
tar_count = 0
pdf_count = 0
processing_error = []
data_batch_files = []

if connection_type=="elasticsearch":
    writer=None
    embedding=None

# Creating shared list to be shared across processes
m = multiprocessing.Manager()
shared_data_list = m.list()
shared_all_split_docs = m.list()

num_workers = parameters['num_workers']

def update_progress(result):
    pbar.update(1)

# Loop through and process each tar file
print('Execution Started!')
start_time = datetime.datetime.now()
try:
    with tqdm(total=len(tar_files), desc="Processing Tar Files", unit="tars") as pbar_tar:
        for tar_file in tar_files:
            tar_count += 1
            
            # limit tar count to be processed
            # if tar_count > 5: 
            #     break
            
            print(f"Tar file: {tar_count}")    
            print(f"Processing tar file: {tar_file}\n------------------\n")
            
            start_tar_execution = datetime.datetime.now()
            map_pdf_content = process_tar_file(cos_data_connection["bucket"], tar_file)
            print(f"\n----------\nTotal PDF count: {len(map_pdf_content)}\n")
    
            with multiprocessing.Pool(processes=num_workers) as pool:
                with tqdm(total=len(shared_data_list), desc=f"Tar: {tar_file}, Total PDFs: {len(map_pdf_content)}, Processing PDF", unit=" ") as pbar:
                    results = [pool.apply_async(process_pdf, (p ,shared_data_list, shared_all_split_docs), callback=update_progress) for p in map_pdf_content.items()]
    
                    for r in results:
                        r.get()
                        # print('\t', r.get())
            
            end_tar_execution = datetime.datetime.now()
            
            if connection_type=="milvus" or connection_type=="milvuswxd":
                for doc in shared_data_list:
                    writer.append_row(doc)
                writer.commit()
                print('Successfully written documents to the COS Milvus object store!')
            
                # Data files path
                data_batch_files = writer.batch_files
                print(f'Data batch files created in COS: {data_batch_files}')
    
                shared_data_list[:] = []
        
            print(f"\n----------\nProcessing of tar {tar_file} completed. Total PDF count: {pdf_count}\n Time elapsed: {end_tar_execution-start_tar_execution}\n-----------\n")
            pbar_tar.update(1)
            pdf_count = 0
except Exception as e:
    print(f'Failed to process data files from cos bucket. Error: {e}.')

end_time = datetime.datetime.now()
print('Execution Complete! \nTotal Execution complete: ',(end_time-start_time))
print('start time: ', start_time)
print('end time: ', end_time)


<h4>Bulk ingestion of generated data files to Milvus vector store. </h4>

Batch files containing set of documents can be directly ingested into Milvus vector store.<br>
This `asynchronous process` generates a unique task_id for each batch file, allowing for progress tracking.

In [None]:
if connection_type=="milvus" or connection_type=="milvuswxd":
    task_id_list = []
    try:
        for batch in data_batch_files:
            batch_data_path = '/'.join(batch[0].split('/')[:-1])
            print('Batch:',batch_data_path)
            task_id = utility.do_bulk_insert(
                collection_name=parameters['vector_store_index_name'],
                files=batch)
            task_id_list.append(task_id)
            print(f'task_id: {task_id}')
        print('Bulk indexing of the documents has started')
    except Exception as e:
        print('Bulk Insert failed. Error: ', e)

The document ingestion progress can be tracked using the task IDs generated for each batch file in the previous step.
Each `task_id` represents an asynchronous bulk-insert operation, and its state can be checked to determine the current status of the ingestion process. Different task states are as follows:
* Pending
* Started / In Progress
* Persisted
* Completed
* Failed
  
**Note** The bulk ingestion is an intensive operation and may take some time to complete. It's advisable to monitor the progress periodically to ensure successful completion.

In [None]:
if connection_type=="milvus" or connection_type=="milvuswxd":
    print('Bulk indexing Status:')
    for task_id in task_id_list:
        print(f'\n----------\ntask_id: {task_id}')
        task = utility.get_bulk_insert_state(task_id=task_id)
        state_2_name = task.state_2_name
        state_2_name.update({2:'In Progress'})
        
        print("UTC Start time:", task.create_time_str)
        print("Ingestion Status:", state_2_name[task.state])
        print("Ingestion Progress: {}%".format(task.progress))

        if task.state == BulkInsertState.ImportCompleted:
            print("Imported row count:", task.row_count)
            print('Document Ingestion Completed!')
            
        if task.state == BulkInsertState.ImportFailed:
            print('Document Ingestion Failed!')
            print('-------------------------')
            print("Failed reason:", task.failed_reason)


After completing the bulk ingestion process, execute the following cell to load all data into the Milvus collection. This step is essential for making the data available for querying.

Please note that executing this cell will fail if any of the previous bulk insertion steps encountered errors.

In [None]:
if connection_type=="milvus" or connection_type=="milvuswxd":
    try:
        coll.load()
        print('Milvus collection is loaded with documents and ready for querying!')
    except Exception as e:
        print('Failed loading Milvus collection. Error in bulk ingestion : ', e)

<h4>Bulk ingestion of documents to Elasticsearch vector store. </h4>

- Transforms the processed data into a format compatible with the Elasticsearch bulk writer.
- Performs asynchronous bulk insertion into the Elasticsearch index.

In [None]:
if connection_type=="elasticsearch":
    all_docs_split_upd=[]
    try:
        for doc in shared_all_split_docs:
            doc = json.loads(doc.json())
            doc['text'] = doc['page_content']
            del doc['page_content']
            all_docs_split_upd.append(doc)
        print('Documents are ready for ingestion in Elasticsearch!')
    except Exception as e:
        print(f'Error: {e}')

The following cell asynchronously indexes data into an Elasticsearch index by processing documents in chunks and streaming the progress of their insertion.

**Note**: Executing this operation may take some time if the dataset contains a large number of documents.

In [None]:
if connection_type=="elasticsearch":
    start_time = datetime.datetime.now()
    try:
        total_docs = 0
        successful_docs = 0
        print('Starting Bulk indexing of the documents.')
        async for success, info in helpers.async_streaming_bulk(es_client, all_docs_split_upd, index = parameters['vector_store_index_name'], chunk_size=500, raise_on_error=True):
            total_docs += 1
            if success:
                successful_docs += 1
            print(f"Indexed {total_docs} documents: {successful_docs} successful")
    except helpers.BulkIndexError as e:
        print('Bulk Insert failed. Errors: ')
        for error in e.errors:
            print(error)
    end_time = datetime.datetime.now()
    print(f"Indexing of document completed. \n Time elapsed: {end_time-start_time}")

Above cell may take significant amount of time to complete based on the size of the documents. 

Optionally you can also proceed to **`Create and Deploy QnA AI Service`** notebook to create and deploy the RAG AI service.<br>




**Sample Materials, provided under license.</a> <br>
Licensed Materials - Property of IBM. <br>
© Copyright IBM Corp. 2025. All Rights Reserved. <br>
US Government Users Restricted Rights - Use, duplication or disclosure restricted by GSA ADP Schedule Contract with IBM Corp. <br>**
