In [1]:
from qdrant_client import QdrantClient
from qdrant_client.http import models
from typing import List, Dict
from collections import defaultdict
import re
import pymupdf
from package.models.document_stuffs import CustomDocument
from transformers import AutoModel, AutoTokenizer
import torch

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
client = QdrantClient(host="localhost", port=6333)

In [3]:
# client.create_collection(
#     collection_name = "test_collection",
#     vectors_config = models.VectorParams(size=100, distance=models.Distance.COSINE)
# )

In [4]:
# client.get_collections()

In [5]:
# import pymupdf
# from collections import defaultdict
# from typing import List, Dict

# class CustomSection:
#     def __init__(self, section_title, section_text):
#         self.title = section_title
#         self.text = section_text

# class CustomPage:
#     def __init__(self, page_number: int, page: pymupdf.Page, page_headings: List[str]):
#         text = page.get_text("text")
#         image_list = page.get_images(full=True)
#         self.just_text = text                                  # internal use for analysis
#         self.page_number = page_number
#         self.num_sections = len(page_headings)
#         self.num_chars = len(text)
#         self.num_images = len(image_list)
#         self.sections = self._make_sections(text, page_headings)
#         # images itself
#         # page number
#         # should have a text attribute and a sections attribute
    
#     def _make_sections(self, text: str, page_headings) -> List[CustomSection]:
#         sections = defaultdict(list)
#         lines = text.split("\n")
#         current_heading = page_headings.pop(0)
#         next_heading = page_headings.pop(0) if page_headings else None
#         for line in lines:
#             if next_heading and next_heading.split(":")[-1] in line:
#                 current_heading = next_heading
#                 next_heading = page_headings.pop(0) if page_headings else None
#             sections[current_heading].append(line)
#         page_headings += [current_heading] # here for overflow which we want
#         return [CustomSection(title, "\n".join(text)) for title, text in sections.items()]

#     # =======================================
#     #                  DUNDERS
#     # =======================================
#     def __getitem__(self, idx):
#         return self.sections[idx]

#     def __str__(self):
#         s = ""
#         for section in self.sections:
#             s += section.title + "\n"
#         return f"Page {self.page_number} of document. Has {len(self.sections)} sections: \n" + s

#     def __repr__(self):
#         return str(self)

# class CustomDocument:
#     def __init__(self, file_path):
#         pymupdf_doc = self._load_pdf(file_path)
#         labels: Dict[int, List[str]] = self._create_multilevel_headings(pymupdf_doc.get_toc())
#         self.file_path = file_path
#         self.metadata = pymupdf_doc.metadata
#         self.pages = self._create_pages(labels, pymupdf_doc)
        
#     def _load_pdf(self, file_path: str) -> pymupdf.Document:
#         """Load pdfs"""
#         pdf = pymupdf.open(file_path)
#         return pdf

#     def _create_multilevel_headings(self, table_of_content: List[List[int]]) -> Dict[int, List[str]]:
#         """given a pdf table of content we get a heading id to chunk properly"""
#         page_mapping = defaultdict(list)
#         current_heading = []
#         for heading_level, heading_title, page_number in table_of_content: 
#             # check if the criterion is met
#             if heading_level <= len(current_heading):
#                 current_heading = current_heading[:heading_level - 1] # in the case of headinglevel = 1 it returns an empty list
#             current_heading.append(heading_title)
    
#             # add the joined headers
#             page_mapping[page_number].append(":".join(current_heading))
            
#         return page_mapping

#     def _create_pages(self, labels: Dict[int, List[str]], doc: pymupdf.Document) -> List[CustomPage]:
#         pages, page_headings = [], []
#         for page_num, page in enumerate(doc):
#             page_num += 1
#             page_headings += labels.get(page_num, ["Other"])
#             pages.append(CustomPage(page_num, page, page_headings)) # headings should go away once popped
#         return pages
        
#     def get_full_text(self) -> str:
#         """Text I would need to do analysis and show end users"""
#         full_text = []
#         for page_num, page in enumerate(self.pages):
#             page_num += 1
#             full_text.append(f"Page {page_num}:")
#             for section in page.sections:
#                 full_text.append(section.text)
#         return "\n".join(full_text)
#     # =======================================
#     #                  DUNDERS
#     # =======================================
#     def __getitem__(self, idx):
#         return self.pages[idx]

#     def __str__(self):
#         return f"PDF of {self.file_path}, has {len(self.pages)} pages. Use .metadata attribute to see more"

#     def __repr__(self):
#         return str(self)    

In [6]:
class Chunk:
    def __init__(self, text: str, metadata):
        self.text = text
        self.metadata = metadata
        
class CustomCharacterTextSplitter:
    def __init__(self, num_chars: int = 256, overlap: int = 0):
        self.num_chars = num_chars
        self.overlap = overlap

    def split_document(self, doc: pymupdf.Document) -> List[Chunk]:
        chunks = []
        section_md = []
        page_number_md = []
        remainder = 0
        overflow = False
        chunk_text = ""
        for page in doc:
            for section in page:
                if overflow:
                    window_end = remainder
                    section_md.append(section.title) if section_md[-1] != section.title else None
                    page_number_md.append(page.page_number) if page_number_md[-1] != page.page_number else None
                else:
                    window_end = self.num_chars
                    section_md = [section.title]
                    page_number_md = [page.page_number]
                    
                section_length = len(section)
                window_start   = 0
                
                while window_start < section_length:
                    chunk_text += section[window_start:window_end]
                    chunk_metadata = {"page": tuple(page_number_md), "section": tuple(section_md)}
                    
                    if window_end > section_length:
                        remainder = window_end - section_length
                        overflow = True
                        break
                    else:
                        chunks.append(Chunk(chunk_text, chunk_metadata))
                        chunk_text = ""
                        window_start = window_end - self.overlap
                        window_end   += self.num_chars - self.overlap
                        overflow = False

        return chunks

In [11]:
class ChunkVectorizer:
    def __init__(self, model_name: str):
        self._model = AutoModel.from_pretrained(model_name)
        self._tokenizer = AutoTokenizer.from_pretrained(model_name)

    def vectorize(self):
        pass

In [12]:
custom_doc = CustomDocument("aiayn.pdf")

In [13]:
splitter = CustomCharacterTextSplitter(num_chars=512, overlap=0)
chunks = splitter.split_document(custom_doc)

In [14]:
model = 'sentence-transformers/all-MiniLM-L6-v2'
vectorizer = ChunkVectorizer(model)

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to see activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


In [None]:
for page_number, page in enumerate(doc):
    image_list = page.get_images(full=True)
    images = []
    
    for img_index, img in enumerate(image_list):
        # Get the XREF of the image
        xref = img[0]
        
        # Extract the image bytes
        base_image = doc.extract_image(xref)
        
        # Get the image bytes and metadata
        image_bytes = base_image["image"]
        image_ext = base_image["ext"]
        
        # Save the image as a file
        image_filename = f"image_page{page_number+1}_{img_index}.{image_ext}"
        with open(image_filename, "wb") as image_file:
            image_file.write(image_bytes)
        
        # Append image data to the list
        images.append({
            "image_index": img_index,
            "image_filename": image_filename,
            "image_bytes": image_bytes,
            "image_extension": image_ext
        })
        