<a href="https://colab.research.google.com/github/jerryjliu/llama_index/blob/main/docs/examples/ingestion/parallel_execution_ingestion_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Parallelizing Ingestion Pipeline

In this notebook, we demonstrate how to execute ingestion pipelines using parallel processes. Both sync and async versions of batched parallel execution are possible with `IngestionPipeline`.

In [None]:
%pip install llama-index-embeddings-openai


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [None]:
import nest_asyncio

nest_asyncio.apply()

In [None]:
import cProfile, pstats
from pstats import SortKey

### Load data

For this notebook, we'll load the `PatronusAIFinanceBenchDataset` llama-dataset from [llamahub](https://llamahub.ai).

In [None]:
!llamaindex-cli download-llamadataset PatronusAIFinanceBenchDataset --download-dir ./data

100%|███████████████████████████████████████████| 32/32 [00:25<00:00,  1.26it/s]
Successfully downloaded PatronusAIFinanceBenchDataset to ./data


In [None]:
from llama_index.core import SimpleDirectoryReader

documents = SimpleDirectoryReader(input_dir="./data/source_files").load_data(
    num_workers=4
)

### Define our IngestionPipeline

In [None]:
from llama_index.core import Document
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.extractors import TitleExtractor
from llama_index.core.ingestion import IngestionPipeline

# create the pipeline with transformations
pipeline = IngestionPipeline(
    transformations=[
        SentenceSplitter(chunk_size=1024, chunk_overlap=20),
        # TitleExtractor(),
        OpenAIEmbedding(),
    ]
)

# since we'll be testing performance, using timeit and cProfile
# we're going to disable cache
pipeline.disable_cache = True

### Parallel Execution

A single run. Setting `num_workers` to a value greater than 1 will invoke parallel execution.

In [None]:
nodes = pipeline.run(documents=documents, num_workers=4)

In [None]:
len(nodes)

5371

In [None]:
%timeit pipeline.run(documents=documents, num_workers=4)

24.9 s ± 732 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
cProfile.run(
    "pipeline.run(documents=documents, num_workers=4)",
    "newstats",
)
p = pstats.Stats("newstats")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)

Mon Mar  4 22:58:56 2024    newstats

         2050 function calls in 25.435 seconds

   Ordered by: cumulative time
   List reduced from 215 to 15 due to restriction <15>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000   25.435   25.435 {built-in method builtins.exec}
        1    0.049    0.049   25.434   25.434 <string>:1(<module>)
        1    0.000    0.000   25.386   25.386 pipeline.py:665(run)
       12    0.000    0.000   25.327    2.111 threading.py:589(wait)
       11    0.000    0.000   25.327    2.302 threading.py:288(wait)
       71   25.327    0.357   25.327    0.357 {method 'acquire' of '_thread.lock' objects}
        1    0.000    0.000   25.324   25.324 pool.py:369(starmap)
        1    0.000    0.000   25.324   25.324 pool.py:767(get)
        1    0.000    0.000   25.324   25.324 pool.py:764(wait)
        1    0.000    0.000    0.049    0.049 context.py:115(Pool)
        1    0.000    0.000    0.049    0.049 pool.py

<pstats.Stats at 0x2a360f9d0>

### Async Parallel Execution

Here the `ProcessPoolExecutor` from `concurrent.futures` is used to execute processes asynchronously. The tasks are being processed are blocking, but also performed asynchronously on the individual processes.

In [None]:
nodes = await pipeline.arun(documents=documents, num_workers=4)

In [None]:
len(nodes)

5371

In [None]:
import asyncio

loop = asyncio.get_event_loop()
%timeit loop.run_until_complete(pipeline.arun(documents=documents, num_workers=4))

Retrying llama_index.embeddings.openai.base.aget_embeddings in 0.2501484448736715 seconds as it raised RateLimitError: Error code: 429 - {'error': {'message': 'Rate limit reached for text-embedding-ada-002 in organization org-1ZDAvajC6v2ZtAP9hLEIsXRz on tokens per min (TPM): Limit 10000000, Used 9942046, Requested 74706. Please try again in 100ms. Visit https://platform.openai.com/account/rate-limits to learn more.', 'type': 'tokens', 'param': None, 'code': 'rate_limit_exceeded'}}.
Retrying llama_index.embeddings.openai.base.aget_embeddings in 0.4651037530906399 seconds as it raised RateLimitError: Error code: 429 - {'error': {'message': 'Rate limit reached for text-embedding-ada-002 in organization org-1ZDAvajC6v2ZtAP9hLEIsXRz on tokens per min (TPM): Limit 10000000, Used 9952471, Requested 74238. Please try again in 160ms. Visit https://platform.openai.com/account/rate-limits to learn more.', 'type': 'tokens', 'param': None, 'code': 'rate_limit_exceeded'}}.
Retrying llama_index.embed

17.6 s ± 4.38 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
loop = asyncio.get_event_loop()
cProfile.run(
    "loop.run_until_complete(pipeline.arun(documents=documents, num_workers=4))",
    "async-newstats",
)
p = pstats.Stats("async-newstats")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)

Mon Mar  4 23:02:02 2024    async-newstats

         2730 function calls in 13.905 seconds

   Ordered by: cumulative time
   List reduced from 286 to 15 due to restriction <15>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000   13.905   13.905 {built-in method builtins.exec}
        1    0.051    0.051   13.905   13.905 <string>:1(<module>)
        1    0.000    0.000   13.853   13.853 nest_asyncio.py:86(run_until_complete)
       13    0.000    0.000   13.853    1.066 nest_asyncio.py:100(_run_once)
       13    0.000    0.000   13.561    1.043 selectors.py:554(select)
       13   13.561    1.043   13.561    1.043 {method 'control' of 'select.kqueue' objects}
       26    0.000    0.000    0.292    0.011 events.py:78(_run)
       26    0.000    0.000    0.292    0.011 {method 'run' of '_contextvars.Context' objects}
        2    0.000    0.000    0.291    0.145 tasks.py:215(__step)
        2    0.000    0.000    0.291    0.145 {metho

<pstats.Stats at 0x12e943eb0>

### Sequential Execution

By default `num_workers` is set to `None` and this will invoke sequential execution.

In [None]:
nodes = pipeline.run(documents=documents)

In [None]:
len(nodes)

5371

In [None]:
%timeit pipeline.run(documents=documents)

1min 11s ± 3.56 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
cProfile.run("pipeline.run(documents=documents)", "oldstats")
p = pstats.Stats("oldstats")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)

Mon Mar  4 23:14:42 2024    oldstats

         5713181 function calls (5496694 primitive calls) in 68.806 seconds

   Ordered by: cumulative time
   List reduced from 714 to 15 due to restriction <15>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000   68.810   68.810 {built-in method builtins.exec}
        1    0.053    0.053   68.810   68.810 <string>:1(<module>)
        1    0.000    0.000   68.757   68.757 pipeline.py:665(run)
        1    0.015    0.015   68.757   68.757 pipeline.py:97(run_transformations)
        1    5.145    5.145   63.002   63.002 base.py:333(__call__)
        1    0.010    0.010   57.827   57.827 base.py:233(get_text_embedding_batch)
       54    0.000    0.000   57.807    1.070 base.py:411(_get_text_embeddings)
       54    0.000    0.000   57.806    1.070 __init__.py:287(wrapped_f)
       54    0.003    0.000   57.806    1.070 __init__.py:369(__call__)
       54    0.001    0.000   57.799    1.070 base.py:1

<pstats.Stats at 0x10ea61cf0>

### Async on Main Processor

As with the sync case, `num_workers` is default to `None`, which will then lead to single-batch execution of async tasks.

In [None]:
nodes = await pipeline.arun(documents=documents)

In [None]:
len(nodes)

5371

In [None]:
%timeit loop.run_until_complete(pipeline.arun(documents=documents))

16.7 s ± 3.45 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
cProfile.run(
    "loop.run_until_complete(pipeline.arun(documents=documents))",
    "async-oldstats",
)
p = pstats.Stats("async-oldstats")
p.strip_dirs().sort_stats(SortKey.CUMULATIVE).print_stats(15)

Mon Mar  4 23:18:06 2024    async-oldstats

         6901892 function calls (6678831 primitive calls) in 15.789 seconds

   Ordered by: cumulative time
   List reduced from 1015 to 15 due to restriction <15>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000   15.799   15.799 {built-in method builtins.exec}
        1    0.043    0.043   15.799   15.799 <string>:1(<module>)
        1    0.002    0.002   15.756   15.756 nest_asyncio.py:86(run_until_complete)
     2078    0.035    0.000   15.753    0.008 nest_asyncio.py:100(_run_once)
     5995    0.004    0.000   12.996    0.002 events.py:78(_run)
     5995    0.003    0.000   12.992    0.002 {method 'run' of '_contextvars.Context' objects}
     3127    0.010    0.000   12.916    0.004 tasks.py:215(__step)
     3072    0.002    0.000   12.893    0.004 {method 'send' of 'coroutine' objects}
        2    0.000    0.000   11.321    5.660 pipeline.py:835(arun)
        2    0.000    0.000   11

<pstats.Stats at 0x2ba2e8190>

### In Summary

The results from the above experiments are re-shared below where each strategy is listed from fastest to slowest with this example dataset and pipeline.

1. (Async, Parallel Processing): 20.3s 
2. (Async, No Parallel Processing): 20.5s
3. (Sync, Parallel Processing): 29s
4. (Sync, No Parallel Processing): 1min 11s

We can see that both cases that use Parallel Processing outperforms the Sync, No Parallel Processing (i.e., `.run(num_workers=None)`). Also, that at least for this case for Async tasks, there is little gains in using Parallel Processing. Perhaps for larger workloads and IngestionPipelines, using Async with Parallel Processing can lead to larger gains.