In [None]:
# download all needed packages and dependencies
! pip install cohere -q
! pip install cohere hnswlib unstructured -q
!pip install pdfplumber
!pip install --quiet langchain langchain_cohere langchain_experimental
!pip install langchain-cohere

In [None]:
# import all needed packages and dependencies
import cohere
import pdfplumber
import uuid
import hnswlib
import faiss
from typing import List, Dict
from unstructured.partition.html import partition_html
from unstructured.chunking.title import chunk_by_title
import os
from langchain_community.tools.tavily_search import TavilySearchResults
from pydantic import BaseModel, Field
from langchain.agents import Tool
from langchain_experimental.utilities import PythonREPL
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.tools import tool
import random
from langchain.agents import AgentExecutor
from langchain_cohere.react_multi_hop.agent import create_cohere_react_agent
from langchain_core.prompts import ChatPromptTemplate
from langchain_cohere.chat_models import ChatCohere

In [None]:
# connect to Cohere API
api_key = ""
co = cohere.Client(api_key)

In [None]:
# open the PDF file and extract text as string
file_path = ""
with pdfplumber.open(file_path) as pdf:
    text = ''
    # iterate through each page
    for page in pdf.pages:
        text += page.extract_text() + '\n'

In [None]:
# defines structure for text
lines = text.splitlines()
title = "\n".join(lines[:3])
text = "\n".join(lines[3:])

documents = [
    {
        "title": title,
        "text": text
    }
]

In [None]:
class Vectorstore:
    """
    A class representing a collection of documents indexed into a vectorstore.

    Parameters:
    raw_documents (list): A list of dictionaries representing the sources of the raw documents. Each dictionary should have 'title' and 'url' keys.

    Attributes:
    raw_documents (list): A list of dictionaries representing the raw documents.
    docs (list): A list of dictionaries representing the chunked documents, with 'title', 'text', and 'url' keys.
    docs_embs (list): A list of the associated embeddings for the document chunks.
    docs_len (int): The number of document chunks in the collection.
    idx (hnswlib.Index): The index used for document retrieval.

    Methods:
    load_and_chunk(): Loads the data from the sources and partitions the HTML content into chunks.
    embed(): Embeds the document chunks using the Cohere API.
    index(): Indexes the document chunks for efficient retrieval.
    retrieve(): Retrieves document chunks based on the given query.
    """

    def __init__(self, documents: List[Dict[str, str]]):
        self.documents = documents
        self.docs = []
        self.docs_embs = []
        self.retrieve_top_k = 10
        self.rerank_top_k = 3
        self.load_and_chunk()
        self.embed()
        self.index()


    def load_and_chunk(self) -> None:
        """
        Loads the pre-extracted text documents and stores them as chunks.
        """
        print("Loading documents...")

        for document in self.documents:
            title = document["title"]
            text = document["text"]

            # Assuming you want to chunk by paragraphs or a similar method
            chunks = self.chunk_text(text)
            
            for chunk in chunks:
                self.docs.append(
                    {
                        "title": title,
                        "text": str(chunk)
                    }
                )
        print(f"Loaded {len(self.docs)} document chunks.")

    def chunk_text(self, text: str, max_chunk_size: int = 500) -> list:
        """
        Splits the text into chunks of a maximum size.
        """
        # You can implement a more sophisticated chunking logic here
        return [text[i:i + max_chunk_size] for i in range(0, len(text), max_chunk_size)]


    def embed(self) -> None:
        """
        Embeds the document chunks using the Cohere API.
        """
        print("Embedding document chunks...")

        batch_size = 90
        self.docs_len = len(self.docs)
        for i in range(0, self.docs_len, batch_size):
            batch = self.docs[i : min(i + batch_size, self.docs_len)]
            texts = [item["text"] for item in batch]
            docs_embs_batch = co.embed(
                texts=texts, model="embed-english-v3.0", input_type="search_document"
            ).embeddings
            self.docs_embs.extend(docs_embs_batch)

    def index(self) -> None:
        """
        Indexes the document chunks for efficient retrieval.
        """
        print("Indexing document chunks...")

        self.idx = hnswlib.Index(space="ip", dim=1024)
        self.idx.init_index(max_elements=self.docs_len, ef_construction=512, M=64)
        self.idx.add_items(self.docs_embs, list(range(len(self.docs_embs))))

        print(f"Indexing complete with {self.idx.get_current_count()} document chunks.")

    def retrieve(self, query: str) -> List[Dict[str, str]]:
        """
        Retrieves document chunks based on the given query.

        Parameters:
        query (str): The query to retrieve document chunks for.

        Returns:
        List[Dict[str, str]]: A list of dictionaries representing the retrieved document chunks, with 'title', 'text', and 'url' keys.
        """

        # Dense retrieval
        query_emb = co.embed(
            texts=[query], model="embed-english-v3.0", input_type="search_query"
        ).embeddings
        
        doc_ids = self.idx.knn_query(query_emb, k=self.retrieve_top_k)[0][0]

        # Reranking
        rank_fields = ["title", "text"] # We'll use the title and text fields for reranking

        docs_to_rerank = [self.docs[doc_id] for doc_id in doc_ids]
        rerank_results = co.rerank(
            query=query,
            documents=docs_to_rerank,
            top_n=self.rerank_top_k,
            model="rerank-english-v3.0",
            rank_fields=rank_fields
        )

        doc_ids_reranked = [doc_ids[result.index] for result in rerank_results.results]

        docs_retrieved = []
        for doc_id in doc_ids_reranked:
            docs_retrieved.append(
                {
                    "title": self.docs[doc_id]["title"],
                    "text": self.docs[doc_id]["text"],
                }
            )

        return docs_retrieved

In [None]:
# Create an instance of the Vectorstore class with the given sources
vectorstore = Vectorstore(documents)

In [None]:
def run_chatbot(message, chat_history=None):
    if chat_history is None:
        chat_history = []
    
    # Generate search queries, if any        
    response = co.chat(message=message,
                        model="command-r-plus",
                        search_queries_only=True,
                        chat_history=chat_history)
    
    search_queries = []
    for query in response.search_queries:
        search_queries.append(query.text)

    # If there are search queries, retrieve the documents
    if search_queries:
        print("Retrieving information...", end="")

        # Retrieve document chunks for each query
        documents = []
        for query in search_queries:
            documents.extend(vectorstore.retrieve(query))

        # Use document chunks to respond
        response = co.chat_stream(
            message=message,
            model="command-r-plus",
            documents=documents,
            chat_history=chat_history,
        )

    else:
        response = co.chat_stream(
            message=message,
            model="command-r-plus",
            chat_history=chat_history,
        )
        
    # Print the chatbot response, citations, and documents
    chatbot_response = ""
    print("\nChatbot:")

    for event in response:
        if event.event_type == "text-generation":
            print(event.text, end="")
            chatbot_response += event.text
        if event.event_type == "stream-end":
            if event.response.citations:
                print("\n\nCITATIONS:")
                for citation in event.response.citations:
                    print(citation)
            if event.response.documents:
                print("\nCITED DOCUMENTS:")
                for document in event.response.documents:
                    print(document)
            # Update the chat history for the next turn
            chat_history = event.response.chat_history

    return chat_history

In [None]:
# creates web search tool

# connect to Tavily API
tavily_api_key = ""
os.environ["TAVILY_API_KEY"] = tavily_api_key

# creates TavilySearchResults object
internet_search = TavilySearchResults()
internet_search.name = "internet_search"
internet_search.description = "Returns a list of relevant document snippets for a textual query retrieved from the internet."

# creates web search tool
class TavilySearchInput(BaseModel):
    """
    A class inherits from BaseModel.

    Attributes:
    query (str):  a string representing the internet query to be searched
    """
    query: str = Field(description="Query to search the internet with")
        
# ensure format of input matches schema defined by TavilySearchInput class
internet_search.args_schema = TavilySearchInput

In [None]:
# creates python interpreter tool

# create a new PythonREPL object
python_repl = PythonREPL()
python_tool = Tool(
   name="python_repl",
   description="Executes python code and returns the result. The code runs in astatic sandbox without interactive mode, so print output or save output to a file.",
   func=python_repl.run,
)
python_tool.name = "python_interpreter"

class ToolInput(BaseModel):
    """
    A class that describes the input for PythonREPL object.
    
    Attributes:
    code (str): a string containing the python code to execute
    """
    code: str = Field(description="Python code to execute.")

# ensure format of input is valid
python_tool.args_schema = ToolInput

In [None]:
# creates a random operation tool

@tool
def random_operation_tool(a: int, b: int):
 """Calculates a random operation between the inputs."""
 coin_toss = random.uniform(0, 1)
 if coin_toss > 0.5:
   return {'output': a*b}
 else:
   return {'output': a+b}

random_operation_tool.name = "random_operation" # use python case
random_operation_tool.description = "Calculates a random operation between the inputs."

class random_operation_inputs(BaseModel):
    """
    A class that defines the input for the random operation tool.
    
    Attributes:
    a (int):first input
    b (int): second input
    """
    a: int = Field(description="First input")
    b: int = Field(description="Second input")
        
# validates the format of the inputs
random_operation_tool.args_schema = random_operation_inputs

In [None]:
# Define a custom search tool that interacts with your vector store
class VectorstoreSearchTool:
    def __init__(self, vectorstore):
        self.vectorstore = vectorstore

    def search(self, query: str):
        # Call the retrieve method from your Vectorstore instance
        return self.vectorstore.retrieve(query)

# Instantiate the search tool
vectorstore_search = VectorstoreSearchTool(vectorstore)

In [None]:
# provide context to the LLM about our role
context = "We are a technology consulting company with global clients. It is important for our company and the clients of our company to stay within regulations of the strictest AI act, the EU AI Act. "

# pass case scenario to the LLM with context
user_input = "A client in the healthcare industry has approached our tech consulting company with a proposal for an AI doctor that can use a patient's information to detect health risk and diagnoses. How much risk does this project have according to the EU AI Act? Please provide quotes and citations from the document."
message = context + user_input

In [None]:
# Turn # 1
chat_history = run_chatbot("Hello, I have a question")

In [None]:
# Turn # 2
chat_history = run_chatbot(message, chat_history)