In [None]:
# import os
# import requests

# pdf_path = ""

# if not os.path.exists(pdf_path):
#   print("File doesn't exist, downloading...")

#   url = ""

#   filename = pdf_path

#   response = requests.get(url)

#   if response.status_code == 200:
#       with open(filename, "wb") as file:
#           file.write(response.content)
#       print(f"The file has been downloaded and saved as {filename}")
#   else:
#       print(f"Failed to download the file. Status code: {response.status_code}")
# else:
#   print(f"File {pdf_path} exists.")
import fitz

def text_formatter(text: str) -> str:
    cleaned_text = text.replace("\n", " ").replace("/xo"," ").strip()
    return cleaned_text

def open_and_read_pdf(pdf_path: str) -> list[dict]:
    doc = fitz.open(pdf_path)
    pages_and_texts = []
    for page_number, page in enumerate(doc):
        text = page.get_text()
        text = text_formatter(text)
        pages_and_texts.append({"page_number": page_number,
                                "page_char_count": len(text),
                                "page_word_count": len(text.split(" ")),
                                "page_token_count": len(text) / 4,
                                "text": text})
    return pages_and_texts
pdf_path="books/Atomic-Habits.pdf"
pages_and_texts = open_and_read_pdf(pdf_path=pdf_path)

In [None]:
from spacy.lang.en import English

nlp=English()
nlp.add_pipe("sentencizer")

for itm in pages_and_texts:
    itm["sentence"]=list(nlp(itm["text"]).sents)
    itm["sentence"]=[str(s) for s in itm["sentence"]]
    itm["sent_count"]=len(itm["sentence"])

In [None]:
import pandas as pd
df=pd.DataFrame(pages_and_texts)
df.describe()

In [None]:
num_sen_sz=10

def split_lst(in_lst:list[str],slc_sz:int=num_sen_sz)->list[list[str]]:
    return [in_lst[i:i+slc_sz]for i in range(0,len(in_lst),slc_sz)]

for itm in pages_and_texts:
    itm["sent_chunk"]=split_lst(itm["sentence"])
    itm["num_chunk"]=len(itm["sent_chunk"])

In [None]:
import re
pg_chunk=[]
for itm in pages_and_texts:
    for sent_ch in itm["sent_chunk"]:
        chunk_dict={}
        chunk_dict["page_num"]=itm["page_number"]
        join_chnk="".join(sent_ch).replace("  ", " ").strip()
        join_chnk=re.sub(r'\.([A-Z])',r'. \1',join_chnk)
        chunk_dict["sentence_chunk"]=join_chnk
        chunk_dict["chunk_char_count"]=len(join_chnk)
        chunk_dict["chunk_word_count"]=len([w for w in join_chnk.split(" ")])
        chunk_dict["chunk_token_count"]=len(join_chnk)/4

        pg_chunk.append(chunk_dict)
len(pg_chunk)

In [None]:
df=pd.DataFrame(pg_chunk)
df.describe()

In [None]:
min_token=20
pg_chunk = df[df["chunk_token_count"] > min_token].to_dict(orient="records")
pg_chunk[:2]

In [None]:
df=pd.DataFrame(pg_chunk)
df.describe()

In [None]:
from sentence_transformers import SentenceTransformer
embed_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2',device="cpu")
embed_model.to("cpu")
txt_chnks=[itm["sentence_chunk"]for itm in pg_chunk]
txt_chnks_embed=embed_model.encode(txt_chnks,batch_size=32)

In [None]:
import faiss

emb_dim=txt_chnks_embed.shape[1]
index=faiss.IndexFlatL2(emb_dim)
index.add(txt_chnks_embed)
faiss.write_index(index,"sentence_emb.index")

In [None]:
import textwrap
import numpy as np
def pr_wr(text,wr=80):
    wrt=textwrap.fill(text,wr)
    print(wrt)
    
def retrieve(query:str,model:SentenceTransformer=embed_model,n:int=4):
    query_emb=model.encode(query)[np.newaxis, :]
    score,ind= index.search(query_emb,n)
    return score,ind

In [None]:
from transformers import AutoModelForQuestionAnswering, AutoTokenizer, pipeline

model_name = "deepset/tinyroberta-squad2"
generator_model = AutoModelForQuestionAnswering.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
nlp = pipeline('question-answering', model=model_name, tokenizer=model_name)

In [None]:
def prompt_gen(query:str,cntxt_itm:list[dict])->str:
    context="\n".join([itm["sentence_chunk"]for itm in cntxt_itm])
    base_prompt={
        'question':query,
        'context':context
    }
    return base_prompt

In [None]:
def ask(query):
    _,indice=retrieve(query,embed_model,4)
    cntxt_it=[pg_chunk[i] for i in indice.flatten()]
    
    prompt=prompt_gen(query,cntxt_it)
    res=nlp(prompt)
    print(res['answer'])