In [None]:
%%capture
!pip install llama-index==0.10.37 qdrant-client==1.9.1 llama-index-vector-stores-qdrant==0.2.8 llama-index-llms-google-genai llama-index-embeddings-google-genai
# llama-index-embeddings-openai==0.1.9 llama-index-llms-cohere==0.2.0 

In [3]:
import os
from dotenv import load_dotenv
from getpass import getpass

import nest_asyncio

nest_asyncio.apply()
load_dotenv()

True

In [4]:
GEMINI_API_KEY = os.environ['GEMINI_API_KEY'] or getpass("Enter your Gemini API key: ")

In [5]:
# OPENAI_API_KEY = os.environ['OPENAI_API_KEY'] or getpass("Enter your OpenAI API key: ")

In [6]:
QDRANT_URL = os.environ['QDRANT_URL'] or getpass("Enter your Qdrant URL:")

In [7]:
QDRANT_API_KEY = os.environ['QDRANT_API_KEY'] or  getpass("Enter your Qdrant API Key:")

In [8]:
CO_API_KEY = os.environ['CO_API_KEY'] or getpass("Enter your Cohere API key: ")

In [None]:
from llama_index.core.settings import Settings
# from llama_index.llms.cohere import Cohere
# from llama_index.embeddings.openai import OpenAIEmbedding
# from llama_index.embeddings.cohere import CohereEmbedding
from llama_index.llms.google_genai import GoogleGenAILLM
from llama_index.embeddings.google_genai import GoogleGenAIEmbedding

# Settings.llm = Cohere(model="command-r-plus", api_key=CO_API_KEY)
Settings.llm = GoogleGenAILLM(model="gemini-2.5-flash", api_key=GEMINI_API_KEY)

# Settings.embed_model = OpenAIEmbedding(model_name="text-embedding-3-small")
Settings.embed_model = GoogleGenAIEmbedding(model_name="gemini-embedding-001", api_key=GEMINI_API_KEY)

# Ingestion Pipeline

- 🔄 **IngestionPipeline Overview**: Utilizes `Transformations` applied to input data, modifying data into nodes, which are returned or inserted to a vector database.

- 💾 **Caching Mechanism**: Each node+transformation pair is cached, enhancing efficiency for identical subsequent operations by utilizing cached results.


### Using an `IngestionPipeline`

First, let's read in some data. 

In [10]:
from llama_index.core import SimpleDirectoryReader

documents = SimpleDirectoryReader(
    input_files = ["../02_Fundamental_Concepts_in_LlamaIndex/data/pg10.txt"], 
    filename_as_id=True).load_data()

# Ingestion Pipeline with Document Management


 •  💾 **Caching in IngestionPipeline**: Hashes and stores each node + transformation combination to speed up future processes with identical data.

 •  📁 **Local Cache Management**: The input nodes list and transformation pair are cached in the pipeline. When we apply the same transformation to that list of nodes again, the output nodes are retrieved from the cache.

 •  📚 **Docstore Attachment**:  Enables document management in the ingestion pipeline, using `doc_id` or `node.ref_doc_id` for identification. Prevents running a transformation on the same document multiple times by using the document ID and the hash of the document content to manage duplicates.

 •  🗂️ **Duplicate Handling**:
  - Maintains a `doc_id` to `document_hash` map to identify duplicates.

  - Re-processes documents if the same `doc_id` is found with a changed hash.

  - Skips documents if the same `doc_id` is found but the hash remains unchanged.

 •  🚫 **Without Vector Store**:
  - Limited to checking and removing duplicate inputs.

 •  ✨ **With Vector Store**:
  - Enables handling of upserts for updated documents, offering advanced management capabilities.

In [11]:
from qdrant_client import QdrantClient

from llama_index.core import StorageContext, VectorStoreIndex
from llama_index.core.ingestion import IngestionCache, IngestionPipeline
from llama_index.core.node_parser import TokenTextSplitter
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.vector_stores.qdrant import QdrantVectorStore

client = QdrantClient(
    url=QDRANT_URL, 
    api_key=QDRANT_API_KEY,
)

vector_store = QdrantVectorStore(
    client=client, 
    collection_name="it_can_be_done")

ingest_cache = IngestionCache(
    collection="it_can_be_done",
)

# create pipeline with transformations
pipeline = IngestionPipeline(
    transformations=[
        TokenTextSplitter(chunk_size=256, chunk_overlap=16),
        Settings.embed_model
    ],
    docstore=SimpleDocumentStore(),
    vector_store=vector_store,
    cache=ingest_cache,
)

# run the pipeline
nodes = pipeline.run(documents = documents)

TooManyRequestsError: status_code: 429, body: data=None id='052b25e4-bf7f-48c6-82f5-d1a700ba7875' message='trial token rate limit exceeded, limit is 100000 tokens per minute'

In [None]:
nodes[0].__dict__.keys()

In [None]:
index = VectorStoreIndex.from_vector_store(vector_store=vector_store)

In [None]:
retriever = index.as_retriever(
    similarity_top_k=7, 
    return_sources=True
    )

In [None]:
retrieved_nodes = retriever.retrieve("Poems about starting where you stand, and not making dreams your master")

In [None]:
retrieved_nodes

In [None]:
print(retrieved_nodes[0].get_text())

In [None]:
print(retrieved_nodes[0].get_score())

The ingestion pipeline allows for saves the cache and docstore to a default folder `(./pipeline_storage)`. 

When running the pipeline, it reuses the cache, skips duplicate documents in the docstore.



In [None]:
pipeline.persist('./pipeline_storage')