In [None]:
%pip install -U langchain faiss-cpu openai langchain-community langchain-openai pypdf pydantic

In [2]:
#from langchain.document_loaders import PyPDFLoader
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
#from langchain.vectorstores import FAISS
from langchain_community.vectorstores import FAISS
from pydantic import BaseModel, Field, ValidationError
from langchain.prompts import PromptTemplate

import re
from tkinter import Tk
from tkinter.filedialog import askopenfilename
import warnings
warnings.filterwarnings("ignore", category=UserWarning, message=".*NSOpenPanel.*")

#### Functions for file selecting, loading and preprocessing

In [2]:
def safe_float_input(prompt, default):
    """
    Prompts the user to enter a float value and returns it as the result.
    Args:
        prompt: a hint for the user with a brief explanation what to enter,
        default: the default value that is returned if the user enters it incorrectly
    Returns:
        a floating point value entered by the user or a default value 
    """
    while True:
        try:
            value = input(prompt).strip()
            return float(value) if value else default
        except ValueError:
            print(f"Invalid input. Please enter a number (integer or decimal) or leave blank for default ({default}).")

In [3]:
def safe_int_input(prompt, default):
    """
    Prompts the user to enter an integer value and returns it as the result.
    Args:
        prompt: a hint for the user with a brief explanation what to enter,
        default: the default value that is returned if the user enters it incorrectly
    Returns:
        an integer value entered by the user or a default value 
    """
    while True:
        try:
            value = input(prompt).strip()
            return int(value) if value else default
        except ValueError:
            print(f"Invalid input. Please enter a number (integer or decimal) or leave blank for default ({default}).")

In [4]:
def select_file():
    """
    Allows the user to select a PDF file using the tkinter library's UI and returns the path to the file.
    """

    # Hide the main Tkinter window
    Tk().withdraw()
    # Open file selection dialog
    file_path = askopenfilename(title="Select a PDF file", filetypes=[("PDF files", "*.pdf")])
    if file_path:
        print(f"Selected file: {file_path}")
    else:
        print("No file selected")
        file_path = None
    
    return file_path

In [5]:
def read_pdf(path):
    """
    Reads a PDF file from the specified path using langchain PyPDFLoader.
    Args:
        path: The path to the PDF file.
    Returns:
        document: list containing [Document(metadata={'source': '...', 'page': ...}, page_content='...text...', ...] 
    """
    # Load PDF document
    loader = PyPDFLoader(path)
    document = loader.load()

    return document

In [6]:
def clean_text(texts): 
    """
    Cleans text
    Args:
        texts: text after splitting into chanks using langchain RecursiveCharacterTextSplitter.
    Returns:
        texts: cleaned text 
    """
    
    for text in texts:
        # Remove page numbers with only of numbers (max 4 digits) with optional spaces, hyphens or # around
        text.page_content = re.sub(r'^[\s#-]*\d{1,4}[\s#-]*$', '', text.page_content)
        # Remove suffixes like 'st', 'nd', 'rd', 'th', 'т', 'й' (up to two letters after the number)
        text.page_content = re.sub(r'(\d+)-?([a-zA-Zа-яА-Я]{1,2})', r'\1', text.page_content)
        # Remove # and № in front of numbers
        text.page_content = re.sub(r'[#№](\d+)', r'\1', text.page_content)
         # Remove newline characters
        text.page_content = text.page_content.replace('\n', ' ')

    # Remove empty pages
    texts = [text for text in texts if text.page_content]
    return texts

#### Functions for configuring and creating a FAISS vector store

In [8]:
def FAISS_config(query_to_tune):
    """
    Prompts the user to enter values of some FAISS parameters to tune it.
    Args:
        query_to_tune: a passed request from the user to change or use default parameters ('tune' / any other input)
    Function called inside:
        safe_number_input("Hint", value_by_default)
    Returns:
       config: dictionary with FAISS parameters.
    """
    
    if query_to_tune == 'tune':
        config = {
            "chunk_size": safe_int_input("Input chunk size to split text for RAG (by default 2300): ", 2300),
            "chunk_overlap": safe_int_input("Input chunk overlap (by default 400): ", 400),
            "n_chunks_in_context": safe_int_input("Input number of top chunks retrieved from RAG vector store (by default 2): ", 2)
        }
    else:
        config = {
            "chunk_size": 2300,
            "chunk_overlap": 400,
            "n_chunks_in_context": 2
        }

    return config

In [9]:
def splite_encode_document(document, chunk_size, chunk_overlap):
    """
    Splits and encodes a document into a vector store using OpenAI embeddings and FAISS.
    Args:
        document: a pdf file read using langchain PyPDFLoader.
        chunk_size: text chunk size.
        chunk_overlap: overlap between chunks.
    Function called inside:
        clean_text(texts)
    Returns:
        vectorstore: a FAISS vector store containing the encoded texts.
    """
    # Split document into chunks
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len
    )
    texts = text_splitter.split_documents(document)
    texts = clean_text(texts)
    
    # Create embeddings and vector store
    embeddings = OpenAIEmbeddings()
    vectorstore = FAISS.from_documents(texts, embeddings)

    return vectorstore

## Retrieve context, get answer to a question

In [10]:
def retrieve_context(question, query_retriever):
    """
    Retrieves a relevant context.
    Args:
        question: str with user question.
        query_retriever: FAISS query_retriever.
    Returns:
        context: list with str of context.
    """

    # Retrieve relevant chanks for the given question
    retrieved_chanks = query_retriever.invoke(question)

    # Concatenate retrieved chanks in one context
    context = [chank.page_content for chank in retrieved_chanks]

    return context

In [11]:
class QuestionAnswerFromContext(BaseModel):
    """A class for formatting the output data of a model according to a given structure."""
    answer: str = Field(description="Generates an answer to a query based on a given context.")
    context: list[str] = Field(description="The context used to generate the answer.")
    question: str = Field(description="The question that was answered.")


def create_question_answer_from_context_chain(llm):
    """
    Creates a langchain question-answer chain using a given language model.
    Args:
        llm: LLM model used for getting answers.
    Returns:
        question_answer_chain: langchain object - chain for getting GenAI's answer to the user's question based on context.
    """
    
    # Create template for queries
    question_answer_prompt_template = """ 
    For the question below, provide a concise but suffice answer based ONLY on the provided context:
    {context}
    Question:
    {question}
    """

    # Create a PromptTemplate object with the specified template and input variables
    question_answer_from_context_prompt = PromptTemplate(
        template=question_answer_prompt_template,
        input_variables=["context", "question"],
    )

    # Create the chain, which processes the prompt template with the llm
    question_answer_chain = question_answer_from_context_prompt | llm
    
    return question_answer_chain


def answer_question_from_context(question, context, question_answer_chain):
    """
    Answers to user's question using the given context.
    Args:
        question: user's question
        context: retrieved context for user's question
        question_answer_chain: processing chain obtained from create_question_answer_from_context_chai function.
    Returns:
        structured_result: the model output structured as QuestionAnswerFromContext object
        output: the model row output
    """
    
    input_data = {
        "question": question,
        "context": context
    }
    print("Preparing results ...")

    output = question_answer_chain.invoke(input_data) 
    
    raw_result = {
    'answer': output.content,
    'context': input_data['context'],
    'question': input_data['question']
    }
    try:
        structured_result = QuestionAnswerFromContext(**raw_result)  
    except ValidationError as e:
        print("Validation Error:", e)
        structured_result = None

    print("Done!")

    return structured_result, output 


#### Main script to run the AI assistant

In [None]:
def main():
    print("\n=== AI Assistant for Answering Questions About Texts ===\n=== OpenAI with Retrieval-Augmented Generation (RAG) ===\n\n")

    # Request path to PDF file with text, load and read it
    user_query1 = ""
    while True:
        print("Please select a pdf file in the window that appears")
        file_path = select_file()
        if file_path:
            print(f"Selected PDF file loading and reading...")
            try:
                document = read_pdf(file_path)
                print("Document uploaded successfully!")
                break
            except Exception as e:
                print(f"Error loading file: {e}")
        else:
            print("File not selected or not found. Please, make again a choice in the window that appears.")
            user_query1 = input("To try again type any symbol, to exit type exit: ").strip().lower()
            if user_query1 in {"exit"}:
                break

    if user_query1 in {"exit"}:
        return "The program has been successfully completed. Bye!"
    
    # Request for RAG tuning parameters
    user_query2 = input("To tune RAG type 'tune'. To use tuning by defolt type any symbol: ").strip().lower()
    RAG_config = FAISS_config(user_query2)
    chunk_size = RAG_config['chunk_size']
    chunk_overlap = RAG_config['chunk_overlap']
    n_chunks_in_context = RAG_config['n_chunks_in_context']
    
    print("Preparing a retriever with your text...")
    # split document and create FAISS vectorestore 
    vectorestore = splite_encode_document(document, chunk_size, chunk_overlap)

    # Create retriever
    query_retriever = vectorestore.as_retriever(search_kwargs={"k": n_chunks_in_context})
    print("The retriever is ready to work")

    # Basic workflow: user request – AI response
    while True:
        print("\nYou can ask the AI assistant in the window that appears. To exit the program, type 'exit'.\n")
        user_query_toAI = input("Your question: ").strip()
        
        if user_query_toAI.lower() in {"exit"}:
            return "Thank you for using the AI assistant. Bye!"
        
        # LLM object with parameters by default
        llm = ChatOpenAI(temperature=0.7, model_name="gpt-4o", max_tokens=2000)
        
        # Code for the hidden ability to change LLM parameters. 
        if user_query_toAI.lower() in {"tune llm"}:
            print("You just requested 'tune llm' for llm tuning\n")
            temperature = safe_float_input("temperature (0.7 by default): ", 0.7)
            model_name = input("model name (press enter for gpt-4o by default): ")
            if not model_name: 
                model_name = "gpt-4o"
            max_tokens = safe_int_input("max tokens (2000 by default): ", 2000)
            llm = ChatOpenAI(temperature=temperature, model_name=model_name, max_tokens=max_tokens)
            user_query_toAI = input("LLM tuning done\nYour question to the AI assistant: ").strip()

        try:
            question_answer_chain = create_question_answer_from_context_chain(llm)
            context = retrieve_context(user_query_toAI, query_retriever)

            structured_result, raw_result = answer_question_from_context(user_query_toAI, context, question_answer_chain)
            print(f"{'Your question:':<15}{user_query_toAI}")
            print(f"{'AI answer:':<15}{structured_result.answer}")
        
        except Exception as e:
            print(f"Error processing request: {e}")
        

In [None]:
main()