In [3]:
# !pip install 'langchain==0.3.9'

In [4]:
import json
import pandas as pd
import os
import zipfile
import logging
from typing import List, Optional, Any, Dict

from frozendict import frozendict
from tqdm import tqdm

import requests
from langchain.chat_models.base import BaseChatModel
from langchain.schema import (
    BaseMessage,
    AIMessage,
    HumanMessage,
    SystemMessage,
    ChatResult,
    ChatGeneration,
)
from pydantic import PrivateAttr

In [5]:
import json
import ast
import requests

from typing import Any, Dict, Optional

from pydantic import create_model
from langchain.tools import StructuredTool
from langchain.agents import create_structured_chat_agent, AgentExecutor
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate

def create_tool_function(func_name: str, func_description: str, parameters: Dict[str, Any], outputs: Dict[str, Any], instances: list):
    def simulated_api(**kwargs):
        for instance in instances:
            if instance['input'] in kwargs.values():
                return instance['output']
        return "Simulated response based on input parameters."

    InputModel = create_model(f"{func_name}Input", **parameters)

    OutputModel = create_model(f"{func_name}Output", **outputs)

    simulated_api.__doc__ = func_description

    tool = StructuredTool.from_function(
        func=simulated_api,
        name=func_name,
        description=func_description,
        args_schema=InputModel,
        return_direct=True,
    )
    return tool

In [6]:
# Define the custom LLM class
class Llama31ChatModel(BaseChatModel):
    api_key: str
    base_url: str
    model: str
    temperature: float = 0.5
    max_tokens: int = 3000
    logging_level: int = logging.INFO

    _logger: logging.Logger = PrivateAttr()

    class Config:
        arbitrary_types_allowed = True  # Allows arbitrary types like the logger

    def __init__(self, **data):
        super().__init__(**data)
        self._logger = logging.getLogger(__name__)
        logging.basicConfig(level=self.logging_level)

    @property
    def _llm_type(self) -> str:
        return "llama31"

    def _prepare_headers(self) -> Dict[str, str]:
        return {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json",
        }

    def _prepare_context(self, messages: List[BaseMessage]) -> List[Dict[str, str]]:
        role_map = {
            HumanMessage: "user",
            AIMessage: "assistant",
            SystemMessage: "system"
        }

        return [{"role": role_map.get(type(message), "user"), "content": message.content} for message in messages]

    def _prepare_payload(
        self,
        context: List[Dict[str, str]],
        stop: Optional[List[str]] = None,
        **kwargs: Any
    ) -> Dict[str, Any]:
        payload = {
            "model": self.model,
            "messages": context,
            "temperature": kwargs.get("temperature", self.temperature),
            "max_tokens": kwargs.get("max_tokens", self.max_tokens),
        }
        if stop is not None:
            payload["stop"] = stop
        return payload

    def _generate(
        self,
        messages: List[BaseMessage],
        stop: Optional[List[str]] = None,
        **kwargs: Any,
    ) -> ChatResult:
        headers = self._prepare_headers()
        context = self._prepare_context(messages)
        payload = self._prepare_payload(context, stop, **kwargs)

        try:
            response = requests.post(
                f"{self.base_url}/chat/completions",
                headers=headers,
                json=payload,
            )
            response.raise_for_status()
            assistant_response = response.json()["choices"][0]["message"]["content"]
            self._logger.info("REQUEST: %s", payload)
            self._logger.info("RESPONSE: %s", assistant_response)
            ai_message = AIMessage(content=assistant_response)
            generation = ChatGeneration(message=ai_message)
            return ChatResult(generations=[generation])
        except requests.RequestException as e:
            self._logger.error("API request failed: %s", e)
            raise RuntimeError(f"API request failed: {e}")

In [7]:
llm = Llama31ChatModel(
    api_key="sk-or-vv-7fcc4ab944ca013feb7608fb7c0f001e5c12c32abf66233aad414183b4191a79",
    base_url="https://api.vsegpt.ru/v1",
    model="meta-llama/llama-3.3-70b-instruct",
    temperature=0.5,
    max_tokens=3000,
)

In [8]:
from langchain.tools import StructuredTool
from langchain.agents import create_structured_chat_agent, AgentExecutor
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.prompts import SystemMessagePromptTemplate, HumanMessagePromptTemplate

In [51]:
system_prompt = '''Respond to the human as helpfully and accurately as possible. You have access to the following tools (you must use ALL the tools from the provided list):

{tools}

Use a JSON blob to specify a tool by providing an "action" key (tool name) and an "action_input" key (tool input).

Valid "action" values: "Final Answer" or {tool_names}

Important! If you encounter a value of the format "name": "", replace it with "name": " "
You must use ALL the tools from the list of provided tools.

Provide in JSON blob, as shown:

{{ [{{"action": "$TOOL_NAME_1", "action_input": "$INPUT_1"}}, {{"action": "$TOOL_NAME_2", "action_input": "$INPUT_2"}},...] }}

Follow this format:

Question: input question to answer
Thought: consider previous and subsequent steps
Action: {{[{{"action": "$TOOL_NAME", "action_input": "$INPUT"}}]}}

Observation: action result
... (repeat Thought/Action/Observation N times)
Thought:
I know what to respond
Action:

{{[{{"action": "Final Answer", "action_input": "Final response to human"}}]}}


Begin! Reminder to ALWAYS respond with a valid JSON blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:

$JSON_BLOB

 then Observation'''

human_prompt = '''{input}
{agent_scratchpad}
(Reminder to respond in a JSON blob no matter what)'''

system_message = SystemMessagePromptTemplate.from_template(
    system_prompt,
    input_variables=["tools", "tool_names"],
)
human_message = HumanMessagePromptTemplate.from_template(
    human_prompt,
    input_variables=["input", "agent_scratchpad"],
)


prompt = ChatPromptTemplate.from_messages(
    [
        system_message,
        MessagesPlaceholder(variable_name="chat_history", optional=True),
        human_message,
    ]
)

# Загрузка необходимых данных

In [10]:
data_r = zipfile.ZipFile('drive/MyDrive/Копия data.zip', 'r')

data_r.extractall()

In [11]:
filtered_cases = pd.read_csv('filtered_data.csv')

In [12]:
filtered_cases.head()

Unnamed: 0,query_id,api_count,relevant_api_count,query
0,3469,8,8,My friends and I are planning a digital nomad ...
1,3470,8,7,I'm organizing a digital nomad meetup and I wa...
2,3471,8,8,I'm a digital nomad planning to visit various ...
3,3900,8,8,"Please provide me with the number of medium, e..."
4,3901,8,8,I'm conducting research on coding proficiency....


In [13]:
filtered_cases.iloc[0]

Unnamed: 0,0
query_id,3469
api_count,8
relevant_api_count,8
query,My friends and I are planning a digital nomad ...


In [14]:
with open('data/instruction/G1_query.json', 'r') as instruction_file:
    instruction_json = json.loads(instruction_file.read())

In [15]:
def get_type(s: str):
    if s.lower() == 'string':
        return str
    if s.lower() == 'str':
        return str
    if s.lower() == 'enum':
        return str
    return Any

In [16]:
def format_tool_outputs(template_response):
    print(template_response)

In [17]:
def get_tools(instruction_json, query_id):
    tools = []
    for query_info in instruction_json:
        if query_info['query_id'] == query_id:
            for api in query_info['api_list']:
                func_name = api['api_name']
                func_description = api['api_description']
                parameters = {}
                for param in api['required_parameters']:
                    if 'default' in param:
                        parameters[param['name']] = (get_type(param['type']), param['default'])
                    else:
                        parameters[param['name']] = (get_type(param['type']), ...)
                for param in api['optional_parameters']:
                    if 'default' in param:
                        parameters[param['name']] = (get_type(param['type']), param['default'])
                    else:
                        parameters[param['name']] = (get_type(param['type']), ...)
                outputs = {}
                if 'template_response' in api:
                    for output in api['template_response'].keys():
                        # format_tool_outputs(api['template_response'])
                        outputs[output] = (type(api['template_response'][output]), api['template_response'][output])
                tool = create_tool_function(func_name, func_description, parameters, outputs, [])
                tools.append(tool)
            break
    return tools

In [54]:
cases_count = 2
for i in range(0, cases_count):
    _case = filtered_cases.iloc[i]

    user_question = _case.query
    query_id = _case.query_id
    # expected_output = _case.final_answer
    expected_output = "NoNe"

    tools = get_tools(instruction_json, query_id)

    agent = create_structured_chat_agent(
        llm=llm,
        tools=tools,
        prompt=prompt,
        stop_sequence=True,
    )

    # Create the AgentExecutor
    agent_executor = AgentExecutor.from_agent_and_tools(
        agent=agent,
        tools=tools,
        verbose=True,
        return_intermediate_steps=True,
        output_keys=["output"]
    )

    response = agent_executor.invoke({"input": user_question})

    final_answer = response["output"]

    print(f"User Question: {user_question}")
    print(f"Agent's Response: {final_answer}")
    print(f"Expected Output: {expected_output}")



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m{"action": "All citites", "action_input": {"sort_by": "internet_speed", "sort": "asc", "size": "30", "page": "1"}}[0m[33;1m[1;3mSimulated response based on input parameters.[0m
[32;1m[1;3m[0m

[1m> Finished chain.[0m
User Question: My friends and I are planning a digital nomad trip and we want to explore the best cities worldwide. Can you provide us with a list of cities sorted by overall score? We're particularly interested in sorting them in ascending order based on internet speed and cost of living. Please include 30 cities per page.
Agent's Response: Simulated response based on input parameters.
Expected Output: NoNe


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m{"action": "All citites", "action_input": {"sort_by": "temperatureC", "sort": "desc", "size": "40", "page": "1"}}[0m[33;1m[1;3mSimulated response based on input parameters.[0m
[32;1m[1;3m[0m

[1m> Finished chain.[0m
User Question: 