In [35]:
import os
from getpass import getpass

os.environ["OPENAI_API_KEY"] = getpass("Введите ваш OpenAI API ключ: ")

Введите ваш OpenAI API ключ:  ········


In [36]:
"""Default prompts used by the agent."""

SYSTEM_PROMPT = """You are a helpful AI assistant.

System time: {system_time}"""

In [37]:
"""Define the configurable parameters for the agent."""

from __future__ import annotations

from dataclasses import dataclass, field, fields
from typing import Annotated

from langchain_core.runnables import ensure_config
from langgraph.config import get_config


@dataclass(kw_only=True)
class Configuration:
    """The configuration for the agent."""

    system_prompt: str = field(
        default=SYSTEM_PROMPT,
        metadata={
            "description": "The system prompt to use for the agent's interactions. "
            "This prompt sets the context and behavior for the agent."
        },
    )

    model: Annotated[str, {"__template_metadata__": {"kind": "llm"}}] = field(
        default="openai/o3-mini",
        metadata={
            "description": "The name of the language model to use for the agent's main interactions. "
            "Should be in the form: provider/model-name."
        },
    )

    max_search_results: int = field(
        default=10,
        metadata={
            "description": "The maximum number of search results to return for each search query."
        },
    )

    @classmethod
    def from_context(cls) -> Configuration:
        """Create a Configuration instance from a RunnableConfig object."""
        try:
            config = get_config()
        except RuntimeError:
            config = None
        config = ensure_config(config)
        configurable = config.get("configurable") or {}
        _fields = {f.name for f in fields(cls) if f.init}
        return cls(**{k: v for k, v in configurable.items() if k in _fields})

In [38]:
"""Define the state structures for the agent."""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Sequence

from langchain_core.messages import AnyMessage
from langgraph.graph import add_messages
from langgraph.managed import IsLastStep
from typing_extensions import Annotated


@dataclass
class InputState:
    """Defines the input state for the agent, representing a narrower interface to the outside world.

    This class is used to define the initial state and structure of incoming data.
    """

    messages: Annotated[Sequence[AnyMessage], add_messages] = field(
        default_factory=list
    )
    """
    Messages tracking the primary execution state of the agent.

    Typically accumulates a pattern of:
    1. HumanMessage - user input
    2. AIMessage with .tool_calls - agent picking tool(s) to use to collect information
    3. ToolMessage(s) - the responses (or errors) from the executed tools
    4. AIMessage without .tool_calls - agent responding in unstructured format to the user
    5. HumanMessage - user responds with the next conversational turn

    Steps 2-5 may repeat as needed.

    The `add_messages` annotation ensures that new messages are merged with existing ones,
    updating by ID to maintain an "append-only" state unless a message with the same ID is provided.
    """


@dataclass
class State(InputState):
    """Represents the complete state of the agent, extending InputState with additional attributes.

    This class can be used to store any information needed throughout the agent's lifecycle.
    """

    is_last_step: IsLastStep = field(default=False)
    """
    Indicates whether the current step is the last one before the graph raises an error.

    This is a 'managed' variable, controlled by the state machine rather than user code.
    It is set to 'True' when the step count reaches recursion_limit - 1.
    """

    # Additional attributes can be added here as needed.
    # Common examples include:
    # retrieved_documents: List[Document] = field(default_factory=list)
    # extracted_entities: Dict[str, Any] = field(default_factory=dict)
    # api_connections: Dict[str, Any] = field(default_factory=dict)

In [43]:
"""This module provides example tools for web scraping and search functionality.

It includes a basic Tavily search function (as an example)

These tools are intended as free examples to get started. For production use,
consider implementing more robust and specialized tools tailored to your needs.
"""

from typing import Any, Callable, List, Optional, cast
from pydantic import BaseModel, Field
import arxiv


class ArxivArticle(BaseModel):
    title: str = Field(description="Заголовок статьи")
    summary: str = Field(description="Краткое содержание статьи")
    pdf_url: str = Field(description="Ссылка на PDF-файл статьи")
    
def search_arxiv(query: str) -> ArxivArticle:
    search = arxiv.Search(query=query, max_results=1)
    result = next(search.results())
    
    # Проверяем, есть ли pdf_url
    pdf_url = getattr(result, "pdf_url", None)
    if not pdf_url:
        # Формируем pdf_url вручную из entry_id, если не найден
        entry_id = result.entry_id  # например https://arxiv.org/abs/2301.12345
        # извлечь id из entry_id
        arxiv_id = entry_id.split('/')[-1]
        pdf_url = f"https://arxiv.org/pdf/{arxiv_id}.pdf"

    return ArxivArticle.model_validate({"title": result.title, "summary": result.summary, "pdf_url": pdf_url})

def download_pdf(pdf_url: str) -> str:
    """Скачивает pdf, сохраняет в temp.pdf."""
    r = requests.get(pdf_url)
    with open("temp.pdf", "wb") as f:
        f.write(r.content)
    return "temp.pdf"

def load_pdf(pdf_path: str):
    """Загружает PDF и разбивает на страницы."""
    loader = PyPDFLoader(pdf_path)
    return loader.load_and_split()

class PDFQA:
    """Класс для индексации PDF и ответа на вопросы по ней."""
    def __init__(self):
        self.qa_chain = None
    
    def index_pdf(self, pages):
        embeddings = OpenAIEmbeddings()
        vectorstore = FAISS.from_documents(pages, embeddings)
        retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
        self.qa_chain = RetrievalQA.from_chain_type(llm=OpenAI(), retriever=retriever)
    
    def ask(self, question: str) -> str:
        if not self.qa_chain:
            return "PDF не загружен и не проиндексирован."
        return self.qa_chain.run(question)

pdf_qa = PDFQA()

def summarize_pdf(pdf_path: str) -> str:
    pages = load_pdf(pdf_path)
    llm = OpenAI(model_name='o3-mini')
    chain = load_summarize_chain(llm, chain_type="map_reduce")
    summary = chain.run(pages)
    return summary

def tool_search_arxiv(query: str) -> str:
    'Поиск статьи в arXiv по запросу. Ввод: поисковый запрос, вывод: title, summary и pdf ссылка.'
    return search_arxiv(query)

def tool_load_and_index(pdf_url: str) -> str:
    'Загружает PDF по ссылке, разбивает на страницы и индексирует для поиска.'
    pdf_path = download_pdf(pdf_url)
    pages = load_pdf(pdf_path)
    pdf_qa.index_pdf(pages)
    return f"PDF загружен и проиндексирован: {len(pages)} страниц"

def tool_ask_pdf(question: str) -> str:
    'Отвечает на вопросы по загруженному PDF.'
    return pdf_qa.ask(question)

def tool_summarize(pdf_url: str) -> str:
    'Делает краткое резюме статьи по PDF-ссылке.'
    pdf_path = download_pdf(pdf_url)
    return summarize_pdf(pdf_path)


TOOLS: List[Callable[..., Any]] = [tool_search_arxiv, tool_load_and_index, tool_ask_pdf, tool_summarize]

In [44]:
"""Utility & helper functions."""

from langchain.chat_models import init_chat_model
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import BaseMessage


def get_message_text(msg: BaseMessage) -> str:
    """Get the text content of a message."""
    content = msg.content
    if isinstance(content, str):
        return content
    elif isinstance(content, dict):
        return content.get("text", "")
    else:
        txts = [c if isinstance(c, str) else (c.get("text") or "") for c in content]
        return "".join(txts).strip()


def load_chat_model(fully_specified_name: str) -> BaseChatModel:
    """Load a chat model from a fully specified name.

    Args:
        fully_specified_name (str): String in the format 'provider/model'.
    """
    provider, model = fully_specified_name.split("/", maxsplit=1)
    return init_chat_model(model, model_provider=provider)

In [53]:
"""Define a custom Reasoning and Action agent.

Works with a chat model with tool calling support.
"""

from datetime import UTC, datetime
from typing import Dict, List, Literal, cast

from langchain_core.messages import AIMessage
from langgraph.graph import StateGraph
from langgraph.prebuilt import ToolNode


# Define the function that calls the model


def call_model(state: State) -> Dict[str, List[AIMessage]]:
    """Call the LLM powering our "agent".

    This function prepares the prompt, initializes the model, and processes the response.

    Args:
        state (State): The current state of the conversation.
        config (RunnableConfig): Configuration for the model run.

    Returns:
        dict: A dictionary containing the model's response message.
    """
    configuration = Configuration.from_context()

    # Initialize the model with tool binding. Change the model or add more tools here.
    model = load_chat_model(configuration.model).bind_tools(TOOLS)

    # Format the system prompt. Customize this to change the agent's behavior.
    system_message = configuration.system_prompt.format(
        system_time=datetime.now(tz=UTC).isoformat()
    )

    # Get the model's response
    response = cast(
        AIMessage,
        model.invoke(
            [{"role": "system", "content": system_message}, *state.messages]
        ),
    )

    # Handle the case when it's the last step and the model still wants to use a tool
    if state.is_last_step and response.tool_calls:
        return {
            "messages": [
                AIMessage(
                    id=response.id,
                    content="Sorry, I could not find an answer to your question in the specified number of steps.",
                )
            ]
        }

    # Return the model's response as a list to be added to existing messages
    return {"messages": [response]}


# Define a new graph

builder = StateGraph(State, input=InputState, config_schema=Configuration)

# Define the two nodes we will cycle between
builder.add_node(call_model)
builder.add_node("tools", ToolNode(TOOLS))

# Set the entrypoint as `call_model`
# This means that this node is the first one called
builder.add_edge("__start__", "call_model")


def route_model_output(state: State) -> Literal["__end__", "tools"]:
    """Determine the next node based on the model's output.

    This function checks if the model's last message contains tool calls.

    Args:
        state (State): The current state of the conversation.

    Returns:
        str: The name of the next node to call ("__end__" or "tools").
    """
    last_message = state.messages[-1]
    if not isinstance(last_message, AIMessage):
        raise ValueError(
            f"Expected AIMessage in output edges, but got {type(last_message).__name__}"
        )
    # If there is no tool call, then we finish
    if not last_message.tool_calls:
        return "__end__"
    # Otherwise we execute the requested actions
    return "tools"


# Add a conditional edge to determine the next step after `call_model`
builder.add_conditional_edges(
    "call_model",
    # After call_model finishes running, the next node(s) are scheduled
    # based on the output from route_model_output
    route_model_output,
)

# Add a normal edge from `tools` to `call_model`
# This creates a cycle: after using tools, we always return to the model
builder.add_edge("tools", "call_model")
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver() 
# Compile the builder into an executable graph
graph = builder.compile(name="ReAct Agent", checkpointer=checkpointer)

In [54]:
config = {"configurable": {"thread_id": "1"}}

In [55]:
for chunk in graph.stream(
    {
        "messages": [
            {
                "role": "user",
                "content": "diffusion models 2024",
            }
        ]
    },
    config=config
):
    for node, update in chunk.items():
        print("Update from node", node)
        update["messages"][-1].pretty_print()
        print("\n\n")


Update from node call_model
Tool Calls:
  tool_search_arxiv (call_Ayc6sCDOAFonO9fui8bpSNQ4)
 Call ID: call_Ayc6sCDOAFonO9fui8bpSNQ4
  Args:
    query: diffusion models 2024





  result = next(search.results())


Update from node tools
Name: tool_search_arxiv

title="Rebuttal of Morris' criticism of the diffusive compressible Euler model" summary='This short note addresses the criticism of the diffusive compressible Euler\nmodel regarding heat diffusion, sound attenuation and material frame\nindifference put forward by M. Morris.' pdf_url='http://arxiv.org/pdf/2406.18241v1'



Update from node call_model

I found the following article on diffusion models from 2024:

Title: Rebuttal of Morris' criticism of the diffusive compressible Euler model  
Summary: This note addresses criticisms raised against the diffusive compressible Euler model—specifically concerning heat diffusion, sound attenuation, and material frame indifference—by M. Morris.  
PDF Link: http://arxiv.org/pdf/2406.18241v1

Would you like more details about this paper, or are you interested in exploring related work in diffusion models?





In [56]:
questions = [
    "Что такое диффузионные модели?",
    "Какие основные результаты в статье?",
    "Какова методология исследования в статье?"
]

for q in questions:
    for chunk in graph.stream(
        {
            "messages": [
                {
                    "role": "user",
                    "content": q,
                }
            ],
        },
        config=config
    ):
        for node, update in chunk.items():
            print("Update from node", node)
            update["messages"][-1].pretty_print()
            print("\n\n")


Update from node call_model

Диффузионные модели – это класс генеративных моделей, которые используют процесс диффузии для постепенного превращения простого шума в сложные данные, такие как изображения или аудиосигналы. Идея заключается в том, что сначала данные многократно "зашумляются" (проходит процесс добавления случайного шума), а затем модель обучается выполнять обратный процесс – шаг за шагом устранять шум, чтобы восстановить исходные данные.

Основные моменты:
1. Обучение модели происходит посредством имитации обратного процесса диффузии: модель учится постепенно преобразовывать зашумленные данные в чистые, таким образом, "генерируя" новые данные, схожие с обучающими примерами.
2. Эти модели продемонстрировали выдающиеся результаты, особенно в задачах генерации изображений, и стали конкурировать с другими подходами, такими как генеративные состязательные сети (GAN).
3. Диффузионные модели часто являются более стабильными в обучении и могут давать высококачественные результаты, 