In [1]:
from pstats import SortKey
import nest_asyncio, asyncio
import cProfile, pstats, yaml, os
from llama_index import Document
from llama_index import SimpleDirectoryReader
from llama_index.extractors import TitleExtractor
from llama_index.embeddings import OpenAIEmbedding
from llama_index.ingestion import IngestionPipeline
from llama_index.text_splitter import SentenceSplitter
from llama_index.embeddings import HuggingFaceEmbedding

In [2]:
nest_asyncio.apply()

In [3]:
reader = SimpleDirectoryReader(input_dir="./data")
documents = reader.load_data()

In [4]:
# create the pipeline with transformations
pipeline = IngestionPipeline(
                            transformations=[
                                            SentenceSplitter(
                                                            chunk_size=1024, 
                                                            chunk_overlap=20
                                                            ),
                                            # TitleExtractor(),
                                            HuggingFaceEmbedding(
                                                                model_name="BAAI/bge-small-en-v1.5",
                                                                device="mps"
                                            ),
                                            ]
                            )

# since we'll be testing performance, using timeit and cProfile
# we're going to disable cache
pipeline.disable_cache = True

### Parallel Execution

In [5]:
%timeit pipeline.run(documents=documents, num_workers=4)
# nodes = pipeline.run(documents=documents, num_workers=4)

### Async Parallel Execution

In [None]:
loop = asyncio.get_event_loop()
%timeit loop.run_until_complete(pipeline.arun(documents=documents, num_workers=4))
# nodes = await pipeline.arun(documents=documents, num_workers=4)

### Sequential Execution

In [None]:
%timeit pipeline.run(documents=documents)
# nodes = pipeline.run(documents=documents)

### Async on Main Processor

In [None]:
%timeit loop.run_until_complete(pipeline.arun(documents=documents))
# nodes = await pipeline.arun(documents=documents)

# In Summary
The results from the above experiments are re-shared below where each strategy is listed from fastest to slowest with this example dataset and pipeline.

1. (Async, Parallel Processing)    : 20.3s
2. (Async, No Parallel Processing) : 20.5s
3. (Sync, Parallel Processing)     : 29s
4. (Sync, No Parallel Processing)  : 1min 11s

We can see that both cases that use Parallel Processing outperforms the Sync, No Parallel Processing (i.e., .run(num_workers=None)). Also, that at least for this case for Async tasks, there is little gains in using Parallel Processing. Perhaps for larger workloads and IngestionPipelines, using Async with Parallel Processing can lead to larger gains.