In [1]:
import os
import ray
import sys
from pathlib import Path
from bs4 import BeautifulSoup, NavigableString

In [2]:
import sys; sys.path.append("..")
import warnings; warnings.filterwarnings("ignore")
from dotenv import load_dotenv; load_dotenv()
%load_ext autoreload
%autoreload 2

# Ray setup

In [3]:
ray.init(runtime_env={
    "env_vars": {
        "OPENAI_API_BASE": os.environ["OPENAI_API_BASE"],
        "OPENAI_API_KEY": os.environ["OPENAI_API_KEY"], 
        "DB_CONNECTION_STRING": os.environ["DB_CONNECTION_STRING"],
    },
    "working_dir": './'
})

2024-04-27 04:26:56,007	INFO worker.py:1749 -- Started a local Ray instance.
2024-04-27 04:27:20,281	INFO packaging.py:530 -- Creating a file package for local directory './'.


RuntimeEnvSetupError: Failed to set up runtime environment.
Failed to upload working_dir ./ to the Ray cluster: Package size (576.67MiB) exceeds the maximum size of 500.00MiB. You can exclude large files using the 'excludes' option to the runtime_env or provide a remote URI of a zip file using protocols such as 's3://', 'https://' and so on, refer to https://docs.ray.io/en/latest/ray-core/handling-dependencies.html#api-reference.

# Build Ray Materialized Dataset

In [None]:
EFS_DIR = './docs'
DOCS_DIR = Path(EFS_DIR, 'docs.ray.io/en/master/')

ds = ray.data.from_items([{'path': path} for path in DOCS_DIR.rglob('*.html') if not path.is_dir()])

print(f"{ds.count()} documents")

2024-04-15 18:58:58,166	INFO worker.py:1743 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


3042 documents


[33m(raylet)[0m [2024-04-16 02:37:13,337 E 9002 38730532] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2024-04-15_18-58-53_847573_64993 is over 95% full, available space: 12458246144; capacity: 250790436864. Object creation will fail if spilling is required.
[33m(raylet)[0m [2024-04-16 02:37:34,306 E 9002 38730532] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2024-04-15_18-58-53_847573_64993 is over 95% full, available space: 12503347200; capacity: 250790436864. Object creation will fail if spilling is required.
[33m(raylet)[0m [2024-04-16 02:37:44,362 E 9002 38730532] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2024-04-15_18-58-53_847573_64993 is over 95% full, available space: 12456562688; capacity: 250790436864. Object creation will fail if spilling is required.
[33m(raylet)[0m [2024-04-16 02:37:54,436 E 9002 38730532] (raylet) file_system_monitor.cc:111: /tmp/ray/session_2024-04-15_18-58-53_847573_64993 is over 95% full, available space: 12456546

# Data extraction

In [9]:
def extract_text_from_section(section):
    texts = []
    for elem in section.children:
        if isinstance(elem, NavigableString):
            if elem.strip():
                texts.append(elem.strip())
        elif elem.name == "section":
            continue
        else:
            texts.append(elem.get_text().strip())
    return "\n".join(texts)

def extract_sections(record):
    with open(record["path"], "r", encoding="utf-8") as html_file:
        soup = BeautifulSoup(html_file, "html.parser")
    sections = soup.find_all("section")
    section_list = []
    for section in sections:
        section_id = section.get("id")
        section_text = extract_text_from_section(section)
        if section_id:
            uri = path_to_uri(path=record["path"])
            section_list.append({"source": f"{uri}#{section_id}", "text": section_text})
    return section_list

def path_to_uri(path, scheme="https://", domain="docs.ray.io"):
    return scheme + domain + str(path).split(domain)[-1]

In [11]:
sections_ds = ds.flat_map(extract_sections)
sections = sections_ds.take_all()
print (len(sections))

2024-04-15 19:19:37,023	INFO streaming_executor.py:115 -- Starting execution of Dataset. Full log is in /tmp/ray/session_2024-04-15_18-58-53_847573_64993/logs/ray-data.log
2024-04-15 19:19:37,025	INFO streaming_executor.py:116 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_sections)]



5654


In [13]:
from langchain.document_loaders import ReadTheDocsLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

chunk_size = 300
chunk_overlap = 50
text_splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", " ", ""],
    chunk_size=chunk_size,
    chunk_overlap=chunk_overlap,
    length_function=len
)

# chunk a sample section
sample_section = sections_ds.take(1)[0]
chunks = text_splitter.create_documents(texts=[sample_section["text"]],
                                        metadatas=[{"source": sample_section["source"]}])

2024-04-15 21:31:24,365	INFO streaming_executor.py:115 -- Starting execution of Dataset. Full log is in /tmp/ray/session_2024-04-15_18-58-53_847573_64993/logs/ray-data.log
2024-04-15 21:31:24,367	INFO streaming_executor.py:116 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_sections)] -> LimitOperator[limit=1]



In [20]:
from functools import partial

def chunk_section(section, chunk_size, chunk_overlap):
    text_splitter = RecursiveCharacterTextSplitter(
        separators=["\n\n", "\n", " ", ""],
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len
    )
    chunks = text_splitter.create_documents(
        texts=[section["text"]],
        metadatas=[{"source": section["source"]}]
    )
    return [{"text": chunk.page_content, "source": chunk.metadata["source"]} for chunk in chunks]

# scale chunking
chunks_ds = sections_ds.flat_map(partial(
    chunk_section,
    chunk_size=chunk_size,
    chunk_overlap=chunk_overlap
))

print(f"{chunks_ds.count()} chunks")
chunks_ds.show(1)

2024-04-15 22:18:17,095	INFO streaming_executor.py:115 -- Starting execution of Dataset. Full log is in /tmp/ray/session_2024-04-15_18-58-53_847573_64993/logs/ray-data.log
2024-04-15 22:18:17,096	INFO streaming_executor.py:116 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[FlatMap(extract_sections)->FlatMap(partial)] -> LimitOperator[limit=1]



3042 chunks
{'text': 'Quickstart using the Ray Jobs CLI#\nThis guide walks through the Ray Jobs CLI commands available for submitting and interacting with a Ray Job.\nTo use the Jobs API programmatically with a Python SDK instead of a CLI, see Python SDK Overview.', 'source': 'https://docs.ray.io/en/master/cluster/running-applications/job-submission/quickstart.html#quickstart-using-the-ray-jobs-cli'}


In [22]:
import numpy as np
from ray.data import ActorPoolStrategy
from langchain.embeddings import OpenAIEmbeddings
from langchain.embeddings.huggingface import HuggingFaceBgeEmbeddings

class EmbedChunks:
    def __init__(self, model_name) -> None:
        if model_name == 'text-embedding-ada-002':
            self.embedding_model = OpenAIEmbeddings(
                model=model_name,
                openai_api_base=os.environ['OPENAI_API_BASE'],
                openai_api_key=os.environ['OPENAI_API_KEY']
            )
        else:
            self.embedding_model = HuggingFaceBgeEmbeddings(
                model_name=model_name,
                model_kwargs={'device': 'cpu'},
                encode_kwargs={'device': 'cpu', 'batch_size': 100}
            )
    def __call__(self, batch):
        embeddings = self.embedding_model.embed_documents(batch['text'])
        return {'text': batch['text'], 'source': batch['source'], 'embeddings': embeddings}

In [25]:
# embed chunks
embedding_model_name = 'thenlper/gte-base'
embedded_chunks = chunks_ds.map_batches(
    EmbedChunks,
    fn_constructor_kwargs={'model_name': embedding_model_name},
    batch_size=100,
    concurrency=2
)

In [None]:
class StoreResults:
    def __call__(self, batch):
        with psycopg.connect(os.environ['DB_CONNECTION_STRING']) as conn:
            register_vector(conn)
            with conn.cursor() as cur:
                for text, source, embedding in zip (batch['text'], batch['source'], batch['embeddings']):
                    cur.execute("INSERT INTO document (text, source, embedding)")