## 🍫 Building a RAG indexing pipeline with Fondant

> ⚠️ Please note that this notebook **is not** compatible with **Google Colab**. To complete the tutorial, you must 
> initiate Docker containers. Starting Docker containers within Google Colab is not supported.

This repository demonstrates a Fondant data pipeline that ingests text
data into a vector database. The pipeline uses four reusable Fondant components.  
Additionally, we provide a Docker Compose setup for Weaviate, enabling local testing and
development.

### Pipeline overview

The primary goal of this sample is to showcase how you can use a Fondant pipeline and reusable
components to load, chunk and embed text, as well as ingest the text embeddings to a vector
database.
Pipeline Steps:

- [Data Loading](https://github.com/ml6team/fondant/tree/main/components/load_from_parquet): The
  pipeline begins by loading text data from a Parquet file, which serves as the
  source for subsequent processing. For the minimal example we are using a dataset from Huggingface.
- [Text Chunking](https://github.com/ml6team/fondant/tree/main/components/chunk_text): Text data is
  chunked into manageable sections to prepare it for embedding. This
  step
  is crucial for performant RAG systems.
- [Text Embedding](https://github.com/ml6team/fondant/tree/main/components/embed_text): We are using
  a small HuggingFace model for the generation of text embeddings.
  The `embed_text` component easily allows the usage of different models as well.
- [Write to Weaviate](https://github.com/ml6team/fondant/tree/main/components/index_weaviate): The
  final step of the pipeline involves writing the embedded text data to
  a Weaviate database.

## Environment
### This section checks the prerequisites of your environment. Read any errors or warnings carefully. 

**Ensure a Python between version 3.8 and 3.10 is available**

In [34]:
import sys
if sys.version_info < (3, 8, 0) or sys.version_info >= (3, 11, 0):
    raise Exception(f"A Python version between 3.8 and 3.10 is required. You are running {sys.version}")

**Check if docker compose is installed and the docker daemon is running**

In [2]:
!docker compose version
!docker ps && echo "Docker running"

Docker Compose version v2.19.1
CONTAINER ID   IMAGE                                            COMMAND                  CREATED          STATUS          PORTS                    NAMES
9b2a794da86c   fndnt/data_explorer:0.10.1                       "streamlit run /app/…"   13 minutes ago   Up 13 minutes   0.0.0.0:8501->8501/tcp   explorer_app-data_explorer-1
a69dfc883937   semitechnologies/weaviate:1.20.5                 "/bin/weaviate --hos…"   3 hours ago      Up 3 hours      0.0.0.0:8081->8080/tcp   weaviate_service-weaviate-1
1dd4925470fb   semitechnologies/contextionary:en0.16.0-v1.2.1   "/contextionary-serv…"   3 hours ago      Up 3 hours                               weaviate_service-contextionary-1
Docker running


**Check if GPU is available**

In [35]:
import logging
import subprocess

try:
    subprocess.check_output('nvidia-smi')
    logging.info("Found GPU, using it!")
    number_of_accelerators = 1
    accelerator_name = "GPU"
except Exception:
    logging.warning("We recommend to run this pipeline on a GPU, but none could be found, using CPU instead")
    number_of_accelerators = None
    accelerator_name = None



**Install Fondant**

In [4]:
!pip install -r ../requirements.txt


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m23.3.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


## Implement the pipeline

First of all, we need to initialize the pipeline, which includes specifying a name for your pipeline, providing a description, and setting a base_path. The base_path is used to store the pipeline artifacts and data generated by the components

In [42]:
from pathlib import Path
from fondant.pipeline import Pipeline, Resources

BASE_PATH = "./data"
Path(BASE_PATH).mkdir(parents=True, exist_ok=True)

pipeline = Pipeline(
    name="ingestion-pipeline",  # Add a unique pipeline name to easily track your progress and data
    description="Pipeline to prepare and process data for building a RAG solution",
    base_path=BASE_PATH, # The demo pipelines uses a local directory to store the data.
)

For demonstration purposes, we will utilize a dataset available on Hugging Face. As such, we will use a reusable Fondant component `load_from_hf_hub`. Note that the `load_from_hf_hub` component does not define a fixed schema for the data it produces, which means we need to provide hits ourselves with the `produces` argument. It takes a mapping from field names to `pyarrow` types.

In [43]:
import pyarrow as pa

text = pipeline.read(
    "load_from_hf_hub",
    arguments={
        # Add arguments
        "dataset_name": "wikitext@~parquet",
        "n_rows_to_load": 100,
    },
    produces={
        "text": pa.string()
    }
)

## Implement a custom component 

You can build Fondant pipelines using reusable components from the component hub. Of course, you can implement your custom components. The easiest way to implement your custom components is to build a `lightweight_component`. You can easily implement and test the component code in a notebook and use the same code as part of your pipeline.

Here, we will implement a custom chunking component using Langchain.

Text data is chunked into manageable sections to prepare it for embedding. This step is crucial for efficient RAG systems. Langchain provides an interface to chunk text snippets efficiently. We will implement a Fondant component around the Langchain interface. Here, we are creating a custom `lightweight_component`. Check out [our documentation](https://fondant.ai/en/latest/components/lightweight_components/) for more information.

In [44]:
import pandas as pd
import typing as t 

from fondant.component import PandasTransformComponent
from fondant.pipeline import lightweight_component
import logging
import typing as t 

@lightweight_component(
    consumes={"text":pa.string()},
    produces={"text":pa.string(), "original_document_id":pa.string()},
    extra_requires=["langchain==0.0.329"]
)
class ChunkTextComponent(PandasTransformComponent):
    """Component that chunks text into smaller segments.
    More information about the different chunking strategies can be here:
      - https://python.langchain.com/docs/modules/data_connection/document_transformers/
      - https://www.pinecone.io/learn/chunking-strategies/.
    """
    

    def __init__(
        self,
        *,
        chunk_size: int,
        chunk_overlap: int,
    ):
        """
        Args:
            chunk_size: the chunk size 
            chunk_overlap: the overlap between chunks
        """
        from langchain.text_splitter import RecursiveCharacterTextSplitter
        self.chunker = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap
        )

    def chunk_text(self, row) -> t.List[t.Tuple]:
        # Multi-index df has id under the name attribute
        doc_id = row.name
        text_data = row["text"]
        docs = self.chunker.create_documents([text_data])

        return [
            (doc_id, f"{doc_id}_{chunk_id}", chunk.page_content)
            for chunk_id, chunk in enumerate(docs)
        ]

    def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame:
        import itertools
        results = dataframe.apply(
            self.chunk_text,
            axis=1,
        ).to_list()

        # Flatten results
        results = list(itertools.chain.from_iterable(results))

        # Turn into dataframes
        results_df = pd.DataFrame(
            results,
            columns=["original_document_id", "id", "text"],
        )
        results_df = results_df.set_index("id")

        return results_df


This method doesn't execute the component yet, but adds it to the execution graph of the pipeline, and returns a lazy `Dataset` instance. 
We can now add the implemented chunking component to the pipeline using `Dataset.apply()`.
Beside our custom component we start to add reusable components, `embed_text` and `index_weaviate`, from the [Fondant Hub](https://fondant.ai/en/latest/components/hub/).

In [45]:
import utils

chunks = text.apply(
    ChunkTextComponent,
    arguments={
        "chunk_size": 512, "chunk_overlap": 32
    }
)


embeddings = chunks.apply(
    "embed_text",
    arguments={
        "model_provider": "huggingface",
        "model": "all-MiniLM-L6-v2"
    },
    resources=Resources(
        accelerator_number=number_of_accelerators,
        accelerator_name=accelerator_name,
    ),
    cluster_type="local" if number_of_accelerators is not None else "default",
    cache=False
)

embeddings.write(
    "index_weaviate",
    arguments={
        "weaviate_url": f"http://{utils.get_host_ip()}:8081",
        "class_name": "index",
    },
    consumes={
        "text": pa.string(),
        "embedding": pa.list_(pa.float32()),   
    }
)

Our pipeline now looks as follows:

`read_from_hf_hub` -> `chunk_text` -> `embed_text` -> `index_weaviate`

## Running the pipeline

The pipeline will load and process text data, then ingest the processed data into a vector database. Before executing the pipeline, we need to start the Weaviate database. Otherwise the pipeline execution will fail.

To do this, we can utilize the Docker setup provided in the `weaviate` folder.

In [40]:
!docker compose -f weaviate_service/docker-compose.yaml up --detach --quiet-pull

[1A[1B[0G[?25l[+] Running 2/0
 [32m✔[0m Container weaviate_service-weaviate-1       [32mRunning[0m                     [34m0.0s [0m
 [32m✔[0m Container weaviate_service-contextionary-1  [32mRunning[0m                     [34m0.0s [0m
[?25h

Finally, we can execute our pipeline. 
Fondant provides multiple runners to run our pipeline:

- A Docker runner for local execution
- A Vertex AI runner for managed execution on Google Cloud
- A Sagemaker runner for managed execution on AWS
- A Kubeflow Pipelines runner for execution anywhere
Here we will use the DockerRunner for local execution, which utilizes docker-compose under the hood.

The runner will download the reusable components from the component hub. Afterwards, you will see the components execute one by one.

In [46]:
from fondant.pipeline.runner import DockerRunner

DockerRunner().run(pipeline)

INFO:root:Found reference to un-compiled pipeline... compiling
INFO:fondant.pipeline.compiler:Compiling ingestion-pipeline to .fondant/compose.yaml
INFO:fondant.pipeline.compiler:Base path found on local system, setting up ./data as mount volume
INFO:fondant.pipeline.pipeline:Sorting pipeline component graph topologically.
INFO:fondant.pipeline.pipeline:All pipeline component specifications match.
INFO:fondant.pipeline.compiler:Compiling service for load_from_hugging_face_hub
INFO:fondant.pipeline.compiler:Compiling service for chunktextcomponent
INFO:fondant.pipeline.compiler:Compiling service for embed_text
INFO:fondant.pipeline.compiler:Compiling service for index_weaviate
INFO:fondant.pipeline.compiler:Successfully compiled to .fondant/compose.yaml
 embed_text Pulling 
 load_from_hugging_face_hub Pulling 
 chunktextcomponent Pulling 
 index_weaviate Pulling 


Starting pipeline run...


 index_weaviate Pulled 
 load_from_hugging_face_hub Pulled 
 chunktextcomponent Pulled 
 embed_text Pulled 
 Container ingestion-pipeline-load_from_hugging_face_hub-1  Recreate
 Container ingestion-pipeline-load_from_hugging_face_hub-1  Recreated
 Container ingestion-pipeline-chunktextcomponent-1  Recreate
 Container ingestion-pipeline-chunktextcomponent-1  Recreated
 Container ingestion-pipeline-embed_text-1  Recreate
 Container ingestion-pipeline-embed_text-1  Recreated
 Container ingestion-pipeline-index_weaviate-1  Recreate
 Container ingestion-pipeline-index_weaviate-1  Recreated


Attaching to ingestion-pipeline-chunktextcomponent-1, ingestion-pipeline-embed_text-1, ingestion-pipeline-index_weaviate-1, ingestion-pipeline-load_from_hugging_face_hub-1


ingestion-pipeline-load_from_hugging_face_hub-1  | [2024-02-05 15:25:39,373 | fondant.cli | INFO] Component `LoadFromHubComponent` found in module main
ingestion-pipeline-load_from_hugging_face_hub-1  | [2024-02-05 15:25:39,380 | fondant.component.executor | INFO] Dask default local mode will be used for further executions.Our current supported options are limited to 'local' and 'default'.
ingestion-pipeline-load_from_hugging_face_hub-1  | [2024-02-05 15:25:39,389 | fondant.component.executor | INFO] Skipping component execution
ingestion-pipeline-load_from_hugging_face_hub-1  | [2024-02-05 15:25:39,392 | fondant.component.executor | INFO] Matching execution detected for component. The last execution of the component originated from `ingestion-pipeline-20240205151753`.
ingestion-pipeline-load_from_hugging_face_hub-1  | [2024-02-05 15:25:39,401 | fondant.component.executor | INFO] Saving output manifest to /data/ingestion-pipeline/ingestion-pipeline-20240205162532/load_from_hugging_face

ingestion-pipeline-load_from_hugging_face_hub-1 exited with code 0
ingestion-pipeline-chunktextcomponent-1          | Collecting langchain==0.0.329 (from -r requirements.txt (line 1))
ingestion-pipeline-chunktextcomponent-1          |   Obtaining dependency information for langchain==0.0.329 from https://files.pythonhosted.org/packages/42/4e/86204994aeb2e4ac367a7fade896b13532eae2430299052eb2c80ca35d2c/langchain-0.0.329-py3-none-any.whl.metadata
ingestion-pipeline-chunktextcomponent-1          |   Downloading langchain-0.0.329-py3-none-any.whl.metadata (16 kB)
ingestion-pipeline-chunktextcomponent-1          | Collecting SQLAlchemy<3,>=1.4 (from langchain==0.0.329->-r requirements.txt (line 1))
ingestion-pipeline-chunktextcomponent-1          |   Obtaining dependency information for SQLAlchemy<3,>=1.4 from https://files.pythonhosted.org/packages/7a/de/0ca53bf49d213bea164b0bd0187d3c94d6fea650b7679a8e41c91e3182d7/SQLAlchemy-2.0.25-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl

ingestion-pipeline-chunktextcomponent-1          | 
ingestion-pipeline-chunktextcomponent-1          | [notice] A new release of pip is available: 23.2.1 -> 24.0
ingestion-pipeline-chunktextcomponent-1          | [notice] To update, run: pip install --upgrade pip
ingestion-pipeline-chunktextcomponent-1          | 
ingestion-pipeline-chunktextcomponent-1          | [2024-02-05 15:25:59,356 | fondant.cli | INFO] Component `ChunkTextComponent` found in module main
ingestion-pipeline-chunktextcomponent-1          | [2024-02-05 15:25:59,360 | fondant.component.executor | INFO] Dask default local mode will be used for further executions.Our current supported options are limited to 'local' and 'default'.
ingestion-pipeline-chunktextcomponent-1          | [2024-02-05 15:25:59,366 | fondant.component.executor | INFO] Previous component `load_from_hugging_face_hub` run was cached. Cached pipeline id: ingestion-pipeline-20240205151753
ingestion-pipeline-chunktextcomponent-1          | [2024-02-05

ingestion-pipeline-chunktextcomponent-1 exited with code 0


ingestion-pipeline-embed_text-1                  | [2024-02-05 15:26:04,095 | fondant.cli | INFO] Component `EmbedTextComponent` found in module main
ingestion-pipeline-embed_text-1                  | [2024-02-05 15:26:04,101 | fondant.component.executor | INFO] Dask default local mode will be used for further executions.Our current supported options are limited to 'local' and 'default'.
ingestion-pipeline-embed_text-1                  | [2024-02-05 15:26:04,104 | fondant.component.executor | INFO] Caching disabled for the component
ingestion-pipeline-embed_text-1                  | [2024-02-05 15:26:04,105 | root | INFO] Executing component
ingestion-pipeline-embed_text-1                  | [2024-02-05 15:26:07,390 | sentence_transformers.SentenceTransformer | INFO] Load pretrained SentenceTransformer: all-MiniLM-L6-v2
.gitattributes:   0%|          | 0.00/1.18k [00:00<?, ?B/s]
.gitattributes: 100%|██████████| 1.18k/1.18k [00:00<00:00, 1.26MB/s]
1_Pooling/config.json:   0%|          |

[                                        ] | 0% Completed | 548.71 us
[                                        ] | 0% Completed | 106.00 ms
[                                        ] | 0% Completed | 210.30 ms
[                                        ] | 0% Completed | 313.94 ms
[                                        ] | 0% Completed | 414.18 ms
[                                        ] | 0% Completed | 514.46 ms
[                                        ] | 0% Completed | 614.76 ms
[                                        ] | 0% Completed | 715.04 ms
[                                        ] | 0% Completed | 815.32 ms
[                                        ] | 0% Completed | 915.62 ms
[                                        ] | 0% Completed | 1.02 s
[                                        ] | 0% Completed | 1.12 s
[                                        ] | 0% Completed | 1.22 s
[                                        ] | 0% Completed | 1.32 s
[                               

ingestion-pipeline-embed_text-1                  | 
ingestion-pipeline-embed_text-1                  | 
ingestion-pipeline-embed_text-1                  | 
ingestion-pipeline-embed_text-1                  | 
Batches: 100%|██████████| 1/1 [00:02<00:00,  2.18s/it][A[A[A
Batches: 100%|██████████| 1/1 [00:02<00:00,  2.18s/it]
ingestion-pipeline-embed_text-1                  | 
ingestion-pipeline-embed_text-1                  | 
Batches: 100%|██████████| 1/1 [00:02<00:00,  2.26s/it][A
Batches: 100%|██████████| 1/1 [00:02<00:00,  2.26s/it]
ingestion-pipeline-embed_text-1                  | 
ingestion-pipeline-embed_text-1                  | 
ingestion-pipeline-embed_text-1                  | 
Batches: 100%|██████████| 1/1 [00:02<00:00,  2.36s/it][A[A
Batches: 100%|██████████| 1/1 [00:02<00:00,  2.36s/it]


[################                        ] | 40% Completed | 2.42 s
[########################                ] | 60% Completed | 2.52 s


Batches: 100%|██████████| 1/1 [00:02<00:00,  2.42s/it]
Batches: 100%|██████████| 1/1 [00:02<00:00,  2.42s/it]
ingestion-pipeline-embed_text-1                  | [2024-02-05 15:26:25,883 | fondant.component.executor | INFO] Saving output manifest to /data/ingestion-pipeline/ingestion-pipeline-20240205162532/embed_text/manifest.json
ingestion-pipeline-embed_text-1                  | [2024-02-05 15:26:25,883 | fondant.component.executor | INFO] Writing cache key with manifest reference to /data/ingestion-pipeline/cache/16ad9368315b37196c96ebd4799a73f2.txt


[########################################] | 100% Completed | 2.63 s
ingestion-pipeline-embed_text-1 exited with code 0


ingestion-pipeline-index_weaviate-1              | [2024-02-05 15:26:29,587 | fondant.cli | INFO] Component `IndexWeaviateComponent` found in module main
ingestion-pipeline-index_weaviate-1              | [2024-02-05 15:26:29,601 | fondant.component.executor | INFO] Dask default local mode will be used for further executions.Our current supported options are limited to 'local' and 'default'.
ingestion-pipeline-index_weaviate-1              | [2024-02-05 15:26:29,606 | fondant.component.executor | INFO] Previous component `embed_text` is not cached. Invalidating cache for current and subsequent components
ingestion-pipeline-index_weaviate-1              | [2024-02-05 15:26:29,606 | fondant.component.executor | INFO] Caching disabled for the component
ingestion-pipeline-index_weaviate-1              | [2024-02-05 15:26:29,606 | root | INFO] Executing component
ingestion-pipeline-index_weaviate-1              |             Please consider upgrading to the latest version. See https://weavi

ingestion-pipeline-index_weaviate-1 exited with code 0
Finished pipeline run.


## Exploring the dataset

You can also explore the dataset using the fondant explorer, this enables you to visualize your output dataset at each component step. It might take a while to start the first time as it needs to download the explorer docker image first. You can browse at 
http://localhost:8501/

In [76]:
from fondant.explore import run_explorer_app

run_explorer_app(base_path=BASE_PATH)

INFO:root:Using local base path: ./data
INFO:root:This directory will be mounted to /artifacts in the container.
 data_explorer Pulling 
 c57ee5000d61 Already exists 
 be0f2e005f57 Already exists 
 1cf9e04c14ca Already exists 
 d971e6b3ab55 Already exists 
 eda4bb0752cd Already exists 
 dd9603f2fa4d Pulling fs layer 
 e5082afb5ef6 Pulling fs layer 
 4aaa5a05865b Pulling fs layer 
 c49df6b5e275 Pulling fs layer 
 4f4fb700ef54 Pulling fs layer 
 c49df6b5e275 Waiting 
 4f4fb700ef54 Waiting 
 e5082afb5ef6 Verifying Checksum 
 e5082afb5ef6 Download complete 
 dd9603f2fa4d Downloading [>                                                  ]    537kB/90.76MB
 4aaa5a05865b Downloading [>                                                  ]    537kB/380.4MB
 dd9603f2fa4d Downloading [>                                                  ]  1.078MB/90.76MB
 dd9603f2fa4d Downloading [=>                                                 ]  2.151MB/90.76MB
 4aaa5a05865b Downloading [>                        

To stop the Explore, run the cell below.

In [None]:
from fondant.explore import stop_explorer_app

stop_explorer_app()

## Clean up your environment

After your pipeline run successfully, you should clean up your environment and stop the weaviate database.

In [None]:
!docker compose -f weaviate/docker-compose.yaml down

In [None]:
stop_explorer_app()

## Scaling up
If you're happy with your dataset, it's time to scale up. Check [our documentation](https://fondant.ai/en/latest/pipeline/#compiling-and-running-a-pipeline) for more information about the available runners.

