In [15]:
from pathlib import Path
from rag.data import extract_sections
from functools import partial
import ray
import langchain
from langchain.document_loaders import ReadTheDocsLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
import numpy as np
from ray.data import ActorPoolStrategy
import os
import psycopg
from pgvector.psycopg import register_vector


In [3]:
EFS_DIR = 'ray-data'

In [4]:
DOCS_DIR = Path(EFS_DIR, "docs.ray.io/en/master/")
ds = ray.data.from_items([{"path": path} for path in DOCS_DIR.rglob("*.html") if not path.is_dir()])
print(f"{ds.count()} documents")

2024-10-02 20:39:26,051	INFO worker.py:1601 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2024-10-02 20:39:26,055	INFO worker.py:1786 -- Connected to Ray cluster.


3609 documents


In [5]:
ds

MaterializedDataset(
   num_blocks=200,
   num_rows=3609,
   schema={
      path: extension<ray.data.arrow_pickled_object<ArrowPythonObjectType>>
   }
)

In [6]:
sample_html_fp = Path(EFS_DIR, "docs.ray.io/en/master/rllib/rllib-env.html")
extract_sections({"path": sample_html_fp})[0]

{'source': 'https://docs.ray.io/en/master/rllib/rllib-env.html#environments',
 'text': '\nEnvironments#\nRLlib works with several different types of environments, including Farama-Foundation Gymnasium, user-defined, multi-agent, and also batched environments.\nTip\nNot all environments work with all algorithms. See the algorithm overview for more information.\n'}

In [7]:
sections_ds = ds.flat_map(extract_sections)
sections = sections_ds.take_all()
print(len(sections))

2024-10-02 20:39:26,971	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-02_20-38-53_197114_69991/logs/ray-data
2024-10-02 20:39:26,972	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_sections)]


Running 0: 0.00 row [00:00, ? row/s]

- FlatMap(extract_sections) 1: 0.00 row [00:00, ? row/s]

6290


In [8]:
chunk_size = 300
chunk_overlap = 50

text_splitter = RecursiveCharacterTextSplitter(separators = ["\n\n", "\n", " ", ""],chunk_size = chunk_size, chunk_overlap = chunk_overlap, length_function = len)

# chunk a sample section

sample_section = sections_ds.take(1)[0]
chunks = text_splitter.create_documents(texts = [sample_section['text']], metadatas = [{"source" : sample_section['source']}])
print(chunks[0])

2024-10-02 20:39:55,599	INFO dataset.py:2416 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-10-02 20:39:55,602	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-02_20-38-53_197114_69991/logs/ray-data
2024-10-02 20:39:55,602	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_sections)] -> LimitOperator[limit=1]


Running 0: 0.00 row [00:00, ? row/s]

- FlatMap(extract_sections) 1: 0.00 row [00:00, ? row/s]

- limit=1 2: 0.00 row [00:00, ? row/s]

page_content='Python SDK Overview#\nThe Ray Jobs Python SDK is the recommended way to submit jobs programmatically. Jump to the API Reference, or continue reading for a quick overview.' metadata={'source': 'https://docs.ray.io/en/master/cluster/running-applications/job-submission/sdk.html#python-sdk-overview'}


In [9]:
def chunk_section(section, chunk_size, chunk_overlap):
    splitter = RecursiveCharacterTextSplitter(separators = ["\n\n", "\n", " ", ""], chunk_size = chunk_size, chunk_overlap = chunk_overlap, length_function = len)
    docs = splitter.create_documents(texts = [section['text']], metadatas = [{'source': section['source']}])
    return [{'text' : doc.page_content, 'source' : doc.metadata['source']} for doc in docs]


#scaling up chunking
chunks_ds = sections_ds.flat_map(partial(chunk_section, chunk_size = chunk_size, chunk_overlap = chunk_overlap))
print(f"{chunks_ds.count()} chunks")
chunks_ds.show(1)

2024-10-02 20:39:56,534	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-02_20-38-53_197114_69991/logs/ray-data
2024-10-02 20:39:56,535	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_sections)->FlatMap(partial)]


Running 0: 0.00 row [00:00, ? row/s]

- FlatMap(extract_sections)->FlatMap(partial) 1: 0.00 row [00:00, ? row/s]

2024-10-02 20:40:26,660	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-02_20-38-53_197114_69991/logs/ray-data
2024-10-02 20:40:26,661	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_sections)->FlatMap(partial)] -> LimitOperator[limit=1]


32086 chunks


Running 0: 0.00 row [00:00, ? row/s]

- FlatMap(extract_sections)->FlatMap(partial) 1: 0.00 row [00:00, ? row/s]

- limit=1 2: 0.00 row [00:00, ? row/s]

{'text': 'Python SDK Overview#\nThe Ray Jobs Python SDK is the recommended way to submit jobs programmatically. Jump to the API Reference, or continue reading for a quick overview.', 'source': 'https://docs.ray.io/en/master/cluster/running-applications/job-submission/sdk.html#python-sdk-overview'}


In [10]:
class Embeddings:

    def __init__(self, model_name):
        if model_name == "text-embedding-ada-002":
            self.model = OpenAIEmbeddings(model = model_name, openai_api_base=os.environ["OPENAI_API_BASE"], openai_api_key=os.environ["OPENAI_API_KEY"])

        else:
            self.model = HuggingFaceEmbeddings(model_name = model_name, model_kwargs={'device' : 'mps'}, encode_kwargs = {'device' : 'mps', 'batch_size' : 100})

    def __call__(self, batch):
        embeddings = self.model.embed_documents(batch["text"])

        return {"text": batch["text"], "source": batch["source"], "embeddings": embeddings}

In [11]:
embedding_model_name = "thenlper/gte-base"
embedded_chunks = chunks_ds.map_batches(Embeddings, fn_constructor_kwargs={'model_name' : embedding_model_name}, batch_size = 100, num_cpus = 1, num_gpus=0, concurrency =  2)

In [12]:
print(embedded_chunks.show(1))

2024-10-02 20:40:27,639	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-02_20-38-53_197114_69991/logs/ray-data
2024-10-02 20:40:27,640	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> ActorPoolMapOperator[FlatMap(extract_sections)->FlatMap(partial)->MapBatches(Embeddings)] -> LimitOperator[limit=1]


Running 0: 0.00 row [00:00, ? row/s]

- FlatMap(extract_sections)->FlatMap(partial)->MapBatches(Embeddings) 1: 0.00 row [00:00, ? row/s]

- limit=1 2: 0.00 row [00:00, ? row/s]



{'text': 'ray.job_submission.JobType.endswith#\nJobType.endswith(suffix[, start[, end]]) â†’ bool#\nReturn True if S ends with the specified suffix, False otherwise.\nWith optional start, test S beginning at that position.\nWith optional end, stop comparing S at that position.', 'source': 'https://docs.ray.io/en/master/cluster/running-applications/job-submission/doc/ray.job_submission.JobType.endswith.html#ray-job-submission-jobtype-endswith', 'embeddings': [0.0066557652316987514, -0.02076447755098343, -0.007583048194646835, 0.015573485754430294, 0.028178909793496132, 0.038403771817684174, 0.030573192983865738, -0.005029419902712107, -0.031879961490631104, -0.07003326714038849, -0.04981723055243492, 0.028296630829572678, -0.05013647675514221, 0.0525132492184639, 0.008815379813313484, 0.06466109305620193, 0.04170697554945946, -0.005238746292889118, 0.0225755013525486, -0.04484710097312927, 0.007457794155925512, 0.00285573722794652, -0.008171087130904198, 0.017116604372859, 0.03693402558



In [16]:
class StoreResults:
    def __call__(self, batch):
        with psycopg.connect(os.environ["DB_CONNECTION_STRING"]) as conn:
            register_vector(conn)
            with conn.cursor() as cur:
                for text, source, embedding in zip(batch["text"], batch["source"], batch["embeddings"]):
                    cur.execute("INSERT INTO document (text, source, embedding) VALUES (%s, %s, %s)", (text, source, embedding,),)
        return {}

In [17]:
embedded_chunks.map_batches(StoreResults, batch_size = 128, concurrency = 2, num_cpus = 1, num_gpus = 0).count()

2024-10-02 20:49:29,752	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-10-02_20-38-53_197114_69991/logs/ray-data
2024-10-02 20:49:29,764	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> ActorPoolMapOperator[FlatMap(extract_sections)->FlatMap(partial)->MapBatches(Embeddings)] -> ActorPoolMapOperator[MapBatches(StoreResults)]


Running 0: 0.00 row [00:00, ? row/s]

- FlatMap(extract_sections)->FlatMap(partial)->MapBatches(Embeddings) 1: 0.00 row [00:00, ? row/s]

- MapBatches(StoreResults) 2: 0.00 row [00:00, ? row/s]

2024-10-02 20:49:49,536	ERROR streaming_executor_state.py:469 -- An exception was raised from a task of operator "MapBatches(StoreResults)". Dataset execution will now abort. To ignore this exception and continue, set DataContext.max_errored_blocks.
2024-10-02 20:49:49,553	ERROR exceptions.py:63 -- Exception occurred in user code, with the abbreviated stack trace below. By default, the Ray Data internal stack trace is omitted from stdout, and only written to the Ray Data log files at /tmp/ray/session_2024-10-02_20-38-53_197114_69991/logs/ray-data. To output the full stack trace to stdout, set `DataContext.log_internal_stack_trace_to_stdout` to True.


RayTaskError(UserCodeException): [36mray::MapBatches(StoreResults)()[39m (pid=72847, ip=127.0.0.1, actor_id=8b66d4632abb07af7e7f770a01000000, repr=MapWorker(MapBatches(StoreResults)))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/vipulsarode/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/util.py", line 78, in __call__
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/vipulsarode/anaconda3/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Users/vipulsarode/anaconda3/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/Users/vipulsarode/anaconda3/lib/python3.11/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/pz/bcj1fwv130d2771y76qjg5780000gn/T/ipykernel_70165/2009609690.py", line 3, in __call__
  File "<frozen os>", line 679, in __getitem__
KeyError: 'DB_CONNECTION_STRING'

The above exception was the direct cause of the following exception:

[36mray::MapBatches(StoreResults)()[39m (pid=72847, ip=127.0.0.1, actor_id=8b66d4632abb07af7e7f770a01000000, repr=MapWorker(MapBatches(StoreResults)))
  File "/Users/vipulsarode/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/actor_pool_map_operator.py", line 371, in submit
    yield from _map_task(
  File "/Users/vipulsarode/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_operator.py", line 461, in _map_task
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/Users/vipulsarode/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 392, in __call__
    for data in iter:
  File "/Users/vipulsarode/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 134, in _udf_timed_iter
    output = next(input)
             ^^^^^^^^^^^
  File "/Users/vipulsarode/anaconda3/lib/python3.11/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 236, in __call__
    yield from self._batch_fn(input, ctx)
  File "/Users/vipulsarode/anaconda3/lib/python3.11/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 282, in transform_fn
    res = fn(batch)
          ^^^^^^^^^
  File "/Users/vipulsarode/anaconda3/lib/python3.11/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 186, in fn
    _handle_debugger_exception(e)
  File "/Users/vipulsarode/anaconda3/lib/python3.11/site-packages/ray/data/_internal/planner/plan_udf_map_op.py", line 210, in _handle_debugger_exception
    raise UserCodeException() from e
ray.exceptions.UserCodeException