In [2]:
# Helper function for printing docs
def pretty_print_docs(docs):
    print(
        f"\n{'-' * 100}\n".join(
            [f"Document {i+1}:\n\n" + d.page_content for i, d in enumerate(docs)]
        )
    )

In [None]:
recommender = None


In [None]:
from urllib import urllib
def download_pdf(url, output_path):
    urllib.request.urlretrieve(url, output_path)

In [None]:
import re
def preprocess(text):
    text = text.replace('\n', ' ')
    text = re.sub('\s+', ' ', text)
    return text

In [None]:
import fitz
def pdf_to_text(path, start_page=1, end_page=None):
    doc = fitz.open(path)
    total_pages = doc.page_count

    if end_page is None:
        end_page = total_pages

    text_list = []

    for i in range(start_page - 1, end_page):
        text = doc.load_page(i).get_text("text")
        text = preprocess(text)
        text_list.append(text)

    doc.close()
    return text_list

In [None]:
def text_to_chunks(texts, word_length=150, start_page=1):
    text_toks = [t.split(' ') for t in texts]
    page_nums = []
    chunks = []

    for idx, words in enumerate(text_toks):
        for i in range(0, len(words), word_length):
            chunk = words[i : i + word_length]
            if (
                (i + word_length) > len(words)
                and (len(chunk) < word_length)
                and (len(text_toks) != (idx + 1))
            ):
                text_toks[idx + 1] = chunk + text_toks[idx + 1]
                continue
            chunk = ' '.join(chunk).strip()
            chunk = f'[Page no. {idx+start_page}]' + ' ' + '"' + chunk + '"'
            chunks.append(chunk)
    return chunks

In [None]:
import tensorflow_hub as hub
from sklearn.neighbors import NearestNeighbors
class SemanticSearch:
    def __init__(self):
        self.use = hub.load('https://tfhub.dev/google/universal-sentence-encoder/4')
        self.fitted = False

    def fit(self, data, batch=1000, n_neighbors=5):
        self.data = data
        self.embeddings = self.get_text_embedding(data, batch=batch)
        n_neighbors = min(n_neighbors, len(self.embeddings))
        self.nn = NearestNeighbors(n_neighbors=n_neighbors)
        self.nn.fit(self.embeddings)
        self.fitted = True

    def __call__(self, text, return_data=True):
        inp_emb = self.use([text])
        neighbors = self.nn.kneighbors(inp_emb, return_distance=False)[0]

        if return_data:
            return [self.data[i] for i in neighbors]
        else:
            return neighbors

    def get_text_embedding(self, texts, batch=1000):
        embeddings = []
        for i in range(0, len(texts), batch):
            text_batch = texts[i : (i + batch)]
            emb_batch = self.use(text_batch)
            embeddings.append(emb_batch)
        embeddings = np.vstack(embeddings)
        return embeddings

In [None]:
def load_recommender(path, start_page=1):
    global recommender
    if recommender is None:
        recommender = SemanticSearch()

    texts = pdf_to_text(path, start_page=start_page)
    chunks = text_to_chunks(texts, start_page=start_page)
    recommender.fit(chunks)
    return 'Corpus Loaded.'


In [None]:
import openai
def generate_text(openAI_key, prompt, engine="text-davinci-003"):
    openai.api_key = openAI_key
    completions = openai.Completion.create(
        engine=engine,
        prompt=prompt,
        max_tokens=512,
        n=1,
        stop=None,
        temperature=0.7,
    )
    message = completions.choices[0].text
    return message

In [None]:
def generate_answer(question, openAI_key):
    topn_chunks = recommender(question)
    prompt = ""
    prompt += 'search results:\n\n'
    for c in topn_chunks:
        prompt += c + '\n\n'

    prompt += (
        "Instructions: Compose a comprehensive reply to the query using the search results given. "
        "Cite each reference using [ Page Number] notation (every result has this number at the beginning). "
        "Citation should be done at the end of each sentence. If the search results mention multiple subjects "
        "with the same name, create separate answers for each. Only include information found in the results and "
        "don't add any additional information. Make sure the answer is correct and don't output false content. "
        "If the text does not relate to the query, simply state 'Text Not Found in PDF'. Ignore outlier "
        "search results which has nothing to do with the question. Only answer what is asked. The "
        "answer should be short and concise. Answer step-by-step. \n\nQuery: {question}\nAnswer: "
    )

    prompt += f"Query: {question}\nAnswer:"
    answer = generate_text(openAI_key, prompt, "text-davinci-003")
    return answer

In [None]:
def load_openai_key() -> str:
    key = os.environ.get("OPENAI_API_KEY")
    if key is None:
        raise ValueError(
            "[ERROR]: Please pass your OPENAI_API_KEY. Get your key here : https://platform.openai.com/account/api-keys"
        )
    return key


In [None]:
@serving
def ask_url(url: str, question: str):
    download_pdf(url, 'corpus.pdf')
    load_recommender('corpus.pdf')
    openAI_key = load_openai_key()
    return generate_answer(question, openAI_key)

In [None]:
@serving
async def ask_file(file: UploadFile, question: str) -> str:
    suffix = Path(file.filename).suffix
    with NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
        shutil.copyfileobj(file.file, tmp)
        tmp_path = Path(tmp.name)

    load_recommender(str(tmp_path))
    openAI_key = load_openai_key()
    return generate_answer(question, openAI_key)

# Mine


In [None]:
def download_pdf(url: str, output_path: str) -> None:
    urllib.request.urlretrieve(url, output_path)


def preprocess(data: List[Document] | str) -> List[Document] | str:
    if not data:
        return data
    if isinstance(data, str):
        data = re.sub("\s+", " ", data)
        data = data.replace("\n", " ")
        return data
    if isinstance(data[0], str):
        data = re.sub("\s+", " ", data)
        data = data.replace("\n", " ")
        return data
    else:
        for d in data:
            d.page_content = re.sub("\s+", " ", d.page_content)
            d.page_content = d.page_content.replace("\n", " ")
        return data


def pdf_to_text(path: str, start_page: int = 1, end_page: int = 0) -> List[str]:
    try:
        doc = fitz.open(path)
    except OSError:
        print(f"Error: could not open file {path}")
        return []
    total_pages: int = doc.page_count

    if end_page == 0:
        end_page = total_pages

    text_list = []

    for i in range(start_page - 1, end_page):
        try:
            text = doc.load_page(i).get_text("text")
            text = preprocess(text)
            text_list.append(text)
        except Exception:
            print(f"Error: could not extract text from page {i+1} in file {path}")

    doc.close()
    return text_list


def text_to_chunks(
    texts: List[str], path: str, word_length: int = 300, start_page: int = 1
) -> List[str]:
    text_toks = [t.split(" ") for t in texts]
    page_nums = []
    chunks = []

    for idx, words in enumerate(text_toks):
        for i in range(0, len(words), word_length):
            chunk = words[i : i + word_length]
            if (
                (i + word_length) > len(words)
                and (len(chunk) < word_length)
                and (len(text_toks) != (idx + 1))
            ):
                text_toks[idx + 1] = chunk + text_toks[idx + 1]
                continue
            chunk = " ".join(chunk).strip()
            chunk = f"[Page: {idx+start_page} from {path}]" + " " + '"' + chunk + '"'
            chunks.append(chunk)
    return chunks


def normal_process(path: str) -> List[Document]:
    documents: List[Document] = []
    texts = pdf_to_text(path)
    data = text_to_chunks(texts, path)
    for text in data:
        documents.append(Document(page_content=text))
    print(f"Processed {len(documents)} documents from {path}")
    return documents


def flatten_array(arr):
    result = []
    for i in arr:
        if isinstance(i, list):
            result.extend(flatten_array(i))
        else:
            result.append(i)
    return result


def embed_directory(path: str) -> List[Document]:
    documents: List[Document] = []
    pdf_files = glob.glob("pdfs/*.pdf")
    for pdf_file in pdf_files:
        docs.append(normal_process(pdf_file))

    docs = flatten_array([normal_process(pdf_file) for pdf_file in pdf_files])

    print(f"Processed {len(documents)} documents from {path}")
    return documents

In [None]:
docs = []
pdf_files = glob.glob("pdfs/*.pdf")
docs = [flatten_array(normal_process(pdf_file)) for pdf_file in pdf_files]

Processed 8 documents from pdfs/Joseph Sambrook, David W. Russel - Molecular Cloning_ A Laboratory Manual. Volume 1, 2, & 3 (in one file).pdf
Processed 1531 documents from pdfs/Diagnostic%20and%20statistical%20manual%20of%20mental%20disorders%20_%20DSM-5%20%28%20PDFDrive.com%20%29.pdf
Processed 4 documents from pdfs/Asthma-1.pdf
Processed 6 documents from pdfs/Neoplasia contd.pdf
Processed 2992 documents from pdfs/Robbins and Cotran Pathologic Basis of Disease, 10th Edition (VetBooks.ir).pdf
Processed 7 documents from pdfs/Nutritional Diseases.pdf
Processed 4 documents from pdfs/Kevin M. G. Taylor, Michael E. Aulton - Aulton's Pharmaceutics_ The Design and Manufacture of Medicines-Elsevier (2021).pdf
Processed 2 documents from pdfs/Micriobiology mod5.pdf
Processed 4 documents from pdfs/Systemic Effects of Inflammation & Tissue Repair.pdf
Processed 2222 documents from pdfs/Joanne Willey, Kathleen Sandman, Dorothy Wood - Prescott's Microbiology 11th Edition-McGraw-Hill Education (2019).p

In [None]:
d = []
for doc in docs:
    d.append([flatten_array(doc) for doc in docs])
d = flatten_array(d)

Document(page_content='[Page: 300 from pdfs/Joseph Sambrook, David W. Russel - Molecular Cloning_ A Laboratory Manual. Volume 1, 2, & 3 (in one file).pdf] ""', metadata={})

In [None]:
d[0]

In [None]:
db = FAISS.from_documents(d, embeddings)
db.save_local(folder_path="dbs", index_name="manual_db")

In [None]:
db = FAISS.load_local(folder_path="dbs", index_name="manual_db", embeddings=embeddings)
compressor = LLMChainExtractor.from_llm(llm=OpenAI(temperature=0, max_tokens=512))
retriever = db.as_retriever()
retriever = ContextualCompressionRetriever(
    base_retriever=retriever, base_compressor=compressor
)

In [None]:
answer_prompt = PromptTemplate(
    template="Context:\n{context}\n\n"
    "Instruction: Using the context above, answer the question below."
    "Do not leave any information out."
    "Do not add any information that is not in the context."
    "Answer step-by-step. \n\nQuery: {query}\nAnswer: ",
    input_variables=["query", "context"],
)

summary_prompt = PromptTemplate(
    template="Context:\n{context}\n\n"
    "Instruction: Using the context above, write a concise summary of the text as it relates to the query below."
    "Make sure to include the source of the information. The source is indicated by [Page: X from Y.pdf]."
    "Answer step-by-step.\nQuery:\n{query}\n\nSummary: ",
    input_variables=["context", "query"],
)

In [None]:
answer_chain = LLMChain(llm=OpenAI(temperature=0, max_tokens=512), prompt=answer_prompt)
summmary_chain = LLMChain(
    llm=OpenAI(temperature=0, max_tokens=1048), prompt=summary_prompt
)


def qa(query: str) -> str:
    context = retriever.get_relevant_documents(query=query)
    pretty_print_docs(context)
    summary = summmary_chain.run({"query": query, "context": context})
    return summary

In [None]:
qa("What is edema")

Document 1:

"Edema is easily recognized grossly; microscopically, it is appreciated as clearing and separation of the extracellular matrix (ECM) and subtle cell swelling. Edema is most commonly seen in subcutaneous tissues, the lungs, and the brain. Subcutaneous edema can be diffuse or more conspicuous in regions with high hydrostatic pressures. Its distribution is often influenced by gravity (e.g., it appears in the legs when standing and the sacrum when recumbent), a feature termed dependent edema. Finger pressure over markedly edematous subcutaneous tissue displaces the interstitial fluid and leaves a depression, a sign called pitting edema. Edema resulting from renal dysfunction often appears initially in parts of the body containing loose connective tissue, such as the eyelids; periorbital edema is thus a characteristic finding in severe renal disease. With pulmonary edema, the lungs are often two to three times their normal weight, and sectioning yields frothy, blood-tinged flui

"\nEdema is a condition characterized by swelling due to an accumulation of fluid in the body's tissues. It is most commonly seen in subcutaneous tissues, the lungs, and the brain. It is easily recognized by its gross appearance and microscopically, it is appreciated as clearing and separation of the extracellular matrix (ECM) and subtle cell swelling. Edema can be diffuse or more conspicuous in regions with high hydrostatic pressures and its distribution is often influenced by gravity. It can also be caused by renal dysfunction and appears initially in parts of the body containing loose connective tissue, such as the eyelids. With pulmonary edema, the lungs are often two to three times their normal weight and sectioning yields frothy, blood-tinged fluid. Brain edema can be localized or generalized depending on the nature and extent of the pathologic process or injury. [Source: Page 1 from Edema.pdf]."