In [39]:
import os
import logging
import json
import uuid
import warnings
from tqdm import trange
from dotenv import load_dotenv
import io
import zipfile
import re
import time

# Adobe PDF Services imports
from adobe.pdfservices.operation.auth.service_principal_credentials import ServicePrincipalCredentials
from adobe.pdfservices.operation.exception.exceptions import ServiceApiException, ServiceUsageException, SdkException
from adobe.pdfservices.operation.io.cloud_asset import CloudAsset
from adobe.pdfservices.operation.io.stream_asset import StreamAsset
from adobe.pdfservices.operation.pdf_services import PDFServices
from adobe.pdfservices.operation.pdf_services_media_type import PDFServicesMediaType
from adobe.pdfservices.operation.pdfjobs.jobs.extract_pdf_job import ExtractPDFJob
from adobe.pdfservices.operation.pdfjobs.params.extract_pdf.extract_element_type import ExtractElementType
from adobe.pdfservices.operation.pdfjobs.params.extract_pdf.extract_pdf_params import ExtractPDFParams
from adobe.pdfservices.operation.pdfjobs.result.extract_pdf_result import ExtractPDFResult

# Pinecone and Langchain imports
from pinecone import Pinecone, ServerlessSpec
from pinecone_text.sparse import BM25Encoder
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain.text_splitter import RecursiveCharacterTextSplitter

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
warnings.filterwarnings("ignore")

# Load environment variables
load_dotenv()
groq_api_key = os.getenv('GROQ_API_KEY')
hf_key = os.getenv('HUGGINGFACE_API_KEY')
pinecone_api_key = os.getenv('PINECONE_API_KEY')
openai_api_key = os.getenv('OPENAI_API_KEY')
dense_embedder_api = os.getenv("HF_API_URL")

# Define model
chat_model = "llama3-8b-8192"
# Import other necessary modules
from llama_index.legacy import Document
from llama_index.legacy.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.legacy.vector_stores import PineconeVectorStore
from typing import Any, Callable, List, Optional, Sequence, TypedDict

import numpy as np

from llama_index.legacy.bridge.pydantic import Field
from llama_index.legacy.callbacks.base import CallbackManager
from llama_index.legacy.embeddings.base import BaseEmbedding
from llama_index.legacy.embeddings.openai import OpenAIEmbedding
from llama_index.legacy.node_parser import NodeParser
from llama_index.legacy.node_parser.interface import NodeParser
from llama_index.legacy.node_parser.node_utils import (
    build_nodes_from_splits,
    default_id_func,
)
from llama_index.legacy.node_parser.text.utils import split_by_sentence_tokenizer
from llama_index.legacy.schema import BaseNode, Document
from llama_index.legacy.utils import get_tqdm_iterable

DEFAULT_OG_TEXT_METADATA_KEY = "original_text"


In [2]:
# Initialize the logger
logging.basicConfig(level=logging.INFO)

class ExtractTextTableInfoFromPDF:
    def __init__(self, file_path):
        self.unique_id = str(uuid.uuid4())
        try:
            file = open(file_path, 'rb')
            input_stream = file.read()
            file.close()

            # Initial setup, create credentials instance
            credentials = ServicePrincipalCredentials(
                client_id=os.getenv('ADOBE_SERVICES_CLIENT_ID'),
                client_secret=os.getenv('ADOBE_SERVICES_CLIENT_SECRET')
            )

            # Creates a PDF Services instance
            pdf_services = PDFServices(credentials=credentials)

            # Creates an asset(s) from source file(s) and upload
            input_asset = pdf_services.upload(input_stream=input_stream, mime_type=PDFServicesMediaType.PDF)

            # Create parameters for the job
            extract_pdf_params = ExtractPDFParams(
                elements_to_extract=[ExtractElementType.TEXT],
            )

            # Creates a new job instance
            extract_pdf_job = ExtractPDFJob(input_asset=input_asset, extract_pdf_params=extract_pdf_params)

            # Submit the job and gets the job result
            location = pdf_services.submit(extract_pdf_job)
            pdf_services_response = pdf_services.get_job_result(location, ExtractPDFResult)

            # Get content from the resulting asset(s)
            result_asset: CloudAsset = pdf_services_response.get_result().get_resource()
            stream_asset: StreamAsset = pdf_services.get_content(result_asset)
            
            zip_bytes = io.BytesIO(stream_asset.get_input_stream())
            with zipfile.ZipFile(zip_bytes, 'r') as zip_ref:
                # Extract all the contents into memory
                self.extracted_data = {name: zip_ref.read(name) for name in zip_ref.namelist()}
                
        except (ServiceApiException, ServiceUsageException, SdkException) as e:
            logging.exception(f'Exception encountered while executing operation: {e}')
    
    # Generates a string containing a directory structure and file name for the output file using unique_id
    @staticmethod
    def create_output_file_path(unique_id: str) -> str:
        os.makedirs("../data/Extracted_data", exist_ok=True)
        return f"../data/Extracted_data/{unique_id}.zip"

    @classmethod
    def create_with_unique_id(cls, file_path):
        instance = cls(file_path)
        return instance, instance.unique_id

In [3]:
# Get the extracted data from the extractor
def get_extracted_data(extracted_data):
    if 'structuredData.json' in extracted_data:
        json_data = json.loads(extracted_data['structuredData.json'])
    return json_data

# Function to initialise a flexible text splitter
def initialise_text_splitter(chunk_size, chunk_overlap):
    separator_list = ["\n\n", "\n", ". ", "!", "?", ",", " ", "", ")", "("]
    try:
        text_splitter = RecursiveCharacterTextSplitter(
                chunk_size=chunk_size,
                chunk_overlap=chunk_overlap,
                length_function=len,
                separators=separator_list)
    except Exception as e:
        logging.error(f"Failed to initialize text splitter: {e}")
        return None
    return text_splitter

# Function to obtain text chunks using the text splitter
def get_text_chunks(file_path, json_data):
    if 'elements' not in json_data:
        logging.error("Missing 'elements' key in json_data")
        raise ValueError("Missing 'elements' key in json_data")
        
    file_name = file_path.split("/")[-1]

    # Chunks are split by pages here
    page_text = ""
    start_page = 0
    
    all_texts = []
    
    list_label = ""
    for i in range(len(json_data['elements'])):
        try:
            current_page = json_data['elements'][i]['Page']
        except KeyError:
            logging.warning(f"Missing 'Page' key in element at index {i}")
            continue

        try:
            if current_page > start_page:
                # Update the new page number
                start_page = current_page               
         
                all_texts.append({'ElementType': 'Text', 'file_name': file_name, 'Page': current_page, 'Text': page_text})
                page_text = ""
                list_label = ""
            else:
                if 'Text' in json_data['elements'][i]:  # Check if Text is not empty
                    if json_data['elements'][i]['Path'].endswith("Lbl") and not json_data['elements'][i]['Path'].startswith("//Document/Table"):
                        list_label = json_data['elements'][i]['Text']
                    else:
                        if list_label:
                            page_text += f"{list_label} {json_data['elements'][i]['Text']}\n"
                            list_label = ""  # Reset list label to empty string
                        else:
                            page_text += f"{json_data['elements'][i]['Text']}\n"
        except KeyError as e:
            logging.warning(f"Key error in json_data['elements'][i] processing at index {i}: {e}")
    
    
    # Processing the last page of the text
    if page_text:
        all_texts.append({'ElementType': 'Text', 'file_name': file_name, 'Page': current_page, 'Text': page_text})

    return all_texts

# Function to derive the nodes from the text chunks
def convert_pagetexts_to_nodes(text_chunks):
    
    # Function to clean up the text in each node
    def clean_up_text(content: str) -> str:
        """
        Remove unwanted characters and patterns in text input.
        :param content: Text input.
        :return: Cleaned version of original text input.
        """

        # Fix hyphenated words broken by newline
        content = re.sub(r'(\w+)-\n(\w+)', r'\1\2', content)

        # Remove specific unwanted patterns and characters
        unwanted_patterns = [
            "\\n", "  —", "——————————", "—————————", "—————",
            r'\\u[\dA-Fa-f]{4}', r'\uf075', r'\uf0b7'
        ]
        for pattern in unwanted_patterns:
            content = re.sub(pattern, "", content)

        # Fix improperly spaced hyphenated words and normalize whitespace
        content = re.sub(r'(\w)\s*-\s*(\w)', r'\1-\2', content)
        content = re.sub(r'\s+', ' ', content)
        return content
    
    # Conversion of text chunks to Documents
    page_documents = [Document(text=chunk['Text'],
                            metadata={
                                "file_name": chunk['file_name'],
                                "page": chunk['Page']
                                },
                            excluded_llm_metadata_keys=["file_name"],
                            metadata_seperator="::",
                            metadata_template="{key}=>{value}",
                            text_template="Metadata: {metadata_str}\n-----\nContent: {content}")
                            
                            for chunk in text_chunks]

    # Clean the texts in each page
    page_nodes = []
    for d in page_documents:
        cleaned_text = clean_up_text(d.text)
        d.text = cleaned_text
        page_nodes.append(d)
    return page_nodes


In [4]:
# file_path = '../PDF/HSI1000-chapter1.pdf'
# extractor, unique_id = ExtractTextTableInfoFromPDF.create_with_unique_id(file_path)
# extracted_data = extractor.extracted_data
# pdf_data = get_extracted_data(extracted_data)
# fname = file_path.split("/")[-1]
# with open(f"../data/{fname}.json", "w", encoding="utf-8") as fout:
#     json.dump(pdf_data, fout)

In [5]:
file_path = '../PDF/HSI1000-chapter1.pdf'
fname = file_path.split("/")[-1]
with open(f"../data/{fname}.json", "r", encoding='utf-8') as fin:
    pdf_data = json.load(fin)

page_texts = get_text_chunks(file_path, pdf_data)
page_documents = convert_pagetexts_to_nodes(page_texts)

## Doing the semantic chunking myself


In [6]:
class SentenceCombination(TypedDict):
    sentence: str
    index: int
    combined_sentence: str
    combined_sentence_embedding: List[float]

class SemanticSplitterNodeParser(NodeParser):
    """Semantic node parser.

    Splits a document into Nodes, with each node being a group of semantically related sentences.

    Args:
        buffer_size (int): number of sentences to group together when evaluating semantic similarity
        embed_model: (BaseEmbedding): embedding model to use
        sentence_splitter (Optional[Callable]): splits text into sentences
        include_metadata (bool): whether to include metadata in nodes
        include_prev_next_rel (bool): whether to include prev/next relationships
    """

    sentence_splitter: Callable[[str], List[str]] = Field(
        default_factory=split_by_sentence_tokenizer,
        description="The text splitter to use when splitting documents.",
        exclude=True,
    )

    embed_model: BaseEmbedding = Field(
        description="The embedding model to use to for semantic comparison",
    )

    buffer_size: int = Field(
        default=1,
        description=(
            "The number of sentences to group together when evaluating semantic similarity. "
            "Set to 1 to consider each sentence individually. "
            "Set to >1 to group sentences together."
        ),
    )

    breakpoint_percentile_threshold = Field(
        default=95,
        description=(
            "The percentile of cosine dissimilarity that must be exceeded between a "
            "group of sentences and the next to form a node.  The smaller this "
            "number is, the more nodes will be generated"
        ),
    )

    @classmethod
    def class_name(cls) -> str:
        return "SemanticSplitterNodeParser"

    @classmethod
    def from_defaults(
        cls,
        embed_model: Optional[BaseEmbedding] = None,
        breakpoint_percentile_threshold: Optional[int] = 95,
        buffer_size: Optional[int] = 1,
        sentence_splitter: Optional[Callable[[str], List[str]]] = None,
        original_text_metadata_key: str = DEFAULT_OG_TEXT_METADATA_KEY,
        include_metadata: bool = True,
        include_prev_next_rel: bool = True,
        callback_manager: Optional[CallbackManager] = None,
        id_func: Optional[Callable[[int, Document], str]] = None,
    ) -> "SemanticSplitterNodeParser":
        callback_manager = callback_manager or CallbackManager([])

        sentence_splitter = sentence_splitter or split_by_sentence_tokenizer()
        embed_model = embed_model or OpenAIEmbedding()

        id_func = id_func or default_id_func

        return cls(
            embed_model=embed_model,
            breakpoint_percentile_threshold=breakpoint_percentile_threshold,
            buffer_size=buffer_size,
            sentence_splitter=sentence_splitter,
            original_text_metadata_key=original_text_metadata_key,
            include_metadata=include_metadata,
            include_prev_next_rel=include_prev_next_rel,
            callback_manager=callback_manager,
            id_func=id_func,
        )

    def _parse_nodes(
        self,
        nodes: Sequence[BaseNode],
        show_progress: bool = False,
        **kwargs: Any,
    ) -> List[BaseNode]:
        """Parse document into nodes."""
        all_nodes: List[BaseNode] = []
        nodes_with_progress = get_tqdm_iterable(nodes, show_progress, "Parsing nodes")

        for node in nodes_with_progress:
            nodes = self.build_semantic_nodes_from_documents([node], show_progress)
            all_nodes.extend(nodes)

        return all_nodes

    def build_semantic_nodes_from_documents(
        self,
        documents: Sequence[Document],
        show_progress: bool = False,
    ) -> List[BaseNode]:
        """Build window nodes from documents."""
        all_nodes: List[BaseNode] = []
        for doc in documents:
            text = doc.text
            text_splits = self.sentence_splitter(text)

            sentences = self._build_sentence_groups(text_splits)

            combined_sentence_embeddings = self.embed_model.get_text_embedding_batch(
                [s["combined_sentence"] for s in sentences],
                show_progress=show_progress,
            )

            for i, embedding in enumerate(combined_sentence_embeddings):
                sentences[i]["combined_sentence_embedding"] = embedding

            distances = self._calculate_distances_between_sentence_groups(sentences)

            chunks = self._build_node_chunks(sentences, distances)

            nodes = build_nodes_from_splits(
                chunks,
                doc,
                id_func=self.id_func,
            )

            all_nodes.extend(nodes)

        return all_nodes

    def _build_sentence_groups(
        self, text_splits: List[str]
    ) -> List[SentenceCombination]:
        sentences: List[SentenceCombination] = [
            {
                "sentence": x,
                "index": i,
                "combined_sentence": "",
                "combined_sentence_embedding": [],
            }
            for i, x in enumerate(text_splits)
        ]

        # Group sentences and calculate embeddings for sentence groups
        for i in range(len(sentences)):
            combined_sentence = ""

            for j in range(i - self.buffer_size, i):
                if j >= 0:
                    combined_sentence += sentences[j]["sentence"]

            combined_sentence += sentences[i]["sentence"]

            for j in range(i + 1, i + 1 + self.buffer_size):
                if j < len(sentences):
                    combined_sentence += sentences[j]["sentence"]

            sentences[i]["combined_sentence"] = combined_sentence

        return sentences

    def _calculate_distances_between_sentence_groups(
        self, sentences: List[SentenceCombination]
    ) -> List[float]:
        distances = []
        for i in range(len(sentences) - 1):
            embedding_current = sentences[i]["combined_sentence_embedding"]
            embedding_next = sentences[i + 1]["combined_sentence_embedding"]

            similarity = self.embed_model.similarity(embedding_current, embedding_next)

            distance = 1 - similarity

            distances.append(distance)

        return distances

    def _build_node_chunks(
        self, sentences: List[SentenceCombination], distances: List[float]
    ) -> List[str]:
        chunks = []
        if len(distances) > 0:
            breakpoint_distance_threshold = np.percentile(
                distances, self.breakpoint_percentile_threshold
            )

            indices_above_threshold = [
                i for i, x in enumerate(distances) if x > breakpoint_distance_threshold
            ]

            # Chunk sentences into semantic groups based on percentile breakpoints
            start_index = 0

            for index in indices_above_threshold:
                end_index = index - 1

                group = sentences[start_index : end_index + 1]
                combined_text = "".join([d["sentence"] for d in group])
                chunks.append(combined_text)

                start_index = index

            if start_index < len(sentences):
                combined_text = "".join(
                    [d["sentence"] for d in sentences[start_index:]]
                )
                chunks.append(combined_text)
        else:
            # If, for some reason we didn't get any distances (i.e. very, very small documents) just
            # treat the whole document as a single node
            chunks = [" ".join([s["sentence"] for s in sentences])]

        return chunks

In [None]:
# Embedding model is customisable 
embed_model = HuggingFaceEmbedding(model_name='sentence-transformers/all-mpnet-base-v2')

In [22]:
parser = SemanticSplitterNodeParser.from_defaults(
    embed_model = embed_model,  # Your embedding model here
    buffer_size = 1,  # For example, group sentences in pairs
    breakpoint_percentile_threshold = 95,  # Your threshold setting
)
# Here we semantically chunk the nodes into semantically split nodes
semantic_nodes = parser._parse_nodes(page_documents[:3], show_progress=False)

In [48]:
# proceed to embed each node so that u can upsert the text with the embeddings
node_texts = [node.text for node in semantic_nodes]
# Generate embeddings (multiprocessing used here by the function)
embeddings = embed_model._embed(node_texts)
# match the embeddings with the semantic nodes
for i in range(len(semantic_nodes)):
    semantic_nodes[i].embedding = embeddings[i]
    
pinecone_text_upserts = []

for i in range(len(semantic_nodes)):
    pinecone_text_upserts.append({
        'id'    : semantic_nodes[i].id_,
        'values': semantic_nodes[i].embedding,
        'metadata': {'text':semantic_nodes[i].text   
        }
        })

In [80]:
pc = Pinecone(api_key=pinecone_api_key)
index_name = "rag-testing"

# Create your index (can skip this step if your index already exists)
hybrid_search=False
if hybrid_search:
    pc_similarity_metric = "dotproduct"
else:
    pc_similarity_metric = "cosine"
    
if index_name not in pc.list_indexes().names():
    logging.info("Creating pinecone index...")
    pc.create_index(
        index_name,
        dimension=768,
        metric=pc_similarity_metric,
        spec=ServerlessSpec(cloud="aws", region="us-east-1")
    )
else:
    logging.info(f"Pinecone index with name: \"{index_name}\" already created")

# Initialize your index
pinecone_index = pc.Index(index_name)

def upsert_pinecone_data(pinecone_text_upserts):  
    # Generate a new UUID for the namespace
    namespace = str(uuid.uuid4())

    # Upsert data into the new namespace
    pinecone_index.upsert(vectors=pinecone_text_upserts, namespace=namespace)
    logging.info("Updating pinecone index...")
    time.sleep(5)
    index_status = pinecone_index.describe_index_stats()
    time.sleep(3)
    if index_status['namespaces'][namespace]['vector_count'] == len(pinecone_text_upserts):          
    # if not check_upsert_success(pinecone_index, namespace):
        logging.error(f"Not all vectors were upserted to namespace {namespace}. Exiting...")
        return namespace, False
    else:
        logging.info(f"All vectors uploaded successfully to namespace {namespace}")
        return namespace, True

namespace, success = upsert_pinecone_data(pinecone_text_upserts)
if success:
    logging.info(f"Data successfully upserted into namespace: {namespace}")
else:
    logging.error(f"Failed to upsert data into namespace: {namespace}")

2024-08-26 02:20:06,626 - INFO - Discovering subpackages in _NamespacePath(['/Users/Spare/Desktop/chat-pdf/venv/lib/python3.10/site-packages/pinecone_plugins'])
2024-08-26 02:20:06,629 - INFO - Looking for plugins in pinecone_plugins.inference
2024-08-26 02:20:06,629 - INFO - Installing plugin inference into Pinecone
2024-08-26 02:20:07,645 - INFO - Pinecone index with name: "rag-testing" already created


Allowing pinecone index to update...


2024-08-26 02:20:17,798 - INFO - All vectors uploaded successfully to namespace 63562882-96ef-43dc-946f-97cb2bb050c4
2024-08-26 02:20:17,800 - INFO - Data successfully upserted into namespace: 63562882-96ef-43dc-946f-97cb2bb050c4


In [75]:
pinecone_index.describe_index_stats()

{'dimension': 768,
 'index_fullness': 0.0,
 'namespaces': {'0efdb654-4c54-4b9f-af96-c75dba4e49ed': {'vector_count': 7},
                '69f84bb0-c8d3-4abb-8161-95b4bcda2217': {'vector_count': 7},
                'e195d0e0-47e5-46dc-9228-e62631c97c88': {'vector_count': 7}},
 'total_vector_count': 21}