# Brief

This guide aims to help you get started with generating embeddings for your own data. 

## Quick refresher

Embeddings the what and why

**The What ?**
  - Embeddings are a way to represent unstructured data of different modalities text, audio or image data as vectors in a high-dimensional space. 
  - The distance between the embedding of a query and the embedding of a data point is small if the data point is relevant to the query.
  - A search index makes use of approximate nearest neighbor search to retrieve the most relevant data for a given query.

**The Why ?**
  - Semantic search
    -  [gong.io](https://www.gong.io/) for [searching sales conversations](https://www.pinecone.io/customers/gong/).
  - QA using Retrieval augmented generation
    - Notion AI's [Question Answering functionality](https://www.notion.so/help/notion-ai-security-practices)


## Goal of this guide
The goal of this guide is to show how to **generate embeddings at scale using Ray and Pinecone**.

More specifically, we will cover how to:
- Build a production-ready embeddings pipeline.
- Use Ray Data to easily scale the generation of embeddings.
- Assess Ray Data's pipeline's performance.
- Upsert embeddings at scale with pinecone.
- Query a search index with pinecone.

## The road ahead

Here is our roadmap for this guide:

<div class="alert alert-block alert-info">

1. Setup
2. Embeddings pipeline overview
3. Simplest possible embedding pipeline
4. Simple pipeline for a real use-case
5. Migrating the simple pipeline to Ray Data
6. Scaling the pipeline with Ray Data
7. Upserting embeddings to Pinecone
8. Querying Pinecone

</div>

## Setup

### Imports

In [1]:
import os
import json
import shutil
from pathlib import Path

import numpy as np
import pandas as pd
import joblib
import psutil
import ray
import torch
from pinecone.grpc import PineconeGRPC as Pinecone
from pinecone import ServerlessSpec, PodSpec
from bs4 import BeautifulSoup
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer

### Constants

In [2]:
DATA_DIR = Path("/mnt/cluster_storage/")
shutil.copytree(Path("../data/"), DATA_DIR, dirs_exist_ok=True)

PosixPath('/mnt/cluster_storage')

## Embeddings pipeline overview

What are the steps involved in generating embeddings? In the most common case for text data, the steps are as follows:

1. Load documents
2. Process documents into chunks
   1. Process documents into chunks
   2. Optionally persist chunks
3. Generate embeddings from chunks
   1. Generate embeddings from chunks
   2. Optionally persist embeddings
4. Upsert embeddings into a database

## Simplest possible pipeline

Let's start with the simplest implementation of these steps. As we go along, we will replace the simple implementation with real components.

### 1. Load documents

In [3]:
dataset = ["this is a document", "this is another document"]

### 2. Process documents into chunks

In our case, we will chunk our documents into words - the simplest chunk.

In [4]:
def chunk_fn(doc):
    return doc.split(" ")

chunks = []
for doc in dataset:
    chunks.extend(chunk_fn(doc))
chunks

['this', 'is', 'a', 'document', 'this', 'is', 'another', 'document']

### 3. Generate embeddings from chunks

To keep it very simple, our embedding model is a lookup function.

In [5]:
word_to_vec = {
    "this": [0.1, 0.2],
    "is": [0.3, 0.4],
    "a": [0.5, 0.6],
    "document": [0.7, 0.8],
    "another": [0.9, 1.0],
}
word_to_vec["<UNK>"] = [0.0, 0.0]


def embed_model(word):
    return word_to_vec.get(word, word_to_vec["<UNK>"])

In [6]:
embeddings = [embed_model(chunk) for chunk in chunks]
embeddings

[[0.1, 0.2],
 [0.3, 0.4],
 [0.5, 0.6],
 [0.7, 0.8],
 [0.1, 0.2],
 [0.3, 0.4],
 [0.9, 1.0],
 [0.7, 0.8]]

#### 3b. Persist embeddings to disk

We will use a simple json file to persist the embeddings.

In [7]:
dest_dir = DATA_DIR / "simplest_pipeline"

dest_dir.mkdir(exist_ok=True, parents=True)
with open(dest_dir / "embeddings.json", "w") as f:
    json.dump(embeddings, f)

We confirm the data has been saved to the correct location by listing the contents of the directory:

In [8]:
!ls -llh {dest_dir}

total 4.0K
-rw-r--r-- 1 ray users 96 Mar  9 11:48 embeddings.json


### 4. Upsert embeddings to vector store

The final step is to upsert the embeddings into a database. We will skip this step for now.

## Simple pipeline for a real use-case

Let's now assume we want to "embed the ray documentation website". 

We will circle back and start with a small sample dataset taken from the ray documentation. 

To visualize our pipeline, see the diagram below:

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/rag-bootcamp-mar-2024/simple_embeddings_pipeline.svg" width="800px">

### 1. Load documents

First step, we load the data using `pandas`.

In [9]:
df = pd.read_json(DATA_DIR / "small_sample" / "sample-input.json", lines=True)

We have a dataset of 4 documents fetched from online content and stored as objects in a json file.

Here are some of the notable columns:
- `text` column which contains the text of the document that we want to embed.
- `section_url` column which contains the section under which the document is found.
- `page_url` column which contains the page under which the document is found.

In [10]:
df

Unnamed: 0,text,section_url,page_url
0,\n\n\nConfiguring Environments#\n\n\nYou can p...,https://docs.ray.io/en/master/rllib-env.html#c...,https://docs.ray.io/en/master/rllib-env.html
1,\n\nGymnasium#\n\n\nRLlib uses Gymnasium as it...,https://docs.ray.io/en/master/rllib-env.html#g...,https://docs.ray.io/en/master/rllib-env.html
2,\n\nPerformance#\n\n\n\nTip\nAlso check out th...,https://docs.ray.io/en/master/rllib-env.html#p...,https://docs.ray.io/en/master/rllib-env.html
3,\n\nExpensive Environments#\n\n\nSome environm...,https://docs.ray.io/en/master/rllib-env.html#e...,https://docs.ray.io/en/master/rllib-env.html


<div class="alert alert-block alert-secondary">

**Considerations for scaling the pipeline:**
- Memory: We currently load the entire file into memory. This is not a problem for small files, but can be a problem for large files.
- Latency: Reading the file from disk is slow. We can speed this up by using a faster disk, but we can also speed this up by splitting the file into smaller files and reading them in parallel (more on this later).

</div>

### 2. Process documents into chunks

We will use langchain's `RecursiveCharacterTextSplitter` to split the text into chunks. 

It works by first splitting on paragraphs, then sentences, then words, then characters. It is a recursive algorithm that will stop once the chunk size is satisfied.

Let's try it out on a sampe document.

In [11]:
text = """
This is the first part. Estimate me like 12 words long.

This is the second part. Estimate me like 12 words long.

This is the third part. Estimate me like 12 words long.
"""

splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", " ", ""],  # The default separators used by the splitter
    chunk_size=24,
    chunk_overlap=0,
    length_function=lambda x: len(x.split(" ")),
)
splitter.split_text(text)

['This is the first part. Estimate me like 12 words long.\n\nThis is the second part. Estimate me like 12 words long.',
 'This is the third part. Estimate me like 12 words long.']

If we change the paragraphs, the chunk contents will change

In [12]:
text = """
This is the first part. Estimate me like 12 words long.

This is the second part. Estimate me like 12 words long.
This is the third part. Estimate me like 12 words long.
"""

splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", " ", ""],  # The default separators used by the splitter
    chunk_size=24,
    chunk_overlap=0,
    length_function=lambda x: len(x.split(" ")),
)
splitter.split_text(text)

['This is the first part. Estimate me like 12 words long.',
 'This is the second part. Estimate me like 12 words long.\nThis is the third part. Estimate me like 12 words long.']

We now proceed to:

1. Configure the `RecursiveCharacterTextSplitter`
2. Run it over all the documents in the dataset

In [13]:
chunk_size = 128  #  Chunk size is usually specified in tokens
words_to_tokens = 1.2  # Heuristic for converting tokens to words
chunk_size_in_words = int(chunk_size // words_to_tokens)


splitter = RecursiveCharacterTextSplitter(
    chunk_size=chunk_size_in_words,
    length_function=lambda x: len(x.split()),
    chunk_overlap=0,
)

chunks = []
for idx, row in df.iterrows():
    for chunk in splitter.split_text(row["text"]):
        chunks.append(
            {
                "text": chunk,
                "section_url": row["section_url"],
                "page_url": row["page_url"],
            }
        )

<div class="alert alert-block alert-secondary">

**Considerations for choosing the chunk size**

  - We want the chunks small enough to:
    - Fit into the context window of our chosen embedding model
    - Be semantically coherent - i.e. concentrate on ideally a single topic
  - We want the chunks large enough to:
    - Contain enough information to be semantically meaningful.
    - Avoid creating too many embeddings which can be expensive to store and query.

</div>

Let's inspect the chunks produced for the first document.

In [14]:
first_document = df["text"].iloc[0]
print("first document is", len(first_document.split()), "words")

first document is 327 words


In [15]:
for k, v in chunks[0].items():
    if k == "text":
        print("first chunk of first document is", len(v.split()), "words")
    else:
        print(k, v)

first chunk of first document is 104 words
section_url https://docs.ray.io/en/master/rllib-env.html#configuring-environments
page_url https://docs.ray.io/en/master/rllib-env.html


In [16]:
for k, v in chunks[1].items():
    if k == "text":
        print("second chunk of first document is", len(v.split()), "words")
    else:
        print(k, v)

second chunk of first document is 83 words
section_url https://docs.ray.io/en/master/rllib-env.html#configuring-environments
page_url https://docs.ray.io/en/master/rllib-env.html


<div class="alert alert-block alert-secondary">

**Considerations for switching to a recursive chunker:**

- CPU: Recursive chunking is a CPU-intensive task which is being done in serial, iterating over every row. 
- Latency: Recursive chunking is slower than naive text splitting and is a blocking operation. We need to wait for the chunking to finish before we can start embedding.

</div>

### 3. Generate embeddings from chunks

For our third step, we want to load a good embedding model. 

**Suggested steps to choosing an embedding model:**
1. Visit the [MTEB leaderboard](https://huggingface.co/spaces/mteb/leaderboard) on HuggingFace.
2. Find a model that satisfies the following considerations:
  - Does the model perform well overall and in the task you are interested in?
  - Is the model closed-source or open-source?
    - If it is closed-source:
      - What are the costs, security, and privacy implications?
    - If it is open-source:
      - What are its resource requirements if you want to self-host it?
      - Is it readily available as a service by third-party providers like Anyscale, Fireworks, or Togther AI?

We will use `thenlper/gte-large` model from the [HuggingFace Model Hub](https://huggingface.co/thenlper/gte-large) given it is an open-source model and is available as a service by Anyscale and performs relatively well in the MTEB leaderboard.

In [17]:
svmem = psutil.virtual_memory()

# memory used in GB
memory_used = svmem.total - svmem.available
memory_used_gb_before_model_load = memory_used / (1024**3)
memory_used_gb_before_model_load

4.6450653076171875

In [18]:
%%time
model = SentenceTransformer('thenlper/gte-large', device='cpu')

.gitattributes:   0%|          | 0.00/1.52k [00:00<?, ?B/s]

1_Pooling/config.json:   0%|          | 0.00/191 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/67.9k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/619 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/670M [00:00<?, ?B/s]

onnx/config.json:   0%|          | 0.00/632 [00:00<?, ?B/s]

model.onnx:   0%|          | 0.00/1.34G [00:00<?, ?B/s]

onnx/special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

onnx/tokenizer.json:   0%|          | 0.00/712k [00:00<?, ?B/s]

onnx/tokenizer_config.json:   0%|          | 0.00/342 [00:00<?, ?B/s]

onnx/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/670M [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/57.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/712k [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/342 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

modules.json:   0%|          | 0.00/385 [00:00<?, ?B/s]

CPU times: user 2.66 s, sys: 6.69 s, total: 9.35 s
Wall time: 16.4 s


In [19]:
svmem = psutil.virtual_memory()
memory_used = svmem.total - svmem.available
memory_used_gb_after_model_load = memory_used / (1024**3)
memory_used_gb_after_model_load

5.906482696533203

In [20]:
model_memory_usage = memory_used_gb_after_model_load - memory_used_gb_before_model_load
model_memory_usage

1.2614173889160156

Loading the embedding model took around 1 GB of memory.

Let's see how slow it is to generate an embedding.

In [21]:
%%time

embeddings = model.encode([chunk["text"] for chunk in chunks])

CPU times: user 30.7 s, sys: 3.77 s, total: 34.5 s
Wall time: 4.54 s


In [22]:
len(chunks)

8

It takes on the order of a few seconds to embed 8 chunks on our CPU. We will most definitely need a GPU to speed things up.

#### Save embeddings to disk

As a fourth step, we want to store our generated embeddings as a parquet file.

In [23]:
df_output = pd.DataFrame(chunks)

In [24]:
df_output["embeddings"] = embeddings.tolist()

In [25]:
df_output

Unnamed: 0,text,section_url,page_url,embeddings
0,Configuring Environments#\n\n\nYou can pass ei...,https://docs.ray.io/en/master/rllib-env.html#c...,https://docs.ray.io/en/master/rllib-env.html,"[0.014681331813335419, 0.01771390251815319, -0..."
1,You can also register a custom env creator fun...,https://docs.ray.io/en/master/rllib-env.html#c...,https://docs.ray.io/en/master/rllib-env.html,"[-0.006544388365000486, 0.0010215046349912882,..."
2,"In the above example, note that the env_creato...",https://docs.ray.io/en/master/rllib-env.html#c...,https://docs.ray.io/en/master/rllib-env.html,"[0.0012164587387815118, 0.02453259937465191, -..."
3,"Tip\nWhen using logging in an environment, the...",https://docs.ray.io/en/master/rllib-env.html#c...,https://docs.ray.io/en/master/rllib-env.html,"[0.004050213377922773, -0.007204730529338121, ..."
4,Gymnasium#\n\n\nRLlib uses Gymnasium as its en...,https://docs.ray.io/en/master/rllib-env.html#g...,https://docs.ray.io/en/master/rllib-env.html,"[0.0061655230820178986, 0.01083178911358118, -..."
5,Performance#\n\n\n\nTip\nAlso check out the sc...,https://docs.ray.io/en/master/rllib-env.html#p...,https://docs.ray.io/en/master/rllib-env.html,"[0.006059643812477589, 0.03671540692448616, -0..."
6,Distribute across multiple processes: You can ...,https://docs.ray.io/en/master/rllib-env.html#p...,https://docs.ray.io/en/master/rllib-env.html,"[0.004182981792837381, 0.017631979659199715, -..."
7,Expensive Environments#\n\n\nSome environments...,https://docs.ray.io/en/master/rllib-env.html#e...,https://docs.ray.io/en/master/rllib-env.html,"[0.006403152830898762, 0.010620699264109135, -..."


In [26]:
df_output.to_parquet(DATA_DIR / "small_sample" / "sample-output.parquet")

### 4. Upsert embeddings to vector store

The final step is to upsert the embeddings into a database. We will skip this step for now.

## Migrating the simple pipeline to Ray Data

We now want to migrate our implementation to use Ray Data to drastically scale our pipeline for larger datasets.

### 1. Load documents

Let's start with a first pass conversion of our data pipeline to use Ray Data. 

Instead of `pandas.read_json`, use `ray.data.read_json` to instantiate a `ray.data.Dataset` that will eventually read our file.

In [27]:
ds = ray.data.read_json(DATA_DIR / "small_sample" / "sample-input.json")
type(ds)

2024-03-09 11:54:20,955	INFO worker.py:1569 -- Connecting to existing Ray cluster at address: 10.0.49.91:6379...
2024-03-09 11:54:20,963	INFO worker.py:1744 -- Connected to Ray cluster. View the dashboard at [1m[32mhttps://session-1tuhiybbbivknukaewhmrj5ayw.i.anyscaleuserdata.com [39m[22m
2024-03-09 11:54:20,965	INFO packaging.py:358 -- Pushing file package 'gcs://_ray_pkg_10abfe4d59a072895d94ace3376c67df84c5cc84.zip' (0.09MiB) to Ray cluster...
2024-03-09 11:54:20,966	INFO packaging.py:371 -- Successfully pushed file package 'gcs://_ray_pkg_10abfe4d59a072895d94ace3376c67df84c5cc84.zip'.


ray.data.dataset.Dataset

`ray.data.read_json` returns a `ray.data.Dataset` which is a distributed collection of data. Execution in Ray Data by default is:
- **Lazy**: `Dataset` transformations aren’t executed until you call a consumption operation.
- **Streaming**: `Dataset` transformations are executed in a streaming way, incrementally on the base data, one block at a time.

Accordingly `ray.data.Dataset` will only fetch back some high-level metadata and schema information about the file, but not the actual data.

In [28]:
ds.schema()

Column       Type
------       ----
text         string
section_url  string
page_url     string

### Under the hood

Ray Data uses Ray tasks to read files in parallel. Each read task reads one or more files and produces one or more output blocks.

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/rag-bootcamp-mar-2024/dataset-read-cropped-2.svg" width="500px">

### 2. Process documents into chunks

Given a `ray.data.Dataset`, we can apply transformations to it. There are two types of transformations:
1. **row-wise transformations**
  - `map`: a 1-to-1 function that is applied to each row in the dataset.
  - `filter`: a 1-to-1 function that is applied to each row in the dataset and filters out rows that don’t satisfy the condition.
  - `flat_map`: a 1-to-many function that is applied to each row in the dataset and then flattens the results into a single dataset.
2. **batch-wise transformations**
  - `map_batches`: a 1-to-n function that is applied to each batch in the dataset.


We chose to make use of `flat_map` to generate a list of chunk rows. `flat_map` will create `FlatMap` tasks which will be scheduled in parallel to process as many rows as possible at once.

In [30]:
def chunk_row(row):
    chunk_size = 128
    words_to_tokens = 1.2
    num_tokens = int(chunk_size // words_to_tokens)

    def get_num_words(text):
        return len(text.split())

    splitter = RecursiveCharacterTextSplitter(
        chunk_size=num_tokens,
        keep_separator=True, 
        length_function=get_num_words, 
        chunk_overlap=0,
    )

    chunks = []
    for chunk in splitter.split_text(row["text"]):
        chunks.append(
            {
                "text": chunk,
                "section_url": row["section_url"],
                "page_url": row["page_url"],
            }
        )
    return chunks

ds = ds.flat_map(chunk_row)

To verify our `flat_map` is working, we can consume a limited number of rows from the dataset.

To do so, we an either call
- `take` to specify a limited number of rows from the dataset.
- `take_batch` to specify a limited number of batches from the dataset.

Here we call `take(2)` to return 2 rows.

In [31]:
ds.take(2)

2024-03-09 12:01:58,465	INFO dataset.py:2380 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-03-09 12:01:58,471	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=64 for operator ReadJSON to satisfy parallelism at least twice the available number of CPUs (32).
2024-03-09 12:01:58,471	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 64, each read task output is split into 64 smaller blocks.
2024-03-09 12:01:58,473	INFO streaming_executor.py:110 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> TaskPoolMapOperator[FlatMap(chunk_row)->FlatMap(chunk_row)] -> LimitOperator[limit=2]
2024-03-09 12:01:58,474	INFO streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, prese

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[{'text': 'Configuring Environments#\n\n\nYou can pass either a string name or a Python class to specify an environment. By default, strings will be interpreted as a gym environment name.\nCustom env classes passed directly to the algorithm must take a single env_config parameter in their constructor:\n\n\nimport gymnasium as gym\nimport ray\nfrom ray.rllib.algorithms import ppo\n\nclass MyEnv(gym.Env):\n    def __init__(self, env_config):\n        self.action_space = <gym.Space>\n        self.observation_space = <gym.Space>\n    def reset(self, seed, options):\n        return <obs>, <info>\n    def step(self, action):\n        return <obs>, <reward: float>, <terminated: bool>, <truncated: bool>, <info: dict>\n\nray.init()\nalgo = ppo.PPO(env=MyEnv, config={\n    "env_config": {},  # config to pass to env class\n})\n\nwhile True:\n    print(algo.train())',
  'section_url': 'https://docs.ray.io/en/master/rllib-env.html#configuring-environments',
  'page_url': 'https://docs.ray.io/en/mas

### 3. Generate embeddings from chunks

For our third step, we apply the embeddings using `map_batches`, which will be implemented using `MapBatches` tasks scheduled in parallel.

In [32]:
def embed_batch(batch):
    assert isinstance(batch, dict)
    for key in batch.keys():
        assert key in ["text", "section_url", "page_url"]
    for val in batch.values():
        assert isinstance(val, np.ndarray), type(val)

    model = SentenceTransformer('thenlper/gte-large')
    text = batch["text"].tolist()
    embeddings = model.encode(text, batch_size=len(text))
    batch["embeddings"] = embeddings.tolist()
    return batch

ds = ds.map_batches(embed_batch)

#### Save embeddings to disk

For our fourth step, we write our dataset to JSON using `write_json`.

In [33]:
%%time

output_path = DATA_DIR / "small_sample" / "sample-output"
if output_path.exists():
    shutil.rmtree(output_path)

ds.write_json(output_path)

2024-03-09 12:02:07,191	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=64 for operator ReadJSON to satisfy parallelism at least twice the available number of CPUs (32).
2024-03-09 12:02:07,191	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 64, each read task output is split into 64 smaller blocks.
2024-03-09 12:02:07,192	INFO streaming_executor.py:110 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> TaskPoolMapOperator[FlatMap(chunk_row)->FlatMap(chunk_row)->MapBatches(embed_batch)->Write]
2024-03-09 12:02:07,193	INFO streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-03-09 12:02:07,194	INFO streaming_executor.py:113 -- Tip: For detailed 

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[36m(FlatMap(chunk_row)->FlatMap(chunk_row)->MapBatches(embed_batch)->Write pid=17706)[0m Skipped writing empty block to /mnt/cluster_storage/small_sample/sample-output
[36m(FlatMap(chunk_row)->FlatMap(chunk_row)->MapBatches(embed_batch)->Write pid=17706)[0m Skipped writing empty block to /mnt/cluster_storage/small_sample/sample-output
.gitattributes: 100%|██████████| 1.52k/1.52k [00:00<00:00, 16.3MB/s])->Write pid=7173, ip=10.0.12.185)[0m 
1_Pooling/config.json: 100%|██████████| 191/191 [00:00<00:00, 2.64MB/s]Write pid=7173, ip=10.0.12.185)[0m 
README.md: 100%|██████████| 67.9k/67.9k [00:00<00:00, 47.6MB/s]batch)->Write pid=7173, ip=10.0.12.185)[0m 
config.json: 100%|██████████| 619/619 [00:00<00:00, 8.19MB/s]d_batch)->Write pid=7173, ip=10.0.12.185)[0m 
model.safetensors:   0%|          | 0.00/670M [00:00<?, ?B/s]d_batch)->Write pid=7246, ip=10.0.12.185)[0m 
model.safetensors:   3%|▎         | 21.0M/670M [00:00<00:03, 175MB/s]->Write pid=7246, ip=10.0.12.185)[0m 
model.safe

CPU times: user 623 ms, sys: 237 ms, total: 859 ms
Wall time: 28 s


We inspect the created JSON output directory. Every write task will create a separate file in the output directory.

In [34]:
!ls -llah {output_path} 

total 132K
drwxr-xr-x 2 ray users 6.0K Mar  9 12:02 .
drwxr-xr-x 3 ray users 6.0K Mar  9 12:02 ..
-rw-r--r-- 1 ray users  57K Mar  9 12:02 4_000000_000000.json
-rw-r--r-- 1 ray users  14K Mar  9 12:02 4_000001_000000.json
-rw-r--r-- 1 ray users  29K Mar  9 12:02 4_000002_000000.json
-rw-r--r-- 1 ray users  14K Mar  9 12:02 4_000003_000000.json


In [36]:
ray.data.read_json(DATA_DIR / "small_sample" / "sample-output").to_pandas()

Read progress 0:   0%|          | 0/4 [00:00<?, ?it/s]

Read progress 0:   0%|          | 0/4 [00:00<?, ?it/s]

  return transform_pyarrow.concat(tables)


Unnamed: 0,text,section_url,page_url,embeddings
0,Configuring Environments#\n\n\nYou can pass ei...,https://docs.ray.io/en/master/rllib-env.html#c...,https://docs.ray.io/en/master/rllib-env.html,"[0.0146813318, 0.0177139025, -0.0276165493, -0..."
1,You can also register a custom env creator fun...,https://docs.ray.io/en/master/rllib-env.html#c...,https://docs.ray.io/en/master/rllib-env.html,"[-0.0065443884, 0.0010215046, -0.0364210084, -..."
2,"In the above example, note that the env_creato...",https://docs.ray.io/en/master/rllib-env.html#c...,https://docs.ray.io/en/master/rllib-env.html,"[0.0012164587, 0.0245325994, -0.050914079, -0...."
3,"Tip\nWhen using logging in an environment, the...",https://docs.ray.io/en/master/rllib-env.html#c...,https://docs.ray.io/en/master/rllib-env.html,"[0.0040502134, -0.0072047305, -0.0348574072, -..."
4,Gymnasium#\n\n\nRLlib uses Gymnasium as its en...,https://docs.ray.io/en/master/rllib-env.html#g...,https://docs.ray.io/en/master/rllib-env.html,"[0.0061655231, 0.0108317891, -0.0290550683, -0..."
5,Performance#\n\n\n\nTip\nAlso check out the sc...,https://docs.ray.io/en/master/rllib-env.html#p...,https://docs.ray.io/en/master/rllib-env.html,"[0.0060596438, 0.0367154144, -0.037084613, -0...."
6,Distribute across multiple processes: You can ...,https://docs.ray.io/en/master/rllib-env.html#p...,https://docs.ray.io/en/master/rllib-env.html,"[0.0041829948, 0.0176319741, -0.0411699414, -0..."
7,Expensive Environments#\n\n\nSome environments...,https://docs.ray.io/en/master/rllib-env.html#e...,https://docs.ray.io/en/master/rllib-env.html,"[0.0064031528, 0.0106206993, -0.024834536, -0...."


### 4. Upsert embeddings to vector store

The final step is to upsert the embeddings into a database. We will skip this step for now.

**Recap**

Here is our entire pipeline:

```python
(
    ray.data.read_json(DATA_DIR / "small_sample" / "sample-input.json")
    .flat_map(chunk_row)
    .map_batches(embed_batch)
    .write_json(DATA_DIR / "small_sample" / "sample-output")
)
```

<div class="alert alert-block alert-info">

### Activity: Use a different embedding model

Re-run the entire data pipeline but this time use a different embedding model `BAAI/bge-large-en-v1.5` which outperforms `thenlper/gte-large` on certain parts of the MTEB leaderboard.

NOTE: make sure to output the results to a different directory.

<details> 

<summary>Click here to see the solution </summary>

```python
def embed_batch(batch):
    # Load the embedding model
    model = SentenceTransformer("BAAI/bge-large-en-v1.5")
    text = batch["text"].tolist()
    embeddings = model.encode(text, batch_size=len(text))
    batch["embeddings"] = embeddings.tolist()
    return batch

(
    ray.data.read_json(DATA_DIR / "small_sample" / "sample-input.json")
    .flat_map(chunk_row)
    .map_batches(embed_batch)
    .write_json(DATA_DIR / "small_sample" / "sample-output-bge")
)

# inspect output
ray.data.read_json(DATA_DIR / "small_sample" / "sample-output-bge").to_pandas()
```

</details>
</div>

In [46]:
def chunk_row(row):
    chunk_size = 128
    words_to_tokens = 1.2
    num_tokens = int(chunk_size // words_to_tokens)

    def get_num_words(text):
        return len(text.split())

    splitter = RecursiveCharacterTextSplitter(
        chunk_size=num_tokens,
        keep_separator=True, 
        length_function=get_num_words, 
        chunk_overlap=0,
    )

    chunks = []
    for chunk in splitter.split_text(row["text"]):
        chunks.append(
            {
                "text": chunk,
                "section_url": row["section_url"],
                "page_url": row["page_url"],
            }
        )
    return chunks

ds = ds.flat_map(chunk_row)



def embed_batch(batch):
    assert isinstance(batch, dict)
    for key in batch.keys():
        assert key in ["text", "section_url", "page_url"]
    for val in batch.values():
        assert isinstance(val, np.ndarray), type(val)

    model = SentenceTransformer('BAAI/bge-large-en-v1.5')
    text = batch["text"].tolist()
    embeddings = model.encode(text, batch_size=len(text))
    batch["embeddings"] = embeddings.tolist()
    return batch

ds = ds.map_batches(embed_batch)

## Scaling the pipeline with Ray Data

Let's explore how to scale our pipeline to a larger dataset using Ray Data.

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/rag-bootcamp-mar-2024/full_scale_embeddings_pipeline.svg" width="1000px">



### Phase 1: Preparing input files

First, we need to prepare our documents by performing the following steps
1. Fetch all the Ray documentation from the web.
2. Parse the web pages to extract the text.
3. Store the text into input files that are ideal for Ray Data.

Consider converting to parquet files which allow for pruning to improve reads:
- When working with column-oriented file formats like parquet, specify which columns you want to read. This might help significantly reduce the memory footprint of the read task.
- Similarly, you can pass in a filter to `ray.data.read_parquet()` (filter pushdown) which is applied at the file scan so only rows that match the filter predicate are returned.

#### 1. Fetch all the Ray documentation from the web.

We can make use of `wget` to crawl the web and download all the webpages under a given domain.

In [37]:
raw_web_pages_dir = DATA_DIR / "full_scale" / "00_raw_web_pages"

If you uncomment the following cell, it will crawl the web pages and save them to the `raw_web_pages_dir` directory.This will take a long time.

In [38]:
# !wget https://docs.ray.io/en/master/ -e robots=off --recursive --page-requisites \
#   --html-extension --convert-links --restrict-file-names=windows \
#   --domains docs.ray.io --no-parent --accept=html --retry-on-http-error=429 \
#   -P {raw_web_pages_dir}

Instead we will fetch a zip file containing the webpages and extract it.

In [39]:
!rm -rf {raw_web_pages_dir}
!wget https://anyscale-materials.s3.us-west-2.amazonaws.com/rag-ray-documentation-html-files/ray_docs_web_pages.zip \
    -P {str(raw_web_pages_dir)}
!ls -ll {raw_web_pages_dir}
!unzip -o {raw_web_pages_dir / "ray_docs_web_pages.zip"} -d {raw_web_pages_dir}

--2024-03-09 12:03:06--  https://anyscale-materials.s3.us-west-2.amazonaws.com/rag-ray-documentation-html-files/ray_docs_web_pages.zip
Resolving anyscale-materials.s3.us-west-2.amazonaws.com (anyscale-materials.s3.us-west-2.amazonaws.com)... 52.218.182.137, 52.218.236.97, 3.5.77.181, ...
Connecting to anyscale-materials.s3.us-west-2.amazonaws.com (anyscale-materials.s3.us-west-2.amazonaws.com)|52.218.182.137|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 85522196 (82M) [application/zip]
Saving to: ‘/mnt/cluster_storage/full_scale/00_raw_web_pages/ray_docs_web_pages.zip’


2024-03-09 12:03:07 (101 MB/s) - ‘/mnt/cluster_storage/full_scale/00_raw_web_pages/ray_docs_web_pages.zip’ saved [85522196/85522196]

total 83520
-rw-r--r-- 1 ray users 85522196 Feb 11 12:47 ray_docs_web_pages.zip
Archive:  /mnt/cluster_storage/full_scale/00_raw_web_pages/ray_docs_web_pages.zip
   creating: /mnt/cluster_storage/full_scale/00_raw_web_pages/docs.ray.io/
   creating: /mnt/clust

We count the total number of files in the directory.

In [40]:
!ls -R {raw_web_pages_dir} | wc -l

3557


We also take the total size of the raw web pages directory.

In [41]:
!du -sh {raw_web_pages_dir}

676M	/mnt/cluster_storage/full_scale/00_raw_web_pages


Note that this only includes the latest version of the ray documentation. This size would easily be in the gegabytes if we included all versions of the documentation.

#### Parse the web pages to extract the text.

We first read all HTML files in the raw web pages directory into a `ray.data.Dataset`.

In [42]:
ds = ray.data.from_items(
    [{"path": path} for path in raw_web_pages_dir.rglob("*.html") if not path.is_dir()]
)
ds

MaterializedDataset(num_blocks=200, num_rows=3057, schema={path: object})

We then implement a function to extract the text from the HTML files. Given for each HTML file, we extract a single document, we will use `map` to apply the function to each row in the dataset.

In [43]:
def path_to_uri(
    path: str, scheme: str = "https://", domain: str = "docs.ray.io"
) -> str:
    return scheme + domain + str(path).split(domain)[-1]


def extract_document_from_html(row: dict) -> list[dict]:
    """Extract a document from an HTML file."""
    # 1. Request the page and extract the text using BeautifulSoup
    with open(row["path"], "r", encoding="utf-8") as html_file:
        soup = BeautifulSoup(html_file, "html.parser")

    # 2. Create a document object with the text and page_url
    return {
        "text": soup.text,
        "page_url": path_to_uri(row["path"]),
    }


ds = ds.map(extract_document_from_html)

For now we write out the data to a "01_documents" directory

In [44]:
%%time

if (DATA_DIR / "full_scale" / "01_documents").exists():
    shutil.rmtree(DATA_DIR / "full_scale" / "01_documents")
ds.write_json(DATA_DIR / "full_scale" / "01_documents", num_rows_per_file=100)

2024-03-09 12:06:12,979	INFO streaming_executor.py:110 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[Map(extract_document_from_html)->Write]
2024-03-09 12:06:12,980	INFO streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-03-09 12:06:12,980	INFO streaming_executor.py:113 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

CPU times: user 499 ms, sys: 166 ms, total: 665 ms
Wall time: 27.1 s


We inspect the produced documents

In [45]:
ray.data.read_json(DATA_DIR / "full_scale" / "01_documents").count()

Read progress 0:   0%|          | 0/34 [00:00<?, ?it/s]

3057

##### Utilize inherent structure to improve the documents 

Documentation [webpages](https://docs.ray.io/en/latest/rllib/rllib-env.html) are naturally split into sections. We can use this to our advantage by returning our documents as sections. This will facilitate producing semantically coherent chunks. 

<img src="https://images.ctfassets.net/xjan103pcp94/1eFnKmG5xqPIFtPupZ327X/f6152723e18322b90aaa8be5d2d5a6e4/image5.png" >


We re-instantiate the dataset from the HTML files.

In [47]:
ds = ray.data.from_items(
    [{"path": path} for path in raw_web_pages_dir.rglob("*.html") if not path.is_dir()]
)

This time we are producing multiple documents from each HTML file. We will use the `flat_map` method to produce multiple documents from each HTML file.

In [48]:
def extract_sections_from_html(record: dict) -> list[dict]:
    documents = []
    # 1. Request the page and parse it using BeautifulSoup
    with open(record["path"], "r", encoding="utf-8") as html_file:
        soup = BeautifulSoup(html_file, "html.parser")

    url = path_to_uri(record["path"])

    # 2. Find all sections
    sections = soup.find_all("section")
    for section in sections:
        # 3. Extract text from the section but not from the subsections
        section_text = "\n".join(
            [child.text for child in section.children if child.name != "section"]
        )
        # 4. Construct the section url
        section_url = url + "#" + section["id"]
        # 5. Create a document object with the text, source page, source section uri
        documents.append(
            {
                "text": section_text,
                "section_url": section_url,
                "page_url": url,
            }
        )
    return documents


ds = ds.flat_map(extract_sections_from_html)

#### Store the text into input files that are ideal for Ray Data.

The following are good heuristics to keep in mind:
- Avoid reading large (1 GiB or more) binary files.
  - `ray.data.read_*` cannot parallelize reading a single file - i.e., it maps 1 file to 1 read task.
- Avoid too many tiny files (less than 1 MiB).
  - There is a default minimum block size that `ray.data` uses. This means that `ray.data` will need to group together the tiny blocks into larger blocks, which can be expensive.
- Avoid transforming a Dataset where individual rows are large (100 MiB or more).
  - There is a default maximum block size that `ray.data` uses. This means that `ray.data` will need spill the output into multiple blocks, which could lead to OOM errors.

We choose a `num_rows_per_file` of 400 so our produced files are not of a reasonable size given the above heuristics.

In [49]:
%%time
if (DATA_DIR / "full_scale" / "02_sections").exists():
    shutil.rmtree(DATA_DIR / "full_scale" / "02_sections")
ds.write_json(DATA_DIR / "full_scale" / "02_sections", num_rows_per_file=400)

2024-03-09 13:07:35,819	INFO streaming_executor.py:110 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_sections_from_html)->Write]
2024-03-09 13:07:35,819	INFO streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-03-09 13:07:35,820	INFO streaming_executor.py:113 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

CPU times: user 663 ms, sys: 155 ms, total: 819 ms
Wall time: 1min 3s


In [50]:
!ls -llh {DATA_DIR / "full_scale" / "02_sections"}

total 7.8M
-rw-r--r-- 1 ray users 768K Mar  9 13:08 19_000000_000000.json
-rw-r--r-- 1 ray users 364K Mar  9 13:08 19_000001_000000.json
-rw-r--r-- 1 ray users 2.5M Mar  9 13:08 19_000002_000000.json
-rw-r--r-- 1 ray users 571K Mar  9 13:08 19_000003_000000.json
-rw-r--r-- 1 ray users 631K Mar  9 13:08 19_000004_000000.json
-rw-r--r-- 1 ray users 1.7M Mar  9 13:08 19_000005_000000.json
-rw-r--r-- 1 ray users 426K Mar  9 13:08 19_000006_000000.json
-rw-r--r-- 1 ray users 1.1M Mar  9 13:08 19_000007_000000.json


Let's count how many documents we will have after processing the sections.

In [51]:
ray.data.read_json(DATA_DIR / "full_scale" / "02_sections").count()

Read progress 0:   0%|          | 0/8 [00:00<?, ?it/s]

5636

<div class="alert alert-block alert-secondary">

**Further considerations for creating input files for Ray Data:**

Consider converting to parquet files which allow for pruning to improve reads:
- When working with column-oriented file formats like parquet, you can specify which columns you want to read. This might help significantly reduce the memory footprint of the read task.
- Similarly, you can pass in a filter to `ray.data.read_parquet()` (filter pushdown) which is applied at the file scan so only rows that match the filter predicate are returned.

In our case the bulk of the memory is taken up by the text column. Using parquet files will not significantly help us reduce the memory footprint of the read task.

</div>



### Phase 2: Generating Embeddings

Now that we have our documents, we can proceed to generate embeddings.

#### 1. Load documents
We begin by reading the documents from the "02_sections" directory.

In [52]:
ds = ray.data.read_json(DATA_DIR / "full_scale" / "02_sections")

ds

Dataset(
   num_blocks=64,
   num_rows=?,
   schema={text: string, section_url: string, page_url: string}
)

#### Applying chunking as a transformation

We apply our chunking transformation using `flat_map`, which applies a 1-to-many function to each row in the dataset and then flattens the results into a single dataset.

In [53]:
ds = ds.flat_map(chunk_row)

We could have used `map_batches` instead to apply a many-to-many function to each batch of rows in the dataset. However, given our chunking transformation is not vectorized, `map_batches` will not be faster.

Let's run the chunking and count our total number of chunks.

In [54]:
ds.count()

2024-03-09 13:12:59,705	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=64 for operator ReadJSON to satisfy parallelism at least twice the available number of CPUs (32).
2024-03-09 13:12:59,706	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 64, each read task output is split into 8 smaller blocks.
2024-03-09 13:12:59,706	INFO streaming_executor.py:110 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> TaskPoolMapOperator[FlatMap(chunk_row)]
2024-03-09 13:12:59,707	INFO streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-03-09 13:12:59,707	INFO streaming_executor.py:113 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_cur

Running 0:   0%|          | 0/8 [00:00<?, ?it/s]

11711

#### Applying embedding as a transformation

For the embedding part, we will want to run the embedding model on the fastest possible device.
Let's check the available devices.

In [55]:
def get_device():
    if torch.cuda.is_available():
        device = "cuda"
    elif torch.has_mps:
        device = "mps"
    else:
        device = "cpu"
    return device

device = get_device()
device

'cuda'

We want to load the embedding model once and reuse it across multiple transformation tasks.

To do so, we want to use call `map_batches` with **stateful transform** instead of a *stateless transform*. 

This means we create a pool of processes called actors where the model is already loaded in memory.

Each actor will run a `MapBatch` task where:
  - initial state is handled in `__init__`
  - task is invoked using `__call__` method

In [56]:
num_gpus = 2
num_cpus = psutil.cpu_count()

class EmbedBatch:
    def __init__(self):
        self.model = SentenceTransformer("thenlper/gte-large", device=device)

    def __call__(self, batch):
        text = batch["text"].tolist()
        embeddings = self.model.encode(text, batch_size=len(text))
        batch["embeddings"] = embeddings.tolist()
        return batch

ds = ds.map_batches(
    EmbedBatch,
    # Maximum number of actors to launch.
    concurrency=num_gpus if device == "cuda" else num_cpus,
    # Size of batches passed to embeddings actor.
    batch_size=100,
    # 1 GPU for each actor.
    num_gpus=1 if device == "cuda" else 0,
)

#### Writing the embeddings to disk

When writing, we can use the `num_rows_per_file` parameter to control the number of rows per file.

In [57]:
%%time

if (DATA_DIR / "full_scale" / "03_embeddings").exists():
    shutil.rmtree(DATA_DIR / "full_scale" / "03_embeddings")
(
    ds
    .write_json(
        num_rows_per_file=50,
        path=DATA_DIR / "full_scale" / "03_embeddings",
    )
)

2024-03-09 13:13:11,214	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=64 for operator ReadJSON to satisfy parallelism at least twice the available number of CPUs (32).
2024-03-09 13:13:11,215	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 64, each read task output is split into 8 smaller blocks.
2024-03-09 13:13:11,215	INFO streaming_executor.py:110 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> TaskPoolMapOperator[FlatMap(chunk_row)] -> ActorPoolMapOperator[MapBatches(EmbedBatch)] -> TaskPoolMapOperator[Write]
2024-03-09 13:13:11,216	INFO streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-03-09 13:13:11,216	INFO streaming_executor.py:

Running 0:   0%|          | 0/8 [00:00<?, ?it/s]

[36m(MapWorker(MapBatches(EmbedBatch)) pid=26473)[0m Could not construct Arrow block from numpy array; encountered values of unsupported numpy type `17` in column named 'embeddings', which cannot be casted to an Arrow data type. Falling back to using pandas block type, which is slower and consumes more memory. For maximum performance, consider applying the following suggestions before ingesting into Ray Data in order to use native Arrow block types:
[36m(MapWorker(MapBatches(EmbedBatch)) pid=26473)[0m - Expand out each key-value pair in the dict column into its own column
[36m(MapWorker(MapBatches(EmbedBatch)) pid=26473)[0m - Replace `None` values with an Arrow supported data type
[36m(MapWorker(MapBatches(EmbedBatch)) pid=26473)[0m 


CPU times: user 1.85 s, sys: 680 ms, total: 2.53 s
Wall time: 2min 11s


##### Inspecting the ray data dashboard

If we take a look at the metrics tab of the ray data dashboard, we can check to see:

- The GPU utilization
    - Ideally, we would like to see the GPU utilization at 100% for the duration of the embedding process
- The time spent on io and network by different tasks

We can then use this information to optimize our pipeline.

##### Inspecting the output

We check to see if the embeddings were written to disk.

In [58]:
!ls -llh {DATA_DIR / "full_scale" / "03_embeddings"}

total 163M
-rw-r--r-- 1 ray users 2.4M Mar  9 13:13 26_000000_000000.json
-rw-r--r-- 1 ray users 3.2M Mar  9 13:13 26_000001_000000.json
-rw-r--r-- 1 ray users 1.7M Mar  9 13:13 26_000002_000000.json
-rw-r--r-- 1 ray users 2.2M Mar  9 13:13 26_000003_000000.json
-rw-r--r-- 1 ray users 2.6M Mar  9 13:13 26_000004_000000.json
-rw-r--r-- 1 ray users 2.7M Mar  9 13:13 26_000005_000000.json
-rw-r--r-- 1 ray users 1.3M Mar  9 13:13 26_000006_000000.json
-rw-r--r-- 1 ray users 1.5M Mar  9 13:13 26_000007_000000.json
-rw-r--r-- 1 ray users 1.1M Mar  9 13:13 26_000008_000000.json
-rw-r--r-- 1 ray users 958K Mar  9 13:13 26_000009_000000.json
-rw-r--r-- 1 ray users 899K Mar  9 13:13 26_000010_000000.json
-rw-r--r-- 1 ray users 1.1M Mar  9 13:13 26_000011_000000.json
-rw-r--r-- 1 ray users 1.2M Mar  9 13:13 26_000012_000000.json
-rw-r--r-- 1 ray users 1.1M Mar  9 13:13 26_000013_000000.json
-rw-r--r-- 1 ray users 1.6M Mar  9 13:13 26_000014_000000.json
-rw-r--r-- 1 ray users 925K Mar  9 13:13 26_

### Recap of the pipeline

Here is our entire pipeline so far:

```python
ds = (
    ray.data.read_json(
        DATA_DIR / "full_scale" / "02_sections",
    )
    .flat_map(chunk_row)
    .map_batches(
        EmbedBatch,
        concurrency=num_gpus,
        batch_size=100,
        num_gpus=1,
    )
    .write_json(
        path=DATA_DIR / "full_scale" / "03_embeddings_tuning",
        num_rows_per_file=50,
    )
)
```


<div class="alert alert-block alert-info">

### Activity: Tuning the pipeline

Proceed to tune your pipeline by:
- Changing the batch size on `map_batches` and see what effect it has on the GPU utilization.
- Changing the number of GPUs and see whether it helps to scale the pipeline.

</div>

In [76]:
num_gpus = 2
num_cpus = psutil.cpu_count()

class EmbedBatch:
    def __init__(self):
        self.model = SentenceTransformer("thenlper/gte-large", device=device)

    def __call__(self, batch):
        text = batch["text"].tolist()
        embeddings = self.model.encode(text, batch_size=len(text))
        batch["embeddings"] = embeddings.tolist()
        return batch

ds = ds.map_batches(
    EmbedBatch,
    # Maximum number of actors to launch.
    concurrency=num_gpus if device == "cuda" else num_cpus,
    # Size of batches passed to embeddings actor.
    batch_size=400,
    # 1 GPU for each actor.
    num_gpus=2 if device == "cuda" else 0,
)

### Upserting embeddings to Pinecone

We will use [Pinecone](https://www.pinecone.io/) to index our document embeddings in a vector store. Pinecone is a fully managed vector database optimized for similarity search and is user-friendly. We chose Pinecone for its ease of use and its free tier, which meets our needs.

Index your document embeddings in Pinecone as follows:


1. Create a Pinecone client.
2. Create a Pinecone index.
3. Load the embeddings from disk.
4. Transform the embeddings into Pinecone’s index format.
5. Upsert the embeddings into the Pinecone index.
6. Query the Pinecone index.

#### 1. Create a Pinecone client 
1. Sign up for a free account at https://www.pinecone.io/ and obtain an API key.

Follow the instructions on the Pinecone website to sign up and obtain an API key.

2. Initialize a Pinecone client with your API key.

Replace YOUR_API_KEY with your actual API key to initialize a Pinecone client.

In [59]:
YOUR_PINECONE_API_KEY = "9386359a-0227-4d5b-80d9-b1bb7600dd08"

In [60]:
pinecone_api_key = os.environ.get("PINECONE_API_KEY", YOUR_PINECONE_API_KEY)
pc = Pinecone(api_key=pinecone_api_key)

Let's list out the available indices in your Pinecone account by running the following command:

In [61]:
pc.list_indexes()

{'indexes': [{'dimension': 1024,
              'host': 'canopy--dufwagwitfip-zhxkhfk.svc.apw5-4e34-81fa.pinecone.io',
              'metric': 'cosine',
              'name': 'canopy--dufwagwitfip',
              'spec': {'serverless': {'cloud': 'aws', 'region': 'us-west-2'}},
              'status': {'ready': True, 'state': 'Ready'}},
             {'dimension': 1024,
              'host': 'canopy--shanker-index-zhxkhfk.svc.apw5-4e34-81fa.pinecone.io',
              'metric': 'cosine',
              'name': 'canopy--shanker-index',
              'spec': {'serverless': {'cloud': 'aws', 'region': 'us-west-2'}},
              'status': {'ready': True, 'state': 'Ready'}},
             {'dimension': 1024,
              'host': 'canopy--cong-index-zhxkhfk.svc.apw5-4e34-81fa.pinecone.io',
              'metric': 'cosine',
              'name': 'canopy--cong-index',
              'spec': {'serverless': {'cloud': 'aws', 'region': 'us-west-2'}},
              'status': {'ready': True, 'state': 'R

#### 2. Create a Pinecone index 

##### Choosing an index type
Pinecone offers two types of indexes - [see the docs for more details](https://docs.pinecone.io/docs/indexes): 
- pod-based indices
  -  you choose:
     -  pod type
     -  pod size
     -  number of pods
  - Depending on your choice, you get:
     -  different amounts of storage
     -  higher or lower latency
     -  higher or lower throughput
  - A starter index is free and has a limit of 100,00 vectors.
- serverless indices
  - You don't configure or manage any compute or storage resources.
  - Scales automatically based on usage.
  - You pay only for the amount of data stored and operations performed, with no minimums.


##### Choosing a distance metric
When creating an index, you will need to specify the dimension of the embeddings and the metric you want to use for similarity search.

Pinecone offers these two main metrics:
- Euclidean:
  - The Euclidean distance between two vectors is the square root of the sum of the squared differences between the elements of the vectors.
  - The lower the distance, the more similar the vectors.
- Cosine:
  - The cosine similarity between two vectors is the cosine of the angle between them.
  - Between -1 and 1, where 1 means the vectors are identical, -1 means they are diametrically opposed, and 0 means they are orthogonal.

It is common to choose cosine similarity for high-dimensional embeddings, as it is suffers less from the curse of dimensionality than Euclidean distance. [See this article for more details.](https://www.imaurer.com/which-vector-similarity-metric-should-i-use/) 

Note pinecone also offers dot-product which you can think of as an un-normlized cosine similarity (i.e. not bound between -1 and 1).

##### Configuring the metadata

By default Pinecone will attempt to index all the metadata provided. This can be expensive and slow. We can configure the metadata to only index the fields we are interested in by passing in a `metadata_config` parameter.


##### Implementation

To create a new index in Pinecone use the `create_index` method on the Pinecone client. In case you want to overwrite an existing index, you can use the `delete_index` method to delete the existing index before creating a new one. We implement a `create_index` method that parameterizes the main configuration options we discussed above.

In [62]:
def create_index(
    index_name: str,
    cloud: str,
    region: str,
    metric: str,
    embedding_dimension: int,
    index_type: str,
    **kwargs,
) -> None:
    pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY", YOUR_PINECONE_API_KEY))
    existing_index_names = {index.name for index in pc.list_indexes().indexes}

    if index_name in existing_index_names:
        pc.delete_index(index_name)

    if index_type == "serverless":
        pc.create_index(
            name=index_name,
            dimension=embedding_dimension,
            metric=metric,
            spec=ServerlessSpec(cloud=cloud, region=region),
        )
    elif index_type == "pod":
        pc.create_index(
            name=index_name,
            dimension=embedding_dimension,
            metric=metric,
            spec=PodSpec(
                environment="gcp-starter",
                metadata_config={"indexed": ["source", "section_url", "page_url"]},
                **kwargs,
            ),
        )

In [63]:
cloud = "aws"
region = "us-west-2"
metric = "cosine"
index_type = "serverless"  # "serverless" or "pod"
index_name = None # A unique name for the index under your organization
embedding_dimension = 1024  # From the model page of thenlper/gte-large

In [65]:
create_index(
    index_name='banda-ki',
    cloud=cloud,
    region=region,
    metric=metric,
    index_type=index_type,
    embedding_dimension=embedding_dimension,
)

#### 3. Load the embeddings from disk 

We will load the embeddings from disk using `ray.data.read_json` to initiate a distributed upsert of the embeddings to Pinecone.

In [78]:
ds = ray.data.read_json(DATA_DIR / "full_scale" / "03_embeddings/")
ds

Dataset(
   num_blocks=192,
   num_rows=?,
   schema={
      text: string,
      section_url: string,
      page_url: string,
      embeddings: list<item: double>
   }
)

#### 4. Transform the embeddings into Pinecone index format 

Pinecone requires vectors in a specific format. Construct a list of dictionaries where each dictionary contains the following.

- `id`: a unique identifier for the vector.
- `values`: the embedding vector for the document chunk.
- `metadata`: a dictionary with the document chunk’s metadata, including the original text.

On building an `id` value:
- The `id` should be unique across all the vectors in the index.
- For RAG, Pinecone offers [ID prefixing](https://docs.pinecone.io/docs/manage-rag-documents#use-id-prefixes-to-reference-parent-documents)
  - [ID prefixing]((https://docs.pinecone.io/docs/manage-rag-documents#use-id-prefixes-to-reference-parent-documents)) allows to quickly filter on a common prefix `ID` without having to rely on `metadata` fields.

On building a `metadata` value:
- High cardinality metadata can slow down the indexing process.
- When creating the index, you can configure which metadata fields are indexed and which are not.
  -  This can help to reduce the size of the index and improve the indexing speed.
  -  At the moment this is a feature specific to pod-based indexes.

In [79]:
def convert_to_pinecone_vectors(row):
    row_hash = joblib.hash(row)
    page_name = row["page_url"].split("/")[-1]
    section_name = row["section_url"].split("#")[-1]
    return {
        "id": f"{page_name}#{section_name}#{row_hash}", # sample ID prefix
        "values": row["embeddings"],
        "metadata": {
            "section_url": row["section_url"], # not needed if ID prefix is used
            "page_url": row["page_url"], # not needed if ID prefix is used
            "text": row["text"], # Perhaps this is stored on a separate storage if metadata will index it
        },
    }


ds = ds.map(convert_to_pinecone_vectors)

We inspect a single row to see the structure of the data.

In [80]:
sample = ds.take_batch(1)

2024-03-09 13:54:21,824	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=162 for operator ReadJSON to satisfy output blocks of size at least DataContext.get_current().target_min_block_size=1.0MiB.
2024-03-09 13:54:21,825	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 162, each read task output is split into 3 smaller blocks.
2024-03-09 13:54:21,825	INFO streaming_executor.py:110 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> TaskPoolMapOperator[Map(convert_to_pinecone_vectors)] -> LimitOperator[limit=1]
2024-03-09 13:54:21,826	INFO streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-03-09 13:54:21,826	INFO streaming_executor.py:113 -- Tip:

Running 0:   0%|          | 0/64 [00:00<?, ?it/s]

#### 5. Upsert the embeddings into the Pinecone index. 

Insert the chunks into the Pinecone index by performing these two steps:

1. Instantiate a Pinecone connection object using the client’s Index.
2. Use the `Index.upsert` method to insert the data into the index.

Here are some considerations:
- Avoid creating a new connection object for every transform task. 
  - Make use of stateful transforms to launch a pool of actors with their connection loaded and ready.
- Avoid launching too large of a pool of actors.
  - While pinecone is able to handle a large number of connections, it is still a good practice to limit the number of connections to avoid overwhelming the pinecone server.
- Avoid the batch size being too large.
  - Find the batch size that fits into the allowed limit of the upsert operation.
    - [Max size for an upsert request is 2MB](https://docs.pinecone.io/v1/docs/limits#upserts)
    - [Recommended number of vectors per upsert request is 100](https://docs.pinecone.io/v1/docs/limits#upserts)

<div class="alert alert-box alert-secondary">

Additionally for further control and isolation, you can create namespaces within an index. This is useful when you want to:
- Store different types of data in the same index.
- Store data from different sources in the same index.

For our purposes, we will only use a single namespace. For more details see the [Pinecone documentation](https://docs.pinecone.io/docs/namespaces).

</div>

Let's first determine the ideal batch size. We do so by looking at the statistics of size in MB of the upsert requests. We start by checking if a batch size of 100 (the recommended maximum by pinecone) is too large.

In [81]:
batch_size = 100

def get_size_of_batch(batch):
    size_of_batch_in_bytes = pd.DataFrame(batch).memory_usage(deep=True).sum().sum()
    size_of_batch_in_mb = size_of_batch_in_bytes / 1024**2
    return {"size_in_mb": [size_of_batch_in_mb]}


out = ds.map_batches(get_size_of_batch, batch_size=batch_size).to_pandas()
out["size_in_mb"].describe()

2024-03-09 13:54:25,631	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=162 for operator ReadJSON to satisfy output blocks of size at least DataContext.get_current().target_min_block_size=1.0MiB.
2024-03-09 13:54:25,632	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 162, each read task output is split into 3 smaller blocks.
2024-03-09 13:54:25,633	INFO streaming_executor.py:110 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> TaskPoolMapOperator[Map(convert_to_pinecone_vectors)->MapBatches(get_size_of_batch)]
2024-03-09 13:54:25,633	INFO streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-03-09 13:54:25,633	INFO streaming_executor.py:113 --

Running 0:   0%|          | 0/64 [00:00<?, ?it/s]

[36m(Map(convert_to_pinecone_vectors)->MapBatches(get_size_of_batch) pid=7820, ip=10.0.12.185)[0m   return transform_pyarrow.concat(tables)
  return transform_pyarrow.concat(tables)


count    162.000000
mean       0.034374
std        0.012990
min        0.001091
25%        0.028333
50%        0.037538
75%        0.045472
max        0.052682
Name: size_in_mb, dtype: float64

Turns out a batch_size of 100 is still well below the 2MB limit that pinecone has set so we proceed with it.

Next, we will determine the number of concurrent connections to use for the upsert. We do so by:
- counting how many batches we need to upload
- reasoning about the latency per batch, how fast we want the upsert and avoiding overloading the network

So we compute `approx_total_batches` as the total number of chunks divided by the batch size.

In [82]:
approx_total_batches = ds.count() // batch_size
approx_total_batches

2024-03-09 13:54:34,269	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=162 for operator ReadJSON to satisfy output blocks of size at least DataContext.get_current().target_min_block_size=1.0MiB.
2024-03-09 13:54:34,269	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 162, each read task output is split into 3 smaller blocks.
2024-03-09 13:54:34,270	INFO streaming_executor.py:110 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> TaskPoolMapOperator[Map(convert_to_pinecone_vectors)]
2024-03-09 13:54:34,271	INFO streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-03-09 13:54:34,271	INFO streaming_executor.py:113 -- Tip: For detailed progress rep

Running 0:   0%|          | 0/64 [00:00<?, ?it/s]

117

We then choose a concurrency of 10 actor pools given this means running 13 upserts per connection which is a reasonable wait time.

In [83]:
concurrency = 9
approx_num_upserts_per_connection = approx_total_batches // concurrency
approx_num_upserts_per_connection

13

Finally we apply the upsert as a stateful transform using `map_batches`

In [84]:
pinecone_namespace = "banda-ki-namespace"


class UpsertVectors:
    def __init__(self):
        self.pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY", YOUR_PINECONE_API_KEY))
        self.index = self.pc.Index(index_name)
        self.namespace = pinecone_namespace

    def __call__(self, batch):
        self.index.upsert(
            vectors=[
                {
                    "id": id_,
                    "values": values,
                    "metadata": metadata,
                }
                for id_, values, metadata in zip(
                    batch["id"], batch["values"], batch["metadata"]
                )
            ],
            namespace=self.namespace,
        )
        return batch


ds = ds.map_batches(
    UpsertVectors,
    concurrency=concurrency,
    batch_size=batch_size,
    num_cpus=1,
)

One thing to note is that pinecone offers a simple solution to run [upserts in parallel from a single machine](https://docs.pinecone.io/docs/upsert-data#send-upserts-in-parallel) We are not making use of this feature in this guide, in favor of an even more distributed approach for large scale data.

We finally proceed to "materialize" the upserts by calling `to_pandas` which is a consumption operation triggering the execution of the pipeline.

In [85]:
df_written = ds.to_pandas().drop_duplicates(subset=["id"])

2024-03-09 13:54:47,192	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=162 for operator ReadJSON to satisfy output blocks of size at least DataContext.get_current().target_min_block_size=1.0MiB.
2024-03-09 13:54:47,192	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 162, each read task output is split into 3 smaller blocks.
2024-03-09 13:54:47,193	INFO streaming_executor.py:110 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> ActorPoolMapOperator[Map(convert_to_pinecone_vectors)->MapBatches(UpsertVectors)]
2024-03-09 13:54:47,194	INFO streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-03-09 13:54:47,194	INFO streaming_executor.py:113 -- Ti

RayActorError: The actor died because of an error raised in its creation task, [36mray::_MapWorker.__init__()[39m (pid=13814, ip=10.0.12.185, actor_id=8f641d5df29182460dc8481903000000, repr=MapWorker(Map(convert_to_pinecone_vectors)->MapBatches(UpsertVectors)))
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/actor_pool_map_operator.py", line 391, in __init__
    self._map_transformer.init()
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 122, in init
    self._init_fn()
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 158, in fused_init_fn
    other_init_fn()
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 112, in init_fn
    ray.data._cached_fn = op_fn(
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/util.py", line 67, in __init__
    super().__init__(*args, **kwargs)
  File "/tmp/ipykernel_15097/218924667.py", line 7, in __init__
  File "/home/ray/anaconda3/lib/python3.10/site-packages/pinecone/grpc/pinecone.py", line 130, in Index
    index_host = self.index_host_store.get_host(self.index_api, self.config, name)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/pinecone/control/index_host_store.py", line 38, in get_host
    key = self._key(config, index_name)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/pinecone/control/index_host_store.py", line 22, in _key
    return ":".join([config.api_key, index_name])
TypeError: sequence item 1: expected str instance, NoneType found

A small note from the cell above is given the id is produced by taking a hash of the chunk, for certain edge cases were the same text is repeated in a document the same id will be produced. Therefore we drop duplicates from the dataframe.

### 2. Verify the index

Ensure the upsert operation is complete by checking the index status and getting the number of vectors in the index.

Pinecone is eventually consistent, so it may take a few seconds for the index to be updated.

In [None]:
import time


def verify_index(index_name: str, num_expected_vectors: int):
    index = pc.Index(index_name)
    stats = index.describe_index_stats()

    while stats.total_vector_count != num_expected_vectors:
        time.sleep(5)
        stats = index.describe_index_stats()


verify_index(index_name=index_name, num_expected_vectors=df_written.shape[0])

### Querying the Pinecone index

Given we have indexed our embeddings, we can now query the index to retrieve the most similar documents to a given query.

In [None]:
query = "What is the default number of replicas for a Ray Serve deployment?"

In [None]:
model = SentenceTransformer('thenlper/gte-large', device=get_device())
query_embedding = model.encode(query).tolist()

In [None]:
index = pc.Index(index_name)
result = index.query(
    vector=query_embedding,
    top_k=5,
    namespace=pinecone_namespace,
)

In [None]:
result["matches"]

If want to include the metadata in the result, we can use the following code:

In [None]:
result = index.query(
    vector=query_embedding, top_k=5, include_metadata=True, namespace=pinecone_namespace
)

In [None]:
result["matches"]

We can additionally introduce a filter on score to only return results with a score above a certain threshold based on the fetched results.

In [None]:
scores = [match["score"] for match in result["matches"]]
scores

In [None]:
score_threshold = 0.93  # determined based on data distribution
matches_above_threshold = [
    match for match in result["matches"] if match["score"] > score_threshold
]
len(matches_above_threshold), len(result["matches"])

###  Understanding Pinecone query pricing

Pinecone measures usage of a query in terms of read units. Read units are priced based on the chosen cloud (see [pinecone pricing page](https://www.pinecone.io/pricing/) for more details.)

More specifically, the number of read units used by a query depends on:
- Record count: 
  - the number of records in your namespace
- Record size: 
  - the dimension of the vector you use
  - whether you are retrieving the vector or its metadata or just fetching back the ID

Read units have good scaling properties as we will see below.

See the [documentation page](https://docs.pinecone.io/docs/understanding-cost#query ) for more details

Let's run a sample query to inspect the recorded usage.

In [None]:
response = index.query(
    vector=query_embedding,
    top_k=5,
    namespace=pinecone_namespace,
)

We can inspect `usage` to view how much this query cost us in terms of units. 

In [None]:
response["usage"]

We can see we consumed 5 Read Units (RUs) running the query with top_k = 5. What is cool, is this is the same cost for running with a top_k of 100. Think of this as an index scan is of complexity O(1) and the cost of the query is the cost of fetching the results.

In [None]:
response = index.query(
    vector=query_embedding,
    top_k=100,
    namespace=pinecone_namespace,
)
response["usage"]

Running with `include_metadata=True` however will increase the cost of the query. 

For top_k = 1-10, this incurs an additional 1 RU for fetching the metadata. For top_k = 11-20, this incurs an additional 2 RU for fetching the metadata and so on.

In [None]:
response = index.query(
    vector=query_embedding,
    top_k=1,
    namespace=pinecone_namespace,
    include_metadata=True,
)
response["usage"]

In [None]:
response = index.query(
    vector=query_embedding,
    top_k=10,
    namespace=pinecone_namespace,
    include_metadata=True,
)
response["usage"]

In [None]:
response = index.query(
    vector=query_embedding,
    top_k=11,
    namespace=pinecone_namespace,
    include_metadata=True,
)
response["usage"]

In [None]:
response = index.query(
    vector=query_embedding,
    top_k=20,
    namespace=pinecone_namespace,
    include_metadata=True,
)
response["usage"]