In [1]:
import os
import re
import nltk
import string
import random
import pinecone
import numpy as np
from openai import OpenAI
from dotenv import load_dotenv
from datasets import load_dataset
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from pinecone import Pinecone, ServerlessSpec
from typing import List, Tuple, Callable, Dict
from langchain.document_loaders import PyPDFLoader
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.metrics.pairwise import euclidean_distances
from sklearn.feature_extraction.text import TfidfVectorizer
from langchain.text_splitter import RecursiveCharacterTextSplitter

  Referenced from: <ABE0EE74-6D97-3B8C-B690-C44754774FBC> /Users/carlosmayorga/anaconda3/envs/RAG_ARCHITECTURE/lib/python3.8/site-packages/torchvision/image.so
  warn(


In [2]:
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

[nltk_data] Downloading package punkt to
[nltk_data]     /Users/carlosmayorga/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/carlosmayorga/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/carlosmayorga/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


True

In [3]:
load_dotenv()  # Carga el archivo .env

pinecone_api_key = os.getenv("PINECONE_API_KEY")
openai_api_key = os.getenv("OPENAI_API_KEY")

In [4]:

class Document_Processing:
    def __init__(self, file_paths: List[str]):
        """
        Initialize the Document_Processing class with a list of file paths.

        Args:
            file_paths (List[str]): A list of file paths to PDF files.
        """
        self.documents = file_paths

    def preprocess_documents(self, file_path: str) -> List[str]:
        """
        Preprocess a single document by loading and splitting it into smaller chunks.

        Args:
            file_path (str): The path to the PDF file to be processed.

        Returns:
            List[str]: A list of text chunks from the PDF document.
        """
        # Create a loader for the PDF document
        loader = PyPDFLoader(file_path)

        # Load the data from the PDF
        data = loader.load()

        # Split the document into chunks (e.g., 1000 characters)
        text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
        documents = text_splitter.split_documents(data)

        # Convert Document objects into strings
        text_chunks = [str(doc) for doc in documents]

        return text_chunks

    def process_documents(self) -> List[str]:
        """
        Process all documents in the list of file paths and return their preprocessed text.

        Returns:
            List[str]: A list of preprocessed text chunks from all documents.
        """
        all_processed_documents = []

        # Preprocess each document in the file paths
        for file_path in self.documents:
            processed_text = self.preprocess_documents(file_path)
            all_processed_documents.extend(processed_text)

        return all_processed_documents

In [5]:
class DataSource:
    def __init__(self, data: List[str]):
        """
        Initializes the DataSource object with a list of data.

        Args:
            data (List[str]): The input list of text data.
        """
        self.data = data

    def preprocess_text(self, text: str) -> str:
        """
        Preprocesses the text by lowercasing, removing punctuation, and removing extra whitespace.

        Args:
            text (str): The input text to be preprocessed.

        Returns:
            str: The preprocessed text.
        """
        text = text.lower()  # Convert text to lowercase
        text = text.translate(str.maketrans("", "", string.punctuation))  # Remove punctuation
        text = re.sub(r"\s+", " ", text).strip()  # Remove extra whitespace
        return text

    def preprocess_text_advanced(self, text: str) -> str:
        """
        Preprocesses the text by lowercasing, removing punctuation, removing extra whitespace,
        removing numbers, removing stop words, and lemmatizing the words.

        Args:
            text (str): The input text to be preprocessed.

        Returns:
            str: The preprocessed and cleaned text.
        """
        # Convert text to lowercase
        text = text.lower() 
        # Remove punctuation and numeric characters
        text = re.sub(r'[^a-zA-Z\s]', '', text)
        # Tokenize the text
        tokens = nltk.word_tokenize(text)
        # Remove stop words
        stop_words = set(stopwords.words('english'))
        tokens = [token for token in tokens if token not in stop_words]
        # Lemmatize the words
        lemmatizer = WordNetLemmatizer()
        tokens = [lemmatizer.lemmatize(token) for token in tokens]
        # Join the tokens back into a string
        text = ' '.join(tokens)

        return text

    def tokenize(self, text: str) -> List[str]:
        """
        Tokenizes the preprocessed text into a list of words.

        Args:
            text (str): The preprocessed text to be tokenized.

        Returns:
            List[str]: A list of tokens (words) from the text.
        """
        return self.preprocess_text_advanced(text).split()  # Tokenize the preprocessed text by splitting on spaces

    def process_data(self) -> List[str]:
        """
        Processes the data by applying advanced preprocessing to each sentence in the data list.

        Updates the object's processed_data attribute with the cleaned and preprocessed text.

        Args:
            None
        """
        self.processed_data = [self.preprocess_text_advanced(sentence) for sentence in self.data] # Apply preprocessing to each sentence in the data list

        return self.processed_data  # Apply preprocessing to each sentence in the data list


In [6]:
class UserQuery:
    def __init__(self, query: str):
        self.query = query

In [7]:
class Embedding:
    def __init__(self, model_name: str, device: str = 'cpu', use_local: bool = False):
        """
        Initializes the Embedding object with the specified model name, device, and whether to use a local model.

        Args:
            model_name (str): The name of the embedding model to be used.
            device (str): The device to be used for running the model ("cpu" or "cuda"). Default is "cpu".
            use_local (bool): Whether to use a locally available model. Default is False.
        """
        self.model_name = model_name
        self.device = device
        self.use_local = use_local
        # Mapping of model names to their expected dimensions
        self.model_dimensions = {
            'all-MiniLM-L6-v2': 384,  # Example dimension for a SentenceTransformer model
            'jina-v2-base-en-embed': 768,  # Specified dimension for your local model
            # Add other models and their dimensions here
        }
        self.current_model_dimension = self.model_dimensions.get(model_name, None)

        if use_local:
            raise ValueError(f"Not yet implemented")
        else:
            self.model = SentenceTransformer(model_name, device=device)

    def switch_model(self, model_name: str, device: str):
        """
        Switches the embedding model to a different model and device.

        Args:
            model_name (str): The name of the new embedding model to switch to.
            device (str): The device to be used for the new model ("cpu" or "cuda").
        """
        # Here we assume the model switch is successful and just set the dimension
        if model_name in self.model_dimensions:
            self.current_model_dimension = self.model_dimensions[model_name]
            print(f"Switched to model '{model_name}' with dimension {self.current_model_dimension}")
        else:
            print(f"Model '{model_name}' not recognized. Unable to switch models.")

    def embed(self, text: str) -> List[float]:
        """
        Embeds the input text into a vector representation using the current embedding model.

        Args:
            text (str): The input text to be embedded.

        Returns:
            List[float]: The vector representation of the input text.
        """
        if self.use_local:
          raise ValueError(f"Not yet implemented!")
        else:
            result = self.model.encode(text).tolist()
            if len(result) != self.current_model_dimension:
                print(f"Dimension mismatch detected: Expected {self.current_model_dimension}, got {len(result)}")
            return result


In [8]:
class VectorStorage:
    def __init__(self):
        pass

    def store_vectors(self, vectors: List[List[float]]):
        # Placeholder method for storing vectors
        pass

    def search_vectors(self, query_vector: List[float], top_n: int) -> List[int]:
        # Placeholder method for searching vectors
        pass

In [9]:
class PineconeVectorStorage(VectorStorage):
    def __init__(self, index_name: str, embedding: Embedding):
        """
        Initializes the PineconeVectorStorage object with the specified index name and embedding model.

        Args:
            index_name (str): The name of the Pinecone index to be created or used.
            embedding (Embedding): An instance of the Embedding class providing the embedding model.
        """
        super().__init__()
        PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY")
        self.pinecone = Pinecone(api_key=PINECONE_API_KEY)
        if index_name not in self.pinecone.list_indexes().names():
            self.pinecone.create_index(
                name=index_name,
                dimension=embedding.model.get_sentence_embedding_dimension(),
                metric='cosine',
                spec=ServerlessSpec(cloud='aws', region='us-east-1')
            )
        self.index = self.pinecone.Index(index_name)

    def store_vectors(self, vectors: List[List[float]], metadatas: List[dict]):
        """
        Stores vectors and associated metadata in the Pinecone index.

        Args:
            vectors (List[List[float]]): List of vectors to be stored in the index.
            metadatas (List[dict]): List of dictionaries containing metadata associated with each vector.
        """
        ids = [str(i) for i in range(len(vectors))]
        records = zip(ids, vectors, metadatas)
        self.index.upsert(vectors=records)

    def search_vectors(self, query_vector: List[float], top_n: int, filter_metadata: Dict[str, str] = None) -> List[dict]:
        """
        Searches for vectors similar to the given query vector in the Pinecone index.

        Args:
            query_vector (List[float]): The query vector for similarity search.
            top_n (int): The number of top results to return.
            filter_metadata (Dict[str, str], optional): Metadata filters to apply during search. Defaults to None.

        Returns:
            List[dict]: List of dictionaries containing metadata of the top similar vectors.
        """
        query_params = {
            'top_k': top_n,
            'vector': query_vector,
            'include_metadata': True,
            'include_values': False
        }
        if filter_metadata is not None:
            query_params['metadata'] = filter_metadata

        results = self.index.query(**query_params)
        return results['matches']


In [10]:
class RetrievalAndRanking:
    def __init__(self, embedding: Embedding, vector_storage: VectorStorage):
        """
        Initializes the RetrievalAndRanking object with a data source, embedding model, and vector storage.

        Args:
            data_source (DataSource): The data source containing the preprocessed text data.
            embedding (Embedding): The embedding model used to generate vector representations of text.
            vector_storage (VectorStorage): The storage for vector representations of text data.
        """
        self.embedding = embedding
        self.vector_storage = vector_storage

    def retrieve_relevant_chunks_pinecone(self, queries: List[str], top_n: int = 2, filter_metadata: Dict[str, str] = None) -> List[List[str]]:
        """
        Retrieves the most relevant chunks from the data source based on the query using Pinecone vector storage.

        Args:
            queries (List[str]): A list of user queries.
            top_n (int): The number of top relevant chunks to retrieve for each query (default is 2).
            filter_metadata (Dict[str, str], optional): A dictionary containing metadata key-value pairs to filter the results (default is None).

        Returns:
            List[List[str]]: A list containing the top-n most relevant chunks for each query.
        """
        relevant_chunks = []

        for query in queries:
            query_embedding = self.embedding.embed(query)
            results = self.vector_storage.search_vectors(query_embedding, top_n)

            filtered_results = []
            if filter_metadata is not None:
                for result in results:
                    metadata = result.get('metadata', {})
                    if all(metadata.get(key) == value for key, value in filter_metadata.items()):
                        filtered_results.append(result)
            else:
                filtered_results = results

            relevant_chunks.append([result['metadata']['text'] for result in filtered_results])

        return relevant_chunks
    

In [11]:
class LLM:
    def __init__(self, api_key: str = None, model_name: str = None, device: str = "cuda", consumer_group: str = "mistral"):
        """
        Initializes the LLM object with the specified API key, model name, device, and consumer group.
        Sets up the OpenAI client if an API key is provided.

        Args:
            api_key (str, optional): The API key for accessing the OpenAI service. Default is None.
            model_name (str, optional): The name of the model to be used. Default is None.
            device (str, optional): The device to be used for running the model ("cuda" or "cpu"). Default is "cuda".
            consumer_group (str, optional): The consumer group to be used. Default is "mistral".
        """
        self.api_key = api_key
        self.model_name = model_name
        self.device = device
        self.consumer_group = consumer_group

        if self.api_key:
            self.client = OpenAI(api_key=self.api_key)  # Set up the OpenAI client with the provided API key
        else:
            print('Coming Soon')  # Placeholder message for when API key is not provided

    def switch_model(self, model_name: str, device: str):
        """
        Switches to a different model and device for the LLM. Creates a new reader if necessary.

        Args:
            model_name (str): The name of the new model to switch to.
            device (str): The device to be used for the new model ("cuda" or "cpu").
        """
        current_readers = self.takeoff_client.get_readers()  # Retrieve the current readers from the takeoff client

        # Check if a reader for the desired model already exists
        reader_id = None
        for group, readers in current_readers.items():
            for reader in readers:
                if reader['model_name'] == model_name:  # Check if the desired model is already in use
                    reader_id = reader['reader_id']
                    break
            if reader_id:
                break

        if reader_id:
            print(f"Reader for model '{model_name}' already exists with reader_id: {reader_id}")
        else:
            reader_config = {
                "model_name": model_name,
                "device": device,
                "consumer_group": self.consumer_group
            }

            reader_id, _ = self.takeoff_client.create_reader(reader_config=reader_config)  # Create a new reader
            print(f"Created a new reader with reader_id {reader_id}")

    def answer_query(self, query: str, context: str) -> str:
        """
        Generates an answer to a query based on the provided context using the LLM.

        Args:
            query (str): The user's query.
            context (str): The context to be used for answering the query.

        Returns:
            str: The generated answer to the query.
        """
        prompt = f"Based on the provided context, answer the following query: {query}\n\nContext:\n{context}. Do not use your knowledge, only the context"
        
        if self.api_key:
            chat_completion = self.client.chat.completions.create(
                messages=[
                    {
                        "role": "system",
                        "content": prompt
                    },
                    {
                        "role": "user",
                        "content": query,
                    },
                ],
                model="gpt-3.5-turbo",  # Specify the model to be used for generating the response
            )
            return chat_completion.choices[0].message.content.strip()  # Return the generated response
        else:
            response = self.takeoff_client.generate(prompt, consumer_group=self.consumer_group)  # Generate response using the takeoff client
            if 'text' in response:
                return response['text'].strip()  # Return the generated response
            else:
                print(f"Error generating response: {response}")
                return "Unable to generate a response."


In [12]:
def process_query(queries: List[str], retrieval_and_ranking: RetrievalAndRanking, llm: LLM, retrieval_method: str = "default") -> List[str]:
    user_queries = [UserQuery(query) for query in queries]
    answers = []

    for user_query in user_queries:
        if retrieval_method == "default":
            relevant_chunks = retrieval_and_ranking.retrieve_relevant_chunks(user_query.query)
            context = "\n".join(relevant_chunks)
            answer = llm.answer_query(user_query.query, context)
        elif retrieval_method == "euclidean":
            relevant_chunks = retrieval_and_ranking.retrieve_relevant_chunks_euclidean(user_query.query)
            context = "\n".join(relevant_chunks)
            answer = llm.answer_query(user_query.query, context)
        elif retrieval_method == "pinecone":
            relevant_chunks = retrieval_and_ranking.retrieve_relevant_chunks_pinecone([user_query.query])
            context = "\n".join("\n".join(chunks) for chunks in relevant_chunks)
            answer = llm.answer_query(user_query.query, context)
        else:
            raise ValueError(f"Unknown retrieval method: {retrieval_method}")

        answers.append(answer)

    return answers

In [13]:
def main(retrieval_method: str = "default",
         model_choice: str = "openai",
         model_name: str = None,
         device: str = "cpu",
         embedding_model_name = 'all-MiniLM-L6-v2',
         use_local=False,
         index_name="my-index"
         ):
    """
    Main function for running the retrieval and ranking system with user interaction.

    Args:
        data_source (DataSource): An instance of the DataSource class providing the data.
        retrieval_method (str, optional): The method to use for retrieval and ranking. Defaults to "default".
        model_choice (str, optional): The choice of model for language understanding. Defaults to "openai".
        model_name (str, optional): The name of the model to be used. Defaults to None.
        device (str, optional): The device to be used for embedding. Defaults to "cpu".
        embedding_model_name (str, optional): The name of the embedding model to be used. Defaults to 'all-MiniLM-L6-v2'.
        use_local (bool, optional): Whether to use a local model for embedding. Defaults to False.
        index_name (str, optional): The name of the index for vector storage. Defaults to "my-index".
    """
    embedding = Embedding(model_name=embedding_model_name,
                          device=device,
                          use_local=use_local)
    vector_storage = PineconeVectorStorage(index_name, embedding)
    

    retrieval_and_ranking = RetrievalAndRanking(embedding, vector_storage)

    if model_choice == "openai":
        llm = LLM(api_key=os.environ.get("OPENAI_API_KEY"))
    else:
        raise ValueError(f"Not yet implemented")

    while True:
        user_input = input("Enter your query (or type 'exit' to quit): ")
        if user_input.lower() == 'exit':
            break

        queries = user_input.split(";")  # Split multiple queries separated by semicolon
        answers = process_query(queries, retrieval_and_ranking, llm, retrieval_method)

        print("User Queries:")
        for query, answer in zip(queries, answers):
            print(f"Query: {query}")
            print(f"Answer: {answer}\n")


In [17]:
def store_new_documents(data_source:DataSource,
         device: str = "cpu",
         embedding_model_name = 'all-MiniLM-L6-v2',
         use_local=False,
         index_name="my-index"
         ):
    """
    Main function for running the retrieval and ranking system with user interaction.

    Args:
        data_source (DataSource): An instance of the DataSource class providing the data.
        retrieval_method (str, optional): The method to use for retrieval and ranking. Defaults to "default".
        model_choice (str, optional): The choice of model for language understanding. Defaults to "openai".
        model_name (str, optional): The name of the model to be used. Defaults to None.
        device (str, optional): The device to be used for embedding. Defaults to "cpu".
        embedding_model_name (str, optional): The name of the embedding model to be used. Defaults to 'all-MiniLM-L6-v2'.
        use_local (bool, optional): Whether to use a local model for embedding. Defaults to False.
        index_name (str, optional): The name of the index for vector storage. Defaults to "my-index".
    """
    embedding = Embedding(model_name=embedding_model_name,
                          device=device,
                          use_local=use_local)
    vector_storage = PineconeVectorStorage(index_name, embedding)
    processed_data = data_source.process_data()
    metadatas = [{'text': text, 'category': 'politics'} for text in processed_data]
    vectors = [embedding.embed(text) for text in processed_data]
    vector_storage.store_vectors(vectors, metadatas)


# Embedings Vector Mapper
Map new pdf documents to the vector database.

In [None]:
# text_documants = Document_Processing(["./data/Choi et al.pdf"])
# data = text_documants.process_documents()
# data_source = DataSource(data)
# data_source.process_data()
# store_new_documents(data_source, index_name="my-index-5")

# Chat Interface

In [22]:
main(retrieval_method='pinecone', index_name="my-index-5", embedding_model_name="all-MiniLM-L6-v2")

User Queries:
Query: how is power rooted?
Answer: Power is rooted in both formal and informal sources within a network. Formal power is derived from one's position or authority within a hierarchy, such as pulling rank or making decisions based on one's formal position. On the other hand, informal power comes from holding a central position within a network, allowing for greater influence and access to valuable resources. Power can also be rooted in personal attributes, cognitive abilities, technical expertise, shared resources, and social relationships within the network. Overall, power in a network is influenced by the structural configuration and interactions among its members.

User Queries:
Query: what is political based power?
Answer: Political-based power refers to a form of influence and authority that stems from individuals or organizations having astute knowledge of the political dynamics within a network. This type of power is acquired through actions such as cooptation and n