<a href="https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/docs/examples/ingestion/parallel_execution_ingestion_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Parallelizing llamaindex RAG Pipeline

## 0. Pré-requis


In [95]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In this notebook, we demonstrate how to execute ingestion pipelines using parallel processes. Both sync and async versions of batched parallel execution are possible with `IngestionPipeline`.

In [96]:
%pip install llama-index-cli
%pip install llama-index-embeddings-openai
%pip install llama-index-readers-file
%pip install llama-index-embeddings-huggingface



In [97]:
import nest_asyncio

nest_asyncio.apply()

In [98]:
import cProfile, pstats
from pstats import SortKey
import time
import asyncio

### Download data


For this notebook, we'll load the `PatronusAIFinanceBenchDataset` llama-dataset from [llamahub](https://llamahub.ai).

In [99]:
!llamaindex-cli download-llamadataset PatronusAIFinanceBenchDataset --download-dir ./data

100% 32/32 [00:11<00:00,  2.79it/s]
Successfully downloaded PatronusAIFinanceBenchDataset to ./data


## 1. Load data

### 1.0 Définition du Reader

**Il y a 32 pdfs d'une centaine de pages dans les données PatronusAIFinanceBenchDataset .**

In [100]:
from llama_index.core import SimpleDirectoryReader

# define our reader with the directory containing the 32 pdf files

reader = SimpleDirectoryReader(
    input_dir="./data/source_files",  # "./data/source_files" "/content/drive/MyDrive/test_data"
    #required_exts=[".pdf"],
    recursive=True,
    )

### 1.1 Sequential load

In [101]:
profiler = cProfile.Profile()

tic = time.time()
profiler.enable()
documents = reader.load_data(show_progress=True)
profiler.disable()
print(f"\nCréation de {len(documents)} documents en {time.time()-tic}s.")

profiler.dump_stats('stats_sequential_load')
p = pstats.Stats("stats_sequential_load")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)

Loading files: 100%|██████████| 32/32 [07:59<00:00, 15.00s/file]


Création de 4306 documents en 479.93462562561035s.
Sat Feb  8 17:46:13 2025    stats_sequential_load

         2821751 function calls (2798882 primitive calls) in 479.934 seconds

   Ordered by: cumulative time
   List reduced from 377 to 15 due to restriction <15>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000  479.934  479.934 interactiveshell.py:3512(run_code)
        1    0.000    0.000  479.934  479.934 {built-in method builtins.exec}
        1    0.000    0.000  479.934  479.934 <ipython-input-101-89bd0949f457>:1(<cell line: 0>)
        1    0.000    0.000  479.934  479.934 base.py:664(load_data)
        1    0.000    0.000  479.932  479.932 base.py:493(load_file)
        1    0.000    0.000  479.930  479.930 __init__.py:328(wrapped_f)
        1    0.000    0.000  479.930  479.930 __init__.py:465(__call__)
        1    0.000    0.000  479.930  479.930 base.py:36(load_data)
        5    0.005    0.001  479.690   95.938 _page.p




<pstats.Stats at 0x7eb91cb2ead0>

In [None]:
print(f"Temps d'exécution moyen du loader sur 7 ittérations :")
%timeit reader.load_data()

Temps d'exécution moyen du loader sur 7 ittérations :


### 1.2 Parallel load

In [None]:
import multiprocessing

num_cpus = multiprocessing.cpu_count()
print(f"Number of CPUs: {num_cpus}")

In [None]:
profiler = cProfile.Profile()

tic = time.time()
profiler.enable()
documents = reader.load_data(num_workers=2, show_progress=True)
profiler.disable()
print(f"\nCréation de {len(documents)} documents en {time.time()-tic}s.")

profiler.dump_stats('stats_parallel_load_worker2')
p = pstats.Stats("stats_parallel_load_worker2")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)

In [None]:
print(f"Temps d'exécution moyen du loader sur 7 ittérations :")
%timeit reader.load_data(num_workers=2)

### 1.3 Async Load

In [None]:
profiler = cProfile.Profile()

tic = time.time()
profiler.enable()
documents = await reader.aload_data(show_progress=True)
profiler.disable()
print(f"\nCréation de {len(documents)} documents en {time.time()-tic}s.")

profiler.dump_stats('stats_async_load')
p = pstats.Stats("stats_async_load")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)

In [None]:
loop = asyncio.get_event_loop()
print(f"Temps d'exécution moyen du loader sur 7 ittérations :")
%timeit loop.run_until_complete(reader.aload_data())

## 1.4 Async Parallel Load

In [None]:
profiler = cProfile.Profile()

tic = time.time()
profiler.enable()
documents = await reader.aload_data(num_workers=2, show_progress=True)
profiler.disable()
print(f"\nCréation de {len(documents)} documents en {time.time()-tic}s.")

profiler.dump_stats('stats_parallel_async_load_worker2')
p = pstats.Stats("stats_parallel_async_load_worker2")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)

In [None]:
loop = asyncio.get_event_loop()
print(f"Temps d'exécution moyen du loader sur 7 ittérations :")
%timeit loop.run_until_complete(reader.aload_data(num_workers=2))

### 1.5 TODO : Conclusion

## 2. IngestionPipeline

### 2.0 Définition du pipeline

In [None]:
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline
from llama_index.embeddings.huggingface import HuggingFaceEmbedding

# create the pipeline with transformations
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=512, chunk_overlap=20),
        HuggingFaceEmbedding("BAAI/bge-small-en-v1.5"),
    ]
)

# since we'll be testing performance, using timeit and cProfile
# we're going to disable cache
pipeline.disable_cache = True

### 2.1 Sequential Execution

By default `num_workers` is set to `None` and this will invoke sequential execution.

In [None]:
profiler = cProfile.Profile()

tic = time.time()
profiler.enable()
nodes = pipeline.run(documents=documents, show_progress=True)
profiler.disable()
print(f"\nCréation de {len(nodes)} nodes en {(time.time()-tic)/5}s.")

profiler.dump_stats('stats_sequential_ingestion')
p = pstats.Stats("stats_sequential_ingestion")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)

In [None]:
print(f"Temps d'exécution moyen du pipeline sur 7 ittérations :")
%timeit pipeline.run(documents=documents)

### 2.2 Parallel Execution

A single run. Setting `num_workers` to a value greater than 1 will invoke parallel execution.

In [None]:
profiler = cProfile.Profile()

tic = time.time()
profiler.enable()
nodes = pipeline.run(documents=documents, num_workers=2, show_progress=True)
profiler.disable()
print(f"\nCréation de {len(nodes)} nodes en {(time.time()-tic)/5}s.")

profiler.dump_stats('stats_parallel_ingestion_worker2')
p = pstats.Stats("stats_parallel_ingestion_worker2")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)

In [None]:
print(f"Temps d'exécution moyen du pipeline sur 7 ittérations :")
%timeit pipeline.run(documents=documents, num_workers=2)

### 2.3 Async on Main Processor

As with the sync case, `num_workers` is default to `None`, which will then lead to single-batch execution of async tasks.

In [None]:
profiler = cProfile.Profile()

tic = time.time()
profiler.enable()
nodes = await pipeline.arun(documents=documents, show_progress=True)
profiler.disable()
print(f"\nCréation de {len(documents)} documents en {time.time()-tic}s.")

profiler.dump_stats('stats_async_ingestion')
p = pstats.Stats("stats_async_ingestion")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)

In [None]:
loop = asyncio.get_event_loop()
print(f"Temps d'exécution moyen du pipeline sur 7 ittérations :")
%timeit loop.run_until_complete(pipeline.arun(documents=documents))

### 2.4 Async Parallel Execution

Here the `ProcessPoolExecutor` from `concurrent.futures` is used to execute processes asynchronously. The tasks are being processed are blocking, but also performed asynchronously on the individual processes.

In [None]:
profiler = cProfile.Profile()

tic = time.time()
profiler.enable()
nodes = await pipeline.arun(documents=documents, num_workers=2, show_progress=True)
profiler.disable()
print(f"\nCréation de {len(documents)} documents en {time.time()-tic}s.")

profiler.dump_stats('stats_parallel_async_ingestion')
p = pstats.Stats("stats_parallel_async_ingestion")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)

In [None]:
loop = asyncio.get_event_loop()
print(f"Temps d'exécution moyen du loader sur 7 ittérations :")
%timeit loop.run_until_complete(pipeline.arun(documents=documents, num_workers=2))

### TODO : Conclusion

The results from the above experiments are re-shared below where each strategy is listed from fastest to slowest with this example dataset and pipeline.

1. (Async, Parallel Processing): 20.3s
2. (Async, No Parallel Processing): 20.5s
3. (Sync, Parallel Processing): 29s
4. (Sync, No Parallel Processing): 1min 11s

We can see that both cases that use Parallel Processing outperforms the Sync, No Parallel Processing (i.e., `.run(num_workers=None)`). Also, that at least for this case for Async tasks, there is little gains in using Parallel Processing. Perhaps for larger workloads and IngestionPipelines, using Async with Parallel Processing can lead to larger gains.