In [17]:
import copy
import json
import logging
import pathlib
import re

In [18]:
from abc import ABC, abstractmethod
from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, Sequence
from langchain_core.documents import BaseDocumentTransformer, Document

class TextSplitter(BaseDocumentTransformer, ABC):
    """Interface for splitting text into chunks."""

    def __init__(
        self,
        chunk_size: int = 4000,
        chunk_overlap: int = 200,
        length_function: Callable[[str], int] = len,
        keep_separator: bool = False,
        add_start_index: bool = False,
        strip_whitespace: bool = True,
    ) -> None:
        """Create a new TextSplitter.

        Args:
            chunk_size: Maximum size of chunks to return
            chunk_overlap: Overlap in characters between chunks
            length_function: Function that measures the length of given chunks
            keep_separator: Whether to keep the separator in the chunks
            add_start_index: If `True`, includes chunk's start index in metadata
            strip_whitespace: If `True`, strips whitespace from the start and end of
                              every document
        """
        if chunk_overlap > chunk_size:
            raise ValueError(
                f"Got a larger chunk overlap ({chunk_overlap}) than chunk size "
                f"({chunk_size}), should be smaller."
            )
        self._chunk_size = chunk_size
        self._chunk_overlap = chunk_overlap
        self._length_function = length_function
        self._keep_separator = keep_separator
        self._add_start_index = add_start_index
        self._strip_whitespace = strip_whitespace

    @abstractmethod
    def split_text(self, text: str) -> List[str]:
        """Split text into multiple components."""

    def _join_docs(self, docs: List[str], separator: str) -> Optional[str]:
        text = separator.join(docs)
        if self._strip_whitespace:
            text = text.strip()
        if text == "":
            return None
        else:
            return text

    def _merge_splits(self, splits: Iterable[str], separator: str) -> List[str]:
        # We now want to combine these smaller pieces into medium size
        # chunks to send to the LLM.
        separator_len = self._length_function(separator)

        docs = []
        current_doc: List[str] = []
        total = 0
        for d in splits:
            _len = self._length_function(d)
            if (
                total + _len + (separator_len if len(current_doc) > 0 else 0)
                > self._chunk_size
            ):
                if total > self._chunk_size:
                    logger.warning(
                        f"Created a chunk of size {total}, "
                        f"which is longer than the specified {self._chunk_size}"
                    )
                if len(current_doc) > 0:
                    doc = self._join_docs(current_doc, separator)
                    if doc is not None:
                        docs.append(doc)
                    # Keep on popping if:
                    # - we have a larger chunk than in the chunk overlap
                    # - or if we still have any chunks and the length is long
                    while total > self._chunk_overlap or (
                        total + _len + (separator_len if len(current_doc) > 0 else 0)
                        > self._chunk_size
                        and total > 0
                    ):
                        total -= self._length_function(current_doc[0]) + (
                            separator_len if len(current_doc) > 1 else 0
                        )
                        current_doc = current_doc[1:]
            current_doc.append(d)
            total += _len + (separator_len if len(current_doc) > 1 else 0)
        doc = self._join_docs(current_doc, separator)
        if doc is not None:
            docs.append(doc)
        return docs

    def transform_documents(
        self, documents: Sequence[Document], **kwargs: Any
    ) -> Sequence[Document]:
        """Transform sequence of documents by splitting them."""
        return self.split_documents(list(documents))

In [37]:
text = "This is first. This is second. This is third. This is fourth. This is fifth.\n\nThis is sixth. This is seventh. This is eighth. This is ninth. This is tenth."

python_code_text = """
import os
import sys
import json

def add(a, b):
    return a + b

class MyClass:
    def __init__(self, name):
        self.name = name
        self.age = 0

    def get_name(self):
        return self.name

    def get_age(self):
        return self.age
"""

In [4]:
from langchain.text_splitter import _split_text_with_regex
from langchain.text_splitter import Language

class CharacterTextSplitter(TextSplitter):
    """Splitting text that looks at characters."""

    def __init__(
        self, separator: str = "\n\n", is_separator_regex: bool = False, **kwargs: Any
    ) -> None:
        """Create a new TextSplitter."""
        super().__init__(**kwargs)
        self._separator = separator
        self._is_separator_regex = is_separator_regex

    def split_text(self, text: str) -> List[str]:
        """Split incoming text and return chunks."""
        # First we naively split the large input into a bunch of smaller ones.
        separator = (
            self._separator if self._is_separator_regex else re.escape(self._separator)
        )
        splits = _split_text_with_regex(text, separator, self._keep_separator)
        _separator = "" if self._keep_separator else self._separator
        return self._merge_splits(splits, _separator)

splitter = CharacterTextSplitter(separator="", chunk_size=40, chunk_overlap=20)
splitter.split_text(text)

['This is first. This is second. This is t',
 'is second. This is third. This is fourth',
 'hird. This is fourth. This is fifth.\n\nTh',
 '. This is fifth.\n\nThis is sixth. This is',
 'is is sixth. This is seventh. This is ei',
 'seventh. This is eighth. This is ninth.',
 'ghth. This is ninth. This is tenth.']

In [41]:
class RecursiveCharacterTextSplitter(TextSplitter):
    """Splitting text by recursively look at characters.

    Recursively tries to split by different characters to find one
    that works.
    """

    def __init__(
        self,
        separators: Optional[List[str]] = None,
        keep_separator: bool = True,
        is_separator_regex: bool = False,
        **kwargs: Any,
    ) -> None:
        """Create a new TextSplitter."""
        super().__init__(keep_separator=keep_separator, **kwargs)
        self._separators = separators or ["\n\n", "\n", " ", ""]
        self._is_separator_regex = is_separator_regex

    def _split_text(self, text: str, separators: List[str]) -> List[str]:
        """Split incoming text and return chunks."""
        final_chunks = []
        # Get appropriate separator to use
        separator = separators[-1]
        new_separators = []
        for i, _s in enumerate(separators):
            _separator = _s if self._is_separator_regex else re.escape(_s)
            if _s == "":
                separator = _s
                break
            if re.search(_separator, text):
                separator = _s
                new_separators = separators[i + 1 :]
                break

        _separator = separator if self._is_separator_regex else re.escape(separator)
        splits = _split_text_with_regex(text, _separator, self._keep_separator)

        # Now go merging things, recursively splitting longer texts.
        _good_splits = []
        _separator = "" if self._keep_separator else separator
        for s in splits:
            if self._length_function(s) < self._chunk_size:
                _good_splits.append(s)
            else:
                if _good_splits:
                    merged_text = self._merge_splits(_good_splits, _separator)
                    final_chunks.extend(merged_text)
                    _good_splits = []
                if not new_separators:
                    final_chunks.append(s)
                else:
                    other_info = self._split_text(s, new_separators)
                    final_chunks.extend(other_info)
        if _good_splits:
            merged_text = self._merge_splits(_good_splits, _separator)
            final_chunks.extend(merged_text)
        return final_chunks

    def split_text(self, text: str) -> List[str]:
        return self._split_text(text, self._separators)

    @classmethod
    def from_language(
        cls, language: Language, **kwargs: Any
    ):
        separators = cls.get_separators_for_language(language)
        return cls(separators=separators, is_separator_regex=True, **kwargs)

    @staticmethod
    def get_separators_for_language(language: Language) -> List[str]:
        if language == Language.PYTHON:
            return [
                # First, try to split along class definitions
                "\nclass ",
                "\ndef ",
                "\n\tdef ",
                # Now split by the normal type of lines
                "\n\n",
                "\n",
                " ",
                "",
            ]

splitter = RecursiveCharacterTextSplitter(separators=["\n\n", ""], chunk_size=40, chunk_overlap=20)
splitter.split_text(text)

['This is first. This is second. This is t',
 'is second. This is third. This is fourth',
 'hird. This is fourth. This is fifth.',
 'This is sixth. This is seventh. This i',
 's is seventh. This is eighth. This is ni',
 's eighth. This is ninth. This is tenth.']

In [39]:
[len(x) for x in splitter.split_text(text)]

[40, 40, 36, 38, 40, 39]

In [24]:
splitter = RecursiveCharacterTextSplitter.from_language(Language.PYTHON)

for i,chunk in enumerate(splitter.split_text(python_code_text)):
    print(i)
    print(chunk)

0
import os
import sys
import json

def add(a, b):
    return a + b

class MyClass:
    def __init__(self, name):
        self.name = name
        self.age = 0

    def get_name(self):
        return self.name

    def get_age(self):
        return self.age


In [19]:
from typing import List, Optional

import spacy


class SpacySentenceTokenizer:
    def __init__(self, spacy_model="en_core_web_sm"):
        self.nlp = spacy.load(spacy_model)

    def create_documents(
        self, documents, metadatas=None, overlap: int = 0, stride: int = 1
    )-> List[Document]:
        chunks = []
        if not metadatas:
            metadatas = [{}] * len(documents)
        for doc, metadata in zip(documents, metadatas):
            text_chunks = self.split_text(doc, overlap, stride)
            for chunk_text in text_chunks:
                chunks.append(Document(page_content=chunk_text, metadata=metadata))
        return chunks

    def split_text(self, text: str, stride: int = 1, overlap: int = 0) -> List[str]:
        sentences = list(self.nlp(text).sents)
        chunks = []
        for i in range(0, len(sentences), stride):
            chunk_text = " ".join(str(sent) for sent in sentences[i : i + overlap + 1])
            chunks.append(chunk_text)
        return chunks

In [24]:
text = "I love dogs. They are amazing. Cats must be the easiest pets around. Tesla robots are advanced now with AI. They will take us to mars."
  
tokenizer = SpacySentenceTokenizer()
chunks = tokenizer.split_text(text, stride=1, overlap=2)
chunks

['I love dogs. They are amazing. Cats must be the easiest pets around.',
 'They are amazing. Cats must be the easiest pets around. Tesla robots are advanced now with AI.',
 'Cats must be the easiest pets around. Tesla robots are advanced now with AI. They will take us to mars.',
 'Tesla robots are advanced now with AI. They will take us to mars.',
 'They will take us to mars.']

In [27]:
from typing import List

from sentence_transformers import SentenceTransformer, util
import spacy


class SentenceTransformersSimilarity:
    def __init__(self, model="all-MiniLM-L6-v2", similarity_threshold=0.2):
        self.model = SentenceTransformer(model)
        self.similarity_threshold = similarity_threshold

    def similarities(self, sentences: List[str]):
        # Encode all sentences
        embeddings = self.model.encode(sentences)

        # Calculate cosine similarities for neighboring sentences
        similarities = []
        for i in range(1, len(embeddings)):
            sim = util.pytorch_cos_sim(embeddings[i - 1], embeddings[i]).item()
            similarities.append(sim)

        return similarities


class SpacySentenceSplitter():

    def __init__(self):
        self.nlp = spacy.load("en_core_web_sm")

    def split(self, text: str) -> List[str]:
        doc = self.nlp(text)
        return [str(sent).strip() for sent in doc.sents]


class SimilarSentenceSplitter():

    def __init__(self, similarity_model, sentence_splitter):
        self.model = similarity_model
        self.sentence_splitter = sentence_splitter

    def split_text(self, text: str, group_max_sentences=5) -> List[str]:
        """
        group_max_sentences: The maximum number of sentences in a group.
        """
        sentences = self.sentence_splitter.split(text)

        if len(sentences) == 0:
            return []

        similarities = self.model.similarities(sentences)

        # The first sentence is always in the first group.
        groups = [[sentences[0]]]

        # Using the group min/max sentences contraints,
        # group together the rest of the sentences.
        for i in range(1, len(sentences)):
            if len(groups[-1]) >= group_max_sentences:
                groups.append([sentences[i]])
            elif similarities[i - 1] >= self.model.similarity_threshold:
                groups[-1].append(sentences[i])
            else:
                groups.append([sentences[i]])

        return [" ".join(g) for g in groups]


text = "I love dogs. They are amazing. Cats must be the easiest pets around. Tesla robots are advanced now with AI. They will take us to mars."

model = SentenceTransformersSimilarity()
sentence_splitter = SpacySentenceSplitter()
splitter = SimilarSentenceSplitter(model, sentence_splitter)
splitter.split_text(text)

['I love dogs. They are amazing.',
 'Cats must be the easiest pets around.',
 'Tesla robots are advanced now with AI. They will take us to mars.']

In [1]:
import os
from unstructured.partition.pdf import partition_pdf
from unstructured.staging.base import elements_to_json

In [1]:
filename = "../data/gemini_v1.5.pdf"

# Extracts the elements from the PDF
elements = partition_pdf(
    filename=filename,
    # Unstructured Helpers
    strategy="hi_res",
    infer_table_structure=True,
    model_name="yolox",
)

In [44]:
# for i in range(len(elements)):
#     if "Table" in elements[i].__repr__():
#         print(i)

In [43]:
table = elements[149].metadata.text_as_html
from IPython.display import display, HTML
display(HTML(table))

Unnamed: 0,Context length,AutoAIS Gemini 1.5 Pro,AIS Human Evaluation,Num. Sentences per answer
Anthropic Claude 2.1,0-shot,11.1,30.2,5.7
Gemini 1.0 Pro,0-shot,85.3,79.1,2.3
Gemini 1.5 Pro,0-shot,82.1,75.5,3.4
Anthropic Claude 2.1,4k retrieved,29.1,42.2,5.1
Gemini 1.0 Pro,4k retrieved,75.3,72.1,2.6
Gemini 1.5 Pro,4k retrieved,84.8,78.2,4.9
Gemini 1.5 Pro,710k book,91.4,80.0,5.8
