# LLMs and RAG with DataChain

In LLM applications nowadays, the emerging standard pattern for most use-cases is to employ a pre-trained model with an API from a 3rd party provider and to augment it with a RAG context. On one hand, this means there is not much actual machine learning going on on the user's end. On the other hand naive application of "latest and greatest" models with no prompt engineering, testing and evaluation of RAG context can lead to needlessly expensive operational costs at best and dissapointingly poor performance at worst.

Therefore, even if there is no machine learning involved, there is still a lot of fine tuning we need to do and a lot of that involves large datasets (such as histories of chatbot conversations or large collections of company documents). Just like with ML training, we need to version all that data as we finetune our applications to be able to correctly evaluate the effect of any changes we apply to our models. We can experiment with the LLM choice, prompt engineering, the way we process data for our RAG context (pre-processing, embedding, ...) and so on.

In this example, we will see how we can use DataChain to create such a controlled development environment and how it can help us when we evaluate any fine-tuning of our LLM applications.

We will see how to use DataChain to version our RAG context datasets to preserve reproducibility of our fine-tuning experiments as the RAG context changes. We will also see how to use DataChain in the evaluation of fine-tuning by comparing two different text embedding models and saving (and versioning) the results with additional context.

## Processing a large collection of documents

Let's say that we have a collection of relevant documents which we want to use as context in LLM queries in our chatbot application. We will be using DataChain to create, store and version vector embeddings of our documents.

In this example we will be using papers from the [Neural Information Processing Systems](https://papers.neurips.cc/paper/) conference. 

We will proceed in the following steps:
1. [Data ingestion with DataChain](#data-ingestion) - we will use DataChain to ingest the data, taking advantage of its lazy evaluation feature to only ingest the data we need
1. [Data processing with the Unstructured Python library](#processing-the-documents-individually)
1. [Scaling the data processing with DataChain](#processing-the-documents-at-scale-using-datachain-udfs)
1. [Using Datachain to evaluate different embedding models](#evaluation)
1. [Adding extra context by combining datasets](#adding-more-context---merging-datasets)

In [30]:
from typing import Optional
from collections.abc import Iterator

from datachain.lib.dc import DataChain, C
from datachain.lib.data_model import DataModel
from datachain.lib.file import File
from datachain.sql.functions.array import cosine_distance, euclidean_distance

from unstructured.partition.pdf import partition_pdf
from unstructured.chunking.title import chunk_by_title

from unstructured.embed.huggingface import HuggingFaceEmbeddingConfig, HuggingFaceEmbeddingEncoder

### Data ingestion

We will first ingest the dataset. The data are saved on a cloud storage, so we use the `.from_storage` DataChain method. We will also use the `.filter` method to restrict ourselves only to `.pdf` files (the storage contains many other data which we do not need).

Notice that:

1. Since DataChain employs lazy evaluation, no data are actually loaded just yet (until we invoke an action such as showing or saving our DataChain)
1. The previous point also means that when we filter out all non-pdf files, DataChain doesn't actually waste time loading their content only to throw them away later. This makes DataChain a lot more scalable than tools with eager evaluation.
1. The `.from_storage` method of DataChain operates on the level of the entire bucket. This means that even if the files are stored using a complicated directory structure and potentially uploaded irregularly into this structure, we can retrieve or update our DataChain of articles with just a simple one-line command

In [31]:
dc_papers = (
    DataChain.from_storage("gs://datachain-demo/neurips")
    .filter(C.name.glob("*.pdf"))
    )

In [32]:
dc_papers.show(3)

Listing gs://datachain-demo: 269955 objects [01:10, 3844.31 objects/s]
Processed: 738 rows [00:00, 7876.43 rows/s]


Unnamed: 0_level_0,source,parent,name,version,etag,size,vtype,location,file,file,file,file,file,file,file,file,file,file
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,source,parent,name,size,version,etag,is_latest,last_modified,location,vtype
0,gs://datachain-demo,neurips/1987/file,02e74f10e0327ad868d138f2b4fdd6f0-Paper.pdf,1721047139405563,CPudi5uIqYcDEAE=,2291566,,,gs://datachain-demo,neurips/1987/file,02e74f10e0327ad868d138f2b4fdd6f0-Paper.pdf,2291566,1721047139405563,CPudi5uIqYcDEAE=,1,1970-01-01 00:00:00+00:00,,
1,gs://datachain-demo,neurips/1987/file,03afdbd66e7929b125f8597834fa83a4-Paper.pdf,1721047138865046,CJaf6pqIqYcDEAE=,1322648,,,gs://datachain-demo,neurips/1987/file,03afdbd66e7929b125f8597834fa83a4-Paper.pdf,1322648,1721047138865046,CJaf6pqIqYcDEAE=,1,1970-01-01 00:00:00+00:00,,
2,gs://datachain-demo,neurips/1987/file,072b030ba126b2f4b2374f342be9ed44-Paper.pdf,1721046993295769,CJmztdWHqYcDEAE=,1220711,,,gs://datachain-demo,neurips/1987/file,072b030ba126b2f4b2374f342be9ed44-Paper.pdf,1220711,1721046993295769,CJmztdWHqYcDEAE=,1,1970-01-01 00:00:00+00:00,,



[Limited by 3 rows]


DataChain created a record for each `pdf` file in the `neurips` directory, generating a `file` signal for each file. The file signal contains subsignals with metadata about each file, like `file.name` and `file.size`. Aggregate signals like `file` that contain multiple subsignals are called features.

You can use the `file` feature to not only get metadata about each file, but also to open and read the file as needed.

### Processing the documents individually

We now want to ingest the content of the pdf files as text, divide it into chunks and vectorize those for our RAG application. We are interested in comparing two different models for embeddings. Normally, we would also do some pre-processing and cleaning of the text before vectorization, but we will skip it here for brevity.

We will first do all this with an example of a single pdf using the `unstructured` Python library and then we will see how we can scale this up to the entire bucket with the help of DataChain.

First, we ingest and partition the pdf file and chunk it.


In [33]:
chunks = chunk_by_title(partition_pdf(filename="sample.pdf"))

Next, we vectorize each chunk using HuggingFace embedding encoders

In [34]:
# Define embedding encoder

embedding_encoder = HuggingFaceEmbeddingEncoder(
     config=HuggingFaceEmbeddingConfig()
)

embedding_encoder_alt = HuggingFaceEmbeddingEncoder(
     config=HuggingFaceEmbeddingConfig(model_name='intfloat/e5-small-v2')
)

chunks_embedded = embedding_encoder.embed_documents(chunks)
chunks_embedded_alt = embedding_encoder_alt.embed_documents(chunks)

We now have our chunks vectorized and ready for comparison (e.g. with cosine similarity). However, we are missing a few ingredients:

1. ***Scaling*** - we only processed a single pdf file and we had to manually specify its path. We need to find a way to process all our documents at scale instead and to save the results.
2. ***Saving and Versioning*** - even if we only had a single or a few PDF files we would like to use in our RAG, it is a good practice to version the outputs so that we can keep track of and fine-tune our RAG application. If we simply save the current results to a bucket and overwrite it each time the source is updated, we lose this. We could version the results manually, e.g. by adding a timestamp to the blob name, but that is not very reliable and will lead to unnecessary copies of files.

### Processing the documents at scale, using DataChain UDFs

We will now use DataChain to solve the scaling and versioning issues we outline above. We will create a DataChain user-defined function (UDF) to process all our PDF files the way we did above with a single file (without us having to manually provide file paths) and save the outputs in a Datachain.

The DataChain UDF functionality will allow us to generate additonal columns in our DataChain, iterating over each of the files listed in it.

We first need to define a DataModel class, which will define the types of our outputs. Inputs and outputs need to be specified like this when we use custom functinos in Datachain.

In [35]:
# Define the output as a Feature class
class Chunk(DataModel):
    key: str
    text: str
    embeddings: list[float]
    embeddings_alt: list[float]

In the above we define `Chunk` by specifying the names and types of new columns on the output.

We then define our processing function `pdf_chuks`:

In [36]:
# Use signatures to define input/output types (these can be Feature or regular Python types)
def pdf_chunks(file: File) -> Iterator[Chunk]:
    # Ingest the file
    with file.open() as f:
        chunks = chunk_by_title(partition_pdf(file=f))

    chunks_embedded = embedding_encoder.embed_documents(chunks)
    chunks_embedded_alt = embedding_encoder_alt.embed_documents(chunks)

    # Add new rows to DataChain
    for chunk, chunk_alt in zip(chunks_embedded, chunks_embedded_alt):
        record = {}
        record["key"] = file.name.removesuffix("-Paper.pdf")
        record["text"] = chunk.text
        record["embeddings"] = chunk.embeddings
        record["embeddings_alt"] = chunk_alt.embeddings

        yield Chunk(**record)

Here, the syntax is the same as with any other Python function, except that we specify the input and output types using type hints

```
def pdf_chunks(file: File) -> Iterator[Chunk]:
```
Here, `file` specifies that we pass all `file` columns of the original dataset on the input and `Iterator[Chunk]` specifies that we get a bunch of `Chunk` rows on the output (from a single row of the original datachain representing a single paper we will get a new dataset with multiple rows per paper, each representing a single chunk).

We then specify what each row should contain by defining the `record` dictionary and then we use `yield Chunk(**record)` to create the new rows for each input row.

In [37]:
dc_chunks_embeddings = (
    dc_papers
    .limit(20) # we limit ourselves to 20 papers here, to speed up the demo
    .gen(document=pdf_chunks)
)

dc_chunks_embeddings.save("embeddings")

Processed: 738 rows [00:00, 9790.32 rows/s]
Processed: 0 rows [00:00, ? rows/s]
Processed: 2 rows [00:06,  3.33s/ rows]
Processed: 3 rows [00:12,  4.36s/ rows]
Processed: 4 rows [00:17,  4.76s/ rows]
Processed: 5 rows [00:26,  5.95s/ rows]
Processed: 6 rows [00:32,  6.01s/ rows]
Processed: 7 rows [00:37,  5.81s/ rows]
Processed: 8 rows [00:44,  6.08s/ rows]
Processed: 9 rows [00:52,  6.61s/ rows]
Processed: 10 rows [00:58,  6.65s/ rows]
Processed: 11 rows [01:09,  7.89s/ rows]
Processed: 12 rows [01:16,  7.70s/ rows]
Processed: 13 rows [01:22,  6.98s/ rows]
Processed: 14 rows [01:28,  6.70s/ rows]
Processed: 15 rows [01:32,  6.05s/ rows]
Processed: 16 rows [01:39,  6.27s/ rows]
Processed: 17 rows [01:44,  5.91s/ rows]
Processed: 18 rows [01:49,  5.56s/ rows]
Processed: 19 rows [01:57,  6.24s/ rows]
Processed: 20 rows [02:04,  6.63s/ rows]
Download: 51.2MB [02:10, 413kB/s]
Processed: 20 rows [02:10,  6.53s/ rows]
Generated: 1346 rows [02:03, 10.86 rows/s]


<datachain.lib.dc.DataChain at 0x7f7a239eacf0>

In the cell above we apply our new `pdf_chunks` function to the DataChain `dc_papers`. We do that by using the `gen` method of DataChain with `pdf_chunks`as its parameter. 

`DataChain.gen` is used when we have a function that creates multiple rows per single row of the original datachain (like in our examples, where each paper is split into multiple chunks)

We also presisted the result by the `.save` method. This will permanently save and version the datachain as a dataset with the name `embeddings`. Whenever we call `.save("embeddings")` again, a new version of this dataset will be saved automatically, so we can recall previous versions and track changes of the dataset over time.

### Evaluation

We will now use DataChain to calculate the similarity between the two alternative embeddings of each chunk and for further evaluation we will save dataset containing the chunks that differ the most between the two embeddings.



Since we saved our dataset `embeddings`, we can now load its content to datachain by the `from_dataset` method:

In [58]:
(
    DataChain.from_dataset("embeddings")
    .mutate(
        my_new_column = C.document.embeddings * 2
    )
    .to_pandas()
    .head()
)

Unnamed: 0_level_0,document,document,document,document
Unnamed: 0_level_1,key,text,embeddings,embeddings_alt
0,02e74f10e0327ad868d138f2b4fdd6f0,573\n\nBIT - SERIAL NEURAL NETWORKS\n\nAlan F....,"[-0.071039117872715, 0.018864696845412254, 0.0...","[-0.071039117872715, 0.018864696845412254, 0.0..."
1,02e74f10e0327ad868d138f2b4fdd6f0,A bit - serial VLSI neural network is describe...,"[-0.0730624571442604, 0.03725254908204079, 0.0...","[-0.0730624571442604, 0.03725254908204079, 0.0..."
2,02e74f10e0327ad868d138f2b4fdd6f0,nique is extended to a 256 (2562 synapses) net...,"[-0.05953378230333328, 0.013371129520237446, 0...","[-0.05953378230333328, 0.013371129520237446, 0..."
3,02e74f10e0327ad868d138f2b4fdd6f0,1. INTRODUCTION The functions a synthetic neur...,"[-0.08661094307899475, 0.006553625222295523, 0...","[-0.08661094307899475, 0.006553625222295523, 0..."
4,02e74f10e0327ad868d138f2b4fdd6f0,"yield, where the network degradation is approx...","[-0.07803991436958313, 0.011682862415909767, 0...","[-0.07803991436958313, 0.011682862415909767, 0..."


In [54]:
# Calculate similarity

(
    DataChain.from_dataset("embeddings")
    .mutate(
        test = C.document.embeddings * 2,
        this_should_fail=cosine_distance([2,3,3,4,5], C("document.embeddings_alt")),
        cos_dist=cosine_distance(C("document.embeddings"), C("document.embeddings_alt")),
        eucl_dist=euclidean_distance(C("document.embeddings"), C("document.embeddings_alt")),
    )
    .to_pandas()
    .head()
    # .save("embeddings-differences")
)

Unnamed: 0_level_0,document,document,document,document
Unnamed: 0_level_1,key,text,embeddings,embeddings_alt
0,02e74f10e0327ad868d138f2b4fdd6f0,573\n\nBIT - SERIAL NEURAL NETWORKS\n\nAlan F....,"[-0.071039117872715, 0.018864696845412254, 0.0...","[-0.071039117872715, 0.018864696845412254, 0.0..."
1,02e74f10e0327ad868d138f2b4fdd6f0,A bit - serial VLSI neural network is describe...,"[-0.0730624571442604, 0.03725254908204079, 0.0...","[-0.0730624571442604, 0.03725254908204079, 0.0..."
2,02e74f10e0327ad868d138f2b4fdd6f0,nique is extended to a 256 (2562 synapses) net...,"[-0.05953378230333328, 0.013371129520237446, 0...","[-0.05953378230333328, 0.013371129520237446, 0..."
3,02e74f10e0327ad868d138f2b4fdd6f0,1. INTRODUCTION The functions a synthetic neur...,"[-0.08661094307899475, 0.006553625222295523, 0...","[-0.08661094307899475, 0.006553625222295523, 0..."
4,02e74f10e0327ad868d138f2b4fdd6f0,"yield, where the network degradation is approx...","[-0.07803991436958313, 0.011682862415909767, 0...","[-0.07803991436958313, 0.011682862415909767, 0..."


In the above, we use the `cosine_distance` and `euclidean_distance` Datachain built-in functions to calculate the similarity between the two embeddings for each chunk. To specify that we want to compare columns we use the `C` class from `datachain.lib.dc`.

We use the `mutate` method of datachain, which is a way to add new columns to an existing dataset. Finally we use the `.save` method once again.

In [17]:
DataChain.from_dataset("embeddings-differences").order_by(C.cos_dist).show(20)

Unnamed: 0_level_0,document,document,document,document
Unnamed: 0_level_1,text,embeddings,embeddings_alt,key
0,573\n\nBIT - SERIAL NEURAL NETWORKS\n\nAlan F....,"[-0.071039117872715, 0.018864696845412254, 0.0...","[-0.071039117872715, 0.018864696845412254, 0.0...",02e74f10e0327ad868d138f2b4fdd6f0
1,A bit - serial VLSI neural network is describe...,"[-0.0730624571442604, 0.03725254908204079, 0.0...","[-0.0730624571442604, 0.03725254908204079, 0.0...",02e74f10e0327ad868d138f2b4fdd6f0
2,nique is extended to a 256 (2562 synapses) net...,"[-0.05953378230333328, 0.013371129520237446, 0...","[-0.05953378230333328, 0.013371129520237446, 0...",02e74f10e0327ad868d138f2b4fdd6f0
3,1. INTRODUCTION The functions a synthetic neur...,"[-0.08661094307899475, 0.006553625222295523, 0...","[-0.08661094307899475, 0.006553625222295523, 0...",02e74f10e0327ad868d138f2b4fdd6f0
4,"yield, where the network degradation is approx...","[-0.07803991436958313, 0.011682862415909767, 0...","[-0.07803991436958313, 0.011682862415909767, 0...",02e74f10e0327ad868d138f2b4fdd6f0
5,states and represents the learned information ...,"[-0.09751544147729874, 0.034578703343868256, 0...","[-0.09751544147729874, 0.034578703343868256, 0...",02e74f10e0327ad868d138f2b4fdd6f0
6,XI\n\n••••\n\ni-n-l 0 ~ ii J-O\n\n(2)\n\nwhere...,"[-0.12508268654346466, 0.04674600064754486, 0....","[-0.12508268654346466, 0.04674600064754486, 0....",02e74f10e0327ad868d138f2b4fdd6f0
7,a large number of interconnects. The challenge...,"[-0.07087741047143936, 0.02634391188621521, 0....","[-0.07087741047143936, 0.02634391188621521, 0....",02e74f10e0327ad868d138f2b4fdd6f0
8,2. DESIGNING A NEURAL NETWORK IN VLSI There ar...,"[-0.07934948056936264, 0.04542921110987663, 0....","[-0.07934948056936264, 0.04542921110987663, 0....",02e74f10e0327ad868d138f2b4fdd6f0
9,therefore possible without recourse to unusual...,"[-0.09430072456598282, 0.029243627563118935, -...","[-0.09430072456598282, 0.029243627563118935, -...",02e74f10e0327ad868d138f2b4fdd6f0



[Limited by 20 rows]


We have now solved our scalability issues. When using `DataChain` locally, our computation will still be restricted to a simgle machine but for larger datasets you can use the SaaS version of DataChain available through our DVC Studio which comes with automatic computation cluster management, a graphical user interface and additional ML and data versioning features.

We have also solved our versioning needs and we can track the differences between embeddings over time and use that to choose the best embedding for our use-case.

### Adding more context - merging datasets

In our example bucket, we have not only the `pdf` files themselves but also additinal metadata stored as JSON files. We will now see how we can use Datachain to add the information about authors and the paper title to our `embeddings-differences` dataset which can help us with our evaluation.

In [49]:
import json
meta = json.load(open("sample.json"))
meta["authors"]

[{'given_name': 'Alan', 'family_name': 'Murray', 'institution': None},
 {'given_name': 'Anthony', 'family_name': 'Smith', 'institution': None},
 {'given_name': 'Zoe', 'family_name': 'Butler', 'institution': None}]

As we can see, the metadata contains information about the authors of the paper as a list of dictionaries with each author's name ane institution. Some values can be also be empty. Just as above we create a `DataModel` class to specify the outputs, keeping the name of the file as a key which we will use to join with the previous dataset.

Then we create a function to parse all this information and create a new dataset. We will now only create a single row per original dataset, so we use `return` instead of yield (and there is no need for the `Iterator` class)

In [52]:
# Define the output as a Feature class
class Metadata(DataModel):
    key: str
    title: str
    authors: list[dict[str, Optional[str]]]


# Use signatures to define input/output types (these can be Feature or regular Python types)
def extract_metadata(file: File) -> Metadata:
    import json
    # Ingest the file
    metadata = json.loads(file.get_value())

    record = dict()
    record["filename"] = file.name.removesuffix("Metadata.json")+"Paper.pdf"
    record["title"] = metadata["title"]
    record["authors"] = metadata["authors"]

    return Metadata(**record)

We now apply the `extract_metadata` function as we did with `pdf_chunks` above, except that we use the `.map` method of DataChain which is employed when there is a 1:1 correspondence between the number of rows of the orignal and the new dataset.

In [53]:
dc_meta = (
    DataChain.from_storage("gs://datachain-demo/neurips")
    .filter(C.name.glob("*Metadata.json"))
    )

(
    dc_meta.
    map(document=extract_metadata)
    .select(
        "document.key",
        "document.title",
        "document.authors",
    )
    .show(3)
)

Processed: 738 rows [00:00, 19701.34 rows/s]
Download: 0.00B [00:00, ?B/s]Traceback (most recent call last):
  File "/home/tibor/Repos/datachain/.venv/lib64/python3.12/site-packages/datachain/lib/udf.py", line 109, in process
    return self._func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/tmp/ipykernel_23662/3412983646.py", line 19, in extract_metadata
    return Metadata(**record)
           ^^^^^^^^^^^^^^^^^^
  File "/home/tibor/Repos/datachain/.venv/lib64/python3.12/site-packages/pydantic/main.py", line 193, in __init__
    self.__pydantic_validator__.validate_python(data, self_instance=self)
pydantic_core._pydantic_core.ValidationError: 1 validation error for Metadata
key
  Field required [type=missing, input_value={'filename': '02e74f10e03..., 'institution': None}]}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.8/v/missing
Download: 30.1kB [00:00, 15.9MB/s]




DataChainError: Error in user code in class 'Mapper': 1 validation error for Metadata
key
  Field required [type=missing, input_value={'filename': '02e74f10e03..., 'institution': None}]}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.8/v/missing

Finally we will merge the two datasets using the DataChain `.merge` method.

In [None]:
DataChain.from_dataset("embeddings-differences").merge(dc_meta, on="key").show(20)