# Brief

This guide aims to help you get started with generating embeddings for your own data. 

## Quick refresher

Embeddings the what and why

**The What ?**
  - Embeddings are a way to represent unstructured data of different modalities text, audio or image data as vectors in a high-dimensional space. 
  - The distance between the embedding of a query and the embedding of a data point is small if the data point is relevant to the query.
  - A search index makes use of approximate nearest neighbor search to retrieve the most relevant data for a given query.

**The Why ?**
  - Semantic search
    -  [gong.io](https://www.gong.io/) for [searching sales conversations](https://www.pinecone.io/customers/gong/).
  - QA using Retrieval augmented generation
    - Notion AI's [Question Answering functionality](https://www.notion.so/help/notion-ai-security-practices)


## Goal of this guide
The goal of this guide is to show how to **generate embeddings at scale using Ray and Pinecone**.

More specifically, we will cover how to:
- Build a production-ready embeddings pipeline.
- Use Ray Data to easily scale the generation of embeddings.
- Assess Ray Data's pipeline's performance.
- Upsert embeddings at scale with pinecone.
- Query a search index with pinecone.

## The road ahead

Here is our roadmap for this guide:

<div class="alert alert-block alert-info">

1. Setup
2. Embeddings pipeline overview
3. Simplest possible embedding pipeline
4. Simple pipeline for a real use-case
5. Migrating the simple pipeline to Ray Data
6. Scaling the pipeline with Ray Data
7. Upserting embeddings to Pinecone
8. Querying Pinecone

</div>

## Setup

### Imports

In [4]:
import os
import json
import shutil
from pathlib import Path

import numpy as np
import pandas as pd
import joblib
import psutil
import ray
import torch
from pinecone.grpc import PineconeGRPC as Pinecone
from pinecone import ServerlessSpec, PodSpec
from bs4 import BeautifulSoup
from langchain.text_splitter import RecursiveCharacterTextSplitter
from sentence_transformers import SentenceTransformer

### Constants

In [6]:
DATA_DIR = Path("/mnt/cluster_storage/")
shutil.copytree(Path("../data/"), DATA_DIR, dirs_exist_ok=True)

PosixPath('/mnt/cluster_storage')

## Embeddings pipeline overview

What are the steps involved in generating embeddings? In the most common case for text data, the steps are as follows:

1. Load documents
2. Process documents into chunks
   1. Process documents into chunks
   2. Optionally persist chunks
3. Generate embeddings from chunks
   1. Generate embeddings from chunks
   2. Optionally persist embeddings
4. Upsert embeddings into a database

## Simplest possible pipeline

Let's start with the simplest implementation of these steps. As we go along, we will replace the simple implementation with real components.

### 1. Load documents

In [4]:
dataset = ["this is a document", "this is another document"]

### 2. Process documents into chunks

In our case, we will chunk our documents into words - the simplest chunk.

In [5]:
def chunk_fn(doc):
    return doc.split(" ")

chunks = []
for doc in dataset:
    chunks.extend(chunk_fn(doc))
chunks

['this', 'is', 'a', 'document', 'this', 'is', 'another', 'document']

### 3. Generate embeddings from chunks

To keep it very simple, our embedding model is a lookup function.

In [6]:
word_to_vec = {
    "this": [0.1, 0.2],
    "is": [0.3, 0.4],
    "a": [0.5, 0.6],
    "document": [0.7, 0.8],
    "another": [0.9, 1.0],
}
word_to_vec["<UNK>"] = [0.0, 0.0]


def embed_model(word):
    return word_to_vec.get(word, word_to_vec["<UNK>"])

In [7]:
embeddings = [embed_model(chunk) for chunk in chunks]
embeddings

[[0.1, 0.2],
 [0.3, 0.4],
 [0.5, 0.6],
 [0.7, 0.8],
 [0.1, 0.2],
 [0.3, 0.4],
 [0.9, 1.0],
 [0.7, 0.8]]

#### 3b. Persist embeddings to disk

We will use a simple json file to persist the embeddings.

In [8]:
dest_dir = DATA_DIR / "simplest_pipeline"

dest_dir.mkdir(exist_ok=True, parents=True)
with open(dest_dir / "embeddings.json", "w") as f:
    json.dump(embeddings, f)

We confirm the data has been saved to the correct location by listing the contents of the directory:

In [9]:
!ls -llh {dest_dir}

total 8.0K
-rw-r--r-- 1 ray users  2 Mar  9 11:57 air.jsonl
-rw-r--r-- 1 ray users 96 Mar  9 12:08 embeddings.json


### 4. Upsert embeddings to vector store

The final step is to upsert the embeddings into a database. We will skip this step for now.

## Simple pipeline for a real use-case

Let's now assume we want to "embed the ray documentation website". 

We will circle back and start with a small sample dataset taken from the ray documentation. 

To visualize our pipeline, see the diagram below:

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/rag-bootcamp-mar-2024/simple_embeddings_pipeline.svg" width="800px">

### 1. Load documents

First step, we load the data using `pandas`.

In [30]:
#df = pd.read_json(DATA_DIR / "small_sample" / "sample-input.json", lines=True)

df = pd.read_json(DATA_DIR / "small_sample" / "air.jsonl", lines=True)

[33m(raylet, ip=10.0.44.131)[0m [2024-03-09 13:09:48,471 E 2354 2354] (raylet) node_manager.cc:2948: 2 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a, IP: 10.0.44.131) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.0.44.131`
[33m(raylet, ip=10.0.44.131)[0m 
[33m(raylet, ip=10.0.44.131)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


We have a dataset of 4 documents fetched from online content and stored as objects in a json file.

Here are some of the notable columns:
- `text` column which contains the text of the document that we want to embed.
- `section_url` column which contains the section under which the document is found.
- `page_url` column which contains the page under which the document is found.

In [11]:
df

Unnamed: 0,id,source,text,metadata
0,2210.03945,http://arxiv.org/pdf/2210.03945,UNDERSTANDING HTML WITH LARGE LANGUAGE\nMODELS...,"{'primary_category': 'cs.LG', 'published': '20..."
1,1711.05101,http://arxiv.org/pdf/1711.05101,Published as a conference paper at ICLR 2019\n...,"{'primary_category': 'cs.LG', 'published': '20..."
2,2305.17493,http://arxiv.org/pdf/2305.17493,THECURSE OF RECURSION :\nTRAINING ON GENERATED...,"{'primary_category': 'cs.LG', 'published': '20..."
3,2205.09712,http://arxiv.org/pdf/2205.09712,2022-5-20\nSelection-Inference: Exploiting Lar...,"{'primary_category': 'cs.AI', 'published': '20..."
4,2104.06001,http://arxiv.org/pdf/2104.06001,Gender Bias in Machine Translation\nBeatrice S...,"{'primary_category': 'cs.CL', 'published': '20..."
...,...,...,...,...
418,2204.06745,http://arxiv.org/pdf/2204.06745,GPT-NeoX-20B: An Open-Source Autoregressive La...,"{'primary_category': 'cs.CL', 'published': '20..."
419,1707.06347,http://arxiv.org/pdf/1707.06347,Proximal Policy Optimization Algorithms\nJohn ...,"{'primary_category': 'cs.LG', 'published': '20..."
420,2307.09288,http://arxiv.org/pdf/2307.09288,L/l.sc/a.sc/m.sc/a.sc /two.taboldstyle : Open ...,"{'primary_category': 'cs.CL', 'published': '20..."
421,2211.05100,http://arxiv.org/pdf/2211.05100,BLOOM: A 176B-Parameter Open-Access Multilingu...,"{'primary_category': 'cs.CL', 'published': '20..."


<div class="alert alert-block alert-secondary">

**Considerations for scaling the pipeline:**
- Memory: We currently load the entire file into memory. This is not a problem for small files, but can be a problem for large files.
- Latency: Reading the file from disk is slow. We can speed this up by using a faster disk, but we can also speed this up by splitting the file into smaller files and reading them in parallel (more on this later).

</div>

### 2. Process documents into chunks

We will use langchain's `RecursiveCharacterTextSplitter` to split the text into chunks. 

It works by first splitting on paragraphs, then sentences, then words, then characters. It is a recursive algorithm that will stop once the chunk size is satisfied.

Let's try it out on a sampe document.

In [12]:
text = """
This is the first part. Estimate me like 12 words long.

This is the second part. Estimate me like 12 words long.

This is the third part. Estimate me like 12 words long.
"""

splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", " ", ""],  # The default separators used by the splitter
    chunk_size=24,
    chunk_overlap=0,
    length_function=lambda x: len(x.split(" ")),
)
splitter.split_text(text)

['This is the first part. Estimate me like 12 words long.\n\nThis is the second part. Estimate me like 12 words long.',
 'This is the third part. Estimate me like 12 words long.']

If we change the paragraphs, the chunk contents will change

In [13]:
text = """
This is the first part. Estimate me like 12 words long.

This is the second part. Estimate me like 12 words long.
This is the third part. Estimate me like 12 words long.
"""

splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", " ", ""],  # The default separators used by the splitter
    chunk_size=24,
    chunk_overlap=0,
    length_function=lambda x: len(x.split(" ")),
)
splitter.split_text(text)

['This is the first part. Estimate me like 12 words long.',
 'This is the second part. Estimate me like 12 words long.\nThis is the third part. Estimate me like 12 words long.']

We now proceed to:

1. Configure the `RecursiveCharacterTextSplitter`
2. Run it over all the documents in the dataset

In [15]:
chunk_size = 128  #  Chunk size is usually specified in tokens
words_to_tokens = 1.2  # Heuristic for converting tokens to words
chunk_size_in_words = int(chunk_size // words_to_tokens)


splitter = RecursiveCharacterTextSplitter(
    chunk_size=chunk_size_in_words,
    length_function=lambda x: len(x.split()),
    chunk_overlap=0,
)

chunks = []
for idx, row in df.iterrows():
    for chunk in splitter.split_text(row["text"]):
        chunks.append(
            {
                "text": chunk,
                "section_url": row["id"],
                "page_url": row["source"],
            }
        )

<div class="alert alert-block alert-secondary">

**Considerations for choosing the chunk size**

  - We want the chunks small enough to:
    - Fit into the context window of our chosen embedding model
    - Be semantically coherent - i.e. concentrate on ideally a single topic
  - We want the chunks large enough to:
    - Contain enough information to be semantically meaningful.
    - Avoid creating too many embeddings which can be expensive to store and query.

</div>

Let's inspect the chunks produced for the first document.

In [16]:
first_document = df["text"].iloc[0]
print("first document is", len(first_document.split()), "words")

first document is 10438 words


In [17]:
for k, v in chunks[0].items():
    if k == "text":
        print("first chunk of first document is", len(v.split()), "words")
    else:
        print(k, v)

first chunk of first document is 98 words
section_url 2210.03945
page_url http://arxiv.org/pdf/2210.03945


In [18]:
for k, v in chunks[1].items():
    if k == "text":
        print("second chunk of first document is", len(v.split()), "words")
    else:
        print(k, v)

second chunk of first document is 102 words
section_url 2210.03945
page_url http://arxiv.org/pdf/2210.03945


<div class="alert alert-block alert-secondary">

**Considerations for switching to a recursive chunker:**

- CPU: Recursive chunking is a CPU-intensive task which is being done in serial, iterating over every row. 
- Latency: Recursive chunking is slower than naive text splitting and is a blocking operation. We need to wait for the chunking to finish before we can start embedding.

</div>

### 3. Generate embeddings from chunks

For our third step, we want to load a good embedding model. 

**Suggested steps to choosing an embedding model:**
1. Visit the [MTEB leaderboard](https://huggingface.co/spaces/mteb/leaderboard) on HuggingFace.
2. Find a model that satisfies the following considerations:
  - Does the model perform well overall and in the task you are interested in?
  - Is the model closed-source or open-source?
    - If it is closed-source:
      - What are the costs, security, and privacy implications?
    - If it is open-source:
      - What are its resource requirements if you want to self-host it?
      - Is it readily available as a service by third-party providers like Anyscale, Fireworks, or Togther AI?

We will use `thenlper/gte-large` model from the [HuggingFace Model Hub](https://huggingface.co/thenlper/gte-large) given it is an open-source model and is available as a service by Anyscale and performs relatively well in the MTEB leaderboard.

In [19]:
svmem = psutil.virtual_memory()

# memory used in GB
memory_used = svmem.total - svmem.available
memory_used_gb_before_model_load = memory_used / (1024**3)
memory_used_gb_before_model_load

5.053916931152344

In [20]:
%%time
model = SentenceTransformer('thenlper/gte-large', device='cpu')

.gitattributes:   0%|          | 0.00/1.52k [00:00<?, ?B/s]

1_Pooling/config.json:   0%|          | 0.00/191 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/67.9k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/619 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/670M [00:00<?, ?B/s]

onnx/config.json:   0%|          | 0.00/632 [00:00<?, ?B/s]

model.onnx:   0%|          | 0.00/1.34G [00:00<?, ?B/s]

onnx/special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

onnx/tokenizer.json:   0%|          | 0.00/712k [00:00<?, ?B/s]

onnx/tokenizer_config.json:   0%|          | 0.00/342 [00:00<?, ?B/s]

onnx/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/670M [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/57.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/712k [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/342 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

modules.json:   0%|          | 0.00/385 [00:00<?, ?B/s]

CPU times: user 2.92 s, sys: 6.25 s, total: 9.17 s
Wall time: 16.5 s


In [21]:
svmem = psutil.virtual_memory()
memory_used = svmem.total - svmem.available
memory_used_gb_after_model_load = memory_used / (1024**3)
memory_used_gb_after_model_load

6.332324981689453

In [22]:
model_memory_usage = memory_used_gb_after_model_load - memory_used_gb_before_model_load
model_memory_usage

1.2784080505371094

Loading the embedding model took around 1 GB of memory.

Let's see how slow it is to generate an embedding.

In [23]:
%%time

embeddings = model.encode([chunk["text"] for chunk in chunks])

KeyboardInterrupt: 

In [None]:
len(chunks)

It takes on the order of a few seconds to embed 8 chunks on our CPU. We will most definitely need a GPU to speed things up.

#### Save embeddings to disk

As a fourth step, we want to store our generated embeddings as a parquet file.

In [None]:
df_output = pd.DataFrame(chunks)

In [None]:
df_output["embeddings"] = embeddings.tolist()

In [None]:
df_output

In [None]:
df_output.to_parquet(DATA_DIR / "small_sample" / "sample-output.parquet")

### 4. Upsert embeddings to vector store

The final step is to upsert the embeddings into a database. We will skip this step for now.

## Migrating the simple pipeline to Ray Data

We now want to migrate our implementation to use Ray Data to drastically scale our pipeline for larger datasets.

### 1. Load documents

Let's start with a first pass conversion of our data pipeline to use Ray Data. 

Instead of `pandas.read_json`, use `ray.data.read_json` to instantiate a `ray.data.Dataset` that will eventually read our file.

In [4]:
ds = ray.data.read_json(DATA_DIR / "small_sample" / "air.jsonl")
type(ds)

2024-03-09 13:58:55,737	INFO worker.py:1569 -- Connecting to existing Ray cluster at address: 10.0.18.223:6379...
2024-03-09 13:58:55,746	INFO worker.py:1744 -- Connected to Ray cluster. View the dashboard at [1m[32mhttps://session-ibknczdgu4wks3dek2647tkxx6.i.anyscaleuserdata.com [39m[22m
2024-03-09 13:58:55,748	INFO packaging.py:358 -- Pushing file package 'gcs://_ray_pkg_ce5950edc27d0d99aea3e3379ec688aa89757f6d.zip' (0.24MiB) to Ray cluster...
2024-03-09 13:58:55,749	INFO packaging.py:371 -- Successfully pushed file package 'gcs://_ray_pkg_ce5950edc27d0d99aea3e3379ec688aa89757f6d.zip'.


ray.data.dataset.Dataset

`ray.data.read_json` returns a `ray.data.Dataset` which is a distributed collection of data. Execution in Ray Data by default is:
- **Lazy**: `Dataset` transformations aren’t executed until you call a consumption operation.
- **Streaming**: `Dataset` transformations are executed in a streaming way, incrementally on the base data, one block at a time.

Accordingly `ray.data.Dataset` will only fetch back some high-level metadata and schema information about the file, but not the actual data.

In [5]:
ds.schema()

Column    Type
------    ----
id        string
source    string
text      string
metadata  struct<primary_category: string, published: string, title: string, updated: string>

[36m(autoscaler +2m50s)[0m Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.


### Under the hood

Ray Data uses Ray tasks to read files in parallel. Each read task reads one or more files and produces one or more output blocks.

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/rag-bootcamp-mar-2024/dataset-read-cropped-2.svg" width="500px">

### 2. Process documents into chunks

Given a `ray.data.Dataset`, we can apply transformations to it. There are two types of transformations:
1. **row-wise transformations**
  - `map`: a 1-to-1 function that is applied to each row in the dataset.
  - `filter`: a 1-to-1 function that is applied to each row in the dataset and filters out rows that don’t satisfy the condition.
  - `flat_map`: a 1-to-many function that is applied to each row in the dataset and then flattens the results into a single dataset.
2. **batch-wise transformations**
  - `map_batches`: a 1-to-n function that is applied to each batch in the dataset.


We chose to make use of `flat_map` to generate a list of chunk rows. `flat_map` will create `FlatMap` tasks which will be scheduled in parallel to process as many rows as possible at once.

In [10]:
def chunk_row(row):
    chunk_size = 128
    words_to_tokens = 1.2
    num_tokens = int(chunk_size // words_to_tokens)

    def get_num_words(text):
        return len(text.split())

    splitter = RecursiveCharacterTextSplitter(
        chunk_size=num_tokens,
        keep_separator=True, 
        length_function=get_num_words, 
        chunk_overlap=0,
    )

    chunks = []
    for chunk in splitter.split_text(row["text"]):
        chunks.append(
            {
                "text": chunk,
                "section_url": row["source"],
                "page_url": row["id"],
            }
        )
    return chunks

#ds = ds.flat_map(chunk_row)

To verify our `flat_map` is working, we can consume a limited number of rows from the dataset.

To do so, we an either call
- `take` to specify a limited number of rows from the dataset.
- `take_batch` to specify a limited number of batches from the dataset.

Here we call `take(2)` to return 2 rows.

In [34]:
ds.take(2)

2024-03-09 13:10:43,635	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=64 for operator ReadJSON to satisfy parallelism at least twice the available number of CPUs (32).
2024-03-09 13:10:43,635	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 64, each read task output is split into 64 smaller blocks.
2024-03-09 13:10:43,636	INFO streaming_executor.py:110 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> TaskPoolMapOperator[FlatMap(chunk_row)] -> LimitOperator[limit=2]
2024-03-09 13:10:43,636	INFO streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-03-09 13:10:43,637	INFO streaming_executor.py:113 -- Tip: For detailed progress reporting, run `r

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[{'text': 'UNDERSTANDING HTML WITH LARGE LANGUAGE\nMODELS\nIzzeddin Gur, Oﬁr Nachum, Yingjie Miao, Mustafa Safdari, Austin Huang\nAakanksha Chowdhery, Sharan Narang, Noah Fiedel, Aleksandra Faust\nGoogle Research\nfizzeddin,ofirnachum,yingjiemiao,msafdari,austinvhuang\nchowdhery,sharannarang,nfiedel,sandrafaust g@google.com\nABSTRACT\nLarge language models (LLMs) have shown exceptional performance on a va-\nriety of natural language tasks. Yet, their capabilities for HTML understanding\n– i.e., parsing the raw HTML of a webpage, with applications to automation of\nweb-based tasks, crawling, and browser-assisted retrieval – have not been fully\nexplored. We contribute HTML understanding models (ﬁne-tuned LLMs) and an\nin-depth analysis of their capabilities under three tasks: (i) Semantic Classiﬁca-',
  'section_url': '2210.03945',
  'page_url': 'http://arxiv.org/pdf/2210.03945'},
 {'text': 'tionof HTML elements, (ii) Description Generation for HTML inputs, and (iii)\nAutonomous Web Nav

### 3. Generate embeddings from chunks

For our third step, we apply the embeddings using `map_batches`, which will be implemented using `MapBatches` tasks scheduled in parallel.

In [35]:
def embed_batch(batch):
    assert isinstance(batch, dict)
    for key in batch.keys():
        assert key in ["text", "section_url", "page_url"]
    for val in batch.values():
        assert isinstance(val, np.ndarray), type(val)

    model = SentenceTransformer('thenlper/gte-large')
    text = batch["text"].tolist()
    embeddings = model.encode(text, batch_size=len(text))
    batch["embeddings"] = embeddings.tolist()
    return batch

ds = ds.map_batches(embed_batch)

#### Save embeddings to disk

For our fourth step, we write our dataset to JSON using `write_json`.

In [39]:
%%time

output_path = DATA_DIR / "small_sample" / "sample-output"
if output_path.exists():
    shutil.rmtree(output_path)

ds.write_json(output_path)

2024-03-09 13:26:33,260	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=64 for operator ReadJSON to satisfy parallelism at least twice the available number of CPUs (32).
2024-03-09 13:26:33,261	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 64, each read task output is split into 64 smaller blocks.
2024-03-09 13:26:33,262	INFO streaming_executor.py:110 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> TaskPoolMapOperator[FlatMap(chunk_row)->MapBatches(embed_batch)->Write]
2024-03-09 13:26:33,262	INFO streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-03-09 13:26:33,263	INFO streaming_executor.py:113 -- Tip: For detailed progress reporting, 

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: b3fd36b91ac59816d50355f690d039b4a9c8ba1703000000 Worker ID: 945214dbeb3b735ad11581b51e8e2de03aaa58b1a3e3c697c29c150c Node ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a Worker IP address: 10.0.44.131 Worker port: 10052 Worker PID: 13013 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.


[33m(raylet, ip=10.0.44.131)[0m [2024-03-09 13:26:48,497 E 2354 2354] (raylet) node_manager.cc:2948: 2 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a, IP: 10.0.44.131) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.0.44.131`
[33m(raylet, ip=10.0.44.131)[0m 
[33m(raylet, ip=10.0.44.131)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: a36eeeba162a1182989c067640f77e892be88aea03000000 Worker ID: 8dc79c2d16857231b5aabf9a4a5d6986eeca80f23a7886de7c079694 Node ID: 17a8cb9889c4716653cf7628f2f6966736f442fa8464c3074f6774d2 Worker IP address: 10.0.18.223 Worker port: 10064 Worker PID: 26821 Worker exit type: SYSTEM_ERROR Worker exit detail: The leased worker has unrecoverable failure. Worker is requested to be destroyed when it is returned. RPC Error message: Socket closed; RPC Error details: 
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 3f784f24f7e5a5db02e4deb997b28263fa30147d03000000 Worker ID: 6ef1398f322e2f222267b9739c3601e6101215152298dcf9998eea55 Node ID: 17a8cb9889c4716653cf7628f2f6966736f442fa8464c3074f6774d2 Worke

[33m(raylet)[0m [2024-03-09 13:26:59,779 E 2996 2996] (raylet) node_manager.cc:2948: 2 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 17a8cb9889c4716653cf7628f2f6966736f442fa8464c3074f6774d2, IP: 10.0.18.223) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.0.18.223`
[33m(raylet)[0m 
[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 9742a0065dc7affb066125c61a519a48bbd29a6303000000 Worker ID: 58a64ad3ec60177dad46fadfd35e711a9a6dd81c205d70a581d2a1fc Node ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a Worker IP address: 10.0.44.131 Worker port: 10047 Worker PID: 12535 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 20871f66953b44813b1cc73dade0fb4fc389deeb0300

[33m(raylet, ip=10.0.44.131)[0m [2024-03-09 13:27:48,499 E 2354 2354] (raylet) node_manager.cc:2948: 11 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a, IP: 10.0.44.131) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.0.44.131`
[33m(raylet, ip=10.0.44.131)[0m 
[33m(raylet, ip=10.0.44.131)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: e97deee0abf46a5c843f91bb74b7462c9f53e5c103000000 Worker ID: 4218ae836d3786056cef00719265d84e3fc495ece1f2d92a72f34080 Node ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a Worker IP address: 10.0.44.131 Worker port: 10072 Worker PID: 14255 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.[32m [repeated 2x across cluster][0m
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: b66c19

[33m(raylet)[0m [2024-03-09 13:27:59,783 E 2996 2996] (raylet) node_manager.cc:2948: 7 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 17a8cb9889c4716653cf7628f2f6966736f442fa8464c3074f6774d2, IP: 10.0.18.223) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.0.18.223`
[33m(raylet)[0m 
[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 9839f68687c53f1310aa223bc4036c9b5247477103000000 Worker ID: 7a49cf63ca083107e538e66ecb393bcd03e8c87ae92dfdf02798e529 Node ID: 17a8cb9889c4716653cf7628f2f6966736f442fa8464c3074f6774d2 Worker IP address: 10.0.18.223 Worker port: 10080 Worker PID: 28230 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 86a4bca0c70e094a4004af8f1e70f90be7c3baeb0300

[33m(raylet, ip=10.0.44.131)[0m [2024-03-09 13:28:48,502 E 2354 2354] (raylet) node_manager.cc:2948: 7 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a, IP: 10.0.44.131) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.0.44.131`
[33m(raylet, ip=10.0.44.131)[0m 
[33m(raylet, ip=10.0.44.131)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 12d9b07cbda429053ef48d7939f18407deaebfa203000000 Worker ID: e3cffc6bb3bbdbca1c231c30a37286eea8a22a00b5bb9b4abba3d544 Node ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a Worker IP address: 10.0.44.131 Worker port: 10101 Worker PID: 16572 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: e3778d63156d48f6d2262b0abc76ca16625434e20300

[33m(raylet)[0m [2024-03-09 13:29:05,300 E 2996 2996] (raylet) node_manager.cc:2948: 6 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 17a8cb9889c4716653cf7628f2f6966736f442fa8464c3074f6774d2, IP: 10.0.18.223) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.0.18.223`
[33m(raylet)[0m 
[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 48450f2c28fbf06dcae9f2273e343d1d55c7cf1703000000 Worker ID: 198c269df8ef87f1d496411ed69b9f40dad92b323f2905b20a197705 Node ID: 17a8cb9889c4716653cf7628f2f6966736f442fa8464c3074f6774d2 Worker IP address: 10.0.18.223 Worker port: 10089 Worker PID: 30360 Worker exit type: SYSTEM_ERROR Worker exit detail: The leased worker has unrecoverable failure. Worker is requested to be destroyed when it is returned. RPC Error message: Socket closed; RPC Error details: [32m [repeated 2x across cluster][0m
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 27b33a2e4f0543239fe1f32b87c3a127b432a9a603000000 Worker ID: cd3e65b62066f9f8de46b6a1c0f81a226d93bf3b1567ee381b92edf9 Node ID: 17a8cb9889c4716653cf7628

[33m(raylet, ip=10.0.44.131)[0m [2024-03-09 13:29:48,504 E 2354 2354] (raylet) node_manager.cc:2948: 9 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a, IP: 10.0.44.131) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.0.44.131`
[33m(raylet, ip=10.0.44.131)[0m 
[33m(raylet, ip=10.0.44.131)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 499b86546f4ea8d104454dd8f9dd2b4aabf6d7a603000000 Worker ID: a097a34fa92b36932c63afc8af97fc041a3f61eb033fd9f02deb46cc Node ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a Worker IP address: 10.0.44.131 Worker port: 10116 Worker PID: 17487 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.[32m [repeated 2x across cluster][0m
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 72efc4

[33m(raylet)[0m [2024-03-09 13:30:05,305 E 2996 2996] (raylet) node_manager.cc:2948: 6 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 17a8cb9889c4716653cf7628f2f6966736f442fa8464c3074f6774d2, IP: 10.0.18.223) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.0.18.223`
[33m(raylet)[0m 
[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 09fda81fc2a942d2b167f43f1ec9624f5d7a7f3203000000 Worker ID: 5be80e9316798d02a0f2a6660a77ef27f074042ef0f72d8c4599220b Node ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a Worker IP address: 10.0.44.131 Worker port: 10117 Worker PID: 17624 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.[32m [repeated 2x across cluster][0m
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: debbc6

[33m(raylet, ip=10.0.44.131)[0m [2024-03-09 13:30:48,506 E 2354 2354] (raylet) node_manager.cc:2948: 7 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a, IP: 10.0.44.131) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.0.44.131`
[33m(raylet, ip=10.0.44.131)[0m 
[33m(raylet, ip=10.0.44.131)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 4b566aa22cea1a7a84ab04d840f622a21f5012af03000000 Worker ID: f0f366332c801b78900dd629c4beea5645e5d4957611297c8175365a Node ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a Worker IP address: 10.0.44.131 Worker port: 10133 Worker PID: 18547 Worker exit type: SYSTEM_ERROR Worker exit detail: The leased worker has unrecoverable failure. Worker is requested to be destroyed when it is returned. RPC Error message: Socket closed; RPC Error details: 
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 7d0bd741dabe6eb6373d0b9df3aab26fda60e36a03000000 Worker ID: 1167de9a0576d2a50289324b7d98b2d40a8c771c0ff283e11a20875d Node ID: 17a8cb9889c4716653cf7628f2f6966736f442fa8464c3074f6774d2 Worke

[33m(raylet)[0m [2024-03-09 13:31:06,608 E 2996 2996] (raylet) node_manager.cc:2948: 10 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 17a8cb9889c4716653cf7628f2f6966736f442fa8464c3074f6774d2, IP: 10.0.18.223) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.0.18.223`
[33m(raylet)[0m 
[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: cda54ce49e4da2dc6009a4b2ddc0cf08a6bb1ad003000000 Worker ID: fb4696d630b5b4a88ea654bf615ce026682c54d3989943750bcf70f7 Node ID: 17a8cb9889c4716653cf7628f2f6966736f442fa8464c3074f6774d2 Worker IP address: 10.0.18.223 Worker port: 10119 Worker PID: 32339 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: cff0304dfe6dc9072d2b09598fcb72774d5266d00300

[33m(raylet, ip=10.0.44.131)[0m [2024-03-09 13:31:48,509 E 2354 2354] (raylet) node_manager.cc:2948: 7 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a, IP: 10.0.44.131) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.0.44.131`
[33m(raylet, ip=10.0.44.131)[0m 
[33m(raylet, ip=10.0.44.131)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 2d3d777bfac1447454573277aa9f06a03b91140003000000 Worker ID: 935d159e029132344e8f240a04735a61b013bd60476d2f9e68f63fde Node ID: 17a8cb9889c4716653cf7628f2f6966736f442fa8464c3074f6774d2 Worker IP address: 10.0.18.223 Worker port: 10128 Worker PID: 32933 Worker exit type: SYSTEM_ERROR Worker exit detail: The leased worker has unrecoverable failure. Worker is requested to be destroyed when it is returned. RPC Error message: Socket closed; RPC Error details: [32m [repeated 2x across cluster][0m
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: efddc842b0b32b5e139421074875527851ce101003000000 Worker ID: f007b6145b1f756e16c2e792bd03bda62b6e851cc48ff47a761e9052 Node ID: 17a8cb9889c4716653cf7628

[33m(raylet)[0m [2024-03-09 13:32:06,611 E 2996 2996] (raylet) node_manager.cc:2948: 5 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 17a8cb9889c4716653cf7628f2f6966736f442fa8464c3074f6774d2, IP: 10.0.18.223) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.0.18.223`
[33m(raylet)[0m 
[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 2cbbde8655007b8b0f83591221cc62247b19645703000000 Worker ID: 1684064fef635f5185b0176a30541828b9937bab9d39f25de55cf8b1 Node ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a Worker IP address: 10.0.44.131 Worker port: 10156 Worker PID: 19981 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 7aa2f8161fd3cdbe2967deaaf600407dabdc5f150300

[33m(raylet, ip=10.0.44.131)[0m [2024-03-09 13:32:48,511 E 2354 2354] (raylet) node_manager.cc:2948: 6 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a, IP: 10.0.44.131) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.0.44.131`
[33m(raylet, ip=10.0.44.131)[0m 
[33m(raylet, ip=10.0.44.131)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: b20e19e60ae0140bd05106ab8517c5b2cb714f3903000000 Worker ID: 8dfb2983bb139f0090d07e40c745055d09d242b4d53f0bdb1e3beeb4 Node ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a Worker IP address: 10.0.44.131 Worker port: 10166 Worker PID: 20674 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.
[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: fd272c63e4ba895681f2528b3b53eab4db9af1fe0300

KeyboardInterrupt: 

We inspect the created JSON output directory. Every write task will create a separate file in the output directory.

In [37]:
!ls -llah {output_path} 

total 8.0K
drwxr-xr-x 2 ray users 6.0K Mar  9 13:10 .
drwxr-xr-x 3 ray users 6.0K Mar  9 13:10 ..


In [38]:
ray.data.read_json(DATA_DIR / "small_sample" / "sample-output").to_pandas()

ValueError: not enough values to unpack (expected 2, got 0)

[33m(raylet, ip=10.0.44.131)[0m [2024-03-09 13:12:48,477 E 2354 2354] (raylet) node_manager.cc:2948: 2 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: d9e5357aad0282ee3a993afcc0ec4c5c9985a2ce3658c5d3bf9d961a, IP: 10.0.44.131) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 10.0.44.131`
[33m(raylet, ip=10.0.44.131)[0m 
[33m(raylet, ip=10.0.44.131)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


### 4. Upsert embeddings to vector store

The final step is to upsert the embeddings into a database. We will skip this step for now.

**Recap**

Here is our entire pipeline:

```python
(
    ray.data.read_json(DATA_DIR / "small_sample" / "sample-input.json")
    .flat_map(chunk_row)
    .map_batches(embed_batch)
    .write_json(DATA_DIR / "small_sample" / "sample-output")
)
```

<div class="alert alert-block alert-info">

### Activity: Use a different embedding model

Re-run the entire data pipeline but this time use a different embedding model `BAAI/bge-large-en-v1.5` which outperforms `thenlper/gte-large` on certain parts of the MTEB leaderboard.

NOTE: make sure to output the results to a different directory.

<details> 

<summary>Click here to see the solution </summary>

```python
def embed_batch(batch):
    # Load the embedding model
    model = SentenceTransformer("BAAI/bge-large-en-v1.5")
    text = batch["text"].tolist()
    embeddings = model.encode(text, batch_size=len(text))
    batch["embeddings"] = embeddings.tolist()
    return batch

(
    ray.data.read_json(DATA_DIR / "small_sample" / "sample-input.json")
    .flat_map(chunk_row)
    .map_batches(embed_batch)
    .write_json(DATA_DIR / "small_sample" / "sample-output-bge")
)

# inspect output
ray.data.read_json(DATA_DIR / "small_sample" / "sample-output-bge").to_pandas()
```

</details>
</div>

In [None]:
# Write your solution here

## Scaling the pipeline with Ray Data

Let's explore how to scale our pipeline to a larger dataset using Ray Data.

<img src="https://anyscale-materials.s3.us-west-2.amazonaws.com/rag-bootcamp-mar-2024/full_scale_embeddings_pipeline.svg" width="1000px">



### Phase 1: Preparing input files

First, we need to prepare our documents by performing the following steps
1. Fetch all the Ray documentation from the web.
2. Parse the web pages to extract the text.
3. Store the text into input files that are ideal for Ray Data.

Consider converting to parquet files which allow for pruning to improve reads:
- When working with column-oriented file formats like parquet, specify which columns you want to read. This might help significantly reduce the memory footprint of the read task.
- Similarly, you can pass in a filter to `ray.data.read_parquet()` (filter pushdown) which is applied at the file scan so only rows that match the filter predicate are returned.

#### 1. Fetch all the Ray documentation from the web.

We can make use of `wget` to crawl the web and download all the webpages under a given domain.

In [None]:
raw_web_pages_dir = DATA_DIR / "full_scale" / "00_raw_web_pages"

If you uncomment the following cell, it will crawl the web pages and save them to the `raw_web_pages_dir` directory.This will take a long time.

In [None]:
# !wget https://docs.ray.io/en/master/ -e robots=off --recursive --page-requisites \
#   --html-extension --convert-links --restrict-file-names=windows \
#   --domains docs.ray.io --no-parent --accept=html --retry-on-http-error=429 \
#   -P {raw_web_pages_dir}

Instead we will fetch a zip file containing the webpages and extract it.

In [None]:
!rm -rf {raw_web_pages_dir}
!wget https://anyscale-materials.s3.us-west-2.amazonaws.com/rag-ray-documentation-html-files/ray_docs_web_pages.zip \
    -P {str(raw_web_pages_dir)}
!ls -ll {raw_web_pages_dir}
!unzip -o {raw_web_pages_dir / "ray_docs_web_pages.zip"} -d {raw_web_pages_dir}

We count the total number of files in the directory.

In [None]:
!ls -R {raw_web_pages_dir} | wc -l

We also take the total size of the raw web pages directory.

In [None]:
!du -sh {raw_web_pages_dir}

Note that this only includes the latest version of the ray documentation. This size would easily be in the gegabytes if we included all versions of the documentation.

#### Parse the web pages to extract the text.

We first read all HTML files in the raw web pages directory into a `ray.data.Dataset`.

In [None]:
ds = ray.data.from_items(
    [{"path": path} for path in raw_web_pages_dir.rglob("*.html") if not path.is_dir()]
)
ds

We then implement a function to extract the text from the HTML files. Given for each HTML file, we extract a single document, we will use `map` to apply the function to each row in the dataset.

In [None]:
def path_to_uri(
    path: str, scheme: str = "https://", domain: str = "docs.ray.io"
) -> str:
    return scheme + domain + str(path).split(domain)[-1]


def extract_document_from_html(row: dict) -> list[dict]:
    """Extract a document from an HTML file."""
    # 1. Request the page and extract the text using BeautifulSoup
    with open(row["path"], "r", encoding="utf-8") as html_file:
        soup = BeautifulSoup(html_file, "html.parser")

    # 2. Create a document object with the text and page_url
    return {
        "text": soup.text,
        "page_url": path_to_uri(row["path"]),
    }


ds = ds.map(extract_document_from_html)

For now we write out the data to a "01_documents" directory

In [None]:
%%time

if (DATA_DIR / "full_scale" / "01_documents").exists():
    shutil.rmtree(DATA_DIR / "full_scale" / "01_documents")
ds.write_json(DATA_DIR / "full_scale" / "01_documents", num_rows_per_file=100)

We inspect the produced documents

In [1]:
ray.data.read_json(DATA_DIR / "small_sample").count()

NameError: name 'ray' is not defined

##### Utilize inherent structure to improve the documents 

Documentation [webpages](https://docs.ray.io/en/latest/rllib/rllib-env.html) are naturally split into sections. We can use this to our advantage by returning our documents as sections. This will facilitate producing semantically coherent chunks. 

<img src="https://images.ctfassets.net/xjan103pcp94/1eFnKmG5xqPIFtPupZ327X/f6152723e18322b90aaa8be5d2d5a6e4/image5.png" >


We re-instantiate the dataset from the HTML files.

In [None]:
ds = ray.data.from_items(
    [{"path": path} for path in raw_web_pages_dir.rglob("*.html") if not path.is_dir()]
)

This time we are producing multiple documents from each HTML file. We will use the `flat_map` method to produce multiple documents from each HTML file.

In [None]:
def extract_sections_from_html(record: dict) -> list[dict]:
    documents = []
    # 1. Request the page and parse it using BeautifulSoup
    with open(record["path"], "r", encoding="utf-8") as html_file:
        soup = BeautifulSoup(html_file, "html.parser")

    url = path_to_uri(record["path"])

    # 2. Find all sections
    sections = soup.find_all("section")
    for section in sections:
        # 3. Extract text from the section but not from the subsections
        section_text = "\n".join(
            [child.text for child in section.children if child.name != "section"]
        )
        # 4. Construct the section url
        section_url = url + "#" + section["id"]
        # 5. Create a document object with the text, source page, source section uri
        documents.append(
            {
                "text": section_text,
                "section_url": section_url,
                "page_url": url,
            }
        )
    return documents


ds = ds.flat_map(extract_sections_from_html)

#### Store the text into input files that are ideal for Ray Data.

The following are good heuristics to keep in mind:
- Avoid reading large (1 GiB or more) binary files.
  - `ray.data.read_*` cannot parallelize reading a single file - i.e., it maps 1 file to 1 read task.
- Avoid too many tiny files (less than 1 MiB).
  - There is a default minimum block size that `ray.data` uses. This means that `ray.data` will need to group together the tiny blocks into larger blocks, which can be expensive.
- Avoid transforming a Dataset where individual rows are large (100 MiB or more).
  - There is a default maximum block size that `ray.data` uses. This means that `ray.data` will need spill the output into multiple blocks, which could lead to OOM errors.

We choose a `num_rows_per_file` of 400 so our produced files are not of a reasonable size given the above heuristics.

In [None]:
%%time
if (DATA_DIR / "full_scale" / "02_sections").exists():
    shutil.rmtree(DATA_DIR / "full_scale" / "02_sections")
ds.write_json(DATA_DIR / "full_scale" / "02_sections", num_rows_per_file=400)

In [None]:
!ls -llh {DATA_DIR / "full_scale" / "02_sections"}

Let's count how many documents we will have after processing the sections.

In [None]:
ray.data.read_json(DATA_DIR / "full_scale" / "02_sections").count()

<div class="alert alert-block alert-secondary">

**Further considerations for creating input files for Ray Data:**

Consider converting to parquet files which allow for pruning to improve reads:
- When working with column-oriented file formats like parquet, you can specify which columns you want to read. This might help significantly reduce the memory footprint of the read task.
- Similarly, you can pass in a filter to `ray.data.read_parquet()` (filter pushdown) which is applied at the file scan so only rows that match the filter predicate are returned.

In our case the bulk of the memory is taken up by the text column. Using parquet files will not significantly help us reduce the memory footprint of the read task.

</div>



### Phase 2: Generating Embeddings

Now that we have our documents, we can proceed to generate embeddings.

#### 1. Load documents
We begin by reading the documents from the "02_sections" directory.

In [1]:
#ds = ray.data.read_json(DATA_DIR / "full_scale" / "02_sections")

ds = ray.data.read_json(DATA_DIR / "small_sample")

#ds.show()

NameError: name 'ray' is not defined

#### Applying chunking as a transformation

We apply our chunking transformation using `flat_map`, which applies a 1-to-many function to each row in the dataset and then flattens the results into a single dataset.

In [13]:
ds = ds

We could have used `map_batches` instead to apply a many-to-many function to each batch of rows in the dataset. However, given our chunking transformation is not vectorized, `map_batches` will not be faster.

Let's run the chunking and count our total number of chunks.

In [16]:
ds.count()



Read progress 0:   0%|          | 0/2 [00:00<?, ?it/s]

9

#### Applying embedding as a transformation

For the embedding part, we will want to run the embedding model on the fastest possible device.
Let's check the available devices.

In [17]:
def get_device():
    if torch.cuda.is_available():
        device = "cuda"
    elif torch.has_mps:
        device = "mps"
    else:
        device = "cpu"
    return device

device = get_device()
device

'cuda'

We want to load the embedding model once and reuse it across multiple transformation tasks.

To do so, we want to use call `map_batches` with **stateful transform** instead of a *stateless transform*. 

This means we create a pool of processes called actors where the model is already loaded in memory.

Each actor will run a `MapBatch` task where:
  - initial state is handled in `__init__`
  - task is invoked using `__call__` method

In [18]:
num_gpus = 2
num_cpus = psutil.cpu_count()

class EmbedBatch:
    def __init__(self):
        self.model = SentenceTransformer("thenlper/gte-large", device=device)

    def __call__(self, batch):
        text = batch["text"].tolist()
        embeddings = self.model.encode(text, batch_size=len(text))
        batch["embeddings"] = embeddings.tolist()
        return batch

ds = ds.map_batches(
    EmbedBatch,
    # Maximum number of actors to launch.
    concurrency=num_gpus if device == "cuda" else num_cpus,
    # Size of batches passed to embeddings actor.
    batch_size=100,
    # 1 GPU for each actor.
    num_gpus=1 if device == "cuda" else 0,
)

#### Writing the embeddings to disk

When writing, we can use the `num_rows_per_file` parameter to control the number of rows per file.

In [19]:
%%time

if (DATA_DIR / "full_scale" / "03_embeddings").exists():
    shutil.rmtree(DATA_DIR / "full_scale" / "03_embeddings")
(
    ds
    .write_json(
        num_rows_per_file=50,
        path=DATA_DIR / "full_scale" / "03_embeddings",
    )
)

2024-03-09 15:35:49,061	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=64 for operator ReadJSON to satisfy parallelism at least twice the available number of CPUs (32).
2024-03-09 15:35:49,062	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 64, each read task output is split into 32 smaller blocks.
2024-03-09 15:35:49,063	INFO streaming_executor.py:110 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> ActorPoolMapOperator[MapBatches(EmbedBatch)] -> TaskPoolMapOperator[Write]
2024-03-09 15:35:49,063	INFO streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-03-09 15:35:49,064	INFO streaming_executor.py:113 -- Tip: For detailed progress reportin

Running 0:   0%|          | 0/2 [00:00<?, ?it/s]

[36m(MapWorker(MapBatches(EmbedBatch)) pid=51617)[0m   return transform_pyarrow.concat(tables)
[36m(MapWorker(MapBatches(EmbedBatch)) pid=51617)[0m Could not construct Arrow block from numpy array; encountered values of unsupported numpy type `17` in column named 'metadata', which cannot be casted to an Arrow data type. Falling back to using pandas block type, which is slower and consumes more memory. For maximum performance, consider applying the following suggestions before ingesting into Ray Data in order to use native Arrow block types:
[36m(MapWorker(MapBatches(EmbedBatch)) pid=51617)[0m - Expand out each key-value pair in the dict column into its own column
[36m(MapWorker(MapBatches(EmbedBatch)) pid=51617)[0m - Replace `None` values with an Arrow supported data type
[36m(MapWorker(MapBatches(EmbedBatch)) pid=51617)[0m 


CPU times: user 337 ms, sys: 52.9 ms, total: 390 ms
Wall time: 8.89 s


##### Inspecting the ray data dashboard

If we take a look at the metrics tab of the ray data dashboard, we can check to see:

- The GPU utilization
    - Ideally, we would like to see the GPU utilization at 100% for the duration of the embedding process
- The time spent on io and network by different tasks

We can then use this information to optimize our pipeline.

##### Inspecting the output

We check to see if the embeddings were written to disk.

In [21]:
!ls -llh {DATA_DIR / "full_scale" / "03_embeddings"}

total 512K
-rw-r--r-- 1 ray users 512K Mar  9 15:35 64_000000_000000.json


### Recap of the pipeline

Here is our entire pipeline so far:

```python
ds = (
    ray.data.read_json(
        DATA_DIR / "full_scale" / "02_sections",
    )
    .flat_map(chunk_row)
    .map_batches(
        EmbedBatch,
        concurrency=num_gpus,
        batch_size=100,
        num_gpus=1,
    )
    .write_json(
        path=DATA_DIR / "full_scale" / "03_embeddings_tuning",
        num_rows_per_file=50,
    )
)
```


<div class="alert alert-block alert-info">

### Activity: Tuning the pipeline

Proceed to tune your pipeline by:
- Changing the batch size on `map_batches` and see what effect it has on the GPU utilization.
- Changing the number of GPUs and see whether it helps to scale the pipeline.

</div>

In [None]:
# Write your solution here

### Upserting embeddings to Pinecone

We will use [Pinecone](https://www.pinecone.io/) to index our document embeddings in a vector store. Pinecone is a fully managed vector database optimized for similarity search and is user-friendly. We chose Pinecone for its ease of use and its free tier, which meets our needs.

Index your document embeddings in Pinecone as follows:


1. Create a Pinecone client.
2. Create a Pinecone index.
3. Load the embeddings from disk.
4. Transform the embeddings into Pinecone’s index format.
5. Upsert the embeddings into the Pinecone index.
6. Query the Pinecone index.

#### 1. Create a Pinecone client 
1. Sign up for a free account at https://www.pinecone.io/ and obtain an API key.

Follow the instructions on the Pinecone website to sign up and obtain an API key.

2. Initialize a Pinecone client with your API key.

Replace YOUR_API_KEY with your actual API key to initialize a Pinecone client.

In [22]:
YOUR_PINECONE_API_KEY = "9386359a-0227-4d5b-80d9-b1bb7600dd08"

In [23]:
pinecone_api_key = os.environ.get("PINECONE_API_KEY", YOUR_PINECONE_API_KEY)
pc = Pinecone(api_key=pinecone_api_key)

Let's list out the available indices in your Pinecone account by running the following command:

In [24]:
pc.list_indexes()

{'indexes': [{'dimension': 1024,
              'host': 'canopy--shanker-index-zhxkhfk.svc.apw5-4e34-81fa.pinecone.io',
              'metric': 'cosine',
              'name': 'canopy--shanker-index',
              'spec': {'serverless': {'cloud': 'aws', 'region': 'us-west-2'}},
              'status': {'ready': True, 'state': 'Ready'}},
             {'dimension': 1024,
              'host': 'canopy--cong-index-zhxkhfk.svc.apw5-4e34-81fa.pinecone.io',
              'metric': 'cosine',
              'name': 'canopy--cong-index',
              'spec': {'serverless': {'cloud': 'aws', 'region': 'us-west-2'}},
              'status': {'ready': True, 'state': 'Ready'}},
             {'dimension': 1024,
              'host': 'canopy--atlas-test-zhxkhfk.svc.apw5-4e34-81fa.pinecone.io',
              'metric': 'cosine',
              'name': 'canopy--atlas-test',
              'spec': {'serverless': {'cloud': 'aws', 'region': 'us-west-2'}},
              'status': {'ready': True, 'state': 'Ready

#### 2. Create a Pinecone index 

##### Choosing an index type
Pinecone offers two types of indexes - [see the docs for more details](https://docs.pinecone.io/docs/indexes): 
- pod-based indices
  -  you choose:
     -  pod type
     -  pod size
     -  number of pods
  - Depending on your choice, you get:
     -  different amounts of storage
     -  higher or lower latency
     -  higher or lower throughput
  - A starter index is free and has a limit of 100,00 vectors.
- serverless indices
  - You don't configure or manage any compute or storage resources.
  - Scales automatically based on usage.
  - You pay only for the amount of data stored and operations performed, with no minimums.


##### Choosing a distance metric
When creating an index, you will need to specify the dimension of the embeddings and the metric you want to use for similarity search.

Pinecone offers these two main metrics:
- Euclidean:
  - The Euclidean distance between two vectors is the square root of the sum of the squared differences between the elements of the vectors.
  - The lower the distance, the more similar the vectors.
- Cosine:
  - The cosine similarity between two vectors is the cosine of the angle between them.
  - Between -1 and 1, where 1 means the vectors are identical, -1 means they are diametrically opposed, and 0 means they are orthogonal.

It is common to choose cosine similarity for high-dimensional embeddings, as it is suffers less from the curse of dimensionality than Euclidean distance. [See this article for more details.](https://www.imaurer.com/which-vector-similarity-metric-should-i-use/) 

Note pinecone also offers dot-product which you can think of as an un-normlized cosine similarity (i.e. not bound between -1 and 1).

##### Configuring the metadata

By default Pinecone will attempt to index all the metadata provided. This can be expensive and slow. We can configure the metadata to only index the fields we are interested in by passing in a `metadata_config` parameter.


##### Implementation

To create a new index in Pinecone use the `create_index` method on the Pinecone client. In case you want to overwrite an existing index, you can use the `delete_index` method to delete the existing index before creating a new one. We implement a `create_index` method that parameterizes the main configuration options we discussed above.

In [25]:
def create_index(
    index_name: str,
    cloud: str,
    region: str,
    metric: str,
    embedding_dimension: int,
    index_type: str,
    **kwargs,
) -> None:
    pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY", YOUR_PINECONE_API_KEY))
    existing_index_names = {index.name for index in pc.list_indexes().indexes}

    if index_name in existing_index_names:
        pc.delete_index(index_name)

    if index_type == "serverless":
        pc.create_index(
            name=index_name,
            dimension=embedding_dimension,
            metric=metric,
            spec=ServerlessSpec(cloud=cloud, region=region),
        )
    elif index_type == "pod":
        pc.create_index(
            name=index_name,
            dimension=embedding_dimension,
            metric=metric,
            spec=PodSpec(
                environment="gcp-starter",
                metadata_config={"indexed": ["source", "source", "id"]},
                **kwargs,
            ),
        )

In [26]:
cloud = "aws"
region = "us-west-2"
metric = "cosine"
index_type = "serverless"  # "serverless" or "pod"
index_name = "shanker-index" # A unique name for the index under your organization
embedding_dimension = 1024  # From the model page of thenlper/gte-large

In [27]:
create_index(
    index_name=index_name,
    cloud=cloud,
    region=region,
    metric=metric,
    index_type=index_type,
    embedding_dimension=embedding_dimension,
)

#### 3. Load the embeddings from disk 

We will load the embeddings from disk using `ray.data.read_json` to initiate a distributed upsert of the embeddings to Pinecone.

In [28]:
ds = ray.data.read_json(DATA_DIR / "full_scale" / "03_embeddings/")
ds

Dataset(
   num_blocks=64,
   num_rows=9,
   schema={
      id: string,
      source: string,
      text: string,
      meta...: struct<primary_category: string, published: string, title: string, updated: string>,
      section_url: string,
      page_url: string,
      embeddings: list<item: double>
   }
)

#### 4. Transform the embeddings into Pinecone index format 

Pinecone requires vectors in a specific format. Construct a list of dictionaries where each dictionary contains the following.

- `id`: a unique identifier for the vector.
- `values`: the embedding vector for the document chunk.
- `metadata`: a dictionary with the document chunk’s metadata, including the original text.

On building an `id` value:
- The `id` should be unique across all the vectors in the index.
- For RAG, Pinecone offers [ID prefixing](https://docs.pinecone.io/docs/manage-rag-documents#use-id-prefixes-to-reference-parent-documents)
  - [ID prefixing]((https://docs.pinecone.io/docs/manage-rag-documents#use-id-prefixes-to-reference-parent-documents)) allows to quickly filter on a common prefix `ID` without having to rely on `metadata` fields.

On building a `metadata` value:
- High cardinality metadata can slow down the indexing process.
- When creating the index, you can configure which metadata fields are indexed and which are not.
  -  This can help to reduce the size of the index and improve the indexing speed.
  -  At the moment this is a feature specific to pod-based indexes.

In [32]:
def convert_to_pinecone_vectors(row):
    row_hash = joblib.hash(row)
    page_name = row["source"].split("/")[-1]
    section_name = row["source"].split("#")[-1]
    return {
        "id": f"{page_name}#{section_name}#{row_hash}", # sample ID prefix
        "values": row["embeddings"],
        "metadata": {
            "section_url": row["source"], # not needed if ID prefix is used
            "page_url": row["id"], # not needed if ID prefix is used
            "text": row["text"], # Perhaps this is stored on a separate storage if metadata will index it
        },
    }


ds = ds.map(convert_to_pinecone_vectors)

We inspect a single row to see the structure of the data.

In [33]:
sample = ds.take_batch(1)

2024-03-09 15:40:58,575	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=64 for operator ReadJSON to satisfy parallelism at least twice the available number of CPUs (32).
2024-03-09 15:40:58,576	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 64, each read task output is split into 64 smaller blocks.
2024-03-09 15:40:58,576	INFO streaming_executor.py:110 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> TaskPoolMapOperator[Map(convert_to_pinecone_vectors)->Map(convert_to_pinecone_vectors)] -> LimitOperator[limit=1]
2024-03-09 15:40:58,577	INFO streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-03-09 15:40:58,577	INFO streaming_executor.py:113

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

2024-03-09 15:40:58,619	ERROR streaming_executor_state.py:453 -- An exception was raised from a task of operator "Map(convert_to_pinecone_vectors)->Map(convert_to_pinecone_vectors)". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.


RayTaskError(AttributeError): [36mray::Map(convert_to_pinecone_vectors)->Map(convert_to_pinecone_vectors)()[39m (pid=29333, ip=10.0.44.131)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
    for data in iter:
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
    yield from self._row_fn(input, ctx)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 232, in transform_fn
    for row in rows:
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 248, in __call__
    for block in blocks:
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
    for data in iter:
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
    yield from self._row_fn(input, ctx)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 233, in transform_fn
    out_row = fn(row)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 119, in fn
    return op_fn(item, *fn_args, **fn_kwargs)
  File "/tmp/ipykernel_49712/2085184142.py", line 3, in convert_to_pinecone_vectors
AttributeError: 'NoneType' object has no attribute 'split'

#### 5. Upsert the embeddings into the Pinecone index. 

Insert the chunks into the Pinecone index by performing these two steps:

1. Instantiate a Pinecone connection object using the client’s Index.
2. Use the `Index.upsert` method to insert the data into the index.

Here are some considerations:
- Avoid creating a new connection object for every transform task. 
  - Make use of stateful transforms to launch a pool of actors with their connection loaded and ready.
- Avoid launching too large of a pool of actors.
  - While pinecone is able to handle a large number of connections, it is still a good practice to limit the number of connections to avoid overwhelming the pinecone server.
- Avoid the batch size being too large.
  - Find the batch size that fits into the allowed limit of the upsert operation.
    - [Max size for an upsert request is 2MB](https://docs.pinecone.io/v1/docs/limits#upserts)
    - [Recommended number of vectors per upsert request is 100](https://docs.pinecone.io/v1/docs/limits#upserts)

<div class="alert alert-box alert-secondary">

Additionally for further control and isolation, you can create namespaces within an index. This is useful when you want to:
- Store different types of data in the same index.
- Store data from different sources in the same index.

For our purposes, we will only use a single namespace. For more details see the [Pinecone documentation](https://docs.pinecone.io/docs/namespaces).

</div>

Let's first determine the ideal batch size. We do so by looking at the statistics of size in MB of the upsert requests. We start by checking if a batch size of 100 (the recommended maximum by pinecone) is too large.

In [31]:
batch_size = 100

def get_size_of_batch(batch):
    size_of_batch_in_bytes = pd.DataFrame(batch).memory_usage(deep=True).sum().sum()
    size_of_batch_in_mb = size_of_batch_in_bytes / 1024**2
    return {"size_in_mb": [size_of_batch_in_mb]}


out = ds.map_batches(get_size_of_batch, batch_size=batch_size).to_pandas()
out["size_in_mb"].describe()

2024-03-09 15:38:12,624	INFO set_read_parallelism.py:115 -- Using autodetected parallelism=64 for operator ReadJSON to satisfy parallelism at least twice the available number of CPUs (32).
2024-03-09 15:38:12,625	INFO set_read_parallelism.py:122 -- To satisfy the requested parallelism of 64, each read task output is split into 64 smaller blocks.
2024-03-09 15:38:12,626	INFO streaming_executor.py:110 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadJSON] -> TaskPoolMapOperator[Map(convert_to_pinecone_vectors)->MapBatches(get_size_of_batch)]
2024-03-09 15:38:12,626	INFO streaming_executor.py:111 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-03-09 15:38:12,627	INFO streaming_executor.py:113 -- Tip: For detailed progres

Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

2024-03-09 15:38:12,784	ERROR streaming_executor_state.py:453 -- An exception was raised from a task of operator "Map(convert_to_pinecone_vectors)->MapBatches(get_size_of_batch)". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.


RayTaskError(AttributeError): [36mray::Map(convert_to_pinecone_vectors)->MapBatches(get_size_of_batch)()[39m (pid=29333, ip=10.0.44.131)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 139, in apply_transform
    iter = transform_fn(iter, ctx)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 288, in __call__
    first = next(block_iter, None)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 371, in __call__
    for data in iter:
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 196, in __call__
    yield from self._row_fn(input, ctx)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 233, in transform_fn
    out_row = fn(row)
  File "/home/ray/anaconda3/lib/python3.10/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 119, in fn
    return op_fn(item, *fn_args, **fn_kwargs)
  File "/tmp/ipykernel_49712/2085184142.py", line 3, in convert_to_pinecone_vectors
AttributeError: 'NoneType' object has no attribute 'split'

Turns out a batch_size of 100 is still well below the 2MB limit that pinecone has set so we proceed with it.

Next, we will determine the number of concurrent connections to use for the upsert. We do so by:
- counting how many batches we need to upload
- reasoning about the latency per batch, how fast we want the upsert and avoiding overloading the network

So we compute `approx_total_batches` as the total number of chunks divided by the batch size.

In [None]:
approx_total_batches = ds.count() // batch_size
approx_total_batches

We then choose a concurrency of 10 actor pools given this means running 13 upserts per connection which is a reasonable wait time.

In [None]:
concurrency = 9
approx_num_upserts_per_connection = approx_total_batches // concurrency
approx_num_upserts_per_connection

Finally we apply the upsert as a stateful transform using `map_batches`

In [None]:
pinecone_namespace = "example-namespace"


class UpsertVectors:
    def __init__(self):
        self.pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY", YOUR_PINECONE_API_KEY))
        self.index = self.pc.Index(index_name)
        self.namespace = pinecone_namespace

    def __call__(self, batch):
        self.index.upsert(
            vectors=[
                {
                    "id": id_,
                    "values": values,
                    "metadata": metadata,
                }
                for id_, values, metadata in zip(
                    batch["id"], batch["values"], batch["metadata"]
                )
            ],
            namespace=self.namespace,
        )
        return batch


ds = ds.map_batches(
    UpsertVectors,
    concurrency=concurrency,
    batch_size=batch_size,
    num_cpus=1,
)

One thing to note is that pinecone offers a simple solution to run [upserts in parallel from a single machine](https://docs.pinecone.io/docs/upsert-data#send-upserts-in-parallel) We are not making use of this feature in this guide, in favor of an even more distributed approach for large scale data.

We finally proceed to "materialize" the upserts by calling `to_pandas` which is a consumption operation triggering the execution of the pipeline.

In [None]:
df_written = ds.to_pandas().drop_duplicates(subset=["id"])

A small note from the cell above is given the id is produced by taking a hash of the chunk, for certain edge cases were the same text is repeated in a document the same id will be produced. Therefore we drop duplicates from the dataframe.

### 2. Verify the index

Ensure the upsert operation is complete by checking the index status and getting the number of vectors in the index.

Pinecone is eventually consistent, so it may take a few seconds for the index to be updated.

In [None]:
import time


def verify_index(index_name: str, num_expected_vectors: int):
    index = pc.Index(index_name)
    stats = index.describe_index_stats()

    while stats.total_vector_count != num_expected_vectors:
        time.sleep(5)
        stats = index.describe_index_stats()


verify_index(index_name=index_name, num_expected_vectors=df_written.shape[0])

### Querying the Pinecone index

Given we have indexed our embeddings, we can now query the index to retrieve the most similar documents to a given query.

In [None]:
query = "What is the default number of replicas for a Ray Serve deployment?"

In [None]:
model = SentenceTransformer('thenlper/gte-large', device=get_device())
query_embedding = model.encode(query).tolist()

In [None]:
index = pc.Index(index_name)
result = index.query(
    vector=query_embedding,
    top_k=5,
    namespace=pinecone_namespace,
)

In [None]:
result["matches"]

If want to include the metadata in the result, we can use the following code:

In [None]:
result = index.query(
    vector=query_embedding, top_k=5, include_metadata=True, namespace=pinecone_namespace
)

In [None]:
result["matches"]

We can additionally introduce a filter on score to only return results with a score above a certain threshold based on the fetched results.

In [None]:
scores = [match["score"] for match in result["matches"]]
scores

In [None]:
score_threshold = 0.93  # determined based on data distribution
matches_above_threshold = [
    match for match in result["matches"] if match["score"] > score_threshold
]
len(matches_above_threshold), len(result["matches"])

###  Understanding Pinecone query pricing

Pinecone measures usage of a query in terms of read units. Read units are priced based on the chosen cloud (see [pinecone pricing page](https://www.pinecone.io/pricing/) for more details.)

More specifically, the number of read units used by a query depends on:
- Record count: 
  - the number of records in your namespace
- Record size: 
  - the dimension of the vector you use
  - whether you are retrieving the vector or its metadata or just fetching back the ID

Read units have good scaling properties as we will see below.

See the [documentation page](https://docs.pinecone.io/docs/understanding-cost#query ) for more details

Let's run a sample query to inspect the recorded usage.

In [None]:
response = index.query(
    vector=query_embedding,
    top_k=5,
    namespace=pinecone_namespace,
)

We can inspect `usage` to view how much this query cost us in terms of units. 

In [None]:
response["usage"]

We can see we consumed 5 Read Units (RUs) running the query with top_k = 5. What is cool, is this is the same cost for running with a top_k of 100. Think of this as an index scan is of complexity O(1) and the cost of the query is the cost of fetching the results.

In [None]:
response = index.query(
    vector=query_embedding,
    top_k=100,
    namespace=pinecone_namespace,
)
response["usage"]

Running with `include_metadata=True` however will increase the cost of the query. 

For top_k = 1-10, this incurs an additional 1 RU for fetching the metadata. For top_k = 11-20, this incurs an additional 2 RU for fetching the metadata and so on.

In [None]:
response = index.query(
    vector=query_embedding,
    top_k=1,
    namespace=pinecone_namespace,
    include_metadata=True,
)
response["usage"]

In [None]:
response = index.query(
    vector=query_embedding,
    top_k=10,
    namespace=pinecone_namespace,
    include_metadata=True,
)
response["usage"]

In [None]:
response = index.query(
    vector=query_embedding,
    top_k=11,
    namespace=pinecone_namespace,
    include_metadata=True,
)
response["usage"]

In [None]:
response = index.query(
    vector=query_embedding,
    top_k=20,
    namespace=pinecone_namespace,
    include_metadata=True,
)
response["usage"]