In [1]:
%%writefile script/main.py

import torch

def acquire_data():
    from data_acquisition import get_pdf_paths, open_documents, get_text, get_metadata
    pdf_paths = get_pdf_paths()
    docs = open_documents(pdf_paths)
    pages = get_text(docs)
    pages_and_metadata = get_metadata(pages)
    
    from preprocessing_text import preprocessing_raw_text, convert_paragraphs_to_sentences
    pages_and_metadata = preprocessing_raw_text(pages_and_metadata)
    pages_and_metadata = convert_paragraphs_to_sentences(pages_and_metadata)

    from embeddings import load_embedding_model, convert_data_to_embeddings, get_data_embeddings, convert_embeddings_to_same_dimensions, flatten, save_embeddings, save_data
    embedding_model = load_embedding_model(embedding_model_name, device)
    pages_and_metadata = convert_data_to_embeddings(embedding_model, pages_and_metadata, device)
    pages_and_metadata_embeddings = get_data_embeddings(pages_and_metadata, device)
    pages_and_metadata_embeddings = convert_embeddings_to_same_dimensions(pages_and_metadata_embeddings, device)
    flat_embeddings, flat_data = flatten(pages_and_metadata_embeddings, pages_and_metadata)
    save_embeddings(flat_embeddings)
    save_data(flat_data)

def ask(query):
    from embeddings import load_embeddings, load_data
    pages_and_metadata_embeddings = load_embeddings().to(device)
    flat_data = load_data()
    
    from similarity import get_similarity_score_by_query, get_top_k_scores, get_top_k_content
    dot_scores = get_similarity_score_by_query(query, embedding_model_name, pages_and_metadata_embeddings, device)
    top_scores, top_indices = get_top_k_scores(dot_scores, 15)
    context = get_top_k_content(top_indices, flat_data)
    
    from augmentation import load_llm_model, prompt_augmentation, get_answer, clean_answer
    llm_model = load_llm_model(llm_model_name, device)
    prompt = prompt_augmentation(llm_model_name, context, query)
    output_text = get_answer(llm_model, prompt, llm_model_name, device)
    processed_answer = clean_answer(output_text)
    return processed_answer

if __name__ == "__main__":
    
    embedding_model_name = 'all-MiniLM-L12-v2'
    llm_model_name = 'google/gemma-2b-it'
    query = "What are machine learning? Explain in 500 words"
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    acquire_data()
    print("Data has been acquired")
    answer = ask(query)
    print(answer)

Overwriting script/main.py


In [2]:
%%writefile script/data_acquisition.py

from glob import glob
import fitz

def get_pdf_paths():
    pdf_paths = glob('.\\Dataset\\*.pdf')
    return pdf_paths

def open_documents(pdf_paths):
    docs = list()
    for doc_path in pdf_paths:
        doc = fitz.open(doc_path)
        docs.append(doc)
    return docs

def get_text(documents):
    pages = dict()
    for doc in documents:
        for page_number, page in enumerate(doc):
            if (page_number<15):
                continue
            else:
                page_number = len(pages)
                pages[page_number] = page.get_text()
    return pages

def get_metadata(pages):
    pages_and_metadata = list()
    for page_number, page in pages.items():
        metadata = dict()
        metadata['page_number'] = page_number
        metadata['raw_text'] = page
        metadata['number_of_characters'] = len(page)
        metadata['number_of_tokens'] = len(page)/4
        metadata['number_of_words'] = len(page.split())
        pages_and_metadata.append(metadata)
    return pages_and_metadata

Overwriting script/data_acquisition.py


In [3]:
%%writefile script/preprocessing_text.py

from spacy.lang.en import STOP_WORDS
import re
from spacy.lang.en import English

def convert_to_lowercase(text):
    new_text = text.lower()
    return new_text

def remove_stopwords(text):
    new_text = []
    for word in text.split():
        if word not in STOP_WORDS:
            new_text.append(word)
    return " ".join(new_text)

def remove_html_tags(text):
    new_text = re.sub(r"<!--.*?-->", "", text)
    return new_text

def remove_newlines(text):
    new_text = re.sub(r"\n+", " ", text)
    return new_text

def remove_multiple_spaces(text):
    new_text = text.replace("  ", " ")
    return new_text

def remove_comments(text):
    new_text = re.sub(r"<!--.*?-->", "", text)
    return new_text

def remove_unnecessary_text(text):
    new_text = text.replace("answer:","").replace("question", "").replace(":","").replace("  "," ")
    return new_text

def preprocess_text(text):
    text = convert_to_lowercase(text)
    text = remove_stopwords(text)
    text = remove_html_tags(text)
    text = remove_newlines(text)
    text = remove_multiple_spaces(text)
    text = remove_comments(text)
    text = remove_unnecessary_text(text)
    return text

def preprocessing_raw_text(pages_and_metadata):
    for page in pages_and_metadata:
        page["formatted_text"] = preprocess_text(page["raw_text"])
    return pages_and_metadata

def convert_paragraphs_to_sentences(pages_and_metadata):
    nlp = English()
    nlp.add_pipe("sentencizer")
    for page in pages_and_metadata:
        sentences = nlp(page["formatted_text"]).sents
        sentences = list(set([str(sentence).strip() for sentence in sentences if len(str(sentence).split())>10]))
        pages_and_metadata[page["page_number"]]["sentences"] = sentences
        pages_and_metadata[page["page_number"]]["number_of_sentences"] = len(sentences)
    return pages_and_metadata

Overwriting script/preprocessing_text.py


In [4]:
%%writefile script/embeddings.py

from sentence_transformers import SentenceTransformer
import numpy as np
import torch
import pandas as pd

def load_embedding_model(embedding_model_name, device="cpu"):
    model = SentenceTransformer(embedding_model_name).to(device)
    return model

def convert_data_to_embeddings(embedding_model,pages_and_metadata, device="cpu"):
    for page in pages_and_metadata:
        embeddings = list()
        for sentence in page["sentences"]:
            # sentence = sentence.to(device)
            embedding = embedding_model.encode(sentence, batch_size=1024, convert_to_tensor=True, show_progress_bar=False, device=device)
            embedding = np.stack(embedding.tolist(), axis=0)
            embedding = torch.tensor(embedding)
            embedding = embedding.type(torch.float32)
            embeddings.append(embedding)
        sentence_embeddings = [np.array(embedding) for embedding in embeddings]
        pages_and_metadata[page["page_number"]]["embeddings"] = sentence_embeddings
    return pages_and_metadata


def get_data_embeddings(pages_and_metadata, device="cpu"):
    pages_and_metadata_embeddings = []
    for page in pages_and_metadata:
        page_embeddings = []
        for sentence_embedding in pages_and_metadata[page["page_number"]]["embeddings"]:
            if isinstance(sentence_embedding, torch.Tensor):
                sentence_embedding = sentence_embedding.tolist()
            page_embeddings.append(sentence_embedding)
        pages_and_metadata_embeddings.append(page_embeddings)
    return pages_and_metadata_embeddings

def convert_embeddings_to_same_dimensions(pages_and_metadata_embeddings, device="cpu"):
    if pages_and_metadata_embeddings:
        embedding_dim = len(pages_and_metadata_embeddings[0][0])
        pages_and_metadata_embeddings = [
                [np.pad(chunk, (0, max(0, embedding_dim - len(chunk))), mode='constant')[:embedding_dim]
                 for chunk in page]
                for page in pages_and_metadata_embeddings
            ]
    return pages_and_metadata_embeddings

def flatten(pages_and_metadata_embeddings, pages_and_metadata):
    flat_embeddings = [chunk for page in pages_and_metadata_embeddings for chunk in page]
    flat_data = [sentence for page in pages_and_metadata for sentence in page["sentences"]]
    return flat_embeddings, flat_data

def save_embeddings(flat_embeddings, name="SaveFile/embeddings.csv"):
    df = pd.DataFrame(flat_embeddings)
    df.to_csv(name, index=False)

def save_data(flat_data, name="SaveFile/data.csv"):
    df = pd.DataFrame(flat_data)
    df.to_csv(name, index=False)

def load_embeddings(name="SaveFile/embeddings.csv", device="cpu"):
    flat_embeddings = pd.read_csv(name).to_numpy()
    pages_and_metadata_embeddings = np.array(flat_embeddings, dtype=np.float32)
    pages_and_metadata_embeddings = torch.tensor(pages_and_metadata_embeddings, dtype=torch.float32).to(device)
    return pages_and_metadata_embeddings

def load_data(name="SaveFile/data.csv", device="cpu"):
    flat_data = pd.read_csv(name)["0"].tolist()
    return flat_data

Overwriting script/embeddings.py


In [5]:
%%writefile script/similarity.py

from sentence_transformers import SentenceTransformer
from sentence_transformers import util
import torch

def get_similarity_score_by_query(query, embedding_model_name, pages_and_metadata_embeddings, device="cpu"):
    embedding_model = SentenceTransformer(embedding_model_name)
    query_embeddings = embedding_model.encode(query, convert_to_tensor=True).to(device)
    dot_scores = util.dot_score(query_embeddings, pages_and_metadata_embeddings)[0]
    return dot_scores

def get_top_k_scores(dot_scores, k=10):
    top_scores, top_indices = torch.topk(dot_scores, k=k)
    return top_scores, top_indices

def get_top_k_content(top_indices, flat_data):
    context = list()
    for idx in top_indices:
        context.append(flat_data[idx.item()])
    return context

Overwriting script/similarity.py


In [6]:
%%writefile script/augmentation.py

import torch
from transformers import AutoModelForCausalLM
from transformers import AutoTokenizer
import re

def load_llm_model(llm_model_name, device="cpu"):
    model = AutoModelForCausalLM.from_pretrained(
        pretrained_model_name_or_path=llm_model_name,
        torch_dtype=torch.float16,
        low_cpu_mem_usage=False,
    ).to(device)
    return model

def prompt_augmentation(llm_model_name, context, query):
    tokenizer = AutoTokenizer.from_pretrained(llm_model_name)
    context = "\n -".join(context)
    base_prompt = f'''Based on the following context items, please answer the query
    Context Items:
    {context}
    Query:
    {query}
    Answer:'''
    base_prompt = base_prompt.format(context=context, query=query)
    dialogue_template = [{
        "role": "user",
        "content": base_prompt,
    }]
    prompt = tokenizer.apply_chat_template(conversation=dialogue_template,
                                       tokenize=False,
                                       add_generation_prompt=True)
    return prompt

def get_answer(llm_model, prompt, llm_model_name, device="cpu", temperature=0.2, max_new_tokens=512):
    tokenizer = AutoTokenizer.from_pretrained(llm_model_name)
    input_ids = tokenizer(prompt, return_tensors="pt").to(device)
    outputs = llm_model.generate(**input_ids, temperature=temperature, do_sample=True, max_new_tokens=max_new_tokens)
    output_text = tokenizer.decode(outputs[0])
    return output_text

def clean_answer(output_text):
    idx = output_text.find("Answer")
    answer = output_text[idx+7:]
    answer = answer.replace("**", "")
    answer = answer.replace("<start_of_turn>model","")
    answer = re.sub("<.*?>", "", answer)
    return (f"The cleaned answer is: {answer}")

Overwriting script/augmentation.py
