# 1 - Chunker

In [68]:
import os
import pickle
import bs4

from typing import List
from tqdm import tqdm
from collections import defaultdict

import langchain
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import OpenAIEmbeddings
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
from langchain.vectorstores import Chroma

from dotenv import load_dotenv

load_dotenv()

True

In [3]:
SOUP_OUT_DIR = os.getenv("SOUP_OUT_DIR")
BASE_URL = os.getenv("BASE_URL")

class Dataset:
    """
    Dataset contains the mapping between source (i.e., the website) and 
    its corresponding chunks of text, extracted through the Chunker pipeline.
    """
    def __new__(cls, *args, **kwargs):
        return super().__new__(cls)
    
    def __init__(self):
        self.data = defaultdict(list)

    def __len__(self):
        return len(self.data)
    
    def __getstate__(self):
        return self.__dict__
    
    def __setstate__(self, data):
        self.__dict__ = data

    def add_data(self, source: str, chunks: List[langchain.schema.document.Document]):
        if not isinstance(source, str) and isinstance(chunks, list):
            raise TypeError("Make sure 'source' and 'chunks' are in the right format")
        self.data[source].extend(chunks)
    
    def get_chunks(self, source: str):
        return self.data.get(source, None)


def load_soup(dir_path: str):
    """
    Yields one soup at a time using a generator.

    Args:
        dir_path (str): Path to the directory containing the pickled soups
    """

    for file in os.listdir(dir_path):
        if file.endswith(".pkl"):
            with open(os.path.abspath(os.path.join(dir_path, file)), "rb") as f:
                yield pickle.load(f)

def extract_url(soup: bs4.BeautifulSoup):
    """
    Extracts the URL that contains the information to be extracted.

    Args:
        soup (bs4.BeautifulSoup): bs4 object

    Returns:
        str | None: string with desired URL, None if not found
    """
    for link in soup.find_all("link"):
        href = link.attrs.get("href")
        if href and BASE_URL and BASE_URL in href:
            return href
    return

def extract_paragraph_text(paragraph: bs4.element.Tag):
    """
    Extracts the textual information from a specific <p> tag in a given HTML page.

    Args:
        paragraph (bs4.element.Tag): bs4 object extracted from the <p> tag

    Returns:
        str | None: Extracted string or None
    """
    if isinstance(paragraph, bs4.element.Tag):
        return paragraph.get_text()
    return

def chunk_text(
        text_splitter: RecursiveCharacterTextSplitter, 
        paragraph_text: str, 
        source: str
    ):
    """
    Partitions a given paragraph text into specific chunks.

    Args:
        text_splitter (RecursiveCharacterTextSplitter): Text splitter object provided by Langchain
        paragraph_text (str): Text extracted from a <p> tag
        source (str): URL extracted from a given soup object

    Returns:
        List[Dict[str, langchain.schema.document.Document | str]]: Inverted index mapping source with a list of chunks
    """

    chunks = text_splitter.create_documents(
        texts=[paragraph_text], 
        metadatas=[{"source": source}]
    )
    return [{"text": chunk.page_content, "source": source} for chunk in chunks]

In [25]:
# Instantiate text splitter
chunk_size = 300
chunk_overlap = 50
text_splitter = RecursiveCharacterTextSplitter(
    separators=["\n\n", "\n", " ", ""],
    chunk_size=chunk_size,
    chunk_overlap=chunk_overlap,
    length_function=len,
)

# Instantiate Dataset
dataset = Dataset()

# Iterate over processed soups and extract the relevant data
i = 0
for soup in tqdm(load_soup(SOUP_OUT_DIR)):
    url = extract_url(soup)
    for p in soup.find_all("p"):
        text = extract_paragraph_text(p)
        
        if url and text:
            chunks = text_splitter.create_documents(
                texts=[text], 
                metadatas=[{"source": url}]
            )

            if chunks:
                dataset.add_data(url, chunks)

64429it [40:25, 26.57it/s]


In [26]:
len(dataset)

4491

In [29]:
out_path = os.getenv("CHUNK_OUT_DIR")

with open(out_path + "kworld_chunked_dataset.pkl", "wb") as pickle_file:
    pickle.dump(dataset, pickle_file)

## Load persisted Dataset

In [6]:
data_path = "../data/chunked/kworld_chunked_dataset.pkl"

with open(data_path, "rb") as pickle_file:
  dataset = pickle.load(pickle_file)

In [57]:
flattened_dataset = [chunk for _, chunks in dataset.data.items() for chunk in chunks]
len(flattened_dataset)

2727267

In [70]:
embedding_model_name = "thenlper/gte-base"
embedder = HuggingFaceEmbeddings(
    model_name=embedding_model_name,
    # model_kwargs={"device": "cuda"},
    # encode_kwargs={"device": "cuda", "batch_size": 100}
)

vectordb = Chroma.from_documents(
  flattened_dataset,
  embedding=embedder,
  persist_directory='../data/vector_db/'
)
vectordb.persist()

In [1]:
qa_chain = RetrievalQA.from_chain_type(
    llm=OpenAI(),
    retriever=vectordb.as_retriever(search_kwargs={'k': 7}),
    return_source_documents=True
)

query = "What services does KPMG offer?"
#query = "Does KPMG have offices in Canada?"
#query = "Did FB Barcelona win yesterday?"

result = qa_chain({'query': query})
print(result['result'])

NameError: name 'RetrievalQA' is not defined