<a href="https://colab.research.google.com/github/sumkh/ITI110_AgenticRAG/blob/main/LangGraph.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## AI Tutor Chatbot (Version 2.12)


### Setting Up - Install Requirements (Restart Session after installation)

In [1]:
%%capture
%pip install -qU vllm langchain-openai huggingface_hub transformers langchain langchain_huggingface langgraph langchain-core langchain-text-splitters langchain-community chromadb langchain-chroma langsmith docling langchain-docling sentence_transformers

### Load Packages and Utils

In [1]:
from google.colab import userdata
LANGSMITH_API_KEY="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"
HF_TOKEN = userdata.get("HF_TOKEN")

In [2]:
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false" # Disable tokenizers parallelism, as it causes issues with multiprocessing
os.environ["LANGCHAIN_TRACING_V2"] = "true" # LangSmith for Observability
os.environ["LANGCHAIN_PROJECT"] = "AgenticRAG"
os.environ["LANGSMITH_API_KEY"] = LANGSMITH_API_KEY # Optional

from huggingface_hub import login
login(token=HF_TOKEN) # May be optional for getting model download from Huggingface

# Download required files from Github repo
!wget https://github.com/sumkh/NYP_Dataset/raw/refs/heads/main/Documents.zip
!unzip -o /content/Documents.zip


--2025-02-19 07:45:19--  https://github.com/sumkh/NYP_Dataset/raw/refs/heads/main/Documents.zip
Resolving github.com (github.com)... 20.205.243.166
Connecting to github.com (github.com)|20.205.243.166|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/sumkh/NYP_Dataset/refs/heads/main/Documents.zip [following]
--2025-02-19 07:45:20--  https://raw.githubusercontent.com/sumkh/NYP_Dataset/refs/heads/main/Documents.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 18510909 (18M) [application/zip]
Saving to: ‘Documents.zip’


2025-02-19 07:45:21 (319 MB/s) - ‘Documents.zip’ saved [18510909/18510909]

Archive:  /content/Documents.zip
   creating: Documents/
   creating: Documents/general/
  inflating:

In [3]:
import os
import csv
import json
import hashlib
import uuid
import logging
from typing import List, Optional, Union, Literal, Dict
from dataclasses import dataclass, field

# LangChain & related imports
from langchain_core.tools import tool, StructuredTool
from pydantic import BaseModel, Field

from langchain_huggingface import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.retrievers import EnsembleRetriever, ContextualCompressionRetriever

# Extraction for Documents
from langchain_docling.loader import ExportType
from langchain_docling import DoclingLoader
from docling.chunking import HybridChunker

# Extraction for HTML
from langchain_community.document_loaders import WebBaseLoader
from urllib.parse import urlparse


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Configurations and Get the API key from the environment variable
EMBED_MODEL_ID = "sentence-transformers/all-MiniLM-L6-v2"



In [4]:
# =============================================================================
#                         Document Extraction Functions
# =============================================================================

def extract_documents(doc_path: str) -> List[str]:
    """
    Recursively collects all file paths from folder 'doc_path'.
    Used by ExtractDocument.load_files() to find documents to parse.
    """
    extracted_docs = []

    for root, _, files in os.walk(doc_path):
        for file in files:
            file_path = os.path.join(root, file)
            extracted_docs.append(file_path)
    return extracted_docs


def _generate_uuid(page_content: str) -> str:
    """Generate a UUID for a chunk of text using MD5 hashing."""
    md5_hash = hashlib.md5(page_content.encode()).hexdigest()
    return str(uuid.UUID(md5_hash[0:32]))


def load_file(file_path: str) -> List[Document]:
    """
    Load a file from the given path and return a list of Document objects.
    """
    _documents = []

    # Load the file and extract the text chunks
    try:
        loader = DoclingLoader(
            file_path = file_path,
            export_type = ExportType.DOC_CHUNKS,
            chunker = HybridChunker(tokenizer=EMBED_MODEL_ID),
        )
        docs = loader.load()
        logger.info(f"Total parsed doc-chunks: {len(docs)} from Source: {file_path}")

        for d in docs:
            # Tag each document's chunk with the source file and a unique ID
            doc = Document(
                page_content=d.page_content,
                metadata={
                    "source": file_path,
                    "doc_id": _generate_uuid(d.page_content),
                    "source_type": "file",
                }
            )
            _documents.append(doc)
        logger.info(f"Total generated LangChain document chunks: {len(_documents)}\n.")

    except Exception as e:
        logger.error(f"Error loading file: {file_path}. Exception: {e}\n.")

    return _documents


# Define function to load documents from a folder
def load_files_from_folder(doc_path: str) -> List[Document]:
    """
    Load documents from the given folder path and return a list of Document objects.
    """
    _documents = []
    # Extract all files path from the given folder
    extracted_docs = extract_documents(doc_path)

    # Iterate through each document and extract the text chunks
    for file_path in extracted_docs:
        _documents.extend(load_file(file_path))

    return _documents

# =============================================================================
# Load structured data in csv file to LangChain Document format
def load_mcq_csvfiles(file_path: str) -> List[Document]:
    """
    Load structured data in mcq csv file from the given file path and return a list of Document object.
    Expected format: each row of csv is comma separated into "mcq_number", "mcq_type", "text_content"
    """
    _documents = []

    # iterate through each csv file and load each row into _dict_per_question format
    # Ensure we process only CSV files
    if not file_path.endswith(".csv"):
        return _documents  # Skip non-CSV files
    try:
        # Open and read the CSV file
        with open(file_path, mode='r', encoding='utf-8') as file:
            reader = csv.DictReader(file)
            for row in reader:
                # Ensure required columns exist in the row
                if not all(k in row for k in ["mcq_number", "mcq_type", "text_content"]): # Ensure required columns exist and exclude header
                    logger.error(f"Skipping row due to missing fields: {row}")
                    continue
                # Tag each row of csv is comma separated into "mcq_number", "mcq_type", "text_content"
                doc = Document(
                    page_content = row["text_content"], # text_content segment is separated by "|"
                    metadata={
                        "source": f"{file_path}_{row['mcq_number']}",  # file_path + mcq_number
                        "doc_id": _generate_uuid(f"{file_path}_{row['mcq_number']}"),  # Unique ID
                        "source_type": row["mcq_type"],  # MCQ type
                    }
                )
                _documents.append(doc)
            logger.info(f"Successfully loaded {len(_documents)} LangChain document chunks from {file_path}.")

    except Exception as e:
        logger.error(f"Error loading file: {file_path}. Exception: {e}\n.")

    return _documents

# Define function to load documents from a folder for structured data in csv file
def load_files_from_folder_mcq(doc_path: str) -> List[Document]:
    """
    Load mcq csv file from the given folder path and return a list of Document objects.
    """
    _documents = []
    # Extract all files path from the given folder
    extracted_docs = [
        os.path.join(doc_path, file) for file in os.listdir(doc_path)
        if file.endswith(".csv")  # Process only CSV files
    ]

    # Iterate through each document and extract the text chunks
    for file_path in extracted_docs:
        _documents.extend(load_mcq_csvfiles(file_path))

    return _documents


# =============================================================================
#                         Website Extraction Functions
# =============================================================================
def _generate_uuid(page_content: str) -> str:
    """Generate a UUID for a chunk of text using MD5 hashing."""
    md5_hash = hashlib.md5(page_content.encode()).hexdigest()
    return str(uuid.UUID(md5_hash[0:32]))

def ensure_scheme(url):
    parsed_url = urlparse(url)
    if not parsed_url.scheme:
        return 'http://' + url  # Default to http, or use 'https://' if preferred
    return url

def extract_html(url: List[str]) -> List[Document]:
    if isinstance(url, str):
        url = [url]
    """
    Extracts text from the HTML content of web pages listed in 'web_path'.
    Returns a list of LangChain 'Document' objects.
    """
    # Ensure all URLs have a scheme
    web_paths = [ensure_scheme(u) for u in url]

    loader = WebBaseLoader(web_paths)
    loader.requests_per_second = 1
    docs = loader.load()

    # Iterate through each document, clean the content, removing excessive line return and store it in a LangChain Document
    _documents = []
    for doc in docs:
        # Clean the concent
        doc.page_content = doc.page_content.strip()
        doc.page_content = doc.page_content.replace("\n", " ")
        doc.page_content = doc.page_content.replace("\r", " ")
        doc.page_content = doc.page_content.replace("\t", " ")
        doc.page_content = doc.page_content.replace("  ", " ")
        doc.page_content = doc.page_content.replace("   ", " ")

        # Store it in a LangChain Document
        web_doc = Document(
            page_content=doc.page_content,
            metadata={
                "source": doc.metadata.get("source"),
                "doc_id": _generate_uuid(doc.page_content),
                "source_type": "web"
            }
        )
        _documents.append(web_doc)
    return _documents

# =============================================================================
#                         Vector Store Initialisation
# =============================================================================

embedding_model = HuggingFaceEmbeddings(model_name=EMBED_MODEL_ID)

# Initialise vector stores
general_vs = Chroma(
    collection_name="general_vstore",
    embedding_function=embedding_model,
    persist_directory="./general_db"
)

mcq_vs = Chroma(
    collection_name="mcq_vstore",
    embedding_function=embedding_model,
    persist_directory="./mcq_db"
)

in_memory_vs = Chroma(
    collection_name="in_memory_vstore",
    embedding_function=embedding_model
)

# Split the documents into smaller chunks for better embedding coverage
def split_text_into_chunks(docs: List[Document]) -> List[Document]:
    """
    Splits a list of Documents into smaller text chunks using
    RecursiveCharacterTextSplitter while preserving metadata.
    Returns a list of Document objects.
    """
    if not docs:
        return []
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=1000, # Split into chunks of 1000 characters
        chunk_overlap=200, # Overlap by 200 characters
        add_start_index=True
    )
    chunked_docs = splitter.split_documents(docs)
    return chunked_docs # List of Document objects


# =============================================================================
#                         Retrieval Tools
# =============================================================================

# Define a simple similarity search retrieval tool on msq_vs
class MCQRetrievalTool(BaseModel):
    input: str = Field(..., title="input", description="The input text to search for.")
    k: int = Field(2, title="Number of Results", description="The number of results to retrieve.")

def mcq_retriever(input: str, k: int = 2) -> List[str]:
    # Retrieve the top k most similar mcq question documents from the vector store
    docs_func = mcq_vs.as_retriever(
        search_type="similarity",
        search_kwargs={
        'k': k,
        'filter':{"source_type": "mcq_question"}
    },
    )
    docs_qns = docs_func.invoke(input, k=k)

    # Extract the document IDs from the retrieved documents
    doc_ids = [d.metadata.get("doc_id") for d in docs_qns if "doc_id" in d.metadata]

    # Retrieve full documents based on the doc_ids
    docs = mcq_vs.get(where = {'doc_id': {"$in":doc_ids}})

    qns_list = {}
    for i, d in enumerate(docs['metadatas']):
        qns_list[d['source'] + " " + d['source_type']] = docs['documents'][i]

    return qns_list

# Create a StructuredTool from the function
mcq_retriever_tool = StructuredTool.from_function(
    func = mcq_retriever,
    name = "MCQ Retrieval Tool",
    description = (
    """
    Use this tool to retrieve MCQ questions set when Human asks to generate a quiz related to a topic.
    DO NOT GIVE THE ANSWERS to Human before Human has answered all the questions.

    If Human give answers for questions you do not know, SAY you do not have the questions for the answer
    and ASK if the Human want you to generate a new quiz and then SAVE THE QUIZ with Summary Tool before ending the conversation.


    Input must be a JSON string with the schema:
        - input (str): The search topic to retrieve MCQ questions set related to the topic.
        - k (int): Number of question set to retrieve.
        Example usage: input='What is AI?', k=5

    Returns:
    - A dict of MCQ questions:
    Key: 'metadata of question' e.g. './Documents/mcq/mcq.csv_Qn31 mcq_question' with suffix ['question', 'answer', 'answer_reason', 'options', 'wrong_options_reason']
    Value: Text Content

    """
    ),
    args_schema = MCQRetrievalTool,
    response_format="content",
    return_direct = False, # Return the response as a list of strings
    verbose = False  # To log tool's progress
    )

# -----------------------------------------------------------------------------

# Retrieve more documents with higher diversity using MMR (Maximal Marginal Relevance) from the general vector store
# Useful if the dataset has many similar documents
class GenRetrievalTool(BaseModel):
    input: str = Field(..., title="input", description="The input text to search for.")
    k: int = Field(2, title="Number of Results", description="The number of results to retrieve.")

def gen_retriever(input: str, k: int = 2) -> List[str]:
    # Use retriever of vector store to retrieve documents
    docs_func = general_vs.as_retriever(
        search_type="mmr",
        search_kwargs = {'k': k, 'lambda_mult': 0.25}
    )
    docs = docs_func.invoke(input, k=k)
    return [d.page_content for d in docs]

# Create a StructuredTool from the function
general_retriever_tool = StructuredTool.from_function(
    func = gen_retriever,
    name = "Assistant References Retrieval Tool",
    description = (
    """
    Use this tool to retrieve reference information from Assistant reference database for Human queries related to a topic or
    and when Human asked to generate guides to learn or study about a topic.

    Input must be a JSON string with the schema:
        - input (str): The user query.
        - k (int): Number of results to retrieve.
        Example usage: input='What is AI?', k=5
    Returns:
    - A list of retrieved document's content string.
    """
    ),
    args_schema = GenRetrievalTool,
    response_format="content",
    return_direct = False, # Return the content of the documents
    verbose = False  # To log tool's progress
    )

# -----------------------------------------------------------------------------

# Retrieve more documents with higher diversity using MMR (Maximal Marginal Relevance) from the in-memory vector store
# Query in-memory vector store only
class InMemoryRetrievalTool(BaseModel):
    input: str = Field(..., title="input", description="The input text to search for.")
    k: int = Field(2, title="Number of Results", description="The number of results to retrieve.")

def in_memory_retriever(input: str, k: int = 2) -> List[str]:
    # Use retriever of vector store to retrieve documents
    docs_func = in_memory_vs.as_retriever(
        search_type="mmr",
        search_kwargs = {'k': k, 'lambda_mult': 0.25}
    )
    docs = docs_func.invoke(input, k=k)
    return [d.page_content for d in docs]

# Create a StructuredTool from the function
in_memory_retriever_tool = StructuredTool.from_function(
    func = in_memory_retriever,
    name = "In-Memory Retrieval Tool",
    description = (
    """
    Use this tool when Human ask Assistant to retrieve information from documents that Human has uploaded.

    Input must be a JSON string with the schema:
        - input (str): The user query.
        - k (int): Number of results to retrieve.
    """
    ),
    args_schema = InMemoryRetrievalTool,
    response_format="content",
    return_direct = False, # Whether to return the tool’s output directly
    verbose = False  # To log tool's progress
    )

# -----------------------------------------------------------------------------

# Web Extraction Tool
class WebExtractionRequest(BaseModel):
    input: str = Field(..., title="input", description="The input text to search for.")
    url: str = Field(
        ...,
        title="url",
        description="urls to extract content from"
    )
    k: int = Field(5, title="Number of Results", description="The number of results to retrieve.")

# Extract content from a web URL, load into in_memory_vstore
def extract_web_path_tool(input: str, url: str, k: int = 5) -> List[str]:
    if isinstance(url, str):
        url = [url]
    """
    Extract content from the web URLs based on user's input.
    Args:
    - input: The input text to search for.
    - url: URLs to extract content from.
    - k: Number of results to retrieve.
    Returns:
     - A list of retrieved document's content string.
    """
    # Extract content from the web
    html_docs = extract_html(url)
    if not html_docs:
        return f"No content extracted from {url}."

    # Split the documents into smaller chunks for better embedding coverage
    chunked_texts = split_text_into_chunks(html_docs)
    in_memory_vs.add_documents(chunked_texts) # Add the chunked texts to the in-memory vector store

    # Extract content from the in-memory vector store
    # Use retriever of vector store to retrieve documents
    docs_func = in_memory_vs.as_retriever(
        search_type="mmr",
        search_kwargs={
        'k': k,
        'lambda_mult': 0.25,
        'filter':{"source": {"$in": url}}
    },
    )
    docs = docs_func.invoke(input, k=k)
    return [d.page_content for d in docs]

# Create a StructuredTool from the function
web_extraction_tool = StructuredTool.from_function(
    func = extract_web_path_tool,
    name = "Web Extraction Tool",
    description = (
        "Assistant should use this tool to extract content from web URLs based on user's input, "
        "Web extraction is initially load into database and then return k: Number of results to retrieve"
    ),
    args_schema = WebExtractionRequest,
    return_direct = False, # Whether to return the tool’s output directly
    verbose = False  # To log tool's progress
    )

# -----------------------------------------------------------------------------

# Ensemble Retrieval from General and In-Memory Vector Stores
class EnsembleRetrievalTool(BaseModel):
    input: str = Field(..., title="input", description="The input text to search for.")
    k: int = Field(5, title="Number of Results", description="The number of results to retrieve.")

def ensemble_retriever(input: str, k: int = 5) -> List[str]:
    # Use retriever of vector store to retrieve documents
    general_retrieval = general_vs.as_retriever(
        search_type="mmr",
        search_kwargs = {'k': k, 'lambda_mult': 0.25}
    )
    in_memory_retrieval = in_memory_vs.as_retriever(
        search_type="mmr",
        search_kwargs = {'k': k, 'lambda_mult': 0.25}
    )

    ensemble_retriever = EnsembleRetriever(
        retrievers=[general_retrieval, in_memory_retrieval],
        weights=[0.5, 0.5]
    )
    docs = ensemble_retriever.invoke(input)
    return [d.page_content for d in docs]

# Create a StructuredTool from the function
ensemble_retriever_tool = StructuredTool.from_function(
    func = ensemble_retriever,
    name = "Ensemble Retriever Tool",
    description = (
    """
    Use this tool to retrieve information from reference database and
    extraction of documents that Human has uploaded.

    Input must be a JSON string with the schema:
        - input (str): The user query.
        - k (int): Number of results to retrieve.
    """
    ),
    args_schema = EnsembleRetrievalTool,
    response_format="content",
    return_direct = False
    )


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/10.7k [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

1_Pooling%2Fconfig.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

### Define a Customised Wrapper from vLLM Wrapper

In [None]:
from __future__ import annotations

import re
import json
import warnings
from operator import itemgetter
from typing import (
    Any,
    AsyncIterator,
    Callable,
    Dict,
    Iterator,
    List,
    Literal,
    Mapping,
    Optional,
    Sequence,
    Tuple,
    Type,
    TypedDict,
    Union,
    cast,
)

from langchain_core._api import deprecated
from langchain_core.callbacks import (
    AsyncCallbackManagerForLLMRun,
    CallbackManagerForLLMRun,
)
from langchain_core.language_models import LanguageModelInput
from langchain_core.language_models.chat_models import (
    BaseChatModel,
    LangSmithParams,
    agenerate_from_stream,
    generate_from_stream,
)
from langchain_core.messages import (
    AIMessage,
    AIMessageChunk,
    BaseMessage,
    BaseMessageChunk,
    ChatMessage,
    ChatMessageChunk,
    FunctionMessage,
    FunctionMessageChunk,
    HumanMessage,
    HumanMessageChunk,
    InvalidToolCall,
    SystemMessage,
    SystemMessageChunk,
    ToolCall,
    ToolMessage,
    ToolMessageChunk,
)
from langchain_core.output_parsers import (
    JsonOutputParser,
    PydanticOutputParser,
)
from langchain_core.output_parsers.base import OutputParserLike
from langchain_core.output_parsers.openai_tools import (
    JsonOutputKeyToolsParser,
    PydanticToolsParser,
    make_invalid_tool_call,
    parse_tool_call,
)
from langchain_core.outputs import ChatGeneration, ChatGenerationChunk, ChatResult
from langchain_core.runnables import Runnable, RunnableMap, RunnablePassthrough
from langchain_core.tools import BaseTool
from langchain_core.utils import (
    from_env,
    get_pydantic_field_names,
    secret_from_env,
)
from langchain_core.utils.function_calling import (
    convert_to_openai_function,
    convert_to_openai_tool,
)
from langchain_core.utils.pydantic import is_basemodel_subclass
from pydantic import (
    BaseModel,
    ConfigDict,
    Field,
    SecretStr,
    model_validator,
)
from typing_extensions import Self
import uuid

def _is_pydantic_class(obj: Any) -> bool:
    return isinstance(obj, type) and is_basemodel_subclass(obj)


class _FunctionCall(TypedDict):
    name: str

def _convert_dict_to_message(_dict: Mapping[str, Any]) -> BaseMessage:
    """Convert a dictionary to a LangChain BaseMessage."""
    role = _dict.get("role")
    name = _dict.get("name")
    id_ = _dict.get("id")

    if role == "user":
        return HumanMessage(content=_dict.get("content", ""), id=id_, name=name)
    elif role == "assistant":
        # For Azure or OpenAI-like usage, 'assistant' can have content or tool calls
        content = _dict.get("content") or ""
        additional_kwargs: Dict[str, Any] = {}
        tool_calls = []
        invalid_tool_calls = []
        if raw_tool_calls := _dict.get("tool_calls"):
            additional_kwargs["tool_calls"] = raw_tool_calls
            for raw_tool_call in raw_tool_calls:
                try:
                    tool_calls.append(parse_tool_call(raw_tool_call, return_id=True))
                except Exception as e:
                    invalid_tool_calls.append(
                        make_invalid_tool_call(raw_tool_call, str(e))
                    )
        return AIMessage(
            content=content,
            additional_kwargs=additional_kwargs,
            name=name,
            id=id_,
            tool_calls=tool_calls,
            invalid_tool_calls=invalid_tool_calls,
        )
    elif role in ("system", "developer"):
        # "developer" is a custom role used by some expansions
        # We store it in additional_kwargs if needed
        if role == "developer":
            additional_kwargs = {"__openai_role__": role}
        else:
            additional_kwargs = {}
        return SystemMessage(
            content=_dict.get("content", ""),
            name=name,
            id=id_,
            additional_kwargs=additional_kwargs,
        )
    elif role == "function":
        return FunctionMessage(
            content=_dict.get("content", ""), name=cast(str, _dict.get("name")), id=id_
        )
    elif role == "tool":
        # custom "tool" role
        additional_kwargs = {}
        if "name" in _dict:
            additional_kwargs["name"] = _dict["name"]
        return ToolMessage(
            content=_dict.get("content", ""),
            tool_call_id=cast(str, _dict.get("tool_call_id")),
            additional_kwargs=additional_kwargs,
            name=name,
            id=id_,
        )
    else:
        # fallback for other roles
        return ChatMessage(content=_dict.get("content", ""), role=role, id=id_)


def _format_message_content(content: Any) -> Any:
    """Format message content, removing blocks with type == 'tool_use' if present."""
    if content and isinstance(content, list):
        formatted_content = []
        for block in content:
            if (
                isinstance(block, dict)
                and block.get("type") == "tool_use"
            ):
                continue
            formatted_content.append(block)
        return formatted_content
    return content


def _convert_message_to_dict(message: BaseMessage) -> Dict[str, Any]:
    """Convert a LangChain BaseMessage to a dictionary (role/content)."""
    message_dict: Dict[str, Any] = {
        "content": _format_message_content(message.content)
    }

    # Some messages store 'name' in message.name or additional_kwargs['name']
    if (name := message.name or message.additional_kwargs.get("name")) is not None:
        message_dict["name"] = name

    if isinstance(message, ChatMessage):
        message_dict["role"] = message.role
    elif isinstance(message, HumanMessage):
        message_dict["role"] = "user"
    elif isinstance(message, AIMessage):
        message_dict["role"] = "assistant"
        if "tool_calls" in message.additional_kwargs:
            message_dict["tool_calls"] = message.additional_kwargs["tool_calls"]
            # filter to only certain properties
            tool_call_supported_props = {"id", "type", "function"}
            message_dict["tool_calls"] = [
                {k: v for k, v in tool_call.items() if k in tool_call_supported_props}
                for tool_call in message_dict["tool_calls"]
            ]
        # If tool calls present, empty string => None
        if "tool_calls" in message_dict:
            message_dict["content"] = message_dict["content"] or None
    elif isinstance(message, SystemMessage):
        # We store developer as __openai_role__ in message.additional_kwargs
        if "__openai_role__" in message.additional_kwargs:
            message_dict["role"] = "developer"
        else:
            message_dict["role"] = "system"
    elif isinstance(message, ToolMessage):
        message_dict["role"] = "tool"
        message_dict["tool_call_id"] = message.tool_call_id
        supported_props = {"content", "role", "tool_call_id", "name"}
        message_dict = {k: v for k, v in message_dict.items() if k in supported_props}
    else:
        raise TypeError(f"Got unknown type of message: {type(message)}")

    return message_dict

def _convert_chunk_to_message_chunk(
    chunk: Mapping[str, Any], default_class: Type[BaseMessageChunk]
) -> BaseMessageChunk:
    choice = chunk["choices"][0]
    _dict = choice["delta"]
    role = cast(str, _dict.get("role"))
    content = cast(str, _dict.get("content") or "")
    additional_kwargs: Dict = {}
    if _dict.get("function_call"):
        function_call = dict(_dict["function_call"])
        if "name" in function_call and function_call["name"] is None:
            function_call["name"] = ""
        additional_kwargs["function_call"] = function_call
    if _dict.get("tool_calls"):
        additional_kwargs["tool_calls"] = _dict["tool_calls"]

    if role == "user" or default_class == HumanMessageChunk:
        return HumanMessageChunk(content=content)
    elif role == "assistant" or default_class == AIMessageChunk:
        return AIMessageChunk(
            content=content,
            additional_kwargs=additional_kwargs,
            usage_metadata=None,  # ignoring usage stats
        )
    elif role == "system" or default_class == SystemMessageChunk:
        return SystemMessageChunk(content=content)
    elif role == "function" or default_class == FunctionMessageChunk:
        return FunctionMessageChunk(content=content, name=_dict["name"])
    elif role == "tool" or default_class == ToolMessageChunk:
        return ToolMessageChunk(content=content, tool_call_id=_dict["tool_call_id"])
    elif role or default_class == ChatMessageChunk:
        return ChatMessageChunk(content=content, role=role)
    else:
        return default_class(content=content)  # type: ignore


class BaseChat_vLLMWrapper(BaseChatModel):
    """
    A wrapper for a vLLM model for chat.
    Replace `self._client._generate(...)` calls with the actual method from your LLM client.
    """

    _client: Any
    streaming: bool = False
    """Whether to stream the results or not."""

    def __init__(self, client: Any, **kwargs: Any):
        """
        Args:
            client: The vLLM-like client instance that can be called to generate.
        """
        super().__init__(**kwargs)
        self._client = client

    def _convert_messages_to_prompt(self, messages: List[BaseMessage]) -> str:
        """
        Convert list of messages into a single string prompt.
        For HF text-based generation, we simply combine roles + content.
        """
        lines = []

        # If messages is a dictionary, convert it to a list containing the dictionary
        if isinstance(messages, dict):
            messages = [messages]

        for m in messages:
            role = m.get("type", None) if isinstance(m, dict) else (m.type if hasattr(m, 'type') else m.__class__.__name__)
            content = m.get("content", "") if isinstance(m, dict) else m.content
            lines.append(f"{role}: {content}")
        return "\n".join(lines)

    def _generate(
        self,
        messages: List[BaseMessage],
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> ChatResult:
        if self.streaming:
            stream_iter = self._stream(messages, stop=stop, run_manager=run_manager, **kwargs)
            return generate_from_stream(stream_iter)

        message_dicts, params = self._create_message_dicts(messages, stop)
        params = {
            **params,
            **kwargs,
        }
        prompt_str = self._convert_messages_to_prompt(message_dicts)
        response = self._client._generate(
            prompts=prompt_str, stop=stop, run_manager=run_manager, **kwargs
        )
        return self._create_chat_result(response)

    def _stream(
        self,
        messages: List[BaseMessage],
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> Iterator[ChatGenerationChunk]:
        message_dicts, params = self._create_message_dicts(messages, stop)
        prompt_str = self._convert_messages_to_prompt(message_dicts)
        params = {**params, **kwargs, "stream": True}

        default_chunk_class: Type[BaseMessageChunk] = AIMessageChunk
        for chunk in self._client._generate(
            prompts=prompt_str, stop=stop, run_manager=run_manager, **kwargs
        ):
            if not isinstance(chunk, dict):
                if hasattr(chunk, "model_dump"):
                    chunk = chunk.model_dump()
                else:
                    chunk = dict(chunk)
            if len(chunk["choices"]) == 0:
                continue
            choice = chunk["choices"][0]
            message_chunk = _convert_chunk_to_message_chunk(chunk, default_chunk_class)
            generation_info = {}
            if finish_reason := choice.get("finish_reason"):
                generation_info["finish_reason"] = finish_reason
            logprobs = choice.get("logprobs")
            if logprobs:
                generation_info["logprobs"] = logprobs
            default_chunk_class = message_chunk.__class__
            generation_chunk = ChatGenerationChunk(
                message=message_chunk,
                generation_info=generation_info or None,
            )

            if run_manager:
                run_manager.on_llm_new_token(
                    generation_chunk.text, chunk=generation_chunk, logprobs=logprobs
                )
            yield generation_chunk


    def _create_chat_result(self, response: Any) -> ChatResult:
        """
        Parses the LLM response to detect JSON tool calls.
        If JSON is detected, returns AIMessage with a ToolCall.
        Otherwise, returns a normal AIMessage.
        """
        if not isinstance(response, dict):
            if hasattr(response, "model_dump"):
                response = response.model_dump()
            else:
                response = dict(response)

        all_generations = response.get("generations", [])
        if not all_generations:
            raise ValueError("No 'generations' found in LLM response or it's empty.")
        first_prompt_generations = all_generations[0]
        if not first_prompt_generations:
            raise ValueError("No generation output for the first prompt.")

        # Extract the first response text
        gen_obj = first_prompt_generations[0]
        text = gen_obj.get("text", "").strip()

        # Detect and extract JSON content
        try:
            # Remove markdown formatting if present
            text = re.sub(r"```json\n(.*)\n```", r"\1", text, flags=re.DOTALL).strip()

            # Attempt to parse JSON
            maybe_json = json.loads(text)
            if isinstance(maybe_json, dict) and "name" in maybe_json and "arguments" in maybe_json:
                tool_call_id = f"call_{uuid.uuid4().hex[:6]}"
                parsed_call = ToolCall(
                    name=maybe_json["name"],
                    args=maybe_json["arguments"],
                    id=tool_call_id,
                )
                return ChatResult(generations=[ChatGeneration(message=AIMessage(content="", tool_calls=[parsed_call]))])
        except json.JSONDecodeError:
            pass  # If JSON parsing fails, continue with normal AIMessage

        # Default: Return normal text message
        return ChatResult(generations=[ChatGeneration(message=AIMessage(content=text))])

    def _create_message_dicts(
        self, messages: List[BaseMessage], stop: Optional[List[str]]
    ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
        params: Dict[str, Any] = {}
        if stop is not None:
            params["stop"] = stop
        message_dicts = [_convert_message_to_dict(m) for m in messages]
        return message_dicts, params

    def bind_functions(
        self,
        functions: Sequence[Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool]],
        function_call: Optional[
            Union[_FunctionCall, str, Literal["auto", "none"]]
        ] = None,
        **kwargs: Any,
    ) -> Runnable[LanguageModelInput, BaseMessage]:
        formatted_functions = [convert_to_openai_function(fn) for fn in functions]
        if function_call is not None:
            function_call = (
                {"name": function_call}
                if isinstance(function_call, str)
                and function_call not in ("auto", "none")
                else function_call
            )
            if isinstance(function_call, dict) and len(formatted_functions) != 1:
                raise ValueError(
                    "When specifying `function_call`, you must provide exactly one function."
                )
            if (
                isinstance(function_call, dict)
                and formatted_functions[0]["name"] != function_call["name"]
            ):
                raise ValueError(
                    f"Function call {function_call} was specified, but the only "
                    f"provided function was {formatted_functions[0]['name']}."
                )
            kwargs = {**kwargs, "function_call": function_call}
        return super().bind(
            functions=formatted_functions,
            **kwargs,
        )

    def bind_tools(
        self,
        tools: Sequence[Union[Dict[str, Any], Type[BaseModel], Callable, BaseTool]],
        *,
        tool_choice: Optional[
            Union[dict, str, Literal["auto", "any", "none"], bool]
        ] = None,
        **kwargs: Any,
    ) -> Runnable[LanguageModelInput, BaseMessage]:
        formatted_tools = [convert_to_openai_tool(tool) for tool in tools]
        if tool_choice is not None and tool_choice:
            if tool_choice == "any":
                tool_choice = "required"
            if isinstance(tool_choice, str) and (
                tool_choice not in ("auto", "none", "required")
            ):
                tool_choice = {"type": "function", "function": {"name": tool_choice}}
            if isinstance(tool_choice, bool):
                if len(tools) > 1:
                    raise ValueError(
                        "tool_choice can only be True when there is one tool. "
                        f"Received {len(tools)} tools."
                    )
                tool_name = formatted_tools[0]["function"]["name"]
                tool_choice = {
                    "type": "function",
                    "function": {"name": tool_name},
                }

            kwargs["tool_choice"] = tool_choice
        return super().bind(tools=formatted_tools, **kwargs)

    def with_structured_output(
        self,
        schema: Optional[Union[Dict, Type[BaseModel]]] = None,
        *,
        method: Literal["function_calling", "json_mode"] = "function_calling",
        include_raw: bool = False,
        **kwargs: Any,
    ) -> Runnable[LanguageModelInput, Union[Dict, BaseModel]]:
        if kwargs:
            raise ValueError(f"Received unsupported arguments {kwargs}")
        is_pydantic_schema = _is_pydantic_class(schema)
        if method == "function_calling":
            if schema is None:
                raise ValueError(
                    "schema must be specified when method is 'function_calling'. "
                    "Received None."
                )
            formatted_tool = convert_to_openai_tool(schema)
            tool_name = formatted_tool["function"]["name"]
            llm = self.bind_tools(
                [schema],
                tool_choice=tool_name,
                structured_output_format={
                    "kwargs": {"method": "function_calling"},
                    "schema": formatted_tool,
                },
            )
            if is_pydantic_schema:
                output_parser: OutputParserLike = PydanticToolsParser(
                    tools=[schema],
                    first_tool_only=True,
                )
            else:
                output_parser = JsonOutputKeyToolsParser(
                    key_name=tool_name, first_tool_only=True
                )
        elif method == "json_mode":
            llm = self.bind(
                response_format={"type": "json_object"},
                structured_output_format={
                    "kwargs": {"method": "json_mode"},
                    "schema": schema,
                },
            )
            output_parser = (
                PydanticOutputParser(pydantic_object=schema)
                if is_pydantic_schema
                else JsonOutputParser()
            )
        else:
            raise ValueError(
                f"Unrecognized method argument. Expected one of 'function_calling' or 'json_mode'. Received: '{method}'"
            )

        if include_raw:
            parser_assign = RunnablePassthrough.assign(
                parsed=itemgetter("raw") | output_parser, parsing_error=lambda _: None
            )
            parser_none = RunnablePassthrough.assign(parsed=lambda _: None)
            parser_with_fallback = parser_assign.with_fallbacks(
                [parser_none], exception_key="parsing_error"
            )
            return RunnableMap(raw=llm) | parser_with_fallback
        else:
            return llm | output_parser

    @property
    def _llm_type(self) -> str:
        return "vllm"


### Load Model Using Huggingface (Experimental)

In [None]:
# https://huggingface.co/HuggingFaceTB/SmolLM2-1.7B-Instruct
from huggingface_hub import snapshot_download

local_path = "models/SmolLM2-Instruct"

snapshot_download(
    repo_id="HuggingFaceTB/SmolLM2-1.7B-Instruct",
    local_dir=local_path,
    revision="main",
    resume_download=True,
    cache_dir="./cache"
)
print("Model downloaded into "+ local_path)

In [None]:
# https://huggingface.co/docs/transformers/main_classes/pipelines
# Use a pipeline as a high-level helper
from transformers import pipeline
messages = [
    {"role": "user", "content": "Who are you?"},
]
pipe = pipeline("text-generation", model=local_path)
pipe(messages)

Device set to use cuda:0


[{'generated_text': [{'role': 'user', 'content': 'Who are you?'},
   {'role': 'assistant',
    'content': "I'm SmolLM, an AI model trained by Hugging Face. I'm here to assist"}]}]

In [None]:
print(pipe(messages)[0]['generated_text'][-1]['content'])

I'm SmolLM, an AI model trained by Hugging Face. I'm here to assist


In [None]:
from langchain_huggingface import HuggingFacePipeline, ChatHuggingFace
local_path = "models/SmolLM2-Instruct"

# https://python.langchain.com/docs/integrations/chat/huggingface/
llm = HuggingFacePipeline.from_model_id(
    #model_id=local_path,
    model_id="HuggingFaceTB/SmolLM2-1.7B-Instruct",
    task="text-generation",
    pipeline_kwargs=dict(
        max_new_tokens=8192,
        do_sample=False,
        repetition_penalty=1.03,
    ),
)

Device set to use cuda:0


In [None]:
from langchain_core.prompts import PromptTemplate

template = """Question: {question}

Answer: Let's think step by step."""
prompt = PromptTemplate.from_template(template)

chain = prompt | llm  # using llm directly from HuggingfacePipeline

question = "What is electroencephalography?"

print(chain.invoke({"question": question}))

Question: What is electroencephalography?

Answer: Let's think step by step. Electroencephalography (EEG) is a non-invasive neuroimaging technique used to measure the electrical activity of the brain. It is commonly used in clinical and research settings to diagnose and monitor neurological disorders, such as epilepsy, seizure disorders, and coma.


In [None]:
model = ChatHuggingFace(llm=llm, verbose=False)

messages = [
    {"role": "user", "content": "Who are you?"},
]

events = model.invoke(messages)
for event in events:
    print(event)

('content', "<|im_start|>system\nYou are a helpful AI assistant named SmolLM, trained by Hugging Face<|im_end|>\n<|im_start|>user\nWho are you?<|im_end|>\n<|im_start|>assistant\nI'm SmolLM, an AI model trained on the Hugging Face library. I'm here to assist you with your questions and provide information on various topics. I'm designed to be friendly, helpful, and easy to understand. Feel free to ask me anything you'd like to know!")
('additional_kwargs', {})
('response_metadata', {})
('type', 'ai')
('name', None)
('id', 'run-89bd3b9d-fb08-4f25-bf3a-14e65b6147d1-0')
('example', False)
('tool_calls', [])
('invalid_tool_calls', [])
('usage_metadata', None)


In [None]:
from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate

template = ChatPromptTemplate.from_messages([
    SystemMessagePromptTemplate.from_template("You are a helpful assistant."),
    HumanMessagePromptTemplate.from_template("{input}")
])

messages= template.format_messages(input="Who are you?")

events = model.invoke(messages)
for event in events:
    print(event)

('content', "<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n<|im_start|>user\nWho are you?<|im_end|>\n<|im_start|>assistant\nI'm an AI language model, developed to assist and answer questions to the best of my ability. I don't have personal experiences or opinions, but I can provide information on a wide range of topics. I'm here to help you with your queries, answer your questions, and provide information based on my training data.")
('additional_kwargs', {})
('response_metadata', {})
('type', 'ai')
('name', None)
('id', 'run-3a7d49ee-3ac1-4a18-b1b0-2414f7cd2933-0')
('example', False)
('tool_calls', [])
('invalid_tool_calls', [])
('usage_metadata', None)


In [None]:
from langchain_core.prompts import PromptTemplate

template = """Question: {question}

Answer: Let's think step by step."""
prompt = PromptTemplate.from_template(template)

chain = prompt | llm

question = "What is electroencephalography?"

print(chain.invoke({"question": question}))

You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a dataset


Question: What is electroencephalography?

Answer: Let's think step by step. Electroencephalography (EEG) is a non-invasive neuroimaging technique used to measure the electrical activity of the brain. It is commonly used in clinical and research settings to diagnose and monitor neurological disorders, such as epilepsy, seizure disorders, and coma.


In [None]:
for chunk in chain.stream(question):
    print(chunk, end="", flush=True)

 Electroencephalography (EEG) is a non-invasive neuroimaging technique used to measure the electrical activity of the brain. It is commonly used in clinical and research settings to diagnose and monitor neurological disorders, such as epilepsy, seizure disorders, and coma.

### Load Model and Serve Online (Experimental)

In [None]:
# Load and run the model:
!vllm serve "HuggingFaceTB/SmolLM2-1.7B-Instruct" --dtype=half

2025-02-13 17:35:29.364457: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1739468129.384918   48223 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1739468129.391108   48223 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-02-13 17:35:29.418049: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
INFO 02-13 17:35:32 __init__.py:190] Automatically detected platform cuda.
INFO 02-13 17:35:33 api_server.py:840] vLL

In [None]:
import torch
from langchain_openai import ChatOpenAI

inference_server_url = "http://localhost:8000/v1"

llm = ChatOpenAI(
    model="HuggingFaceTB/SmolLM2-1.7B-Instruct",
    openai_api_key="EMPTY", # max context window of 8192
    openai_api_base=inference_server_url,
    max_tokens=512,
    temperature=0.5
)


### Load Model Offline Locally (vLLM)

#### Download and Initiate LLM  (vLLM)

In [None]:
# RTool Calling System Prompt
system_prompt = """
[SYSTEM PROMPT]

You are a tool-calling assistant. If the user’s request requires calling a tool, **ONLY** respond with a JSON object **without extra text**.

Format strictly as:

{
  "name": "<TOOL_NAME>",
  "arguments": {
    "param1": "value1",
    "param2": "value2"
  }
}

Rules:
- **Do not** return any extra text before or after the JSON.
- **Do not** wrap JSON in markdown (```json```).
- **Do not** explain anything.

Example valid responses:
{"name": "Web Extraction Tool", "arguments": {"input": "Deep Learning", "k": 3, "url": "https://www.ibm.com/think/topics/artificial-intelligence"}}
{"name": "MCQ Retrieval Tool", "arguments": {"input": "AI", "k": 1}}

Example invalid responses:
 Here is your JSON: {"name": "Web Extraction Tool", "arguments": {...}}
 ```json { "name": "Web Extraction Tool", "arguments": {...} } ```
 Let me fetch that for you. {"name": "Web Extraction Tool", "arguments": {...}}

User Query:
{user_question}


"""

# prompt_template =[
#     {"role": "system", "content": system_message},
#     {"role": "user", "content": "{question}"},
#     {"role": "assistant", "content": "{answer}"},
#     ]



In [None]:
# https://python.langchain.com/api_reference/community/llms/langchain_community.llms.vllm.VLLM.html
# https://lunary.ai/blog/vllm-langchain-tutorial#leveraging-quantization-for-improved-efficiency
# Original implementation from LangChain for non-Chat vLLM
import torch
from langchain_community.llms import VLLM

# # Initializing the vLLM model
base_vllm = VLLM(
    #model="HuggingFaceTB/SmolLM2-1.7B-Instruct",
    #model="meta-llama/Llama-3.2-1B-Instruct",
    model="TheBloke/Llama-2-7b-Chat-AWQ",
    trust_remote_code=True,  # mandatory for Hugging Face models
    max_new_tokens=512,
    top_k=10,
    top_p=0.95,
    temperature=0.5,
    dtype="float16",
    ignore_eos = False,
    verbose = True,
    download_dir = "models/vllm",
    vllm_kwargs={"quantization": "awq"}, #vLLM supports awq quantization. To enable it, pass quantization to vllm_kwargs.
)


INFO 02-18 16:38:52 __init__.py:190] Automatically detected platform cuda.


config.json:   0%|          | 0.00/777 [00:00<?, ?B/s]

INFO 02-18 16:39:12 config.py:542] This model supports multiple tasks: {'embed', 'reward', 'classify', 'generate', 'score'}. Defaulting to 'generate'.
INFO 02-18 16:39:14 llm_engine.py:234] Initializing a V0 LLM engine (v0.7.2) with config: model='TheBloke/Llama-2-7b-Chat-AWQ', speculative_config=None, tokenizer='TheBloke/Llama-2-7b-Chat-AWQ', skip_tokenizer_init=False, tokenizer_mode=auto, revision=None, override_neuron_config=None, tokenizer_revision=None, trust_remote_code=True, dtype=torch.float16, max_seq_len=4096, download_dir='models/vllm', load_format=LoadFormat.AUTO, tensor_parallel_size=1, pipeline_parallel_size=1, disable_custom_all_reduce=False, quantization=awq, enforce_eager=False, kv_cache_dtype=auto,  device_config=cuda, decoding_config=DecodingConfig(guided_decoding_backend='xgrammar'), observability_config=ObservabilityConfig(otlp_traces_endpoint=None, collect_model_forward_time=False, collect_model_execute_time=False), seed=0, served_model_name=TheBloke/Llama-2-7b-Ch

tokenizer_config.json:   0%|          | 0.00/776 [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/500k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.84M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/414 [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/188 [00:00<?, ?B/s]

INFO 02-18 16:39:19 cuda.py:179] Cannot use FlashAttention-2 backend for Volta and Turing GPUs.
INFO 02-18 16:39:19 cuda.py:227] Using XFormers backend.
INFO 02-18 16:39:20 model_runner.py:1110] Starting to load model TheBloke/Llama-2-7b-Chat-AWQ...
INFO 02-18 16:39:21 weight_utils.py:252] Using model weights format ['*.safetensors']


model.safetensors:   0%|          | 0.00/3.89G [00:00<?, ?B/s]

INFO 02-18 16:39:47 weight_utils.py:297] No model.safetensors.index.json found in remote.


Loading safetensors checkpoint shards:   0% Completed | 0/1 [00:00<?, ?it/s]


INFO 02-18 16:39:49 model_runner.py:1115] Loading model weights took 3.6711 GB
INFO 02-18 16:39:53 worker.py:267] Memory profiling takes 3.80 seconds
INFO 02-18 16:39:53 worker.py:267] the current vLLM instance can use total_gpu_memory (14.74GiB) x gpu_memory_utilization (0.90) = 13.27GiB
INFO 02-18 16:39:53 worker.py:267] model weights take 3.67GiB; non_torch_memory takes 0.03GiB; PyTorch activation peak memory takes 0.44GiB; the rest of the memory reserved for KV Cache is 9.13GiB.
INFO 02-18 16:39:54 executor_base.py:110] # CUDA blocks: 1168, # CPU blocks: 512
INFO 02-18 16:39:54 executor_base.py:115] Maximum concurrency for 4096 tokens per request: 4.56x
INFO 02-18 16:39:57 model_runner.py:1434] Capturing cudagraphs for decoding. This may lead to unexpected consequences if the model is not static. To run the model in eager mode, set 'enforce_eager=True' or use '--enforce-eager' in the CLI. If out-of-memory error occurs during cudagraph capture, consider decreasing `gpu_memory_utiliz

Capturing CUDA graph shapes: 100%|██████████| 35/35 [00:56<00:00,  1.62s/it]

INFO 02-18 16:40:53 model_runner.py:1562] Graph capturing finished in 57 secs, took 0.50 GiB
INFO 02-18 16:40:53 llm_engine.py:431] init engine (profile, create kv cache, warmup model) took 64.55 seconds





### Run vLLM SubProcess

In [5]:
# https://www.llama.com/docs/model-cards-and-prompt-formats/llama3_1/#-tool-calling-(8b/70b/405b)-
# https://medium.com/@hakimnaufal/trying-out-vllm-deepseek-r1-in-google-colab-a-quick-guide-a4fe682b8665
# https://github.com/naufalhakim23/deepseek-r1-playground/blob/main/deepseek_r1_distill_qwen_fast_api.ipynb
# https://colab.research.google.com/github/deepset-ai/haystack-cookbook/blob/main/notebooks/vllm_inference_engine.ipynb
# https://docs.vllm.ai/_/downloads/en/v0.4.2/pdf/

# https://huggingface.co/casperhansen/llama-3-8b-instruct-awq/tree/main

!wget -q -P examples/ https://github.com/vllm-project/vllm/raw/refs/heads/main/examples/tool_chat_template_llama3.1_json.jinja

# we prepend "nohup" and postpend "&" to make the Colab cell run in background
! nohup python -m vllm.entrypoints.openai.api_server \
                  --model casperhansen/llama-3-8b-instruct-awq \
                  --enable-auto-tool-choice \
                  --tool-call-parser llama3_json \
                  --chat-template examples/tool_chat_template_llama3.1_json.jinja \
                  --quantization awq \
                  --max-model-len 4096 \
                  --download-dir models/vllm \
                  > vllm.log &

nohup: redirecting stderr to stdout


In [6]:
# we check the logs until the server has been started correctly
!while ! grep -q "Application startup complete" vllm.log; do tail -n 1 vllm.log; sleep 5; done

E0000 00:00:1739951177.760811    2687 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
INFO 02-19 07:46:22 api_server.py:206] Started engine process with PID 2752
E0000 00:00:1739951189.818584    2752 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
E0000 00:00:1739951189.818584    2752 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
INFO 02-19 07:46:35 __init__.py:190] Automatically detected platform cuda.
INFO 02-19 07:46:35 __init__.py:190] Automatically detected platform cuda.
INFO 02-19 07:47:00 model_runner.py:1110] Starting to load model casperhansen/llama-3-8b-instruct-awq...
INFO 02-19 07:47:00 weight_utils.py:252] Using model weights format ['*.safetensors']
INFO 02-19 07:47:00 weight_utils.py:252] Using mo

find the process ID (PID) using a command like ps aux | grep vllm and then kill it using kill -9 <PID>.

In [144]:
# Find the process ID (PID)
!ps aux | grep vllm

root        8386  0.2  8.8 6623348 1177212 ?     Sl   05:00   0:24 python3 -m vllm.entrypoints.opena
root       45848  0.0  0.0   7376  3480 ?        S    07:26   0:00 /bin/bash -c ps aux | grep vllm
root       45850  0.0  0.0   6484  2428 ?        S    07:26   0:00 grep vllm


In [147]:
# To kill the process, look for the first set of digits
#!kill -9 8386

In [7]:
import requests

def check_vllm_status():
    try:
        response = requests.get("http://localhost:8000/health")
        if response.status_code == 200:
            print("vllm server is running")
            return True
    except requests.exceptions.ConnectionError:
        print("vllm server is not running")
        return False

try:
    # Monitor the process
    while True:
        if check_vllm_status() == True:
            print("The vllm server is ready to serve.")
            break
        else:
            print("The vllm server has stopped.")
            #stdout, stderr = vllm_process.communicate(timeout=10)
            #print(f"STDOUT: {stdout.decode('utf-8')}")
            #print(f"STDERR: {stderr.decode('utf-8')}")
            break
        time.sleep(5)  # Check every second
except KeyboardInterrupt:
    print("Stopping the check of vllm...")

vllm server is running
The vllm server is ready to serve.


In [8]:
from langchain_openai import ChatOpenAI

model = ChatOpenAI(
    model="casperhansen/llama-3-8b-instruct-awq",
    temperature=0.5,
    timeout=None,
    max_retries=2,
    api_key="not_required",
    base_url="http://localhost:8000/v1",
    # organization="...",
    # other params...
)

messages = [
    (
        "system",
        "You are a helpful assistant that translates English to French. Translate the user sentence.",
    ),
    ("human", "I love programming."),
]
ai_msg = model.invoke(messages)
ai_msg

AIMessage(content='Je adore le programmation.', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 7, 'prompt_tokens': 55, 'total_tokens': 62, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'casperhansen/llama-3-8b-instruct-awq', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None}, id='run-fb017083-b521-4683-b997-b4eec49ee46d-0', usage_metadata={'input_tokens': 55, 'output_tokens': 7, 'total_tokens': 62, 'input_token_details': {}, 'output_token_details': {}})

In [None]:
# Wrap the Custom ChatvLLM over the base_vllm model
# Running a simple query
BaseChat_vLLMWrapper(base_vllm).invoke([
                                        {"role": "system", "content": system_prompt},
                                        {"role": "user", "content": "Search What is Deep Learning?"}]
                                       ).pretty_print()


# Invoke a single question
#result = llm.invoke([HumanMessage(content="What is Deep Learning?")])
#print("Response:", result.content)

# Or call batch
# batch_inputs = [
#     [HumanMessage(content="What is Deep Learning?")],
#     [HumanMessage(content="Explain the concept of attention in Transformers.")],
# ]
# batch_outputs = llm.batch(batch_inputs)
# for i, out in enumerate(batch_outputs):
#     print(f"Response #{i+1}:", out.content)

Processed prompts: 100%|██████████| 1/1 [00:02<00:00,  2.79s/it, est. speed input: 116.88 toks/s, output: 25.46 toks/s]

Tool Calls:
  Web Extraction Tool (call_1680ec)
 Call ID: call_1680ec
  Args:
    input: Deep Learning
    k: 3
    url: https://www.ibm.com/think/topics/artificial-intelligence





### LangGraph

In [None]:
%%capture
%pip install langchain-groq

In [None]:
from google.colab import userdata
GROQ_API_KEY = userdata.get('GROQ_API_KEY')

from langchain_groq import ChatGroq

# Initialize Groq LLM
model = ChatGroq(
    model_name="deepseek-r1-distill-llama-70b",   #"llama-3.2-3b-preview", "deepseek-r1-distill-llama-70b"
    temperature=0.6,
    api_key=GROQ_API_KEY,
    verbose=True
)

print(model.invoke("Who are you?"))

content="<think>\n\n</think>\n\nGreetings! I'm DeepSeek-R1, an artificial intelligence assistant created by DeepSeek. I'm at your service and would be delighted to assist you with any inquiries or tasks you may have." additional_kwargs={} response_metadata={'token_usage': {'completion_tokens': 44, 'prompt_tokens': 7, 'total_tokens': 51, 'completion_time': 0.16, 'prompt_time': 0.003501381, 'queue_time': 0.234579366, 'total_time': 0.163501381}, 'model_name': 'deepseek-r1-distill-llama-70b', 'system_fingerprint': 'fp_492bd52206', 'finish_reason': 'stop', 'logprobs': None} id='run-013d938d-b492-4bb4-9490-de82fb8463a9-0' usage_metadata={'input_tokens': 7, 'output_tokens': 44, 'total_tokens': 51}


In [35]:
import json
from typing import (
    Annotated,
    Sequence,
    TypedDict,
    List,
)

from pydantic import BaseModel, Field

# LangChain / LangGraph imports
from langchain_core.messages import (
    SystemMessage,
    AIMessage,
    HumanMessage,
    BaseMessage,
    ToolMessage,
)
from langchain_core.tools import StructuredTool
from langchain_core.runnables import RunnableConfig

from langgraph.prebuilt import InjectedStore
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore
from langgraph.checkpoint.memory import MemorySaver
from langchain.embeddings import init_embeddings
from langgraph.graph import StateGraph, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition

###############################################################################
# 1. Initialize memory + config
###############################################################################
in_memory_store = InMemoryStore(
    index={
        "embed": init_embeddings("huggingface:sentence-transformers/all-MiniLM-L6-v2"),
        "dims": 384,  # Embedding dimensions
    }
)

# A memory saver to checkpoint conversation states
checkpointer = MemorySaver()

# Initialize config with user & thread info
config = {}
config["configurable"] = {
    "user_id": "user_1",
    "thread_id": 0,
}

###############################################################################
# 2. Define MessagesState
###############################################################################
class MessagesState(TypedDict):
    """The state of the agent.

    The key 'messages' uses add_messages as a reducer,
    so each time this state is updated, new messages are appended.
    # See https://langchain-ai.github.io/langgraph/concepts/low_level/#reducers
    """
    messages: Annotated[Sequence[BaseMessage], add_messages]


###############################################################################
# 3. Memory Tools
###############################################################################
def save_memory(summary_text: str, *, config: RunnableConfig, store: BaseStore) -> str:
    """Save the given memory for the current user and return the key."""
    user_id = config.get("configurable", {}).get("user_id")
    thread_id = config.get("configurable", {}).get("thread_id")
    namespace = (user_id, "memories")
    memory_id = thread_id
    store.put(namespace, memory_id, {"summary": summary_text})
    return memory_id

def update_memory(state: MessagesState, config: RunnableConfig, *, store: BaseStore):
    # Extract the messages list from the event, handling potential missing key
    messages = state["messages"]
    # Convert LangChain messages to dictionaries before storing
    messages_dict = [{"role": msg.type, "content": msg.content} for msg in messages]

    # Get the user id from the config
    user_id = config.get("configurable", {}).get("user_id")
    thread_id = config.get("configurable", {}).get("thread_id")
    # Namespace the memory
    namespace = (user_id, "memories")
    # Create a new memory ID
    memory_id = f"{thread_id}_{str(uuid.uuid4())}"
    store.put(namespace, memory_id, {"memory": messages_dict})
    return memory_id


# Define a Pydantic schema for the save_memory tool (if needed elsewhere)
# https://langchain-ai.github.io/langgraphjs/reference/classes/checkpoint.InMemoryStore.html
class RecallMemory(BaseModel):
    query_text: str = Field(..., title="Search Text", description="The text to search from memories for similar records.")
    k: int = Field(5, title="Number of Results", description="Number of results to retrieve.")

def recall_memory(query_text: str, k: int = 5) -> str:
    """Retrieve user memories from in_memory_store."""
    user_id = config.get("configurable", {}).get("user_id")
    namespace = (user_id, "memories")

    memories = [m.value["data"] for m in in_memory_store.search(namespace=namespace, query=query_text, limit=k)]
    joined = f"User memories: {', '.join(memories)}"
    return joined

# Create a StructuredTool from the function
recall_memory_tool = StructuredTool.from_function(
    func=recall_memory,
    name="Recall Memory Tool",
    description="""
      Retrieve memories relevant to the user's query.
      """,
    args_schema=RecallMemory,
    response_format="content",
    return_direct=False,
    verbose=False
)

###############################################################################
# 4. Summarize Node (using StructuredTool)
###############################################################################
# Define a Pydantic schema for the Summary tool
class SummariseConversation(BaseModel):
    summary_text: str = Field(..., title="text", description="Summary of Conversation")

def summarise_node(summary_text: str):
    """
    Final node that summarizes the entire conversation for the current thread,
    saves it in memory, increments the thread_id, and ends the conversation.
    Returns a confirmation string.
    """
    user_id = config["configurable"]["user_id"]
    current_thread_id = config["configurable"]["thread_id"]
    new_thread_id = str(int(current_thread_id) + 1)

    # Prepare configuration for saving memory with updated thread id
    config_for_saving = {
        "configurable": {
            "user_id": user_id,
            "thread_id": new_thread_id
        }
    }
    key = save_memory(summary_text, config=config_for_saving, store=in_memory_store)
    return f"Summary saved under key: {key}"

# Create a StructuredTool from the function (this wraps summarise_node)
summarise_tool = StructuredTool.from_function(
    func=summarise_node,
    name="Summary Tool",
    description="""
      Summarize the current conversation in no more than
      1000 words. Also retain any unanswered quiz questions along with
      your internal answers so the next conversation thread can continue.
      Do not reveal solutions to the user yet. Use this tool to save
      the current conversation to memory and then end the conversation.
      """,
    args_schema=SummariseConversation,
    response_format="content",
    return_direct=False,
    verbose=True
)

###############################################################################
# 5. Build the Graph
###############################################################################
graph_builder = StateGraph(MessagesState)

# Use the built-in ToolNode from langgraph that calls any declared tools.
tools = [
    mcq_retriever_tool,
    web_extraction_tool,
    ensemble_retriever_tool,
    general_retriever_tool,
    in_memory_retriever_tool,
    recall_memory_tool,
    summarise_tool,  # Use the StructuredTool version, not the raw function
]

tool_node = ToolNode(tools=tools)
#end_node = ToolNode(tools=[summarise_tool])  # Use summarise_tool for end node

#from langchain.agents import AgentExecutor, create_tool_calling_agent
#from langchain_core.prompts import ChatPromptTemplate


# Wrap the Custom ChatvLLM over the base_vllm model
#model = BaseChat_vLLMWrapper(base_vllm)


# Wrap your model with tools
llm_with_tools = model.bind_tools(tools)

###############################################################################
# 6. The agent's main node: call_model
###############################################################################
def call_model(state: MessagesState, config: RunnableConfig):
    """
    The main agent node that calls the LLM with the user + system messages.
    Since our vLLM chat wrapper expects a list of BaseMessage objects,
    we convert any dict messages to HumanMessage objects.
    If the LLM requests a tool call, we'll route to the 'tools' node next
    (depending on the condition).
    """
    # Convert message dicts to HumanMessage instances if needed.
    messages = []
    for m in state["messages"]:
        if isinstance(m, dict):
            # Use role from dict (defaulting to 'user' if missing)
            messages.append(HumanMessage(content=m.get("content", ""), role=m.get("role", "user")))
        else:
            messages.append(m)

    # Invoke the LLM (with tools) using the converted messages.
    response = llm_with_tools.invoke(messages)

    return {"messages": [response]}

###############################################################################
# 7. Add Nodes & Edges, Then Compile
###############################################################################
graph_builder.add_node("agent", call_model)
graph_builder.add_node("tools", tool_node)
#graph_builder.add_node("summary", end_node)

# Entry point
graph_builder.set_entry_point("agent")

# def custom_tools_condition(llm_output: dict) -> str:
#     """Return which node to go to next based on the LLM output."""

#     # The LLM's JSON might have a field like {"name": "Recall Memory Tool", "arguments": {...}}.
#     tool_name = llm_output.get("name", None)

#     # If the LLM calls "Summary Tool", jump directly to the 'summary' node
#     if tool_name == "Summary Tool":
#         return "summary"

#     # If the LLM calls any other recognized tool, go to 'tools'
#     valid_tool_names = [t.name for t in tools]  # all tools in the main tool_node
#     if tool_name in valid_tool_names:
#         return "tools"

#     # If there's no recognized tool name, assume we're done => go to summary
#     return "__end__"

# graph_builder.add_conditional_edges(
#     "agent",
#     custom_tools_condition,
#     {
#         "tools": "tools",
#         "summary": "summary",
#         "__end__": "summary",
#     }
# )

# If LLM requests a tool, go to "tools", otherwise go to "summary"
graph_builder.add_conditional_edges("agent", tools_condition)
#graph_builder.add_conditional_edges("agent", tools_condition, {"tools": "tools", "__end__": "summary"})
#graph_builder.add_conditional_edges("agent", lambda llm_output: "tools" if llm_output.get("name", None) in [t.name for t in tools] else "summary", {"tools": "tools", "__end__": "summary"}

# If we used a tool, return to the agent for final answer or more tools
graph_builder.add_edge("tools", "agent")
#graph_builder.add_edge("agent", "summary")
#graph_builder.set_finish_point("summary")

# Compile the graph with checkpointing and persistent store
graph = graph_builder.compile(checkpointer=checkpointer, store=in_memory_store)

#from langgraph.prebuilt import create_react_agent
#graph = create_react_agent(llm_with_tools, tools=tool_node, checkpointer=checkpointer, store=in_memory_store)

from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))


OutOfMemoryError: CUDA out of memory. Tried to allocate 46.00 MiB. GPU 0 has a total capacity of 14.74 GiB of which 16.12 MiB is free. Process 18792 has 764.00 MiB memory in use. Process 29170 has 13.97 GiB memory in use. Of the allocated memory 595.16 MiB is allocated by PyTorch, and 34.84 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)

In [None]:
from typing import List
from langchain_core.messages import SystemMessage, AIMessage, HumanMessage
from langchain.agents import initialize_agent, AgentType
from langchain.tools import tool  # Correct import for @tool
from langchain.prompts import StringPromptTemplate
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, MessagesState, StateGraph

# ------------------------------------------------------------------
# Custom Prompt Template for the Agent
# ------------------------------------------------------------------
class CustomPromptTemplate(StringPromptTemplate):
    input_variables: List[str]  # Explicit typing

    def format(self, **kwargs) -> str:
        return self.template.format(**kwargs)

# Define the custom prompt
prompt_template = """You are an AI assistant for retrieving and answering questions based on provided tools.
Always respond with an action and input in the following format:

Action: <tool_name>
Action Input: <input>

Examples:
Action: rag_retrieval
Action Input: {"query": "What is Deep Learning?"}

Do not include 'Thoughts' or extraneous text unless explicitly asked.
Ensure the 'Action Input' follows the 'Action' line directly.
If the question is unclear, respond with "Action: None" and "Action Input: None".

The current question is: {input}"""

custom_prompt = CustomPromptTemplate(template=prompt_template, input_variables=["input"])

# ------------------------------------------------------------------
# Define the RAG retrieval tool
# ------------------------------------------------------------------
class GenRetrievalTool(BaseModel):
    input: str = Field(..., title="input", description="The input text to search for.")

def gen_retriever(input: str) -> List[str]:
    # Use retriever of vector store to retrieve documents
    docs_func = general_vs.as_retriever(
        search_type="mmr",
        search_kwargs = {'k': 5, 'lambda_mult': 0.25}
    )
    docs = docs_func.invoke(input, k=5)
    return [d.page_content for d in docs]

# Create a StructuredTool from the function
general_retriever_tool = StructuredTool.from_function(
    func = gen_retriever,
    name = "Assistant References Retrieval Tool",
    description = (
    """
    Use this tool to retrieve reference information from Assistant reference database for Human queries related to a topic or
    and when Human asked to generate guides to learn or study about a topic.
    """
    ),
    args_schema = GenRetrievalTool,
    response_format="content",
    return_direct = False, # Return the content of the documents
    verbose = False  # To log tool's progress
    )

@tool
def rag_retrieval(query: str):
    """
    Retrieve information related to a query from a vector store.
    Returns (retrieved_text).
    """
    if general_vs is None:
        return "Vector store not initialized.", []

    try:
        retrieved_docs = general_vs.similarity_search(query, k=2)
        if not retrieved_docs:
            return "No relevant documents found.", []
        serialized = "\n\n".join(
            (f"Source: {doc.metadata}\nContent: {doc.page_content}")
            for doc in retrieved_docs
        )
        return serialized, retrieved_docs
    except Exception as e:
        return f"Error during retrieval: {str(e)}", []

@tool
def gen_retrieval(query: str):
    """
    Retrieve information related to a query from a vector store.
    Returns (retrieved_text).
    """
    if general_vs is None:
        return "Vector store not initialized."

    try:
        docs_func = general_vs.as_retriever(
            search_type="mmr",
            search_kwargs={'k': 5, 'lambda_mult': 0.25}
        )
        docs = docs_func.invoke(query)
        return [d.page_content for d in docs]
    except Exception as e:
        return f"Error retrieving documents: {str(e)}"

# ------------------------------------------------------------------
# Process Query (Calls RAG tool via an Agent)
# ------------------------------------------------------------------
agent = initialize_agent(
        tools=[gen_retrieval, rag_retrieval],
        #tools = [general_retriever_tool],
        llm=base_vllm,
        agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
        verbose=True,
        handle_parsing_errors=True,
        agent_kwargs={"prompt": custom_prompt},
    )

def process_query(state: MessagesState):
    """Processes the query and calls the RAG tool."""
    messages = state.get("messages", [])

    if not messages or not isinstance(messages[-1], HumanMessage):
        return {"messages": messages + [AIMessage(content="No valid user query detected.")]}

    query = messages[-1].content

    try:
        tool_output = agent.run({"input": query}) #gen_retrieval(query)
        return {"messages": messages + [AIMessage(content=tool_output)]}
    except Exception as e:
        return {"messages": messages + [AIMessage(content=f"Error: {str(e)}")]}

# ------------------------------------------------------------------
# Generate Response (Uses retrieved context to form an answer)
# ------------------------------------------------------------------
def generate_response(state: MessagesState):
    """Generates the final response while maintaining context."""
    messages = state.get("messages", [])

    system_message_content = (
        "You are an assistant for question-answering tasks. "
        "Use the retrieved context to answer the question. "
        "If you don't know the answer, ask the user to clarify."
    )

    conversation_messages = [
        f"User: {message.content}" if isinstance(message, HumanMessage) else f"AI: {message.content}"
        for message in messages
    ]

    prompt = system_message_content + "\n\n" + "\n".join(conversation_messages)

    try:
        final_response = agent(prompt) if agent else "LLM not initialized."
    except Exception as e:
        final_response = f"Error generating response: {str(e)}"

    return {"messages": messages + [AIMessage(content=final_response)]}

# ------------------------------------------------------------------
# Build & Compile the Graph
# ------------------------------------------------------------------
graph_builder = StateGraph(MessagesState)
graph_builder.add_node("process_query", process_query)
graph_builder.add_node("generate_response", generate_response)
graph_builder.set_entry_point("process_query")
graph_builder.add_edge("process_query", "generate_response")
graph_builder.add_edge("generate_response", END)

memory = MemorySaver()
graph = graph_builder.compile(checkpointer=memory)

# ------------------------------------------------------------------
# Example Usage
# ------------------------------------------------------------------
messages = [HumanMessage(content="What is Deep Learning Techniques?")]

config = {
    "configurable": {
        "thread_id": "example_thread",
        "checkpoint_ns": "example_namespace",
        "checkpoint_id": "query_1",
    }
}

result = graph.invoke({"messages": messages}, config=config)

for message in result["messages"]:
    print(f"{message.type.capitalize()}: {message.content}")


In [45]:
# Test to Call Tool Manually
message_with_single_tool_call = AIMessage(
    content="",
    tool_calls=[
        {
            "name": "Assistant References Retrieval Tool",
            "args": {"input": "Deep Learning"},
            "id": "tool_call_id",
            "type": "tool_call",
        }
    ],
)

tool_node.invoke({"messages": [message_with_single_tool_call]})

{'messages': [ToolMessage(content='["Deep Learning\\na machine learning technique in which layers of neural networks are used to process data and make decisions", "How Do Children Learn?\\nThe Deep Learning Revolution"]', name='Assistant References Retrieval Tool', tool_call_id='tool_call_id')]}

In [50]:
# Replacing the JSON with Tool calls
llm_with_tools.invoke(question_1).tool_calls

[{'name': 'Web Extraction Tool',
  'args': {'input': 'Deep Learning',
   'url': 'https://www.ibm.com/think/topics/artificial-intelligence',
   'k': 5},
  'id': 'chatcmpl-tool-079683f0cf764365b75627feafd48f7a',
  'type': 'tool_call'}]

In [71]:
# Test to Call Tool Manually
tool_call_msg = AIMessage(
    content="",
    tool_calls=llm_with_tools.invoke(question_2).tool_calls,
)

tool_result = tool_node.invoke({"messages": [tool_call_msg]})
tool_result

{'messages': [ToolMessage(content='{"./Documents/mcq/mcq.csv_Qn5 mcq_question": "Your organization is planning to deploy a new Azure AI solution to automate the processing of customer inquiries received through various communication channels, including email, chat, and social media. As part of the deployment process, you are tasked with creating an Azure AI resource to host the required services. The solution should be scalable, cost-effective, and comply with Responsible AI principles. Which steps should you follow to create the Azure AI resource while ensuring scalability, cost optimization, and compliance with Responsible AI principles? Select all answers that apply.", "./Documents/mcq/mcq.csv_Qn5 mcq_answer": "Answer - [A, C, D, E]", "./Documents/mcq/mcq.csv_Qn5 mcq_answer_reason": "Option A - Choosing an appropriate pricing tier and region ensures cost optimization and scalability of the Azure AI resource.|Option C - Defining resource tags helps categorize and track usage and cost

In [125]:
context = []
context.append({"role": "user", "content": question_7})
response = llm_with_tools.invoke(context)
response

AIMessage(content='To generate a study guide on Deep Learning, I will use the "Assistant References Retrieval Tool" and "Summary Tool". \n\nHere is the tool call:\n\n{"name": "Assistant References Retrieval Tool", "parameters": {"input": "What is Deep Learning?", "k": 5}}\n\nOutput:\n{\n    "metadata of question": "./Documents/assistant_references/Deep Learning_Study Guide_Qn1",\n    "question": "What is Deep Learning?",\n    "answer": "Deep learning is a subset of machine learning that involves the use of artificial neural networks to analyze and interpret data.",\n    "answer_reason": "Deep learning is a subset of machine learning that involves the use of artificial neural networks to analyze and interpret data. This is because deep learning models are inspired by the structure and function of the human brain, where complex patterns are recognized through multiple layers of abstraction.",\n    "options": ["Machine Learning", "Artificial Intelligence", "Natural Language Processing", "

In [130]:
llm_with_tools.invoke(question_7).tool_calls,

([],)

In [117]:
tool_call_msg = AIMessage(content="", tool_calls=response.tool_calls)
tool_result = tool_node.invoke({"messages": [tool_call_msg]})
tool_result

{'messages': [ToolMessage(content='{"./Documents/mcq/mcq.csv_Qn5 mcq_question": "Your organization is planning to deploy a new Azure AI solution to automate the processing of customer inquiries received through various communication channels, including email, chat, and social media. As part of the deployment process, you are tasked with creating an Azure AI resource to host the required services. The solution should be scalable, cost-effective, and comply with Responsible AI principles. Which steps should you follow to create the Azure AI resource while ensuring scalability, cost optimization, and compliance with Responsible AI principles? Select all answers that apply.", "./Documents/mcq/mcq.csv_Qn5 mcq_answer": "Answer - [A, C, D, E]", "./Documents/mcq/mcq.csv_Qn5 mcq_answer_reason": "Option A - Choosing an appropriate pricing tier and region ensures cost optimization and scalability of the Azure AI resource.|Option C - Defining resource tags helps categorize and track usage and cost

In [94]:
tool_result["messages"][0]

ToolMessage(content='["Each of these approaches is suited to different kinds of problems and data. But one of the most popular types of machine learning algorithm is called a neural network (or artificial neural network). Neural networks are modeled after the human brain\'s structure and function. A neural network consists of interconnected layers of nodes (analogous to neurons) that work together to process and analyze complex data. Neural networks are well suited to tasks that involve identifying complex patterns and relationships in large amounts of data. The simplest form of machine learning is called supervised learning, which involves the use of labeled data sets to train algorithms to classify data or predict outcomes accurately. In supervised learning, humans pair each training example with an output label. The goal is for the model to learn the mapping between inputs and outputs in the training data, so it can predict the labels of new, unseen data.     Deep learning   Deep le

In [96]:
next_response = llm_with_tools.invoke([{"role": "user", "content": question_1},
                                      {"role": "tool",
                                       "content": tool_result["messages"][0].content,
                                       "name": tool_result["messages"][0].name,
                                       "tool_call_id": tool_result["messages"][0].tool_call_id}])

BadRequestError: Error code: 400 - {'object': 'error', 'message': "This model's maximum context length is 2048 tokens. However, you requested 2503 tokens in the messages, Please reduce the length of the messages.", 'type': 'BadRequestError', 'param': None, 'code': 400}

In [86]:
next_response = llm_with_tools.invoke({"role": "user", "content": question_1},
                                      {"role": "Tool", "content": tool_result["messages"][0].content })
next_response

ValueError: Invalid input type <class 'dict'>. Must be a PromptValue, str, or list of BaseMessages.

In [73]:
# Test Example with Tool Calling
context = []
def process_query(query):

    global context
    context.append({"role": "user", "content": query})

    response = llm_with_tools.invoke(context)
    print(response)

    if response.tool_calls:
        tool_call_msg = AIMessage(content="", tool_calls=llm_with_tools.invoke(query).tool_calls)
        tool_result = tool_node.invoke({"messages": [tool_call_msg]})

        context.append(tool_node.invoke({"messages": [tool_call_msg]}))

        response = llm_with_tools.invoke(context)

    context.append({"role": "assistant", "content": response.content})

    return response.content

response_1 = process_query(question_1)
print("LLM_response:", response_1)

content='To answer the prompt, I will call the function "Web Extraction Tool" with the following arguments:\n\n{"name": "Web Extraction Tool", "parameters": {"input": "Deep Learning", "url": "https://www.ibm.com/think/topics/artificial-intelligence", "k": 5}}\n\nThis will extract content from the website related to the topic of Deep Learning.' additional_kwargs={'refusal': None} response_metadata={'token_usage': {'completion_tokens': 77, 'prompt_tokens': 1588, 'total_tokens': 1665, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_name': 'casperhansen/llama-3-8b-instruct-awq', 'system_fingerprint': None, 'finish_reason': 'stop', 'logprobs': None} id='run-098f35fc-f67a-4ed6-bded-2584d03aea65-0' usage_metadata={'input_tokens': 1588, 'output_tokens': 77, 'total_tokens': 1665, 'input_token_details': {}, 'output_token_details': {}}
LLM_response: To answer the prompt, I will call the function "Web Extraction Tool" with the following arguments:

{"name": "Web Extractio

In [None]:
from openai import OpenAI
import json

client = OpenAI(base_url="http://localhost:8000/v1", api_key="dummy")

def get_weather(location: str, unit: str):
    return f"Getting the weather for {location} in {unit}..."
tool_functions = {"get_weather": get_weather}

tools = [{
    "type": "function",
    "function": {
        "name": "get_weather",
        "description": "Get the current weather in a given location",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {"type": "string", "description": "City and state, e.g., 'San Francisco, CA'"},
                "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
            },
            "required": ["location", "unit"]
        }
    }
}]

response = client.chat.completions.create(
    model=client.models.list().data[0].id,
    messages=[{"role": "user", "content": "What's the weather like in San Francisco?"}],
    tools=tools,
    tool_choice="auto"
)

tool_call = response.choices[0].message.tool_calls[0].function
print(f"Function called: {tool_call.name}")
print(f"Arguments: {tool_call.arguments}")
print(f"Result: {get_weather(**json.loads(tool_call.arguments))}")

In [33]:
# Check if the Model can call Web Extraction Tools
llm_with_tools.invoke("Call Web Extraction tool for Search https://www.ibm.com/think/topics/artificial-intelligence").tool_calls

[{'name': 'Web Extraction Tool',
  'args': {'input': 'Search https://www.ibm.com/think/topics/artificial-intelligence',
   'url': 'https://www.ibm.com/think/topics/artificial-intelligence',
   'k': 5},
  'id': 'chatcmpl-tool-1453bc2039334f75b13387c495ad80fa',
  'type': 'tool_call'}]

In [39]:
# Check if the Model can call Retrieval Tools
llm_with_tools.invoke("Look into references tool on Deep Learning").tool_calls

[{'name': 'Assistant References Retrieval Tool',
  'args': {'input': 'What is Deep Learning?', 'k': 5},
  'id': 'chatcmpl-tool-419049d8af5f423283c49c71b5750cee',
  'type': 'tool_call'}]

In [35]:
# Check if the Model can call MCQ Tools
llm_with_tools.invoke("Call Tool to give 1 MCQ").tool_calls

[{'name': 'MCQ Retrieval Tool',
  'args': {'input': 'What is AI?', 'k': 5},
  'id': 'chatcmpl-tool-33239ea01b274b75b1ad25d800ffaeee',
  'type': 'tool_call'}]

In [37]:
# Check if the Model can call MCQ Tools
llm_with_tools.invoke("Recall a Memory").tool_calls

[{'name': 'Recall Memory Tool',
  'args': {'query_text': 'input query', 'k': 5},
  'id': 'chatcmpl-tool-66f9d9e7de45426095df064cdb87eda9',
  'type': 'tool_call'}]

In [41]:
# Check if the Model can call MCQ Tools
llm_with_tools.invoke("Summarise tool for a Conversation").tool_calls

[{'name': 'Summary Tool',
  'args': {'summary_text': 'Summary of Conversation'},
  'id': 'chatcmpl-tool-8bbb5b6a306c47799c7f2f1150d9d09e',
  'type': 'tool_call'}]

In [43]:
# Check if the Model can call MCQ Tools
llm_with_tools.invoke("Search a Document I uploaded in memory").tool_calls

[{'name': 'In-Memory Retrieval Tool',
  'args': {'input': 'Search a Document I uploaded in memory', 'k': 1},
  'id': 'chatcmpl-tool-b8206864bfb541aa971e2d14882a7504',
  'type': 'tool_call'}]

In [30]:
# URL
url1 = "https://www.ibm.com/think/topics/artificial-intelligence"
url2 = "https://www.ibm.com/think/topics/machine-learning"

question_1 = (f"Find out about Deep Learning from databases. Then search from website: {url1} with web extraction tool")
question_2 = "Provide 5 MCQ questions on Artificial Intelligence to help me with practice."
question_3 = "Here are my answers: 1. A, 2. B, 3. C, 4. D, 5. E. Please check mu answer, provide reasons for my wrong answers and provide the correct answers with explanations."
question_4 = "Provide another 5 MCQ questions on Artificial Intelligence to help me with practice."
question_5 = "Here are my answers: 1. A, 2. B, 3. C, 4. D, 5. E. Please check mu answer, provide reasons for my wrong answers and provide the correct answers with explanations."
question_6 = "Provide a study quide to help me learn for my wrong answers for the MCQ questions."
question_7 = "Based on your reference databases only, provide a study quide on Deep Learning."
question_8 = "Based on your memory, provide a summary of our conversation."
question_9 = "Summarise the conversation so far."

# Grab the current user_id and thread_id from config
user_id = "user_1"

# Get last thread_id
last_thread_id = config["configurable"]["thread_id"]
thread_id = str(int(last_thread_id) + 1)

# Print Config
print(f"Conversation Thread ID: {last_thread_id} -> {thread_id}")

# Update the config with the new thread_id
config = {"configurable": {"thread_id": thread_id, "user_id": user_id}}


# Create a system prompt (your overall instructions to the AI)
system_msg = SystemMessage(content="""
You are a helpful AI Tutor assistant.
After response to Human, call for Summary Tool to END.
""")


#The config is the **second positional argument** to stream() or invoke()!
events = graph.stream(
    {"messages": [
        #{"role": "system", "content": system_prompt},
        {"role": "user", "content": question_2}]},
    config, # Pass the thread-level persistence to the graph
    stream_mode="values",
)
for event in events:
    event["messages"][-1].pretty_print()

update_memory(event, config, store=in_memory_store) # Update Memory

#model.invoke(question_1).pretty_print()

Conversation Thread ID: 4 -> 5

Provide 5 MCQ questions on Artificial Intelligence to help me with practice.
Tool Calls:
  MCQ Retrieval Tool (chatcmpl-tool-20d2df08345c437ca463753b55545928)
 Call ID: chatcmpl-tool-20d2df08345c437ca463753b55545928
  Args:
    input: What is AI?
    k: 5
Name: MCQ Retrieval Tool

{"./Documents/mcq/mcq.csv_Qn5 mcq_question": "Your organization is planning to deploy a new Azure AI solution to automate the processing of customer inquiries received through various communication channels, including email, chat, and social media. As part of the deployment process, you are tasked with creating an Azure AI resource to host the required services. The solution should be scalable, cost-effective, and comply with Responsible AI principles. Which steps should you follow to create the Azure AI resource while ensuring scalability, cost optimization, and compliance with Responsible AI principles? Select all answers that apply.", "./Documents/mcq/mcq.csv_Qn5 mcq_answer"

'5_b1d730e8-acbd-44d2-9606-d1556ede4e6b'

### Accessing and Testing LLM Chat Model Memory Persistence

In [31]:
user_id = "user_1"
namespace_for_memory = (user_id, "memories")
memories = in_memory_store.search(namespace_for_memory)

for i in range(len(memories)):
  print(memories[i].dict())

{'namespace': ['user_1', 'memories'], 'key': '1_8a44bf4d-5343-405c-91de-cfc4293e1c55', 'value': {'memory': 'To answer your prompt, I will use the "Web Extraction Tool" and "Ensemble Retriever Tool" to provide the information you need.\n\nHere is the JSON for the function call:\n\n{"name": "Web Extraction Tool", "parameters": {"input": "https://www.ibm.com/think/topics/artificial-intelligence", "url": "https://www.ibm.com/think/topics/artificial-intelligence", "k": 5}}\n\nThis will retrieve content from the provided URL and extract information about Deep Learning from databases.\n\nNext, I will use the "Ensemble Retriever Tool" to retrieve information from the extracted content:\n\n{"name": "Ensemble Retriever Tool", "parameters": {"input": "Deep Learning from databases", "k": 5}}\n\nThis will provide information about Deep Learning from databases.\n\nNote: Please ensure that the provided URL is correct and accessible for the Web Extraction Tool to function properly.'}, 'created_at': '2

In [None]:
# List out the saved memory
saved_memory = in_memory_store.list_namespaces()
print("List of memory saved: "+ str(saved_memory))

# Print out all memory saved
for m in saved_memory:
  print(in_memory_store.search(m))


List of memory saved: []


In [None]:
recall=memories = [m.value["data"] for m in in_memory_store.search(("memories", "user_1"), query="conversation summary", limit=5)]
print(recall)

['In this conversation, you initially set up a role-playing scenario where I act as an AI Tutor assistant. You provided a list of tools I can use and instructed me to call the Summary Tool to end the conversation. Based on your instructions, I called the Summary Tool to generate a summary of our conversation and end the interaction.', "The user is acting as an AI Tutor assistant with access to multiple tools. They instructed to use the Summary Tool to end the conversation by providing a summary of the discussion. The summary should include the user's role, available tools, and the instruction to use the Summary Tool.", 'The conversation has just begun. The user requested a summary of the conversation so far, which is currently empty as no questions or topics have been discussed yet. There are no unanswered quiz questions to retain.']


In [None]:
print_recall = in_memory_store.get(("memories", "user_1"), "memory_2")
print(print_recall)

Item(namespace=['memories', 'user_1'], key='memory_2', value={'data': "The user is acting as an AI Tutor assistant with access to multiple tools. They instructed to use the Summary Tool to end the conversation by providing a summary of the discussion. The summary should include the user's role, available tools, and the instruction to use the Summary Tool."}, created_at='2025-02-16T10:42:53.681511+00:00', updated_at='2025-02-16T10:42:53.681513+00:00')
