In [1]:
from USOSDataLoader import USOSDataLoader
from pinecone import Pinecone, ServerlessSpec
from langchain_pinecone import PineconeVectorStore
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_core.documents.base import Document
from langchain.prompts import PromptTemplate
from langchain_groq import ChatGroq
from dotenv import load_dotenv, find_dotenv
from pydantic import BaseModel, Field
import re
import os
import time
from uuid import uuid4
import logging
import math
from tqdm import tqdm

load_dotenv(find_dotenv())

USER_AGENT environment variable not set, consider setting it to identify your requests.


True

In [2]:
documents = USOSDataLoader().get_documents()

Fetching links...


Fetching pdf links...: 100%|██████████| 34/34 [00:37<00:00,  1.10s/it]


Loading web data...


Loading pdf data...: 100%|██████████| 47/47 [00:32<00:00,  1.45it/s]
Preprocessing documents...: 100%|██████████| 81/81 [00:00<00:00, 3964.00it/s]


In [9]:
logger = logging.getLogger(__name__)
logging.basicConfig(filename="failed_chunks.log", level=logging.ERROR)

embeddings = HuggingFaceEmbeddings(model_name="jinaai/jina-embeddings-v3",
                                   model_kwargs={"trust_remote_code": True},
                                   encode_kwargs={"task": "retrieval.query"})

pc = Pinecone(os.environ.get("PINECONE_API_KEY"))

In [10]:
index_name = "usos-bot-questions"
existing_indexes = [index_info["name"] for index_info in pc.list_indexes()]

if index_name not in existing_indexes:
    pc.create_index(
        name=index_name,
        dimension=1024,
        metric="cosine",
        spec=ServerlessSpec(cloud="aws", region="us-east-1"),
    )
    while not pc.describe_index(index_name).status["ready"]:
        time.sleep(1)

index = pc.Index(index_name)

vectorstore = PineconeVectorStore(index=index, embedding=embeddings)

In [14]:
class QuestionList(BaseModel):
    question_list: list[str] = Field(..., title="List of questions generated for the document or fragment")


def clean_and_filter_questions(questions: list[str]) -> list[str]:
    cleaned_questions = []
    for question in questions:
        cleaned_question = re.sub(r'^\d+\.\s*', '', question.strip())
        if cleaned_question.endswith('?'):
            cleaned_questions.append(cleaned_question)
    return cleaned_questions


def llm_chain(llm, text, n_questions, prompt):
    chain = prompt | llm.with_structured_output(QuestionList)
    input_data = {"context": text, "num_questions": n_questions}
    try:
        result = chain.invoke(input_data)
        return result
    except Exception as e:
        match = re.search(r"\d+m\d+\.\d+s|\d+.\ds|\d+m", e.message)
        if match:
            print(e.message)
            raise Exception(e)
        else:
            return None


def generate_questions(text: str, n_questions, api_key=None) -> list[str]:
    if api_key:
        llm = ChatGroq(model="llama-3.1-70b-versatile", temperature=0, api_key=api_key)
    else:    
        llm = ChatGroq(model="llama-3.1-70b-versatile", temperature=0)
    prompt = PromptTemplate(
        input_variables=["context", "num_questions"],
        template="Using the context data: {context}\n\nGenerate a list of at least {num_questions} "
                 "possible questions that can be asked about this context. Ensure the questions are "
                 "directly answerable within the context and do not include any answers or headers. "
                 "The questions should be in the same language as the context. "
                 "Separate the questions with a new line character."
    )

    prompt_secondary = PromptTemplate(
        input_variables=["context", "num_questions"],
        template="Using the context data: {context}\n\nGenerate a list of at least {num_questions} "
                 "possible questions that can be asked about this context. Ensure the questions are "
                 "directly answerable within the context and do not include any answers or headers. "
                 "The questions should be in the Polish language. "
                 "Separate the questions with a new line character."
    )


    result = llm_chain(llm, text, n_questions, prompt) or llm_chain(llm, text, n_questions, prompt_secondary)
    if result is None:
        logger.error(f"FAILED CHUNK: {text}")
        return []
        
    questions = result.question_list

    filtered_questions = clean_and_filter_questions(questions)
    return list(set(filtered_questions))


def split_document(document: str, chunk_size: int, chunk_overlap: int) -> list[str]:
    tokens = re.findall(r'\b\w+\b', document)
    chunks = []
    for i in range(0, len(tokens), chunk_size - chunk_overlap):
        chunk_tokens = tokens[i:i + chunk_size]
        chunks.append(chunk_tokens)
        if i + chunk_size >= len(tokens):
            break
    return [" ".join(chunk) for chunk in chunks]


def print_document(comment: str, document: Document) -> None:
    print(
        f'{comment} (type: {document.metadata["type"]}, index: {document.metadata["index"]}): {document.page_content}')

In [15]:
def parse_sleep_time(raw_sleep):
    SECONDS_IN_MINUTE = 60
    mins_match = re.search("\\d+(?=m)", raw_sleep)
    if mins_match:
        mins_match = int(mins_match.group(0))
        
    seconds_match = re.search(r"\d+\.\d+(?=s)", raw_sleep)
    if seconds_match:
        seconds_match = math.ceil(float(seconds_match.group(0)))
        
    return SECONDS_IN_MINUTE * mins_match + seconds_match

#### Routine for processing fragments with switching keys

In [16]:
from itertools import cycle

api_keys = os.environ.get("API_KEYS").split("||")
api_keys_iter = cycle(api_keys)

def add_fragment(fragment, i, j, counter, source, api_key):
    knowledge_base = [Document(
        page_content=fragment,
        metadata=dict(type="ORIGINAL", index=counter, source=source, orig_text=fragment)
    )]
    questions = generate_questions(fragment, n_questions=20, api_key=api_key)
    knowledge_base.extend([
        Document(page_content=question,
                 metadata=dict(type="AUGMENTED", index=counter + idx, orig_text=fragment))
        for idx, question in enumerate(questions)
    ])
    counter += len(questions)
    print(f'Text document {i} Text fragment {j} - generated: {len(questions)} questions')

    uuids = [str(uuid4()) for _ in range(len(knowledge_base))]
    vectorstore.add_documents(documents=knowledge_base, ids=uuids)

    return counter

fifteen_minutes_in_sec = 900

def process_fragments(documents: list[Document], chunk_size: int, chunk_overlap: int, counter: int = 0):
    current_key = next(api_keys_iter)
    for i, document in enumerate(documents):
        text = document.page_content
        text_fragments = split_document(text, chunk_size, chunk_overlap)
        
        for j, fragment in enumerate(text_fragments):
            print(f"Document {i} - split into {len(text_fragments)}")
            try:
                counter = add_fragment(fragment, i, j, counter, source=document.metadata["source"], api_key=current_key)
            except Exception as e:
                current_key = next(api_keys_iter)
                print("Switching Keys")
                logger.error(f"FAILED TO PARSE: DOCUMENT: {i}, FRAGMENT: {j}, COUNTER: {counter} - also accessible in the database.")
                # if no match - let it crash
                counter = add_fragment(fragment, i, j, counter, source=document.metadata["source"], api_key=current_key)

#### Routine for processing fragments

In [69]:
def add_fragment(fragment, i, j, counter, source):
    knowledge_base = [Document(
        page_content=fragment,
        metadata=dict(type="ORIGINAL", index=counter, source=source, orig_text=fragment)
    )]
    questions = generate_questions(fragment, n_questions=20)
    knowledge_base.extend([
        Document(page_content=question,
                 metadata=dict(type="AUGMENTED", index=counter + idx, orig_text=fragment))
        for idx, question in enumerate(questions)
    ])
    counter += len(questions)
    print(f'Text document {i} Text fragment {j} - generated: {len(questions)} questions')

    uuids = [str(uuid4()) for _ in range(len(knowledge_base))]
    vectorstore.add_documents(documents=knowledge_base, ids=uuids)

    return counter

fifteen_minutes_in_sec = 900

def process_fragments(documents: list[Document], chunk_size: int, chunk_overlap: int, counter: int = 0):
    for i, document in enumerate(documents):
        text = document.page_content
        text_fragments = split_document(text, chunk_size, chunk_overlap)
        
        for j, fragment in enumerate(text_fragments):
            print(f"Document {i} - split into {len(text_fragments)}")
            try:
                counter = add_fragment(fragment, i, j, counter, source=document.metadata["source"])
            except Exception as e:
                print(e)
                match = re.search(r"\d+m\d+\.\d+s|\d+.\ds|\d+m", e.message)
                if match:
                    match = match.group(0)
                    sleep_parsed = parse_sleep_time(match)
                    if sleep_parsed > fifteen_minutes_in_sec:
                        logger.error(f"FAILED TO PARSE: DOCUMENT: {i}, FRAGMENT: {j}, COUNTER: {counter} - also accessible in the database. MESSAGE: {e.message}")
                        raise RuntimeError("Sleep time too long")
                    print(f"Sleeping for {sleep_parsed} seconds...")
                    for _ in tqdm(range(sleep_parsed)):
                        time.sleep(1)
                # if no match - let it crash
                counter = add_fragment(fragment, i, j, counter, source=document.metadata["source"])

#### Routine for adding document by documents with logging

In [None]:
def add_document(document, chunk_size, chunk_overlap, i, counter):
    knowledge_base = []
    text = document.page_content
    text_fragments = split_document(text, chunk_size, chunk_overlap)
    print(f"Document {i} - split into {len(text_fragments)}")
    for j, fragment in enumerate(text_fragments):
        knowledge_base.append(
            Document(
                page_content=fragment,
                metadata=dict(type="ORIGINAL", index=counter, source=document.metadata["source"], text=fragment)
            )
        )
        questions = generate_questions(text, n_questions=20)
        knowledge_base.extend([
            Document(page_content=question,
                     metadata={"type": "AUGMENTED", "index": counter + idx, "text": fragment})
            for idx, question in enumerate(questions)
        ])
        counter += len(questions)
        print(f'Text document {i} Text fragment {j} - generated: {len(questions)} questions')
    
    uuids = [str(uuid4()) for _ in range(len(knowledge_base))]
    vectorstore.add_documents(documents=knowledge_base, ids=uuids)


def process_documents(documents: list[Document], chunk_size: int, chunk_overlap: int, counter: int = 0):
    for i, document in enumerate(documents):
        try:
            counter = add_document(document, chunk_size, chunk_overlap, i, counter)
        except Exception as e:
            logger.error(f"FAILED TO PARSE: DOCUMENT: {i}, COUNTER: {counter} - also accessible in the database. MESSAGE: {e.message}")

In [17]:
process_fragments(documents, 1000, 200)

Document 0 - split into 2
Error code: 429 - {'error': {'message': 'Rate limit reached for model `llama-3.1-70b-versatile` in organization `org_01jbanafxyf6w921a61ed2qvmx` on : Limit 200000, Used 199424, Requested 1991. Please try again in 10m11.057s. Visit https://console.groq.com/docs/rate-limits for more information.', 'type': '', 'code': 'rate_limit_exceeded'}}
Switching Keys
Text document 0 Text fragment 0 - generated: 21 questions
Document 0 - split into 2
Text document 0 Text fragment 1 - generated: 20 questions
Document 1 - split into 2
Text document 1 Text fragment 0 - generated: 30 questions
Document 1 - split into 2
Text document 1 Text fragment 1 - generated: 20 questions
Document 2 - split into 1
Text document 2 Text fragment 0 - generated: 21 questions
Document 3 - split into 1
Text document 3 Text fragment 0 - generated: 20 questions
Document 4 - split into 1
Text document 4 Text fragment 0 - generated: 20 questions
Document 5 - split into 1
Text document 5 Text fragment 