In [1]:
# try:
#     import fitz
# except ImportError:
#     !pip install PyMuPDF
#     !pip install fitz

# try:
#     import spacy
# except ImportError:
#     !pip install spacy

# try:
#     import sentencepiece
# except ImportError:
#     !pip install sentencepiece

# try:
#     import transformers
# except:
#     !pip install transformers

#!pip install --upgrade numpy
!pip install --upgrade transformers

Defaulting to user installation because normal site-packages is not writeable


In [2]:
import pandas as pd
import fitz  # PyMuPDF for PDF processing
from tqdm.auto import tqdm  # For progress bars
import re  # For regex operations
import chromadb
from sentence_transformers import SentenceTransformer  # For embeddings
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
import logging
import spacy
from transformers import T5ForConditionalGeneration, T5Tokenizer


In [3]:
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def log_output(string):
    #logger.info(string)
    return 

# Step 1: Text Formatter

In [4]:
class TextFormatter(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        formatted_texts = []
        
        for pages_and_text in X:
            # Replace newlines with spaces and strip leading/trailing spaces
            formatted_text = pages_and_text['text'].replace("\n", " ").strip()
            formatted_page_text = {"page_number": pages_and_text['page_number'], "formatted_text": formatted_text}
            formatted_texts.append(formatted_page_text)

        log_output("Formatted texts successfully.")
        return formatted_texts

# Step 2: Open and Read PDF

In [5]:
class PDFReader(BaseEstimator, TransformerMixin):
    def __init__(self, pdf_path):
        self.pdf_path = pdf_path

    def fit(self, X, y=None):
        return self

    def transform(self, X=None):
        """
        Reads a PDF file and extracts text from each page.

        Returns:
            list: A list of dictionaries, each containing the page number and its text.
        """
        try:
            doc = fitz.open(self.pdf_path)
        except Exception as e:
            logger.error(f"Failed to open PDF file: {self.pdf_path}. Error: {e}")

            return []
        
        

        pages_and_texts = []
        for page_number in tqdm(range(len(doc)), desc="Reading PDF pages"):
            page = doc[page_number]
            text = page.get_text()
            pages_and_texts.append({"page_number": page_number, "text": text})

        logger.info(f"Successfully read {len(pages_and_texts)} pages from {self.pdf_path}")
        return pages_and_texts


#  Transformer to detect and convert bullet points

In [6]:
import re
from sklearn.base import BaseEstimator, TransformerMixin
import logging

# Set up logging
logger = logging.getLogger(__name__)

class BulletPointTransformer(BaseEstimator, TransformerMixin):
    """
    Transformer to detect and convert bullet points into a structured format.
    This ensures only successive bullet points are combined into one sentence.
    """
    
    def __init__(self):
        """
        Initializes the BulletPointTransformer.
        """
        pass

    def fit(self, X, y=None):
        """
        Fit method does nothing as the transformer doesn't require fitting.
        """
        return self

    def transform(self, X):
        """
        Transform the input data by identifying and formatting bullet points.
        
        :param X: List of documents or paragraphs to process
        :return: List of documents with bullet points properly formatted
        """
        if not X:
            logger.warning("Input data is empty.")
            return []

        transformed_data = []

        for item in X:
            if isinstance(item, dict):
                if 'formatted_text' in item:
                    text = item['formatted_text'].strip()
                    # Apply bullet point transformation
                    transformed_text = self._transform_bullet_points(text)
                    item['formatted_text'] = transformed_text
                    transformed_data.append(item)
                else:
                    logger.warning(f"Missing 'formatted_text' key in item: {item}")
            else:
                logger.error(f"Unexpected item format: {item}")
        
        return transformed_data

    def _transform_bullet_points(self, text):
        """
        Detect and combine only successive bullet points into a single sentence.
        Bullet points can be identified by common characters such as *, -, or •.
        
        :param text: The input text to process.
        :return: Text with only successive bullet points combined into a single sentence.
        """
        # Regular expression to match bullet points (handles *, -, or •)
        bullet_point_pattern = r'([*\-•])\s?(.*?)(?=\n|\r|\Z|\s*$)'  # Match bullet points

        # Match all bullet points
        bullet_points = re.findall(bullet_point_pattern, text)

        # If there are bullet points, combine only successive ones into a single sentence
        if bullet_points:
            # Iterate through the bullet points and combine only successive ones
            combined_bullet_points = []
            last_bullet = None
            for bp in bullet_points:
                if last_bullet is None:  # First bullet point
                    combined_bullet_points.append(bp[1].strip())
                else:  # Add only if successive bullet points
                    combined_bullet_points.append(bp[1].strip())
                last_bullet = bp
            
            # Join all successive bullet points into a single sentence
            combined_bullet_points_sentence = " ".join(combined_bullet_points) + "."
            # Replace the bullet points section with the combined sentence
            text = re.sub(bullet_point_pattern, "", text)  # Remove old bullet points
            text = f"{combined_bullet_points_sentence} {text}"  # Add combined bullet points as a sentence

        return text

# SentenceChunkerWithSummarization

In [7]:
from transformers import T5ForConditionalGeneration, T5Tokenizer
from sklearn.base import BaseEstimator, TransformerMixin
import spacy
import logging
import hashlib


# Set up logging
logger = logging.getLogger(__name__)

class SentenceChunkerWithSummarization(BaseEstimator, TransformerMixin):
    def __init__(self, max_sentences_per_chunk=10, max_summary_length=500, num_beams=4):
        """
        Initialize the SentenceChunkerWithSummarization.
        
        :param max_sentences_per_chunk: The maximum number of sentences per chunk.
        :param max_summary_length: Maximum length of the generated summary.
        :param num_beams: Number of beams for beam search during summary generation.
        """
        self.max_sentences_per_chunk = max_sentences_per_chunk
        self.max_summary_length = max_summary_length
        self.num_beams = num_beams
        
        # Load the SpaCy model and add the sentencizer
        self.nlp = spacy.blank("en")
        self.nlp.add_pipe("sentencizer")
        
        # Load the T5 model and tokenizer
        self.model = T5ForConditionalGeneration.from_pretrained('t5-small')
        self.tokenizer = T5Tokenizer.from_pretrained('t5-small')

    def fit(self, X, y=None):
        """
        Fit method does nothing as the model doesn't require fitting.
        """
        return self

    def generate_summary(self, text):
        """
        Generate a summary for a given text using the T5 model.
        
        :param text: The input text to summarize
        :return: The summarized text
        """
        if not text or not isinstance(text, str):
            logger.warning("Received invalid text input.")
            return "Invalid input: Empty or non-string text"

        # Tokenize the input with the T5 summarization prompt
        input_tokens = self.tokenizer.encode("summarize: " + text, return_tensors='pt')

        # Generate the summary using the model
        # max_length=150,  # Increase the max_length for a longer summary
        # min_length=50,   # Set a minimum length to prevent too short summaries
        # num_beams=4,     # Use beam search for better quality summaries
        # early_stopping=True,  # Stop early when the model is confident
        # length_penalty=1.5,   # Apply a penalty to make sure summaries are not too short
        output = self.model.generate(input_tokens, min_length=50, max_length=self.max_summary_length, num_beams=self.num_beams, early_stopping=True,  length_penalty=1.5)

        # Decode the summary
        summary = self.tokenizer.decode(output[0], skip_special_tokens=True)
        return summary

    def generate_unique_id(self, sentence_chunk):
        """
        Generate a unique ID from a sentence chunk using SHA-256 hash.

        :param sentence_chunk: The input sentence to generate the ID from.
        :return: A unique ID (SHA-256 hash) as a hexadecimal string.
        """
        # Step 1: Preprocess the sentence (optional, you could strip, lowercase, etc.)
        processed_chunk = sentence_chunk.strip().lower()

        # Step 2: Create the SHA-256 hash of the sentence
        unique_id = hashlib.sha256(processed_chunk.encode()).hexdigest()

        return unique_id

    def transform(self, X):
        """
        Transform the input data by chunking sentences and summarizing each chunk.
        
        :param X: List of documents or paragraphs to process
        :return: List of dictionaries with sentence chunks and their summaries
        """
        if not X:
            logger.warning("Input data is empty.")
            return []

        pages_and_chunks = []
        sentences = []
        pages = []

        # Extract sentences and page numbers from the input
        for item in X:
            if isinstance(item, dict):
                if 'formatted_text' in item and 'page_number' in item:
                    text = item['formatted_text'].strip()
                    page_number = item['page_number']
                    if text:  # Check if text is not empty
                        doc = self.nlp(text)  # Process text with SpaCy
                        for sent in doc.sents:
                            sentences.append(sent.text.strip())
                            pages.append(page_number)
                        logger.info(f"Extracted sentences from page: {page_number}")
                    else:
                        logger.warning(f"Empty sentence found in item: {item}")
                else:
                    logger.error(f"Missing keys in item: {item}")
            elif isinstance(item, tuple) and len(item) == 2:
                text = item[0].strip()
                page_number = item[1]
                doc = self.nlp(text)  # Process text with SpaCy
                for sent in doc.sents:
                    sentences.append(sent.text.strip())
                    pages.append(page_number)
            else:
                logger.error(f"Unexpected item format: {item}")

        # Organize sentences by page
        sentences_by_page = {}
        for sentence, page in zip(sentences, pages):
            sentences_by_page.setdefault(page, []).append(sentence)

        # Chunk sentences into fixed-size chunks and generate summaries
        for page, sentences in sentences_by_page.items():
            if not sentences:
                continue

            for i in range(0, len(sentences), self.max_sentences_per_chunk):
                chunk_sentences = sentences[i:i + self.max_sentences_per_chunk]
                chunk_text = " ".join(chunk_sentences)
                
                # Generate the summary for the chunk of sentences
                summary = self.generate_summary(chunk_text)

                # Generate additional information
                chunk_char_count = sum(len(s) for s in chunk_sentences)
                chunk_word_count = sum(len(s.split()) for s in chunk_sentences)
                chunk_token_count = sum(len(s) // 4 for s in chunk_sentences)
                summary_char_count = len(summary)
                summary_word_count = len(summary.split())

                # Create a dictionary with both chunk data and summary data
                chunk_dict = {
                    "sentence_chunk": chunk_text,
                    "chunk_char_count": chunk_char_count,
                    "chunk_word_count": chunk_word_count,
                    "chunk_token_count": chunk_token_count,
                    "page_number": page,  # Include the page number
                    "summary_text": summary,
                    "summary_char_count": summary_char_count,
                    "summary_word_count": summary_word_count,                    
                    "para_id" : self.generate_unique_id(chunk_text)
                }

                # Only include chunks with more than 30 tokens
                if chunk_token_count > 30:
                    #logger.info(f"Generated chunk and summary: {chunk_dict}")
                    pages_and_chunks.append(chunk_dict)

        #logger.info(f"Processed {len(pages_and_chunks)} semantic chunks with summaries.")
        return pages_and_chunks


# QuestionGenerator

In [8]:
from transformers import T5Tokenizer, T5ForConditionalGeneration, BertTokenizer, BertForQuestionAnswering
import torch
import pandas as pd

class QuestionAnswerGenerator:
    def __init__(self):
        # Load Doc2Query model for question generation
        self.model_name = 'doc2query/all-with_prefix-t5-base-v1'
        self.qgen_tokenizer = T5Tokenizer.from_pretrained(self.model_name)
        self.qgen_model = T5ForConditionalGeneration.from_pretrained(self.model_name)
        
        # Load BERT or similar model for Question Answering (QA)
        self.qa_tokenizer = BertTokenizer.from_pretrained('bert-large-uncased-whole-word-masking-finetuned-squad')
        self.qa_model = BertForQuestionAnswering.from_pretrained('bert-large-uncased-whole-word-masking-finetuned-squad')

    def generate_questions(self, chunk, num_questions=5):
        """
        Generate questions from a chunk of text using the Doc2Query model.

        :param chunk: The input chunk of text to generate questions for.
        :param num_questions: The number of questions to generate (default is 5).
        :return: A list of generated questions.
        """
        # Prepare the chunk for Doc2Query
        input_text = f"generate questions: {chunk}"
        inputs = self.qgen_tokenizer(input_text, return_tensors='pt', truncation=True, max_length=512)

        # Check if we are using greedy decoding or beam search
        if num_questions == 1:
            # Use greedy decoding for one question
            outputs = self.qgen_model.generate(
                **inputs, 
                max_length=50, 
                num_return_sequences=1,  # Only generate one question
                no_repeat_ngram_size=2
            )
        else:
            # Use beam search for multiple questions
            outputs = self.qgen_model.generate(
                **inputs, 
                max_length=150, 
                num_return_sequences=num_questions,  # Generate multiple questions
                num_beams=num_questions,  # Use beam search
                no_repeat_ngram_size=2
            )

        # Decode the generated questions
        questions = [self.qgen_tokenizer.decode(output, skip_special_tokens=True) for output in outputs]
        return questions

    def generate_answers(self, chunk, questions):
        """
        Generate answers for a list of questions given a chunk of text.

        :param chunk: The input chunk of text for answering the questions.
        :param questions: A list of questions to answer.
        :return: A list of answers corresponding to the input questions.
        """
        answers = []
        for question in questions:
            # Encode the question and the context (chunk) for QA model with truncation and padding
            inputs = self.qa_tokenizer.encode_plus(
                question, 
                chunk, 
                return_tensors='pt', 
                truncation=True,  # Ensure the input sequence is truncated to fit the max length
                padding=True,     # Pad the sequence if it's shorter than the maximum length
                max_length=512    # Set the max length for the sequence
            )

            # Get the start and end positions of the answer
            outputs = self.qa_model(**inputs)

            # If the model outputs a tuple (start_scores, end_scores)
            if isinstance(outputs, tuple):
                answer_start_scores, answer_end_scores = outputs
            else:
                # If the model returns a dict, extract the start and end scores
                answer_start_scores = outputs['start_logits']
                answer_end_scores = outputs['end_logits']

            # Get the most likely beginning and end of the answer
            start_index = torch.argmax(answer_start_scores)
            end_index = torch.argmax(answer_end_scores)

            # Decode the answer from the token indices
            answer = self.qa_tokenizer.convert_tokens_to_string(
                self.qa_tokenizer.convert_ids_to_tokens(inputs['input_ids'][0][start_index:end_index+1])
            )

            answers.append(answer)
        return answers

    def transform(self, chunk_data):
        """
        Transform the input chunk data by generating questions and answers.

        :param chunk_data: A list of chunks, each containing a sentence chunk.
        :return: A list of chunks with generated questions and answers added.
        """
        all_chunk_qa = []
        for chunk in chunk_data:
            chunk_text = chunk['sentence_chunk']  # Get the text of the chunk
            questions = self.generate_questions(chunk_text)
            answers = self.generate_answers(chunk_text, questions)
            chunk['generated_questions'] = questions
            chunk['generated_answers'] = answers
            all_chunk_qa.append(chunk)
        return all_chunk_qa


#  Step 3: Chunk Sentences

In [9]:
class SentenceChunker(BaseEstimator, TransformerMixin):
    def __init__(self, max_sentences_per_chunk=5):
        self.max_sentences_per_chunk = max_sentences_per_chunk
        # Load the SpaCy English model and add the sentencizer
        self.nlp = spacy.blank("en")
        self.nlp.add_pipe("sentencizer")

    def fit(self, X, y=None):
        return self

    def transform(self, X, document_attributes=None):
        pages_and_chunks = []
        logger.info(f"Input data for transformation: {X}")
        logger.info(f"Input data length: {len(X)}")

        if not X:
            logger.warning("Input data is empty.")
            return []

        sentences = []
        pages = []

        # Extract sentences and page numbers
        for item in X:
            if isinstance(item, dict):
                if 'formatted_text' in item and 'page_number' in item:
                    text = item['formatted_text'].strip()
                    page_number = item['page_number']
                    if text:  # Check if text is not empty
                        doc = self.nlp(text)  # Process text with SpaCy
                        for sent in doc.sents:
                            sentences.append(sent.text.strip())
                            pages.append(page_number)
                        #logger.info(f"Extracted sentences from page: {page_number}")
                    else:
                        logger.warning(f"Empty sentence found in item: {item}")
                else:
                    logger.error(f"Missing keys in item: {item}")
            elif isinstance(item, tuple) and len(item) == 2:
                text = item[0].strip()
                page_number = item[1]
                doc = self.nlp(text)  # Process text with SpaCy
                for sent in doc.sents:
                    sentences.append(sent.text.strip())
                    pages.append(page_number)
            else:
                logger.error(f"Unexpected item format: {item}")

        # Organize sentences by pages
        sentences_by_page = {}
        for sentence, page in zip(sentences, pages):
            sentences_by_page.setdefault(page, []).append(sentence)

        for page, sentences in sentences_by_page.items():
            if not sentences:
                continue

            # Chunk sentences into fixed-size chunks
            for i in range(0, len(sentences), self.max_sentences_per_chunk):
                chunk_sentences = sentences[i:i + self.max_sentences_per_chunk]
                chunk_token_count = sum(len(s) // 4 for s in chunk_sentences)
                chunk_dict = {
                    "sentence_chunk": " ".join(chunk_sentences),
                    "chunk_char_count": sum(len(s) for s in chunk_sentences),
                    "chunk_word_count": sum(len(s.split()) for s in chunk_sentences),
                    "chunk_token_count": sum(len(s) // 4 for s in chunk_sentences),  # Adjust if needed
                    "page_number": page  # Include the page number
                }
                if chunk_token_count > 30:
                    logger.info(f"Generated chunk: {chunk_dict}")
                    pages_and_chunks.append(chunk_dict)

        logger.info(f"Processed {len(pages_and_chunks)} semantic chunks.")
        return pages_and_chunks


# Step 4: Generate Embeddings

In [10]:
class EmbeddingGenerator(BaseEstimator, TransformerMixin):
    def __init__(self):
        self.model = SentenceTransformer(model_name_or_path="all-mpnet-base-v2", device="cuda")

    def fit(self, X, y=None):
        return self

    def transform(self, X, document_attributes):
        sentences = [chunk["sentence_chunk"] for chunk in X]
        embeddings = self.model.encode(sentences)
        
        for i, chunk in enumerate(X):
            chunk["embedding"] = embeddings[i]
        
        #log_output("Embedding Generator: "+ len(X))  # Log output
        return X

# Step 5: Save to ChromaDB

In [11]:
class ChromaDBSaver(BaseEstimator, TransformerMixin):
    def __init__(self, chroma_db_dir="chroma_db_dir"):  # Ensure this points to your local ChromaDB
        self.client = chromadb.PersistentClient(path=chroma_db_dir)
        self.collection = self.client.get_or_create_collection("pdf_chunks")

    def fit(self, X, y=None):
        return self

    def transform(self, X, document_attributes):
        i = 0 
        for chunk, doc_attr in zip(X, document_attributes):

            document_id = f"{doc_attr['make']}_{doc_attr['model']}_{doc_attr['year']}_{doc_attr['style']}"
            
            # Log the chunk being added
            text =  chunk["sentence_chunk"]
            chunk_char_count = chunk["chunk_char_count"]
            chunk_word_count = chunk["chunk_word_count"]
            if chunk["sentence_chunk"].strip():  # Ensure it's not empty
                #print(f"Adding document ID: {document_id}, Content: '{chunk['sentence_chunk']}'")
                
                self.collection.add(
                    documents=[text],
                    embeddings=[chunk["embedding"].tolist()],
                    metadatas=[{"source": document_id}],
                    ids = [f"{document_id}_{chunk['page_number']}_{chunk_word_count}_{chunk_char_count}"]

                )
            else:
                print(f"Skipping empty document for ID: {document_id}")
            i=i+1

        log_output("ChromaDB Saver , Data saved to ChromaDB")
        return X

# Main Pipeline Execution

In [12]:
def process_document(document):
    # Create the pipeline
    pipeline = Pipeline(steps=[
        ('pdf_reader', PDFReader),  # Step 1: Read PDF (pass the class, not an instance)
        ('text_formatter', TextFormatter()),  # Step 2: Format text
        ('bullet_point_transformer', BulletPointTransformer()),  # Step 3: Transform bullet points
        ('sentence_chunker', SentenceChunkerWithSummarization()),  # Step 4: Chunk sentences
        ('question_answer_generator', QuestionAnswerGenerator()),  # Step 5: Generate QA pairs (call the class)
        ('embedding_generator', EmbeddingGenerator()),  # Step 6: Generate embeddings
        ('chromadb_saver', ChromaDBSaver())  # Step 7: Save to ChromaDB
    ])

    # Create a document ID based on attributes
    document_id = f"{document['make']}_{document['model']}_{document['year']}_{document['style']}"

    # Instantiate PDFReader manually, as it requires the file path
    pdf_reader = PDFReader(document.get('pdf_path'))
    result = pdf_reader.fit_transform(document.get('pdf_path'))  # Read the PDF file

    # Process the document through each pipeline step
    result = pipeline.named_steps['text_formatter'].transform(result)
    result = pipeline.named_steps['bullet_point_transformer'].transform(result)
    result = pipeline.named_steps['sentence_chunker'].transform(result)
    result = pipeline.named_steps['question_answer_generator'].transform(result)  # Generate questions and answers

    # Generate embeddings and add them to the result
    embeddings = pipeline.named_steps['embedding_generator'].transform(result, document)

    # Save the embeddings and document data to ChromaDB
    pipeline.named_steps['chromadb_saver'].transform(embeddings, [document] * len(embeddings))

    # Process each chunk and add the data to the list
    all_chunk_data = []
    all_QandA =[]
    for chunk in result:
        chunk_data = {
            "sentence_chunk": chunk["sentence_chunk"],
            "chunk_char_count": chunk["chunk_char_count"],
            "chunk_word_count": chunk["chunk_word_count"],
            "chunk_token_count": chunk["chunk_token_count"],
            "page_number": chunk["page_number"],
            "summary_text": chunk["summary_text"],
            "summary_char_count": chunk["summary_char_count"],
            "summary_word_count": chunk["summary_word_count"],
            "para_id" : chunk["para_id"],               
        }
        for index, question in enumerate(chunk["generated_questions"], 0):
            qa_data  = {
               
               "page_number": chunk["page_number"],
               "para_id" : chunk["para_id"],   
               "sentence_chunk": chunk["sentence_chunk"],
               "question" : question,
               "answer" :  chunk["generated_answers"][index]
            }
            all_QandA.append(qa_data)
  
        all_chunk_data.append(chunk_data)

    # Convert the list of chunks into a pandas DataFrame
    df = pd.DataFrame(all_chunk_data)
    df_qa = pd.DataFrame(all_QandA)

    # Save the DataFrame to a CSV file
    df.to_csv("manuals/" + document_id + ".csv", index=False)
    df_qa.to_csv("manuals/" + document_id + "_QA.csv", index=False)


In [13]:
# Main Pipeline Execution
input_documents = [
    {
        "make": "Fraggles",
        "model": "X500",
        "year": "2024",
        "style": "FMS",
        "pdf_path": "manuals/FragglesX500FMS-2024-V4.pdf"  
    },
    {
        "make": "Fraggles",
        "model": "X700",
        "year": "2022",
        "style": "HCM",
        "pdf_path": "manuals/FragglesX700HCM-2022-V2.pdf"
    }
]

# Process each document
for doc in input_documents:
    process_document(doc)

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565
Some weights of the model checkpoint at bert-large-uncased-whole-word-masking-finetuned-squad were not used when initializing BertForQuestionAnswering: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForQuestionAnswering from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForQuestionAnswering from the checkpo

Reading PDF pages:   0%|          | 0/97 [00:00<?, ?it/s]

INFO:__main__:Successfully read 97 pages from manuals/FragglesX500FMS-2024-V4.pdf
INFO:__main__:Extracted sentences from page: 0
INFO:__main__:Extracted sentences from page: 1
INFO:__main__:Extracted sentences from page: 2
INFO:__main__:Extracted sentences from page: 3
INFO:__main__:Extracted sentences from page: 4
INFO:__main__:Extracted sentences from page: 5
INFO:__main__:Extracted sentences from page: 6
INFO:__main__:Extracted sentences from page: 7
INFO:__main__:Extracted sentences from page: 8
INFO:__main__:Extracted sentences from page: 9
INFO:__main__:Extracted sentences from page: 10
INFO:__main__:Extracted sentences from page: 11
INFO:__main__:Extracted sentences from page: 12
INFO:__main__:Extracted sentences from page: 13
INFO:__main__:Extracted sentences from page: 14
INFO:__main__:Extracted sentences from page: 15
INFO:__main__:Extracted sentences from page: 16
INFO:__main__:Extracted sentences from page: 17
INFO:__main__:Extracted sentences from page: 18
INFO:__main__:Ex

Batches:   0%|          | 0/9 [00:00<?, ?it/s]

Some weights of the model checkpoint at bert-large-uncased-whole-word-masking-finetuned-squad were not used when initializing BertForQuestionAnswering: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForQuestionAnswering from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForQuestionAnswering from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: all-mpnet-base-v2


Reading PDF pages:   0%|          | 0/28 [00:00<?, ?it/s]

INFO:__main__:Successfully read 28 pages from manuals/FragglesX700HCM-2022-V2.pdf
INFO:__main__:Extracted sentences from page: 0
INFO:__main__:Extracted sentences from page: 1
INFO:__main__:Extracted sentences from page: 2
INFO:__main__:Extracted sentences from page: 3
INFO:__main__:Extracted sentences from page: 4
INFO:__main__:Extracted sentences from page: 5
INFO:__main__:Extracted sentences from page: 6
INFO:__main__:Extracted sentences from page: 7
INFO:__main__:Extracted sentences from page: 8
INFO:__main__:Extracted sentences from page: 9
INFO:__main__:Extracted sentences from page: 10
INFO:__main__:Extracted sentences from page: 11
INFO:__main__:Extracted sentences from page: 12
INFO:__main__:Extracted sentences from page: 13
INFO:__main__:Extracted sentences from page: 14
INFO:__main__:Extracted sentences from page: 15
INFO:__main__:Extracted sentences from page: 16
INFO:__main__:Extracted sentences from page: 17
INFO:__main__:Extracted sentences from page: 18
INFO:__main__:Ex

Batches:   0%|          | 0/3 [00:00<?, ?it/s]

In [14]:
class ChromaDBSearcher:
    def __init__(self, chroma_db_dir="chroma_db_dir", model_name="all-mpnet-base-v2"):
        self.client = chromadb.PersistentClient(path=chroma_db_dir)
        self.collection = self.client.get_collection("pdf_chunks")
        self.model = SentenceTransformer(model_name)

    def search_by_id(self, document_id, query):
        try:
            query_embedding = self.model.encode(query, convert_to_tensor=True).cpu().numpy()
            results = self.collection.query(
                query_embedding.tolist(),
                where={"source": document_source},
                n_results=5
            )
            
            if results and results['documents']:
                document_content = results['documents'][0]  # Remove extra spaces
                if document_content:
                    return results['documents']
                else:
                    print("Document content is empty.")
                    return None
            else:
                print("No documents found.")
                return None
        except Exception as e:
            print(f"An error occurred during search by ID: {e}")
            return None

# Example usage
searcher = ChromaDBSearcher()
document_source = "Fraggles_X500_2024_FMS"  # Replace with the actual document ID you want to search for
#document_source = "Ford_Mustang_2023_MACH-E"
query = "how to use parking breakes?"  # Replace with the query you want to search for

searcher.search_by_id(document_source, query)


INFO:sentence_transformers.SentenceTransformer:Use pytorch device_name: cuda
INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: all-mpnet-base-v2


Batches:   0%|          | 0/1 [00:00<?, ?it/s]

[['•  Traction control may be off, or the vehicle might be  in reverse (R). Ensure the vehicle is moving forward. Q: Why doesn’t the system offer a parking space? •  Sensors could be blocked or damaged, or there may  not be enough space for the vehicle to park safely. Q: Why isn’t the vehicle correctly positioned in the  parking space? •  The system may struggle with irregular curbs, high  attachments on nearby vehicles, or if the parking  space has changed after the vehicle has already  passed. Important Precautions  •  Always Remain in the Vehicle: You must stay in  control of the vehicle and be ready to intervene if  necessary.. FragglesX500FMS 2024          49  8. Release the brake pedal and allow the system to  maneuver the vehicle. 9. The vehicle will back into the parking space and shift  into park (P) when complete.',
  'Tap the Active Park Assist icon on the touchscreen. 3. Select Perpendicular Parking mode. 4. Use the turn signal to indicate the side you want to  park. 5. Dri