In [1]:
#imports
import os
import requests
import operator
from dotenv import find_dotenv, load_dotenv
from langchain.output_parsers import ResponseSchema, StructuredOutputParser
from pydantic import BaseModel, Field
from langchain.tools import StructuredTool
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_huggingface import ChatHuggingFace
from langchain_huggingface import HuggingFaceEndpoint
from langgraph.graph import START, StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver
from typing import TypedDict, Annotated

In [2]:
#load env variables
_ = load_dotenv(find_dotenv())

In [3]:
#initialize agent state
class AgentState(TypedDict):
    messages: Annotated[list[AnyMessage], operator.add]

In [4]:
#Generate chat model from HuggingFace Meta Llama 3.3 70B

model_id = "meta-llama/Llama-3.3-70B-Instruct"

llm = HuggingFaceEndpoint(
    repo_id=model_id,
    task="text-generation",
    temperature=0.5,
    max_new_tokens=520,
    do_sample=False,
)

chat_model = ChatHuggingFace(llm=llm)

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
#define schemas
task_schema = ResponseSchema(name="task",
                             description="The task you are asked to do it could be create, update or delete\
                                answser create, update or delete")
product_name_schema = ResponseSchema(name="product_name",
                                      description="the name of the product to create, update or delete")
price_schema = ResponseSchema(name="price",
                                    description="extract any value related to the product price only if the task is create or update")
description_schema = ResponseSchema(name="description",
                                    description="Extract any information related to the product description only if the task is create or update")
stock_schema = ResponseSchema(name="stock",
                              description="Extract any value related to the stock amount only if the task is create or update")

response_schemas = [
    task_schema,
    product_name_schema,
    price_schema,
    description_schema,
    stock_schema
]

In [6]:
#set output parser
output_parser = StructuredOutputParser.from_response_schemas(response_schemas)

In [7]:
#get format instructions
format_instructions = output_parser.get_format_instructions()

In [8]:
system_message = '''
For the following message extract the following information:

task: The task you are asked to do it could be create, update or delete
answser create, update or delete

product_name: the name of the product to create, update or delete

price: extract any value related to the product price only if the task is create or update

description: Extract any information related to the product description only if the task is create or update

stock: Extract any value related to the stock amount only if the task is create or update

The output should be a markdown code snippet formatted in the following schema, including the leading and trailing "```json" and "```":
```json
{
    "task": string  // The task you are asked to do it could be create, update or delete answser create, update or delete
    "product_name": string  // the name of the product to create, update or delete
    "price": string  // extract any value related to the product price only if the task is create or update
    "description": string  // Extract any information related to the product description only if the task is create or update
    "stock": string  // Extract any value related to the stock amount only if the task is create or update
}
```

You have access to the following tools:
API_task

After extracting the JSON use the tool called API_task to create, update or delete the product and answer the user if the product was created, updated or deleted successfully
'''

In [9]:
#Define Agent Class
class Agent:
    def __init__(self, model, tools, checkpointer, system=""):
        self.system = system
        graph = StateGraph(AgentState)
        graph.add_node("llm", self.call_openai)
        graph.add_node("action", self.take_action)
        graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
        graph.add_edge("action", "llm")
        graph.set_entry_point("llm")
        self.graph = graph.compile(checkpointer=checkpointer)
        self.tools = {t.name: t for t in tools}
        self.model = model.bind_tools(tools)

    def call_openai(self, state: AgentState):
        messages = state['messages']
        if self.system:
            messages = [SystemMessage(content=self.system)] + messages
        message = self.model.invoke(messages)
        return {'messages': [message]}

    def exists_action(self, state: AgentState):
        result = state['messages'][-1]
        return len(result.tool_calls) > 0

    def take_action(self, state: AgentState):
        tool_calls = state['messages'][-1].tool_calls
        results = []
        for t in tool_calls:
            print(f"Calling: {t}")
            result = self.tools[t['name']].invoke(t['args'])
            results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
        print("Back to the model!")
        return {'messages': results}

In [10]:
#Create and define custom tool
class ApiInput(BaseModel):
    message: str = Field(description="Should be the message in JSON format that you were asked to generate")

def api_task(message: str) -> str:
    """Use this to create, update or delete products"""
    endpoint = os.getenv('API_URL')

    headers = {
        'apikey':os.getenv('SUPABASE_KEY'),
        'Authorization': os.getenv('SUPBASE_TOKEN')
    }

    def message_parser(message:str) -> dict:
        output = output_parser.parse(message)
        return output

    def create_product(data:dict) -> str:
        product = {
            'name':data.get('product_name'),
            'price':float(data.get('price')),
            'description':data.get('description'),
            'stock':int(data.get('stock'))
        }
        response = requests.post(endpoint, headers=headers, json=product)

        if response.status_code == 200:
            return f"Product {data.get('product_name')} was created successfully"
        else:
            return "There was an error with the API"

    tasks_actions = {
        'create':create_product
    }

    data = message_parser(message)

    task = data.get('task')

    if task in tasks_actions:
        tasks_actions[task](data)
    else:
        return "This task is not defined"


ApiTask = StructuredTool.from_function(
    func=api_task,
    name="API_task",
    description="Do CRUD tasks in an API",
    args_schema=ApiInput,
    return_direct=True
)

In [11]:
user_message ="Create the product Jack Daniels Honey 1L with price 12.3, with description:The classis Jack Daniels Whiskey now with honey, and stock 48"

In [12]:
#configure thread
thread = {"configurable": {"thread_id": "1"}}

In [13]:
#Executor with SQLite memory implementation
def agent_executor(user_request:str) -> str:
    with SqliteSaver.from_conn_string(':memory:') as memory:
        assistant = Agent(
            model=chat_model,
            tools=[ApiTask],
            system=system_message,
            checkpointer=memory
        )

        request = [HumanMessage(content=user_request)]

        result = assistant.graph.invoke({'messages':request},thread)
    return result['messages'][-1].content


In [20]:
#test execution
agent_executor(user_message)

''