> AI Agents

In [None]:


'''

AI Agent Architectures
├── Profiling Module or Perception Module (The Eyes and Ears of the Agent)
│   ├── Sensory expertise
│   ├── Perceives and interprets the environment and communicated with other agents.
│   ├──  --> the agent may collect and analyze information from its environment like how human senses work.
│   ├──  --> helps it comprehend visual signals, recognize speech patterns, and sense tactile inputs..
│   └── Example: Recognizing objects via sensors in self-driving cars
│
├── Memory Module
│   ├── Stores data, rules, and patterns
│   ├── Enables knowledge recall and decision-making
│   └── Example: Chatbots recalling customer preferences
│   
├── Planning Module
│   ├── Analyzes current situations
│   ├── Strategizes actions to meet goals
│   └── Example: Optimizing delivery routes
│
├── Action Module
│   ├── Executes planned actions
│   ├── Interfaces with external systems
│   └── Example: Robotic arms assembling parts
│
├── Learning Module
│   ├── Adapts and improves performance
│   ├── Methods include:
│   │   ├── Supervised Learning
│   │   ├── Unsupervised Learning
│   │   └── Reinforcement Learning
│   └── Example: Agents learning optimal decisions from feedback
│
├── Data Structuring and Transformation Module
│   ├── Organizes and preprocesses data both from the environment as well as from the memory module
│   ├── Converts data into trainable formats
│   └── Example: Formatting images for neural network training
│
├── Training Module
│   ├── Performs training operations and updates
│   ├── Use methods like:
│   │   ├── Supervised, Unsupervised, and Reinforcement Learning
│   │   ├── Computer Vision, LLM, Time series Learning
│   │   ├── ANN, CNN, Transformers, GANs, GNNs, 
│   │   └── Tools like TensorFlow, PyTorch, Keras, Scikit-learn
│   └── Example: AI training itself in virtual environments
│
└── Other Modules

 
'''



> requirements.txt

In [None]:
'''
langchain_community
tiktoken
langchainhub
langchain
chromadb
langgraph
tavily-python
python-dotenv
google-generativeai
langchain_google_genai
langchain-nomic
langchain-text-splitters
langchain_mistralai
wikipedia
langchain_huggingface
google-search-results
faiss-cpu
sentence-transformers
youtube-search


'''

> n8n

In [None]:
# n8n interface

# n8n is a workflow automation tool that enables you to connect your favorite apps, services, and devices.
# It allows you to automate workflows and integrate your apps, services, and devices with each other.

    # workflow: a sequence of connected steps that automate a process.
    # node: a single step in a workflow.
    # connection: a link between two nodes that passes data from one node to another.
    # execution: a single run of a workflow.

# Types of Nodes
    # Trigger Node: The starting point of a workflow. It initiates the execution of a workflow.
    # Regular Node: A node that performs a specific action or operation.
    # Parameter Node: A node that stores and provides data to other nodes in the workflow.
    # Sub-Workflow Node: A node that allows you to reuse a workflow within another workflow.
    # Webhook Node: A node that receives data from an external service or application.
    # Error Node: A node that handles errors that occur during the execution of a workflow.
    # No-Operation Node: A node that does nothing. It is used for debugging and testing purposes.
    
    # OR
    # Trigger Nodes: These nodes initiate the execution of a workflow. They are the starting points of a workflow.
    # Data Transformation Nodes: These nodes perform operations on data. They transform, filter, or manipulate data in some way.
    # Action Nodes: These nodes perform actions such as sending an email, making an API call, or updating a database.
    # Logic Nodes: These nodes control the flow of a workflow. They make decisions based on conditions and determine the path a workflow should take.





## LangChain Repo

> Langchain Agents

In [None]:
from langchain.agents import agent_types
from langchain.agents.react.agent import create_react_agent
from langchain.agents import tools, tool 

""" 
Directory structure:
└── agents/
    ├── __init__.py
    ├── agent.py
    ├── agent_iterator.py
    ├── agent_types.py
    ├── initialize.py
    ├── load_tools.py
    ├── loading.py
    ├── schema.py
    ├── tools.py
    ├── types.py
    ├── utils.py
    ├── agent_toolkits/
    │   ├── __init__.py
    │   ├── azure_cognitive_services.py
    │   ├── base.py
    │   ├── ainetwork/
    │   │   ├── __init__.py
    │   │   └── toolkit.py
    │   ├── amadeus/
    │   │   └── toolkit.py
    │   ├── clickup/
    │   │   ├── __init__.py
    │   │   └── toolkit.py
    │   ├── conversational_retrieval/
    │   │   ├── __init__.py
    │   │   ├── openai_functions.py
    │   │   └── tool.py
    │   ├── csv/
    │   │   └── __init__.py
    │   ├── file_management/
    │   │   ├── __init__.py
    │   │   └── toolkit.py
    │   ├── github/
    │   │   ├── __init__.py
    │   │   └── toolkit.py
    │   ├── gitlab/
    │   │   ├── __init__.py
    │   │   └── toolkit.py
    │   ├── gmail/
    │   │   ├── __init__.py
    │   │   └── toolkit.py
    │   ├── jira/
    │   │   ├── __init__.py
    │   │   └── toolkit.py
    │   ├── json/
    │   │   ├── __init__.py
    │   │   ├── base.py
    │   │   ├── prompt.py
    │   │   └── toolkit.py
    │   ├── multion/
    │   │   ├── __init__.py
    │   │   └── toolkit.py
    │   ├── nasa/
    │   │   ├── __init__.py
    │   │   └── toolkit.py
    │   ├── nla/
    │   │   ├── __init__.py
    │   │   ├── tool.py
    │   │   └── toolkit.py
    │   ├── office365/
    │   │   ├── __init__.py
    │   │   └── toolkit.py
    │   ├── openapi/
    │   │   ├── __init__.py
    │   │   ├── base.py
    │   │   ├── planner.py
    │   │   ├── planner_prompt.py
    │   │   ├── prompt.py
    │   │   ├── spec.py
    │   │   └── toolkit.py
    │   ├── pandas/
    │   │   └── __init__.py
    │   ├── playwright/
    │   │   ├── __init__.py
    │   │   └── toolkit.py
    │   ├── powerbi/
    │   │   ├── __init__.py
    │   │   ├── base.py
    │   │   ├── chat_base.py
    │   │   ├── prompt.py
    │   │   └── toolkit.py
    │   ├── python/
    │   │   └── __init__.py
    │   ├── slack/
    │   │   ├── __init__.py
    │   │   └── toolkit.py
    │   ├── spark/
    │   │   └── __init__.py
    │   ├── spark_sql/
    │   │   ├── __init__.py
    │   │   ├── base.py
    │   │   ├── prompt.py
    │   │   └── toolkit.py
    │   ├── sql/
    │   │   ├── __init__.py
    │   │   ├── base.py
    │   │   ├── prompt.py
    │   │   └── toolkit.py
    │   ├── steam/
    │   │   ├── __init__.py
    │   │   └── toolkit.py
    │   ├── vectorstore/
    │   │   ├── __init__.py
    │   │   ├── base.py
    │   │   ├── prompt.py
    │   │   └── toolkit.py
    │   ├── xorbits/
    │   │   └── __init__.py
    │   └── zapier/
    │       ├── __init__.py
    │       └── toolkit.py
    ├── chat/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── output_parser.py
    │   └── prompt.py
    ├── conversational/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── output_parser.py
    │   └── prompt.py
    ├── conversational_chat/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── output_parser.py
    │   └── prompt.py
    ├── format_scratchpad/
    │   ├── __init__.py
    │   ├── log.py
    │   ├── log_to_messages.py
    │   ├── openai_functions.py
    │   ├── openai_tools.py
    │   ├── tools.py
    │   └── xml.py
    ├── json_chat/
    │   ├── __init__.py
    │   ├── base.py
    │   └── prompt.py
    ├── mrkl/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── output_parser.py
    │   └── prompt.py
    ├── openai_assistant/
    │   ├── __init__.py
    │   └── base.py
    ├── openai_functions_agent/
    │   ├── __init__.py
    │   ├── agent_token_buffer_memory.py
    │   └── base.py
    ├── openai_functions_multi_agent/
    │   ├── __init__.py
    │   └── base.py
    ├── openai_tools/
    │   ├── __init__.py
    │   └── base.py
    ├── output_parsers/
    │   ├── __init__.py
    │   ├── json.py
    │   ├── openai_functions.py
    │   ├── openai_tools.py
    │   ├── react_json_single_input.py
    │   ├── react_single_input.py
    │   ├── self_ask.py
    │   ├── tools.py
    │   └── xml.py
    ├── react/
    │   ├── __init__.py
    │   ├── agent.py
    │   ├── base.py
    │   ├── output_parser.py
    │   ├── textworld_prompt.py
    │   └── wiki_prompt.py
    ├── self_ask_with_search/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── output_parser.py
    │   └── prompt.py
    ├── structured_chat/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── output_parser.py
    │   └── prompt.py
    ├── tool_calling_agent/
    │   ├── __init__.py
    │   └── base.py
    └── xml/
        ├── __init__.py
        ├── base.py
        └── prompt.py


"""

> Langchain Tools

In [None]:
### Custom Tools 
from langchain_community.tools import YouTubeSearchTool, WikipediaSummaryTool, CustomTool
from langchain_community.tools.tavily_search_tool import TavilySearchResults, TavilyAnswer
from langchain.agents import tool

tool_1 = YouTubeSearchTool()
tool_2 = WikipediaSummaryTool()
tool_3 = TavilySearchResults()

@tool
def get_word_length(word: str) -> int:
    """Return the length of a word."""
    return len(word)

print(f'Length of the word '{get_word_length.invoke("hello")})

print(get_word_length.name)
print(get_word_length.description)
print(get_word_length.args)

In [None]:
from langchain.tools import retriever

""" 
Directory structure:
└── tools/
    ├── __init__.py
    ├── base.py
    ├── convert_to_openai.py
    ├── ifttt.py
    ├── plugin.py
    ├── render.py
    ├── retriever.py
    ├── yahoo_finance_news.py
    ├── ainetwork/
    │   ├── __init__.py
    │   ├── app.py
    │   ├── base.py
    │   ├── owner.py
    │   ├── rule.py
    │   ├── transfer.py
    │   └── value.py
    ├── amadeus/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── closest_airport.py
    │   └── flight_search.py
    ├── arxiv/
    │   ├── __init__.py
    │   └── tool.py
    ├── azure_cognitive_services/
    │   ├── __init__.py
    │   ├── form_recognizer.py
    │   ├── image_analysis.py
    │   ├── speech2text.py
    │   ├── text2speech.py
    │   └── text_analytics_health.py
    ├── bearly/
    │   ├── __init__.py
    │   └── tool.py
    ├── bing_search/
    │   ├── __init__.py
    │   └── tool.py
    ├── brave_search/
    │   ├── __init__.py
    │   └── tool.py
    ├── clickup/
    │   ├── __init__.py
    │   └── tool.py
    ├── dataforseo_api_search/
    │   ├── __init__.py
    │   └── tool.py
    ├── ddg_search/
    │   ├── __init__.py
    │   └── tool.py
    ├── e2b_data_analysis/
    │   ├── __init__.py
    │   └── tool.py
    ├── edenai/
    │   ├── __init__.py
    │   ├── audio_speech_to_text.py
    │   ├── audio_text_to_speech.py
    │   ├── edenai_base_tool.py
    │   ├── image_explicitcontent.py
    │   ├── image_objectdetection.py
    │   ├── ocr_identityparser.py
    │   ├── ocr_invoiceparser.py
    │   └── text_moderation.py
    ├── eleven_labs/
    │   ├── __init__.py
    │   ├── models.py
    │   └── text2speech.py
    ├── file_management/
    │   ├── __init__.py
    │   ├── copy.py
    │   ├── delete.py
    │   ├── file_search.py
    │   ├── list_dir.py
    │   ├── move.py
    │   ├── read.py
    │   └── write.py
    ├── github/
    │   ├── __init__.py
    │   └── tool.py
    ├── gitlab/
    │   ├── __init__.py
    │   └── tool.py
    ├── gmail/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── create_draft.py
    │   ├── get_message.py
    │   ├── get_thread.py
    │   ├── search.py
    │   └── send_message.py
    ├── golden_query/
    │   ├── __init__.py
    │   └── tool.py
    ├── google_cloud/
    │   ├── __init__.py
    │   └── texttospeech.py
    ├── google_finance/
    │   ├── __init__.py
    │   └── tool.py
    ├── google_jobs/
    │   ├── __init__.py
    │   └── tool.py
    ├── google_lens/
    │   ├── __init__.py
    │   └── tool.py
    ├── google_places/
    │   ├── __init__.py
    │   └── tool.py
    ├── google_scholar/
    │   ├── __init__.py
    │   └── tool.py
    ├── google_search/
    │   ├── __init__.py
    │   └── tool.py
    ├── google_serper/
    │   ├── __init__.py
    │   └── tool.py
    ├── google_trends/
    │   ├── __init__.py
    │   └── tool.py
    ├── graphql/
    │   ├── __init__.py
    │   └── tool.py
    ├── human/
    │   ├── __init__.py
    │   └── tool.py
    ├── interaction/
    │   ├── __init__.py
    │   └── tool.py
    ├── jira/
    │   ├── __init__.py
    │   └── tool.py
    ├── json/
    │   ├── __init__.py
    │   └── tool.py
    ├── memorize/
    │   ├── __init__.py
    │   └── tool.py
    ├── merriam_webster/
    │   ├── __init__.py
    │   └── tool.py
    ├── metaphor_search/
    │   ├── __init__.py
    │   └── tool.py
    ├── multion/
    │   ├── __init__.py
    │   ├── close_session.py
    │   ├── create_session.py
    │   └── update_session.py
    ├── nasa/
    │   ├── __init__.py
    │   └── tool.py
    ├── nuclia/
    │   ├── __init__.py
    │   └── tool.py
    ├── office365/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── create_draft_message.py
    │   ├── events_search.py
    │   ├── messages_search.py
    │   ├── send_event.py
    │   └── send_message.py
    ├── openapi/
    │   ├── __init__.py
    │   └── utils/
    │       ├── __init__.py
    │       ├── api_models.py
    │       └── openapi_utils.py
    ├── openweathermap/
    │   ├── __init__.py
    │   └── tool.py
    ├── playwright/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── click.py
    │   ├── current_page.py
    │   ├── extract_hyperlinks.py
    │   ├── extract_text.py
    │   ├── get_elements.py
    │   ├── navigate.py
    │   └── navigate_back.py
    ├── powerbi/
    │   ├── __init__.py
    │   └── tool.py
    ├── pubmed/
    │   ├── __init__.py
    │   └── tool.py
    ├── python/
    │   └── __init__.py
    ├── reddit_search/
    │   ├── __init__.py
    │   └── tool.py
    ├── requests/
    │   ├── __init__.py
    │   └── tool.py
    ├── scenexplain/
    │   ├── __init__.py
    │   └── tool.py
    ├── searchapi/
    │   ├── __init__.py
    │   └── tool.py
    ├── searx_search/
    │   ├── __init__.py
    │   └── tool.py
    ├── shell/
    │   ├── __init__.py
    │   └── tool.py
    ├── slack/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── get_channel.py
    │   ├── get_message.py
    │   ├── schedule_message.py
    │   └── send_message.py
    ├── sleep/
    │   ├── __init__.py
    │   └── tool.py
    ├── spark_sql/
    │   ├── __init__.py
    │   └── tool.py
    ├── sql_database/
    │   ├── __init__.py
    │   ├── prompt.py
    │   └── tool.py
    ├── stackexchange/
    │   ├── __init__.py
    │   └── tool.py
    ├── steam/
    │   ├── __init__.py
    │   └── tool.py
    ├── steamship_image_generation/
    │   ├── __init__.py
    │   └── tool.py
    ├── tavily_search/
    │   ├── __init__.py
    │   └── tool.py
    ├── vectorstore/
    │   ├── __init__.py
    │   └── tool.py
    ├── wikipedia/
    │   ├── __init__.py
    │   └── tool.py
    ├── wolfram_alpha/
    │   ├── __init__.py
    │   └── tool.py
    ├── youtube/
    │   ├── __init__.py
    │   └── search.py
    └── zapier/
        ├── __init__.py
        └── tool.py

"""

> Langchain Chains

In [None]:
from langchain.chains import *
# from langchain_community.chains import *

""" 
Directory structure:
└── chains/
    ├── __init__.py
    ├── base.py
    ├── example_generator.py
    ├── history_aware_retriever.py
    ├── llm.py
    ├── llm_requests.py
    ├── loading.py
    ├── mapreduce.py
    ├── moderation.py
    ├── prompt_selector.py
    ├── retrieval.py
    ├── sequential.py
    ├── transform.py
    ├── api/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── news_docs.py
    │   ├── open_meteo_docs.py
    │   ├── podcast_docs.py
    │   ├── prompt.py
    │   ├── tmdb_docs.py
    │   └── openapi/
    │       ├── __init__.py
    │       ├── chain.py
    │       ├── prompts.py
    │       ├── requests_chain.py
    │       └── response_chain.py
    ├── chat_vector_db/
    │   ├── __init__.py
    │   └── prompts.py
    ├── combine_documents/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── map_reduce.py
    │   ├── map_rerank.py
    │   ├── reduce.py
    │   ├── refine.py
    │   └── stuff.py
    ├── constitutional_ai/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── models.py
    │   ├── principles.py
    │   └── prompts.py
    ├── conversation/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── memory.py
    │   └── prompt.py
    ├── conversational_retrieval/
    │   ├── __init__.py
    │   ├── base.py
    │   └── prompts.py
    ├── elasticsearch_database/
    │   ├── __init__.py
    │   ├── base.py
    │   └── prompts.py
    ├── ernie_functions/
    │   ├── __init__.py
    │   └── base.py
    ├── flare/
    │   ├── __init__.py
    │   ├── base.py
    │   └── prompts.py
    ├── graph_qa/
    │   ├── __init__.py
    │   ├── arangodb.py
    │   ├── base.py
    │   ├── cypher.py
    │   ├── cypher_utils.py
    │   ├── falkordb.py
    │   ├── gremlin.py
    │   ├── hugegraph.py
    │   ├── kuzu.py
    │   ├── nebulagraph.py
    │   ├── neptune_cypher.py
    │   ├── neptune_sparql.py
    │   ├── ontotext_graphdb.py
    │   ├── prompts.py
    │   └── sparql.py
    ├── hyde/
    │   ├── __init__.py
    │   ├── base.py
    │   └── prompts.py
    ├── llm_bash/
    │   └── __init__.py
    ├── llm_checker/
    │   ├── __init__.py
    │   ├── base.py
    │   └── prompt.py
    ├── llm_math/
    │   ├── __init__.py
    │   ├── base.py
    │   └── prompt.py
    ├── llm_summarization_checker/
    │   ├── __init__.py
    │   ├── base.py
    │   └── prompts/
    │       ├── are_all_true_prompt.txt
    │       ├── check_facts.txt
    │       ├── create_facts.txt
    │       └── revise_summary.txt
    ├── llm_symbolic_math/
    │   └── __init__.py
    ├── natbot/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── crawler.py
    │   └── prompt.py
    ├── openai_functions/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── citation_fuzzy_match.py
    │   ├── extraction.py
    │   ├── openapi.py
    │   ├── qa_with_structure.py
    │   ├── tagging.py
    │   └── utils.py
    ├── openai_tools/
    │   ├── __init__.py
    │   └── extraction.py
    ├── qa_generation/
    │   ├── __init__.py
    │   ├── base.py
    │   └── prompt.py
    ├── qa_with_sources/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── loading.py
    │   ├── map_reduce_prompt.py
    │   ├── refine_prompts.py
    │   ├── retrieval.py
    │   ├── stuff_prompt.py
    │   └── vector_db.py
    ├── query_constructor/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── ir.py
    │   ├── parser.py
    │   ├── prompt.py
    │   └── schema.py
    ├── question_answering/
    │   ├── __init__.py
    │   ├── chain.py
    │   ├── map_reduce_prompt.py
    │   ├── map_rerank_prompt.py
    │   ├── refine_prompts.py
    │   └── stuff_prompt.py
    ├── retrieval_qa/
    │   ├── __init__.py
    │   ├── base.py
    │   └── prompt.py
    ├── router/
    │   ├── __init__.py
    │   ├── base.py
    │   ├── embedding_router.py
    │   ├── llm_router.py
    │   ├── multi_prompt.py
    │   ├── multi_prompt_prompt.py
    │   ├── multi_retrieval_prompt.py
    │   └── multi_retrieval_qa.py
    ├── sql_database/
    │   ├── __init__.py
    │   ├── prompt.py
    │   └── query.py
    ├── structured_output/
    │   ├── __init__.py
    │   └── base.py
    └── summarize/
        ├── __init__.py
        ├── chain.py
        ├── map_reduce_prompt.py
        ├── refine_prompts.py
        └── stuff_prompt.py


"""

## Langchain Tools

In [None]:
### Custom Tools 
from langchain_community.tools import YouTubeSearchTool, WikipediaSummaryTool, CustomTool
from langchain_community.tools.tavily_search_tool import TavilySearchResults, TavilyAnswer
from langchain.agents import tool

tool_1 = YouTubeSearchTool()
tool_2 = WikipediaSummaryTool()
tool_3 = TavilySearchResults()

@tool
def get_word_length(word: str) -> int:
    """Return the length of a word."""
    return len(word)

print(f'Length of the word '{get_word_length.invoke("hello")})

print(get_word_length.name)
print(get_word_length.description)
print(get_word_length.args)

In [None]:
#----------------------------------------------------------------------------------------
### Custom Tools 
from langchain_community.tools import YouTubeSearchTool, WikipediaSummaryTool, CustomTool
from langchain_community.tools.tavily_search_tool import TavilySearchResults, TavilyAnswer

tool_1 = YouTubeSearchTool()
tool_2 = WikipediaSummaryTool()
tool_3 = TavilySearchResults()

tools = [tool_1, tool_2, tool_3]

#----------------------------------------------------------------------------------------
#----------------------------------------------------------------------------------------
from langchain.agents import tool

@tool
def get_word_length(text: str) -> int:
    """Return the length of a word."""
    return len(text)

print(get_word_length.invoke("hello"))

print(get_word_length.name)
print(get_word_length.description)
print(get_word_length.args)


#------------------------------------------------------------------------------------------
#------------------------------------------------------------------------------------------
from langchain.agents import Tool
from langchain.utilities import GoogleSearchAPIWrapper

google_search = GoogleSearchAPIWrapper()
tools = [
    Tool(
        name="Web Answer",
        func = google_search.run,
        description="Get an intermediate answer to a question.",
        verbose = True
    )
]


#-----------------------------------------Custom Tool from a LangChain Chain ----------------------------------------
#--------------------------------------------------------------------------------------------------------------------

from langchain.tools import tool
from langchain.prompts import ChatPromptTemplate
from langchain.chat_models import ChatOpenAI

@tool("query_param_checker")
def query_param_checker(user_query: str, generated_query_params: str) -> str:
    """
    This tool checks if the query parameters generated by query_param_generator are valid.
    It uses an LLM to evaluate the parameters based on a simple prompt.
    """
    # Define a simple prompt template
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", "Check if the generated query parameters are valid for the user query."),
            ("human", "User Query: {user_query}\nGenerated Query Parameters: {generated_query_params}"),
        ]
    )

    # Create a chain with the prompt and LLM
    llm = ChatOpenAI(model="gpt-3.5-turbo")
    chain = prompt | llm

    # Invoke the chain with the inputs
    result = chain.invoke({"user_query": user_query, "generated_query_params": generated_query_params})

    # Return the LLM's response
    return result.content


#-----------------------------Custom Tool from Class (RECOMMENDED 1) ------------------------------------------
#----------------------------------------------------------------------------------------------

# Define Input Schema
class SearchToolInput(BaseModel):
    query: str = Field(..., description="The search query to look up.")
    max_results: Optional[int] = Field(default=10, description="The maximum number of search results to return.")

# Define the Tool
class TavilySearchTool:
    def __init__(self, max_results: int = 10):
        self.max_results = max_results

    def search(self, query: str) -> Optional[List[Dict[str, Any]]]:
        """
        Perform a web search using the Tavily search engine.
        """
        try:
            # Initialize the Tavily search tool with the configured max_results
            search_tool = TavilySearchResults(max_results=self.max_results)

            # Perform the search
            result = search_tool.invoke({"query": query})

            # Return the search results
            return result
        except Exception as e:
            return {"error": str(e)}

# Create the LangChain Tool
search_tool = Tool(
    name="Tavily Search",
    func=TavilySearchTool().search,
    description="Performs web searches using the Tavily search engine, providing accurate and trusted results for general queries.",
    args_schema=SearchToolInput
)


#-----------------------------Structured Tool from Class (BEST AND MOST RECOMMENDED) ------------------------------------------
#----------------------------------------------------------------------------------------------
from langchain.tools.base import StructuredTool

# Define Input Schema
class SearchToolInput(BaseModel):
    query: str = Field(..., description="The search query to look up.")
    max_results: Optional[int] = Field(default=10, description="The maximum number of search results to return.")

# Define the Tool
class TavilySearchTool:
    def __init__(self, max_results: int = 10):
        self.max_results = max_results

    def search(self, query: str) -> Optional[List[Dict[str, Any]]]:
        """
        Perform a web search using the Tavily search engine.
        """
        try:
            # Initialize the Tavily search tool with the configured max_results
            search_tool = TavilySearchResults(max_results=self.max_results)

            # Perform the search
            result = search_tool.invoke({"query": query})

            # Return the search results
            return result
        except Exception as e:
            return {"error": str(e)}

# Create the LangChain Tool
search_tool = StructuredTool(
    name="Tavily Search",
    func=TavilySearchTool().search,
    description="Performs web searches using the Tavily search engine, providing accurate and trusted results for general queries.",
    args_schema=SearchToolInput
)


# ------------------------------------------------ Convert Tool to Structured Tool ------------------------------------------
#----------------------------------------------------------------------------------------------------------------------------
from langchain.tools.base import StructuredTool
from langchain.agents import Tool, load_tools
from langchain_core.tools import StructuredTool

def convert_to_structured_tool(tool):
    return StructuredTool.from_function(tool.func, name=tool.name, description=tool.description)

tools = load_tools(['serpapi'])
tools = [convert_to_structured_tool(tool) for tool in tools]


#---------------------------------- Custom Tool (RECOMMENDED 2) -------------------------------------------------
#------------------------------------------------------------------------------------------------
from langchain_core.tools import tool
from pydantic import BaseModel, Field
from typing import Optional, Union
import requests
import os

# Define Input Schema: Use Pydantic to define input parameters and descriptions for your tool.
class MyToolInput(BaseModel):
    param1: str = Field(..., description="Description of param1.")
    param2: int = Field(default=10, description="Description of param2.")
    
# Create the Tool: Use the @tool decorator to define a custom tool.
@tool("my_tool_function", args_schema=MyToolInput, return_direct=True)
def my_tool_function(param1: str, param2: int = 10) -> Union[Dict, str]:
    """
    Description of what the tool does.
    """
    try:
        url = (
            f'https://api.financialdatasets.ai/insider-transactions'
            f'?ticker={param1}'
            f'&limit={param2}'
            )
        # Perform the task (e.g., call an API, process data, etc.)
        response = requests.get(url, headers={'X-API-Key': api_key})
        return response
    except Exception as e:
        return {"error": str(e)}

tools = [my_tool_function, annual_report_tool, get_word_length]


#-----------------------------Custom Tool from a Custom Chain----------------------------------
#----------------------------------------------------------------------------------------------

from langchain.chains.base import Chain
from typing import Dict, List

class AnnualReportChain(Chain):
    chain: Chain

    @property
    def input_keys(self) -> List[str]:
        return list(self.chain.input_keys)

    @property
    def output_keys(self) -> List[str]:
        return ['output']

    def _call(self, inputs: Dict[str, str]) -> Dict[str, str]:
        # Queries the database to get the relevant documents for a given query
        query = inputs.get("input_documents", "")
        docs = vectorstore.similarity_search(query, include_metadata=True)
        output = chain.run(input_documents=docs, question=query)
        return {'output': output}
    
    

from langchain.agents import Tool
from langchain.tools.retriever import create_retriever_tool
from langchain.chains.question_answering import load_qa_chain
from langchain.llms import OpenAI

# Initialize your custom Chain
llm = OpenAI(temperature=0, openai_api_key=OPENAI_API_KEY, model_name="gpt-3.5-turbo")
chain = load_qa_chain(llm)
annual_report_chain = AnnualReportChain(chain=chain)

# Initialize your custom Tool
annual_report_tool = Tool(
    name="Annual Report",
    func=annual_report_chain.run,
    description="""
    useful for when you need to answer questions about a company's income statement,
    cash flow statement, or balance sheet. This tool can help you extract data points like
    net income, revenue, free cash flow, and total debt, among other financial line items.
    """
)



#---------------------------------- Creating a Node from Tools ----------------------------------
#------------------------------------------------------------------------------------------------
from langgraph.prebuilt import ToolNode, tools_condition
from langgraph.graph import StateGraph, START

builder = StateGraph(State)
tool_node = ToolNode(tools=tools)
builder.add_node("tools", tool_node)

> LangChain, LangGraph, and LangSmith

In [None]:
# LangChain 
    # Tools
    # Agents
    # Chains
    # Multi-Agent Systems
    # Plan and Execute
    # Reflection and Learning
    # Communication
    # Perception

# LangChain is a platform that enables developers to build, test, and deploy blockchain applications using multiple programming languages.
# It provides a set of tools and libraries that simplify the development process and make it easier to create blockchain applications.



# Types of LangChain Agents
    # LangChain offers several agentic patterns, each tailored to specific needs. These include:

    # Tool Calling Agents: Designed for straightforward tool usage.
    # React Agents: Use reasoning and action mechanisms to dynamically decide the best steps.
    # Structured Chat Agents: Parse inputs and outputs into structured formats like JSON.
    # Self-Ask with Search: Handle queries by splitting them into smaller, manageable steps.

## Langchain Agent

> Create Tool Calling Agent

In [None]:
from langchain.agents import create_tool_calling_agent
from langchain.agents.tool_calling_agent import base
from langchain_core.messages import HumanMessage
from langchain import hub
from langchain_openai import ChatOpenAI as LangchainChatDeepSeek
from langchain_community.tools.tavily_search import TavilySearchResults, TavilyAnswer
from langchain_community.tools import YouTubeSearchTool, WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain.agents import AgentExecutor
import os

# Load API key
api_key = os.getenv("DEEPSEEK_API_KEY")

# Prompt
prompt = hub.pull("hwchase17/openai-functions-agent")
# or
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", "You are a helpful assistant."),
        # First put the history
        ("placeholder", "{chat_history}"),
        # Then the new input
        ("human", "{input}"),
        # Finally the scratchpad
        ("placeholder", "{agent_scratchpad}"),
    ]
)

# Tools
tool_1 = YouTubeSearchTool()
tool_2 = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
tool_3 = TavilySearchResults(max_results=10)

tools = [tool_1, tool_2, tool_3]

# LLM
llm = LangchainChatDeepSeek(
            api_key=api_key,
            model="deepseek-chat",
            base_url="https://api.deepseek.com",
        )

# Agent

# Create a tool-calling agent
agent = create_tool_calling_agent(llm, tools, prompt)
# agent = base.create_tool_calling_agent()

# Agent Executor
agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    return_intermediate_steps=False,  # Only final output. If True, returns all intermediate steps
    handle_parsing_errors=True,  # Graceful parsing errors
)
        

query = input("Enter your query: ")

response = agent_executor.invoke(
    {
        "input": [HumanMessage(content=query)]
    }
)

> ReAct Agent

In [None]:

# The ReActAgent employs the ReAct (Reason+Act) framework, enabling the agent to perform both reasoning and actions within a 
# single framework. It integrates chain-of-thought reasoning with action execution, allowing the agent to handle complex, 
# multi-step tasks effectively.

from langchain.agents import create_react_agent
from langchain.prompts import PromptTemplate
from langchain_core.tools import tool
from langchain_experimental.utilities import PythonREPL
from langchain.agents import AgentType, initialize_agent, AgentExecutor
from typing import Annotated

template = '''Answer the following questions as best as you can. You have access to the following tools:
{tools}

Use the following format:
Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation sequence can be repeated N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

Here is an example of how to use the tools:
Question: Generate a chart of the Fibonacci sequence.
Thought: I need to write Python code to generate the Fibonacci sequence and plot it.
Action: python_repl_tool
Action Input: 
```python
import matplotlib.pyplot as plt

def fibonacci(n):
    fib_sequence = [0, 1]
    for i in range(2, n):
        fib_sequence.append(fib_sequence[-1] + fib_sequence[-2])
    return fib_sequence

n = 10  # Number of Fibonacci numbers to generate
fib_sequence = fibonacci(n)
plt.plot(fib_sequence)
plt.title("Fibonacci Sequence")
plt.show()
```
Observation: The chart was successfully generated.
Thought: I now know the final answer.
Final Answer: The chart of the Fibonacci sequence has been generated.

Begin!
Question: {input}
Thought: {agent_scratchpad}
'''



repl = PythonREPL()

@tool
def python_repl_tool(
    code: Annotated[str, "The python code to execute to generate your chart."],
):
    """Use this to execute python code. If you want to see the output of a value,
    you should print it out with `print(...)`. This is visible to the user."""
    try:
        result = repl.run(code)
    except BaseException as e:
        return f"Failed to execute. Error: {repr(e)}"
    result_str = f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"
    return result_str
    
prompt = PromptTemplate.from_template(template)
search_agent = create_react_agent(llm, tools = [python_repl_tool], prompt=prompt)

agent_executor = AgentExecutor(agent=search_agent, tools=[python_repl_tool], verbose=True, return_intermediate_steps=True, handle_parsing_errors=True)
agent_executor.invoke({"input": "create a visualization of some advanced dataset."})
# agent_executor.invoke({"input": [HumanMessage(content="What is the capital of France?")]})

> LangGraph ReAct Agent

In [None]:
""" 
LangGraph's prebuilt create_react_agent does not take a prompt template directly as a parameter, but instead takes a prompt parameter. 
This modifies the graph state before the llm is called, and can be one of four values:

    1. A SystemMessage, which is added to the beginning of the list of messages.
    2. A string, which is converted to a SystemMessage and added to the beginning of the list of messages.
    3. A Callable, which should take in full graph state. The output is then passed to the language model.
    4. Or a Runnable, which should take in full graph state. The output is then passed to the language model.

"""

from langgraph.prebuilt import create_react_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import SystemMessage, AnyMessage, HumanMessage, AgentMessage, AgentMessageWithScratchpad

# ----------------------------------------System Message-------------------
system_message = "You are a helpful assistant. Respond only in Spanish."
# This could also be a SystemMessage object
# system_message = SystemMessage(content="You are a helpful assistant. Respond only in Spanish.")

langgraph_agent_executor = create_react_agent(model, tools, prompt=system_message)
# -------------------------------------------------------------------------------------------------


# ----------------------------------------ChatPromptTemplate-------------------
# prompt = ChatPromptTemplate.from_messages(
#     [
#         ("system", "You are a helpful assistant."),
#         # First put the history
#         ("placeholder", "{chat_history}"),
#         # Then the new input
#         ("human", "{input}"),
#         # Finally the scratchpad
#         ("placeholder", "{agent_scratchpad}"),
#     ]
# )

react_agent_prompt = ChatPromptTemplate.from_messages([
    SystemMessage(content="""You are an advanced AI assistant designed to solve complex tasks using a systematic, step-by-step approach. 

CORE AGENT INSTRUCTIONS:
1. ALWAYS follow the React (Reasoning and Acting) paradigm
2. For EACH task, you must:
   a) REASON about the problem
   b) DETERMINE which TOOL to use
   c) Take ACTION using the selected tool
   d) OBSERVE the results
   e) REFLECT and decide next steps

AVAILABLE TOOLS:
{tools}

TOOL USAGE PROTOCOL:
- You have access to the following tools: {tool_names}
- BEFORE using any tool, EXPLICITLY state:
  1. WHY you are using this tool
  2. WHAT specific information you hope to retrieve
  3. HOW this information will help solve the task

TOOL INTERACTION FORMAT:
When using a tool, you MUST follow this strict format:
Thought: [Your reasoning for using the tool]
Action: [Exact tool name]
Action Input: [Precise input for the tool]

After receiving the observation, you will:
Observation: [Tool's response]
Reflection: [Analysis of the observation and next steps]

FINAL OUTPUT EXPECTATIONS:
- Provide a comprehensive, step-by-step solution
- Cite sources and tools used
- Explain your reasoning at each stage
- Offer clear conclusions or recommendations

Are you ready to solve the task systematically and intelligently?"""),
    MessagesPlaceholder(variable_name="chat_history", optional=True),
    ("human", "{messages}"),
])

# Initialize the language model
llm = ChatOpenAI(model="gpt-4o", temperature=0)

# Partial the prompt with tools and tool names
prompt = react_agent_prompt.partial(
    tools="\n".join([f"- {tool.name}: {tool.description}" for tool in tools]),
    tool_names=", ".join([tool.name for tool in tools])
)

# Create the React agent
agent = create_react_agent(
    model=llm, 
    tools=tools, 
    prompt=prompt
)

query = "Calculate the total cost of 15 items priced at $24.50 each, including a 7% sales tax"

messages = agent.invoke({"messages": [("human", query)]})


# -------------------------------------- STREAM MODE --------------------------------------
# Create the React agent
langgraph_agent_executor = create_react_agent(
    model=llm, 
    tools=tools, 
    prompt=prompt
)


for step in langgraph_agent_executor.stream(
    {"messages": [("human", query)]}, stream_mode="updates"
):
    print(step)
    
    
# --------------------------- FOR mAX ITERATION, USE RECURSION LIMIT ---------------------------
from langgraph.errors import GraphRecursionError
from langgraph.prebuilt import create_react_agent

RECURSION_LIMIT = 2 * 3 + 1

langgraph_agent_executor = create_react_agent(model, tools=tools)

try:
    for chunk in langgraph_agent_executor.stream(
        {"messages": [("human", query)]},
        {"recursion_limit": RECURSION_LIMIT},
        stream_mode="values",
    ):
        print(chunk["messages"][-1])
except GraphRecursionError:
    print({"input": query, "output": "Agent stopped due to max iterations."})
    
    

# --------------------------- FOR early_stopping_method ---------------------------
from langgraph.errors import GraphRecursionError
from langgraph.prebuilt import create_react_agent

RECURSION_LIMIT = 2 * 1 + 1

langgraph_agent_executor = create_react_agent(model, tools=tools)

try:
    for chunk in langgraph_agent_executor.stream(
        {"messages": [("human", query)]},
        {"recursion_limit": RECURSION_LIMIT},
        stream_mode="values",
    ):
        print(chunk["messages"][-1])
except GraphRecursionError:
    print({"input": query, "output": "Agent stopped due to max iterations."})

> Self Ask with Search Agent

In [9]:
# This agent incorporates a self-asking mechanism combined with search capabilities. It can autonomously formulate internal queries 
# to gather additional information necessary to answer user questions comprehensively

from langchain.agents import create_self_ask_with_search_agent
from langchain import hub
from langchain.agents import Tool
from langchain.utilities import GoogleSearchAPIWrapper

google_search = GoogleSearchAPIWrapper()
tools = [
    Tool(
        name="Web Answer",
        func = google_search.run,
        description="Get an intermediate answer to a question.",
        verbose = True
    )
]

prompt = hub.pull("hwchase17/self-ask-with-search")
search_agent = create_self_ask_with_search_agent(llm, tools, prompt)

agent_executor = AgentExecutor(agent=search_agent, tools=tools, verbose=True, return_intermediate_steps=True, handle_parsing_errors=True)
agent_executor.invoke({"input": "What is the capital of France?"})
# agent_executor.invoke({"input": [HumanMessage(content="What is the capital of France?")]})

> StructuredChatAgent (can use multiple inputs)

In [None]:
# param format_instructions: str = 'Use a json blob to specify a tool by providing an action key (tool name) and an action_input 
# key (tool input).\n\nValid "action" values: "Final Answer" or {tool_names}\n\nProvide only ONE action per $JSON_BLOB, 
# as shown:
# \n
# \n```
# \n{{{{
    # \n  "action": $TOOL_NAME,
    # \n  "action_input": $INPUT
    # \n}}}}
    # \n```
# 
# \n\nFollow this format:
# \n\nQuestion: input question to answer
# \nThought: consider previous and subsequent steps
# \nAction:\n```
# \n$JSON_BLOB\n```
# \nObservation: action result
# \n... (repeat Thought/Action/Observation N times)
# \nThought: I know what to respond\nAction:
# \n```
# \n{{{{
    # \n  "action": "Final Answer",
    # \n  "action_input": "Final response to human"
    # \n}}}}
    # \n```'

from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder

system_prompt = """
You are a helpful assistant that uses tools to answer the user's queries. Always respond with a JSON object that specifies the 
action to take. Use the following format:

{
    "action": "ToolName",
    "action_input": "Input for the tool"
}

If the answer can be provided without using any tools, use the following format:
{
    "action": "Final Answer",
    "action_input": "Your final answer to the user."
}

Available tools: {tools}
Begin!
"""

human_template = """
User: {input}
Chat History: {agent_scratchpad}
"""

prompt = ChatPromptTemplate.from_messages([
    ("system", system_prompt),
    MessagesPlaceholder(variable_name="chat_history"),
    ("human", human_template),
])

from langchain.chat_models import ChatOpenAI
from langchain.agents import AgentExecutor, create_structured_chat_agent

# Initialize the language model
llm = ChatOpenAI(temperature=0)

# Define the tools the agent can use
tools = [tool_1, tool_2]

# Create the StructuredChatAgent
agent = create_structured_chat_agent(
    llm=llm,
    tools=tools,
    prompt=prompt,
    stop_sequence=True  # Ensures the agent stops at the defined stop token
)

# Create the AgentExecutor
agent_executor = AgentExecutor(agent=agent, tools=tools)



import json

# Define the input
input_data = {
    "input": "What is 15 multiplied by 7?"
}

# Invoke the agent
response = agent_executor.invoke(input_data)

# Print the structured output
print(json.dumps(response, indent=4))

> CrewAI Agents (Advanced Collaboration)

In [None]:
from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool
from langchain_community.llms import OpenAI

# Define agents
researcher = Agent(
    role='Senior Research Analyst',
    goal='Uncover cutting-edge technologies and market trends',
    backstory="You are an experienced technology analyst with a knack for identifying emerging trends.",
    max_iter=5,  # Limit reasoning steps
    llm=ChatOpenAI(model="gpt-4-turbo", temperature=0.3),
    verbose=True,
    memory=True,  # Maintains conversation history
    tools=[SerperDevTool()],  # Search tool
    allow_delegation=True
)

# # Usage
# research_result = research_agent.execute(
#     "Find recent breakthroughs in AI-driven drug discovery"
# )

writer = Agent(
    role='Tech Content Strategist',
    goal='Create compelling content about technology trends',
    backstory="You transform complex technical concepts into engaging content.",
    llm=OpenAI(temperature=0.7),
    verbose=True
)

# Define tasks
research_task = Task(
    description="Research emerging AI technologies focusing on practical applications in healthcare",
    expected_output="A comprehensive report on emerging AI in healthcare with at least 5 specific technologies",
    agent=researcher
)

writing_task = Task(
    description="Create an engaging blog post based on the research findings",
    expected_output="A 1000-word blog post with sections covering each major technology",
    agent=writer,
    context=[research_task]
)

# Create the crew
tech_crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, writing_task],
    process=Process.sequential,
    verbose=2
)

# Execute the crew
result = tech_crew.kickoff()

> AutoGen Conversational Agents (Microsoft)

In [None]:
from autogen import ConversableAgent, GroupChatManager

# Create specialized agents
data_scientist = ConversableAgent(
    name="Data_Scientist",
    system_message="Expert in statistical analysis and ML modeling",
    llm_config={"config_list": [{"model": "gpt-4"}]}
)

domain_expert = ConversableAgent(
    name="Medical_Expert",
    system_message="Healthcare domain expert with clinical trial experience",
    llm_config={"config_list": [{"model": "gpt-4"}]}
)

# Advanced group chat manager
group_chat_manager = GroupChatManager(
    groupchat_participants=[data_scientist, domain_expert],
    max_round=10,
    admin_name="Moderator"
)

> Google Vertex AI Agents

In [None]:
from vertexai.preview.generative_models import GenerativeModel, Part, Tool, ToolConfig, ToolUseBlock
import vertexai

# Initialize Vertex AI
vertexai.init(project="your-project-id", location="us-central1")

# Define tools
def get_weather(location: str) -> str:
    """Gets the current weather for a given location."""
    # This would typically call a weather API
    return f"Sunny, 72°F in {location}"

tools = [
    Tool(
        function_declarations=[
            {
                "name": "get_weather",
                "description": "Gets the current weather for a given location",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "location": {
                            "type": "string",
                            "description": "The location to get weather for, e.g. 'San Francisco, CA'"
                        }
                    },
                    "required": ["location"]
                }
            }
        ]
    )
]

# Create the model with tool configuration
model = GenerativeModel(
    "gemini-pro",
    tools=tools,
    generation_config={"temperature": 0.2}
)

# Handle function execution
def handle_tool_call(tool_call):
    if tool_call.name == "get_weather":
        location = tool_call.args["location"]
        response = get_weather(location)
        return response
    return "Unknown tool"

# Generate content with tool use
response = model.generate_content(
    "What's the weather like in Seattle right now?",
)

# Process any tool calls in the response
if hasattr(response, 'candidates') and len(response.candidates) > 0:
    for part in response.candidates[0].content.parts:
        if isinstance(part, ToolUseBlock):
            # Process and respond to the tool call
            tool_result = handle_tool_call(part.function_call)
            
            # Continue the conversation with the tool result
            follow_up = model.generate_content(
                [
                    Part.from_text("What's the weather like in Seattle right now?"),
                    response.candidates[0].content,
                    Part.from_function_response(
                        name=part.function_call.name,
                        response=tool_result
                    )
                ]
            )
            print(follow_up.text)

> PandasAI Data Agent

In [None]:
from pandasai import SmartDataFrame
from pandasai.llm import OpenAI

# Initialize advanced data agent
llm = OpenAI(api_token="sk-...", model="gpt-4")
df = SmartDataFrame(
    "medical_data.csv",
    config={
        "llm": llm,
        "enable_cache": False,
        "max_retries": 5,
        "custom_prompts": {
            "clean_data": "Automatically clean and preprocess this dataset"
        }
    }
)

# Execute complex analysis
response = df.chat(
    "Predict which drug candidates have >80% efficacy probability "
    "using Bayesian regression analysis"
)

> Create Custom Agent

In [None]:
from typing import List, Dict, Tuple, Optional
from pydantic import BaseModel
import re
from langchain_core.tools import BaseTool
from langchain_core.language_models import BaseLanguageModel
from langchain_core.messages import HumanMessage, SystemMessage

class CustomAgent(BaseModel):
    llm: BaseLanguageModel  # The LLM to use for decision-making
    tools: List[BaseTool]  # List of tools the agent can use
    max_loops: int = 5  # Maximum number of loops to prevent infinite execution
    stop_pattern: List[str]  # Stop patterns for the LLM to avoid hallucinations

    @property
    def tool_by_names(self) -> Dict[str, BaseTool]:
        """Map tool names to tool objects."""
        return {tool.name: tool for tool in self.tools}

    def run(self, question: str) -> str:
        """Run the agent to answer a question."""
        name_to_tool_map = self.tool_by_names
        previous_responses = []
        num_loops = 0

        while num_loops < self.max_loops:
            num_loops += 1

            # Format the prompt with the current state
            curr_prompt = PROMPT_TEMPLATE.format(
                tool_description="\n".join([f"{tool.name}: {tool.description}" for tool in self.tools]),
                tool_names=", ".join([tool.name for tool in self.tools]),
                question=question,
                previous_responses="\n".join(previous_responses),
            )

            # Get the next action from the LLM
            output, tool, tool_input = self._get_next_action(curr_prompt)

            # If the final answer is found, return it
            if tool == "Final Answer":
                return tool_input

            # Execute the tool and get the result
            tool_result = name_to_tool_map[tool].run(tool_input)
            output += f"\n{OBSERVATION_TOKEN} {tool_result}\n{THOUGHT_TOKEN}"
            print(output)  # Print the agent's reasoning
            previous_responses.append(output)

        return "Max loops reached without finding a final answer."

    def _get_next_action(self, prompt: str) -> Tuple[str, str, str]:
        """Get the next action from the LLM."""
        result = self.llm.generate([prompt], stop=self.stop_pattern)
        output = result.generations[0][0].text  # Get the first generation

        # Parse the output to extract the tool and input
        tool, tool_input = self._get_tool_and_input(output)
        return output, tool, tool_input

    def _get_tool_and_input(self, generated: str) -> Tuple[str, str]:
        """Parse the LLM output to extract the tool and input."""
        if FINAL_ANSWER_TOKEN in generated:
            return "Final Answer", generated.split(FINAL_ANSWER_TOKEN)[-1].strip()

        # Use regex to extract the tool and input
        regex = r"Action: (.*?)\nAction Input:[\s]*(.*)"
        match = re.search(regex, generated, re.DOTALL)
        if not match:
            raise ValueError(f"Output of LLM is not parsable for next tool use: `{generated}`")

        tool = match.group(1).strip()
        tool_input = match.group(2).strip(" ").strip('"')
        return tool, tool_input
    

FINAL_ANSWER_TOKEN = "Final Answer:"
OBSERVATION_TOKEN = "Observation:"
THOUGHT_TOKEN = "Thought:"
PROMPT_TEMPLATE = """Answer the question as best as you can using the following tools: 

{tool_description}

Use the following format:

Question: the input question you must answer
Thought: comment on what you want to do next
Action: the action to take, exactly one element of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation repeats N times, use it until you are sure of the answer)
Thought: I now know the final answer
Final Answer: your final answer to the original input question

Begin!

Question: {question}
Thought: {previous_responses}
"""

# The tool(s) that your Agent will use
tools = [annual_report_tool]

# The question that you will ask your Agent
question = "What was Meta's net income in 2022? What was net income the year before that?"

# The prompt that your Agent will use and update as it is "reasoning"
prompt = PROMPT_TEMPLATE.format(
  tool_description="\n".join([f"{tool.name}: {tool.description}" for tool in tools]),
  tool_names=", ".join([tool.name for tool in tools]),
  question=question,
  previous_responses='{previous_responses}',
)

# The LLM that your Agent will use
llm = OpenAI(temperature=0, openai_api_key=OPENAI_API_KEY, model_name="gpt-3.5-turbo")

# Initialize your Agent
agent = CustomAgent(
  llm=llm, 
  tools=tools, 
  prompt=prompt, 
  stop_pattern=[f'\n{OBSERVATION_TOKEN}', f'\n\t{OBSERVATION_TOKEN}'],
)

agent_executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,
    return_intermediate_steps=False,  # Only final output. If True, returns all intermediate steps
    handle_parsing_errors=True,  # Graceful parsing errors
)
# Run the Agent!
result = agent.run(question)

print(result)

> Use Langchain "initialize_agent" class

In [None]:
from langchain.agents import AgentType, initialize_agent, AgentExecutor
from langchain_core.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler


tools = [tool_1,tool_2]  # Add more tools as needed

# Define the agent's prompt
prompt_template = """
You are an advanced agent with access to multiple tools. Your task is to resolve customer queries by:
1. Identifying the problem or request.
2. Using the tools provided to gather additional information if needed.
3. Synthesizing the information into a clear, concise response.

You can chain tools if required. If you are unsure, respond with 'I need more details.'

Query: {query}
"""
llm = ChatOpenAI(
    api_key=os.getenv("DEEPSEEK_API_KEY"),
    model="deepseek-chat",
    base_url="https://api.deepseek.com",
    streaming=True,
    callbacks=AsyncCallbackManager([StreamingStdOutCallbackHandler()]),
)

# Initialize the agent
advanced_agent = initialize_agent(
    tools=tools,
    llm=llm,
    agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    agent_kwargs={"prompt_template": prompt_template},
    verbose=True,
)

agent_executor = AgentExecutor(agent=advanced_agent, tools=tools, verbose=True, 
                            return_intermediate_steps=True, handle_parsing_errors=True)

agent_executor.invoke({"query": "What is the capital of France?"})

## LangGraph

In [3]:
# 1. Key Concepts
    # Graph : A workflow of nodes and edges.
    # Nodes : Functions or agents that perform tasks.
    # Edges : Connections between nodes that define the flow.
    # State : A shared data structure passed between nodes.
    # StateGraph : A graph that manages state transitions.

# Draw a directory tree for the src directory for a LangChain project.
"""
src/
├── agents/
│   ├── __init__.py
│   ├── agent.py
│   ├── graph.py
│   ├── tools.py
│   ├── configuration.py
│   ├── state.py
│   ├── prompts.py
│   └── utils.py

"""

> LangGraph Workflow

In [None]:
from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import create_react_agent
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from pydantic import BaseModel
from typing import Literal, Sequence, List, Annotated
from typing_extensions import TypedDict
import functools
import operator

#------------------ Define the Memory Saver-----------------------
memory = MemorySaver()

#------------------ Define the State-----------------------     # You can write a custom state class by extending the TypedDict class.
class AgentState(TypedDict):
    # Messages: Stores conversation history
    messages: Annotated[Sequence[BaseMessage], operator.add]
    
    # Selected Agents: Tracks which agents are active in the workflow
    selected_analysts: List[str]
    
    # Current Agent Index: Tracks the progress through the selected agents
    current_analyst_idx: int

workflow = StateGraph(AgentState)   # Initialize the Graph

#------------------ Create Nodes-----------------------
def supervisor_router(state):
    """Route to appropriate analyst(s) based on the query"""
    result = routing_chain.invoke(state)
    selected_analysts = [a.strip() for a in result.content.strip().split(',')]
    return {
        "messages": state["messages"] + [SystemMessage(content=f"Routing query to: {', '.join(selected_analysts)}", name="supervisor")],
        "selected_analysts": selected_analysts,
        "current_analyst_idx": 0
    }

# or
def agent_node(state: AgentState, agent, name: str) -> AgentState:
    """
    Generic node function for an agent.
    - `state`: The current state of the workflow.
    - `agent`: The agent or function to process the state.
    - `name`: The name of the agent (for logging or identification).
    """
    # Invoke the agent with the current state
    result = agent.invoke(state)
    
    # Update the state with the agent's output
    return {
        "messages": state["messages"] + [HumanMessage(content=result["messages"][-1].content, name=name)],
        "selected_agents": state["selected_agents"],
        "current_agent_idx": state["current_agent_idx"] + 1
    }


#--------------------- Wrap the agent in a node--------------------------

# Create the analysts with their specific tools
quant_strategist = create_react_agent(llm, tools=quant_strategist_tools)
quant_strategist_node = functools.partial(agent_node, agent=quant_strategist, name="quant_strategist")

macro_analyst = create_react_agent(llm, tools=macro_analyst_tools)
macro_analyst_node = functools.partial(agent_node, agent=macro_analyst, name="macro_analyst")


#------------------- Add Nodes to Graph-----------------------
workflow = StateGraph(AgentState)   # Initialize the Graph
workflow.add_node("supervisor", supervisor_router)  # Add the supervisor node
workflow.add_node("quant_strategist", quant_strategist_node)    # Add the quant_strategist node
workflow.add_node("macro_analyst", macro_analyst_node)        # Add the macro_analyst node

#------------------- Define the Prompt-----------------------
class SupervisorPrompt(ChatPromptTemplate):
    """Prompt for the supervisor node"""
    messages: MessagesPlaceholder
    selected_analysts: List[str]
    current_analyst_idx: int

#------------------- Define Conditional Edge-----------------------
def get_next_step(state: AgentState) -> str:
    """
    Determines the next step in the workflow.
    - If no agents are selected, go to the final summary.
    - If all agents have processed, go to the final summary.
    - Otherwise, go to the next agent.
    """
    if not state["selected_agents"]:
        return "final_summary"
    current_idx = state["current_agent_idx"]
    if current_idx >= len(state["selected_agents"]):
        return "final_summary"
    return state["selected_agents"][current_idx]


# Add conditional edges:
workflow.add_conditional_edges(
    "supervisor",  # Source node
    get_next_step,  # Router node/Function to determine the next step
    {
        "quant_strategist": "quant_strategist",  # Route to quant_strategist node
        "macro_analyst": "macro_analyst",        # Route to macro_analyst node
        "final_summary": "final_summary"         # Route to final_summary node
    }
)

#------------------ Add Final Edges ------------------------------------
workflow.add_edge(START, "supervisor")
workflow.add_edge("final_summary", END)

#-------------------- Compile the Graph --------------------------------
graph = workflow.compile()
# or
graph = workflow.compile(checkpointer=memory)   # Compile the graph with memory
# or
graph = workflow.compile(checkpointer=memory, interrupt_before=["quant_strategist_node"])  # Compile the graph with memory and interrupt before quant_strategist_node


#------------------ Stream the Graph with Memory--------------------------------
config = {"configurable": {"thread_id": "1"}}   # add memory thread, we used thread_id = 2
events = graph.stream({"messages": {"Hi there, my name is Paul"}}, config, stream_mode = "values")

for event in events:    # Iterate over the events
    event['messages'][-1].pretty_print()

memory.get(config)  # Retrieve the memory for a specific configuration or thread_id


#-------------------- Accessing the Graph State --------------------------------
graph = workflow.compile()
graph.get_state(config).values  # get the state of the graph
graph.get_state(config).values.get("messages", "")  # get the messages from the state
graph.update_state(config, {"input": "Hello, World!"})  # update the state of the graph

> Nice way to execute the LangGraph

In [None]:
#---------------------------ATLERNATIVE WAY TO RUN THE GRAPH IN A BEAUTIFUL WAY------------------------------



#------------------------- Run the Graph------------------------------------
#------------------------- Custom Function----------------------------------
from typing import Dict, Any
import json
import re
from langchain_core.messages import HumanMessage
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from rich.rule import Rule

#---------- Formatting Functions
# Format Bold Text
def format_bold_text(content: str) -> Text:
    """Convert **text** to rich Text with bold formatting."""
    text = Text()
    pattern = r'\*\*(.*?)\*\*'
    parts = re.split(pattern, content)
    for i, part in enumerate(parts):
        if i % 2 == 0:
            text.append(part)
        else:
            text.append(part, style="bold")
    return text

# Format Message Content
def format_message_content(content: str) -> Union[str, Text]:
    """Format the message content, handling JSON and text with bold markers."""
    try:
        data = json.loads(content)
        return json.dumps(data, indent=2)
    except:
        if '**' in content:
            return format_bold_text(content)
        return content

# Format Agent Message
def format_agent_message(message: HumanMessage) -> Union[str, Text]:
    """Format a single agent message."""
    return format_message_content(message.content)

# Get Agent Title
def get_agent_title(agent: str, message: HumanMessage) -> str:
    """Get the title for the agent panel, with fallback handling."""
    base_title = agent.replace('_', ' ').title()
    if hasattr(message, 'name') and message.name is not None:
        try:
            return message.name.replace('_', ' ').title()
        except:
            return base_title
    return base_title

# Print a Single Step
def print_step(step: Dict[str, Any]) -> None:
    """Pretty print a single step of the agent execution."""
    console = Console()
    for agent, data in step.items():
        # Handle supervisor steps
        if 'next' in data:
            next_agent = data['next']
            text = Text()
            text.append("Portfolio Manager ", style="bold magenta")
            text.append("assigns next task to ", style="white")
            if next_agent == "final_summary":
                text.append("FINAL SUMMARY", style="bold yellow")
            elif next_agent == "END":
                text.append("END", style="bold red")
            else:
                text.append(f"{next_agent}", style="bold green")
            console.print(Panel(
                text,
                title="[bold blue]Supervision Step",
                border_style="blue"
            ))
        # Handle agent responses and final summary
        if 'messages' in data:
            message = data['messages'][0]
            formatted_content = format_agent_message(message)
            if agent == "final_summary":
                # Final summary formatting
                console.print(Rule(style="yellow", title="Portfolio Analysis"))
                console.print(Panel(
                    formatted_content,
                    title="[bold yellow]Investment Summary and Recommendation",
                    border_style="yellow",
                    padding=(1, 2)
                ))
                console.print(Rule(style="yellow"))
            else:
                # Regular analyst reports
                title = get_agent_title(agent, message)
                console.print(Panel(
                    formatted_content,
                    title=f"[bold blue]{title} Report",
                    border_style="green"
                ))

# Stream the Execution
def stream_agent_execution(graph, input_data: Dict, config: Dict) -> None:
    """Stream and pretty print the agent execution."""
    console = Console()
    console.print("\n[bold blue]Starting Agent Execution...[/bold blue]\n")
    for step in graph.stream(input_data, config):
        if "__end__" not in step:
            print_step(step)
            console.print("\n")
    console.print("[bold blue]Analysis Complete[/bold blue]\n")


# Run the Graph
# Define the input data
input_data = {
    "messages": [HumanMessage(content="What is AAPL's current price and latest revenue?")]
}

# Define the configuration (e.g., recursion limit)
config = {"recursion_limit": 10}

# Stream the execution
stream_agent_execution(graph, input_data, config)

> LangGraph States

In [None]:
# LangGraph State: --> Example
# What is a LangGraph State?
    # A LangGraph state is a data structure that holds the current state of the workflow. It is passed between nodes in the graph, 
    # and each node can modify the state as needed. The state typically contains all the information required for the workflow to function, 
    # such as inputs, intermediate results, and outputs.

from dataclasses import dataclass, field
from typing import Any, Optional, Annotated
import operator
from langgraph.graph import Graph, StateGraph, MessageGraph, MessagesState

 
#------------------ Define the State (State.py) -----------------------
DEFAULT_EXTRACTION_SCHEMA = {
    "title": "CompanyInfo",
    "description": "Basic information about a company",
    "type": "object",
    "properties": {
        "company_name": {
            "type": "string",
            "description": "Official name of the company",
        },
        "founding_year": {
            "type": "integer",
            "description": "Year the company was founded",
        },
        "founder_names": {
            "type": "array",
            "items": {"type": "string"},
            "description": "Names of the founding team members",
        },
        "product_description": {
            "type": "string",
            "description": "Brief description of the company's main product or service",
        },
        "funding_summary": {
            "type": "string",
            "description": "Summary of the company's funding history",
        },
    },
    "required": ["company_name"],
}

class SampleState(MessagesState):   # this state will have both company and messages (since messages is already defined in the MessagesState)
    """A sample state class that extends the MessagesState."""
    company: str
    """Company to research provided by the user."""
    

@dataclass(kw_only=True)
class InputState:
    """Input state defines the interface between the graph and the user (external API)."""

    # Messages: Stores conversation history
    messages: Annotated[Sequence[BaseMessage], operator.add]
    
    company: str
    "Company to research provided by the user."

    extraction_schema: dict[str, Any] = field(
        default_factory=lambda: DEFAULT_EXTRACTION_SCHEMA
    )
    "The json schema defines the information the agent is tasked with filling out."

    user_notes: Optional[dict[str, Any]] = field(default=None)
    "Any notes from the user to start the research process."


@dataclass(kw_only=True)
class OverallState:
    """Input state defines the interface between the graph and the user (external API)."""

    # Messages: Stores conversation history
    messages: Annotated[Sequence[BaseMessage], operator.add]
    
    company: str
    "Company to research provided by the user."

    extraction_schema: dict[str, Any] = field(
        default_factory=lambda: DEFAULT_EXTRACTION_SCHEMA
    )
    "The json schema defines the information the agent is tasked with filling out."

    user_notes: str = field(default=None)
    "Any notes from the user to start the research process."

    search_queries: list[str] = field(default=None)
    "List of generated search queries to find relevant information"

    completed_notes: Annotated[list, operator.add] = field(default_factory=list)
    "Notes from completed research related to the schema"

    info: dict[str, Any] = field(default=None)
    """
    A dictionary containing the extracted and processed information
    based on the user's query and the graph's execution.
    This is the primary output of the enrichment process.
    """

    is_satisfactory: bool = field(default=None)
    "True if all required fields are well populated, False otherwise"

    reflection_steps_taken: int = field(default=0)
    "Number of times the reflection node has been executed"

    
@dataclass(kw_only=True)
class OutputState:
    """The response object for the end user.

    This class defines the structure of the output that will be provided
    to the user after the graph's execution is complete.
    """

    info: dict[str, Any]
    """
    A dictionary containing the extracted and processed information
    based on the user's query and the graph's execution.
    This is the primary output of the enrichment process.
    """

#------------------ Define the Configuration (Configuration.py) -----------------------
@dataclass(kw_only=True)
class Configuration:
    """The configurable fields for the chatbot."""

    max_search_queries: int = 3  # Max search queries per company
    max_search_results: int = 3  # Max search results per query
    max_reflection_steps: int = 0  # Max reflection steps

    @classmethod
    def from_runnable_config(
        cls, config: Optional[RunnableConfig] = None
    ) -> "Configuration":
        """Create a Configuration instance from a RunnableConfig."""
        configurable = (
            config["configurable"] if config and "configurable" in config else {}
        )
        values: dict[str, Any] = {
            f.name: os.environ.get(f.name.upper(), configurable.get(f.name))
            for f in fields(cls)
            if f.init
        }
        return cls(**{k: v for k, v in values.items() if v})
#-------------------------------------------------------------------------------
from langgraph.graph import START, END, StateGraph
from agent.configuration import Configuration

builder = StateGraph(
    OverallState,
    input=InputState,
    output=OutputState,
    config_schema=Configuration,
)

> LangGraph Nodes

In [None]:
# DAG: Directed Acyclic Graph 
    # Definition : A graph where nodes are connected in a linear, directional manner without forming closed loops .
    # Use Case : Used by LangChain to represent workflows where tasks are executed in a non-repeating, linear sequence .
        ''' Start → Node A → Node B → Node C → End '''
            # No loops : Once a node is processed, it doesn’t revisit previous nodes.
            # Linear flow : Tasks are executed in a strict sequence.
            
            
# DCG: Directed Cyclic Graph --> used by LangGraph to represent the workflow of nodes and edges.
    # Definition : A graph where nodes are connected in a directional manner and can form loops or cycles .
    # Use Case : Used by LangGraph to represent workflows with complex patterns , including loops and conditional branching .
        '''
        Start → Node A → Node B → Node C
                ↑              ↓
                └──────────────┘
        '''
            # Loops allowed : Nodes can revisit previous nodes (e.g., for iterative tasks).
            # Complex flow : Supports conditional edges, loops, and dynamic routing.


# Edges:
    # Simple Edge:
        # A direct connection between two nodes in the graph. Used whrn the flow is fixed and uncontitional.
        ''' Start → Node A → Node B → Node C → End '''
    
    # Conditional Edge:
        # A connection between two nodes that is determined by a condition or decision function.
        '''
            Start → Node A
                    ↓
                ┌─────┴─────┐
            Condition 1   Condition 2
                ↓             ↓
            Node B         Node C
                ↓             ↓
            Node D         Node E
                └─────┬─────┘
                    ↓
                    End
        '''

In [None]:
from langgraph.graph import START, END, StateGraph, Graph # Import the necessary classes
from IPython.display import Image, display
from pydantic import BaseModel, Field
from IPython.display import Image, display

# Define the state as a Pydantic model
class CustomerSupportState(BaseModel):
    query: str = Field(..., description="The customer's query")
    response: str = Field(None, description="The response to the customer")
    issue_type: str = Field(None, description="The type of issue (FAQ, Escalation, Recommendation)")
    escalation_required: bool = Field(False, description="Whether the issue requires escalation")
    product_recommendation: str = Field(None, description="Product recommendation for the customer")

# Create the workflow graph
workflow = StateGraph(CustomerSupportState)


#--------------------------------------------- NODE WITHOUT LLM ---------------------------------------------
#------------------------------------------------------------------------------------------------------------
# Node A: Classify the customer's query
def classify_query(state: CustomerSupportState) -> dict:
    query = state.query.lower()
    if "faq" in query or "how to" in query or "what is" in query:
        return {"issue_type": "FAQ"}
    elif "issue" in query or "problem" in query or "error" in query:
        return {"issue_type": "Escalation"}
    elif "recommend" in query or "suggest" in query:
        return {"issue_type": "Recommendation"}
    else:
        return {"issue_type": "Unknown"}

#--------------------------------------------- TOOL NODE ---------------------------------------------
#-----------------------------------------------------------------------------------------------------
from langchain.tools.retriever import create_retriever_tool
from langgraph.prebuilt import ToolNode

vectorstore=Chroma.from_documents(
    documents=doc_splits,
    collection_name="rag-chrome",
    embedding=embeddings
    
)
retriever=vectorstore.as_retriever()
retriever_tool=create_retriever_tool(
    retriever,
    "retrieve_blog_posts",
    "Search and return information about Lilian Weng blog posts on LLM agents, prompt engineering, and adversarial attacks on LLMs.You are a specialized assistant. Use the 'retriever_tool' **only** when the query explicitly relates to LangChain blog data. For all other queries, respond directly without using any tool. For simple queries like 'hi', 'hello', or 'how are you', provide a normal response.",
    )

tools=[retriever_tool]
retrieve=ToolNode([retriever_tool])



#--------------------------------------------- TOOL NODE WITH FALLBACK ---------------------------------------------
#-------------------------------------------------------------------------------------------------------------------

from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda
from langgraph.prebuilt import ToolNode


def handle_tool_error(state) -> dict:
    """
    Function to handle errors that occur during tool execution.
    Args:
        state (dict): The current state of the AI agent, which includes messages and tool call details.
    Returns:
        dict: A dictionary containing error messages for each tool that encountered an issue.
    """
    # Retrieve the error from the current state
    error = state.get("error")
    # Access the tool calls from the last message in the state's message history
    tool_calls = state["messages"][-1].tool_calls
    # Return a list of ToolMessages with error details, linked to each tool call ID
    return {
        "messages": [
            ToolMessage(
                content=f"Error: {repr(error)}\n please fix your mistakes.",  # Format the error message for the user
                tool_call_id=tc["id"],  # Associate the error message with the corresponding tool call ID
            )
            for tc in tool_calls  # Iterate over each tool call to produce individual error messages
        ]
    }

def create_tool_node_with_fallback(tools: list) -> dict:
    """
    Function to create a tool node with fallback error handling.
    Args:
        tools (list): A list of tools to be included in the node.
    Returns:
        dict: A tool node that uses fallback behavior in case of errors.
    """
    # Create a ToolNode with the provided tools and attach a fallback mechanism
    # If an error occurs, it will invoke the handle_tool_error function to manage the error
    return ToolNode(tools).with_fallbacks(
        [RunnableLambda(handle_tool_error)],  # Use a lambda function to wrap the error handler
        exception_key="error"  # Specify that this fallback is for handling errors
    )

builder = StateGraph(OverallState)
builder.add_node("tools", create_tool_node_with_fallback([retriever_tool, tool_1, tool_2]))


#--------------------------------------------- NODE with LLM 1 ---------------------------------------------
#-----------------------------------------------------------------------------------------------------------
from langgraph.graph import add_messages
from typing import Annotated, Sequence, TypedDict
from langchain_core.messages import BaseMessage

class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], add_messages]
    
def ai_assistant(state:AgentState):
    print("---CALL AGENT---")
    messages = state['messages']
    
    if len(messages)>1:
        last_message = messages[-1]
        question = last_message.content
        prompt=PromptTemplate(
        template="""You are a helpful assistant whatever question has been asked to find out that in the given question and answer.
                        Here is the question:{question}
                        """,
                        input_variables=["question"]
                        )
            
        chain = prompt | llm
    
        response=chain.invoke({"question": question})
        return {"messages": [response]}
    else:
        llm_with_tool = llm.bind_tools(tools)
        response = llm_with_tool.invoke(messages)
        #response=handle_query(messages)
        return {"messages": [response]}



#--------------------------------------------- NODE with LLM + structured output ---------------------------------------------
#-----------------------------------------------------------------------------------------------------------
class grade(BaseModel):
    binary_score:str=Field(description="Relevance score 'yes' or 'no'")
    
def grade_documents(state:AgentState)->Literal["Output_Generator", "Query_Rewriter"]:
    llm_with_structure_op=llm.with_structured_output(grade)
    
    prompt=PromptTemplate(
        template="""You are a grader deciding if a document is relevant to a user’s question.
                    Here is the document: {context}
                    Here is the user’s question: {question}
                    If the document talks about or contains information related to the user’s question, mark it as relevant. 
                    Give a 'yes' or 'no' answer to show if the document is relevant to the question.""",
        input_variables=["context", "question"]
                    )
    chain = prompt | llm_with_structure_op
    
    messages = state["messages"]
    last_message = messages[-1]
    question = messages[0].content
    docs = last_message.content
    scored_result = chain.invoke({"question": question, "context": docs})
    score = scored_result.binary_score

    if score == "yes":
        print("---DECISION: DOCS RELEVANT---")
        return "generator" #this should be a node name
    else:
        print("---DECISION: DOCS NOT RELEVANT---")
        return "rewriter" #this should be a node name


#--------------------------------------------- NODE with LLM + structured output 2 ----------------------------------------------------------
#--------------------------------------------------------------------------------------------------------------------------------------------
query_writer_instructions="""Your goal is to generate targeted web search query.

The query will gather information related to a specific topic.

Topic:
{research_topic}

Return your query as a JSON object:
{{
    "query": "string",
    "aspect": "string",
    "rationale": "string"
}}
"""

def generate_query(state: SummaryState, config: RunnableConfig):
    """ Generate a query for web search """
    
    # Format the prompt
    query_writer_instructions_formatted = query_writer_instructions.format(research_topic=state.research_topic)

    # Generate a query
    configurable = Configuration.from_runnable_config(config)
    llm_json_mode = ChatOllama(model=configurable.local_llm, temperature=0, format="json")
    result = llm_json_mode.invoke(
        [SystemMessage(content=query_writer_instructions_formatted),
        HumanMessage(content=f"Generate a query for web search:")]
    )   
    query = json.loads(result.content)
    
    return {"search_query": query['query']}



#------------------------------------------------ NODE with LLM + structured output 3 ----------------------------------------------------------
#-----------------------------------------------------------------------------------------------------------------------------------------------

RECOMMENDATION_PROMPT_2 = """
You are a specialized travel recommendation assistant. 
Generate at least 10 unique recommendations for the user. 
Output your response as a list of dictionaries, each containing the fields: 
  - "key": A short label, e.g. "Crime Rate"
  - "value": A concise recommendation

For example:
[
    {"key": "Crime Rate", "value": "The city is generally safe but beware of pickpockets in tourist areas."},
    {"key": "Weather Advice", "value": "Spring is mild; pack light jackets and an umbrella."}
]

### User Query:
{query}
"""

def recommendations_node_2(state: OverallState) -> OverallState:
    import openai
    
    # If you haven't set up your API key globally, do so here:
    # openai.api_key = "YOUR_OPENAI_API_KEY"
    
    client = openai  # or adapt to your environment if needed

    # Combine all user messages into a single query
    all_messages = "\n".join([message.content for message in state.messages])
    preferences_text = "\n".join([f"{key}: {value}" for key, value in state.user_preferences.items()])
    query = f"{all_messages}\n\nUser Preferences:\n{preferences_text}"

    # Define the structured response format with a JSON Schema
    completion = client.chat.completions.create(
        model="gpt-4o",  # Replace with a valid model you have access to.
        messages=[
            {"role": "system", "content": RECOMMENDATION_PROMPT_2},
            {"role": "user", "content": query},
        ],
        # The 'response_format' parameter needs 'json_schema' -> 'name' + 'schema'
        response_format={
            "type": "json_schema",
            "json_schema": {
                "name": "recommendation_schema",
                "schema": {
                    "type": "object",
                    "properties": {
                        "recommendations": {
                            "type": "array",
                            "items": {
                                "type": "object",
                                "properties": {
                                    "key": {
                                        "type": "string",
                                        "description": "Short label for the recommendation"
                                    },
                                    "value": {
                                        "type": "string",
                                        "description": "Concise recommendation content"
                                    }
                                },
                                "required": ["key", "value"],
                                "additionalProperties": False
                            },
                            "description": "A list of travel recommendations."
                        }
                    },
                    "required": ["recommendations"],
                    "additionalProperties": False
                }
            },
        },
    )

    # Parse and return the generated structured output
    try:
        structured_output = completion.choices[0].message.content
        parsed_output = json.loads(structured_output)
        recommendation_list = parsed_output["recommendations"]  # This is the list of dictionaries
        transformed_list = [{item["key"]: item["value"]} for item in recommendation_list]
        
        state.recommendations = transformed_list
        
        return state 
    
    except Exception as e:
        print(f"Error occurred: {e}")
        return {"error": "Failed to generate recommendations."}



#--------------------------------------------- NODE with LLM + Blind Tools (Used for User call) ---------------------------------------------
#--------------------------------------------------------------------------------------------------------------------------------------------
from langchain_core.tools import tool
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable
# from langchain_aws import ChatBedrock
import boto3
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages
from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda
from langgraph.prebuilt import ToolNode
from langgraph.prebuilt import tools_condition

@tool
def compute_savings(monthly_cost: float) -> float:
    """
    Tool to compute the potential savings when switching to solar energy based on the user's monthly electricity cost.
    
    Args:
        monthly_cost (float): The user's current monthly electricity cost.
    
    Returns:
        dict: A dictionary containing:
            - 'number_of_panels': The estimated number of solar panels required.
            - 'installation_cost': The estimated installation cost.
            - 'net_savings_10_years': The net savings over 10 years after installation costs.
    """
    def calculate_solar_savings(monthly_cost):
        # Assumptions for the calculation
        cost_per_kWh = 0.28  
        cost_per_watt = 1.50  
        sunlight_hours_per_day = 3.5  
        panel_wattage = 350  
        system_lifetime_years = 10  
        # Monthly electricity consumption in kWh
        monthly_consumption_kWh = monthly_cost / cost_per_kWh
        
        # Required system size in kW
        daily_energy_production = monthly_consumption_kWh / 30
        system_size_kW = daily_energy_production / sunlight_hours_per_day
        
        # Number of panels and installation cost
        number_of_panels = system_size_kW * 1000 / panel_wattage
        installation_cost = system_size_kW * 1000 * cost_per_watt
        
        # Annual and net savings
        annual_savings = monthly_cost * 12
        total_savings_10_years = annual_savings * system_lifetime_years
        net_savings = total_savings_10_years - installation_cost
        
        return {
            "number_of_panels": round(number_of_panels),
            "installation_cost": round(installation_cost, 2),
            "net_savings_10_years": round(net_savings, 2)
        }
    # Return calculated solar savings
    return calculate_solar_savings(monthly_cost)

# Define the state for the workflow
class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

# Define the assistant class used for invoking the runnable
class Assistant:
    def __init__(self, runnable: Runnable):
        # Initialize with the runnable that defines the process for interacting with the tools
        self.runnable = runnable
    def __call__(self, state: State):
        while True:
            # Invoke the runnable with the current state (messages and context)
            result = self.runnable.invoke(state)
            
            # If the tool fails to return valid output, re-prompt the user to clarify or retry
            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                # Add a message to request a valid response
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                # Break the loop when valid output is obtained
                break
        # Return the final state after processing the runnable
        return {"messages": result}


llm = ChatOpenAI(
        api_key=os.getenv("DEEPSEEK_API_KEY"),
        model="deepseek-chat",
        base_url="https://api.deepseek.com",
        streaming=True,
        callbacks=AsyncCallbackManager([StreamingStdOutCallbackHandler()]),
    )

primary_assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            '''You are a helpful customer support assistant for Solar Panels Belgium.
            You should get the following information from them:
            - monthly electricity cost
            If you are not able to discern this info, ask them to clarify! Do not attempt to wildly guess.
            After you are able to discern all the information, call the relevant tool.
            ''',
        ),
        ("placeholder", "{messages}"),
    ]
)

# Define the tools the assistant will use
part_1_tools = [
    compute_savings
]

# Bind the tools to the assistant's workflow
part_1_assistant_runnable = primary_assistant_prompt | llm.bind_tools(part_1_tools, tool_choice="any")

builder = StateGraph(State)
builder.add_node("assistant", Assistant(part_1_assistant_runnable))




#--------------------------------------- NODE with LLM + Tools with multiple parameters ----------------------------
#-------------------------------------------------------------------------------------------------------------------


def accommodation_finder_node(state: OverallState) -> OverallState:
    """
    This node extracts accommodation details from the user's query in state.messages
    and returns a structured output that can be passed to the booking tool.
    """

    class AccommodationOutput(BaseModel):
        location: str = Field(..., description="The exact location or neighborhood where the traveler wants to stay (e.g., 'Brooklyn').")
        checkin_date: str = Field(..., description="The check-in date in YYYY-MM-DD format.")
        checkout_date: str = Field(..., description="The check-out date in YYYY-MM-DD format.")
        adults: int = Field(default=2, description="The number of adult guests.")
        rooms: int = Field(default=1, description="The number of rooms.")
        currency: str = Field(default="USD", description="The currency for the prices.")
        
    # Create a new LLM with structured output
    llm_with_structure = llm.with_structured_output(AccommodationOutput)

    # Define the prompt template
    prompt = PromptTemplate(
        template="""
        You are an advanced travel planner assistant. Your task is to extract accommodation details
        from the traveler's query. Use the following information to generate a structured output for
        booking accommodations:

        ### Traveler Query:
        {query}

        ### Instructions:
        1. Extract the exact location or neighborhood where the traveler wants to stay (e.g., "Brooklyn").
           - If the traveler does not specify a location, use the city or city code provided in the state.
        2. Extract the check-in and check-out dates from the query.
           - If the dates are not explicitly mentioned, use the default dates from the state.
        3. Extract the number of adults and rooms from the query.
           - If not specified, use the default values: 1 adult and 1 room.
        4. Use the default currency 'USD' unless specified otherwise.
        5. Return the structured output in the following format:
           - location: The exact location or neighborhood.
           - checkin_date: The check-in date in YYYY-MM-DD format.
           - checkout_date: The check-out date in YYYY-MM-DD format.
           - adults: The number of adult guests.
           - rooms: The number of rooms.
           - currency: The currency for the prices.

        ### Example Output:
        - location: "Brooklyn"
        - checkin_date: "2023-12-01"
        - checkout_date: "2023-12-10"
        - adults: 2
        - rooms: 1
        - currency: "USD"
        """,
        input_variables=["query"]
    )

    # Create the chain
    chain = prompt | llm_with_structure

    # Extract the user's query from state.messages
    query = state.messages[-1].content  # Assuming the last message is the user's query

    # Invoke the chain to generate the structured output
    structured_output = chain.invoke({"query": query})

    # Call Google Flights Search Tool        
    booking_search_input = BookingSearchInput(
        location=structured_output.location,
        checkin_date=structured_output.checkin_date,
        checkout_date=structured_output.checkout_date,
        adults=structured_output.adults,
        rooms=structured_output.rooms,
        currency=structured_output.currency,
    )

    booking_results = booking_tool.func(booking_search_input)
    
    # Update the state with the structured output
    state.accommodation = booking_results

    # Return the updated state
    return state



#--------------------------------------------- NODE with Agent 1 ---------------------------------------------
#-------------------------------------------------------------------------------------------------------------
from langchain.agents import Tool, create_react_agent

# Define a REACT-based agent node
def react_agent_node(state: CustomerSupportState):
    tools = [retriever_tool]  # Add your tool(s) here
    prompt_template = """You are a reasoning and acting agent.
    Use the tools available to gather or verify information as needed.
    Respond directly if no tools are required.

    Question: {query}
    """
    react_agent = create_react_agent(
        tools=tools,
        prompt_template=prompt_template,
        llm=llm,
    )
    
    agent_executor = AgentExecutor(agent=react_agent, tools=tools, verbose=True)
    # Execute the REACT agent
    query = state.query
    response = agent_executor.invoke({"query": query})
    state.response = response
    return state


#--------------------------------------------- NODE with Agent 2 ---------------------------------------------
#-------------------------------------------------------------------------------------------------------------
import functools

def agent_node(state: OverallState, agent, name: str) -> OverallState:
    """
    Generic node function for an agent.
    - `state`: The current state of the workflow.
    - `agent`: The agent or function to process the state.
    - `name`: The name of the agent (for logging or identification).
    """
    # Invoke the agent with the current state
    result = agent.invoke(state)
    
    # Update the state with the agent's output
    return {
        "messages": state["messages"] + [HumanMessage(content=result["messages"][-1].content, name=name)],
        "selected_agents": state["selected_agents"],
        "current_agent_idx": state["current_agent_idx"] + 1
    }

# wrap the agent in a node
def query_param_generator_node(agent_node):

    query_param_generator_agent = create_react_agent(llm, tools=[retriever_tool], prompt=prompt)
    query_param_node = functools.partial(agent_node, agent=query_param_generator_agent, name="query_param_generator")
    return query_param_node



#--------------------------------------------- NODE with Agent 3 ---------------------------------------------
#-----------------------------------------------------------------------------------------------------------
# more advanced node

from langchain.agents import initialize_agent, Tool, AgentExecutor
from langchain.prompts import PromptTemplate
from langchain_core.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.agents import AgentType, initialize_agent, AgentExecutor


def advanced_multi_tool_agent_node(state: CustomerSupportState):
    """
    An advanced agent that uses multiple tools to handle queries.
    """
    tools = [retriever_tool]  # Add more tools as needed

    # Define the agent's prompt
    prompt_template = """
    You are an advanced agent with access to multiple tools. Your task is to resolve customer queries by:
    1. Identifying the problem or request.
    2. Using the tools provided to gather additional information if needed.
    3. Synthesizing the information into a clear, concise response.

    You can chain tools if required. If you are unsure, respond with 'I need more details.'

    Query: {query}
    """
    llm = ChatOpenAI(
        api_key=os.getenv("DEEPSEEK_API_KEY"),
        model="deepseek-chat",
        base_url="https://api.deepseek.com",
        streaming=True,
        callbacks=AsyncCallbackManager([StreamingStdOutCallbackHandler()]),
    )
    
    # Initialize the agent
    advanced_agent = initialize_agent(
        tools=tools,
        llm=llm,
        agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
        agent_kwargs={"prompt_template": prompt_template},
        verbose=True,
    )

    agent_executor = AgentExecutor(agent=advanced_agent, tools=[tool_1, tool_2], verbose=True, 
                               return_intermediate_steps=True, handle_parsing_errors=True)

    # Execute the agent
    query = state.query
    try:
        response = agent_executor.invoke({"query": query})
        state.response = response
    except Exception as e:
        state.response = f"Error: {str(e)}"
    return state




#--------------------------------------------- NODE with Custom Agent --------------------------------------
#-----------------------------------------------------------------------------------------------------------

from langchain.agents import BaseAgent
from typing import Optional

class AdvancedCustomAgent(BaseAgent):
    """
    Custom advanced agent with LLM, human-in-the-loop, and iterative reasoning.
    """
    def __init__(self, llm, tools=None, max_iterations: int = 3):
        self.llm = llm
        self.tools = tools or []
        self.max_iterations = max_iterations

    async def run(self, query: str, human_review: bool = False, **kwargs) -> str:
        """
        Executes the custom agent's workflow.
        
        Args:
            query (str): User's query.
            human_review (bool): If True, adds human-in-the-loop for review.
        
        Returns:
            str: Final response.
        """
        response = ""
        iteration = 0

        while iteration < self.max_iterations:
            iteration += 1
            print(f"--- Iteration {iteration}/{self.max_iterations} ---")

            # Generate a response using LLM
            prompt = f"""
            You are an advanced customer support agent. Use the tools provided to solve the query. 
            Tools: {', '.join([tool.name for tool in self.tools]) if self.tools else 'None'}

            Query: {query}

            If you need clarification or further details, request them from the user.
            """
            try:
                response = await self.llm.apredict(prompt)
                print(f"Generated Response: {response}")

                # Check if human review is required
                if human_review:
                    review = input("Do you approve this response? (yes/no): ")
                    if review.lower() == "yes":
                        break
                    else:
                        query = input("Provide additional details or corrections: ")
                else:
                    break

            except Exception as e:
                response = f"Error: {str(e)}"
                break

        return response


# Define the custom agent node
async def custom_agent_node(state: CustomerSupportState):
    """
    Node with a custom advanced agent that uses LLM and human-in-the-loop.
    """
    custom_agent = AdvancedCustomAgent(llm=llm, tools=[retriever_tool], max_iterations=3)
    query = state.query

    # Human-in-the-loop enabled for critical queries
    response = await custom_agent.run(query, human_review=True)
    state.response = response
    return state


#--------------------------------------------- Supervisor NODE with Router + Command ---------------------------------------------
#-----------------------------------------------------------------------------------------------------------------------
# Agent Supervisor Node
from typing import Literal
from typing_extensions import TypedDict

from langchain_anthropic import ChatAnthropic
from langgraph.graph import MessagesState
from langgraph.types import Command

'''
User
 ├── Supervisor
 │     ├── [direct] Agent 1
 │     ├── [conditional] Agent 2 (if condition A is met)
 │     └── [direct] Agent 3
 │           ├── [conditional] Sub-Agent 3.1 (if condition B is met)
 │           └── [direct] Sub-Agent 3.2
 │
 └── Feedback Loop (User <--> Supervisor)

'''
members = ["researcher", "coder"]

def make_supervisor_node(llm: BaseChatModel, members: list[str]) -> str:
    options = ["FINISH"] + members
    system_prompt = (
        "You are a supervisor tasked with managing a conversation between the"
        f" following workers: {members}. Given the following user request,"
        " respond with the worker to act next. Each worker will perform a"
        " task and respond with their results and status. When finished,"
        " respond with FINISH."
    )

    class Router(TypedDict):
        """Worker to route to next. If no workers needed, route to FINISH."""
        next: Literal[*options]

    def supervisor_node(state: MessagesState) -> Command[Literal[*members, "__end__"]]:
        """An LLM-based router."""
        messages = [
            {"role": "system", "content": system_prompt},
        ] + state["messages"]
        response = llm.with_structured_output(Router).invoke(messages)
        goto = response["next"]
        if goto == "FINISH":
            goto = END

        return Command(goto=goto)

    return supervisor_node

llm = ChatAnthropic(model="claude-3-5-sonnet-latest")
supervisor_node = make_supervisor_node(llm, ["search", "web_scraper"])

builder = StateGraph(MessagesState)
builder.add_node("supervisor", supervisor_node)
builder.add_edge(START, "supervisor")


#--------------------------------------------- More Advanced Supervisor NODE -------------------------------------------
#-----------------------------------------------------------------------------------------------------------------------

from langchain_openai import ChatOpenAI
from langgraph_supervisor import create_supervisor
from langgraph.prebuilt import create_react_agent

model = ChatOpenAI(model="gpt-4o")

math_agent = create_react_agent(
    model=model,
    tools=[tool_1, tool_2],
    name="math_expert",
    prompt="You are a math expert. Always use one tool at a time."
)

research_agent = create_react_agent(
    model=model,
    tools=[tool_1, tool_2],
    name="research_expert",
    prompt="You are a world class researcher with access to web search. Do not do any math."
)

# Create supervisor workflow
workflow = create_supervisor(
    [research_agent, math_agent],
    model=model,
    output_mode = "last_message",    # what we pass back from agent to supervisor. we dont need to always state this
    prompt=(
        "You are a team supervisor managing a research expert and a math expert. "
        "For current events, use research_agent. "
        "For math problems, use math_agent."
    )
)

# Compile and run
app = workflow.compile()
result = app.invoke({
    "messages": [
        {
            "role": "user",
            "content": "what's the combined headcount of the FAANG companies in 2024?"
        }
    ]
})


#--------------------------------------------- Supervisor managing other Supervisors------------------------------------
#-----------------------------------------------------------------------------------------------------------------------

math_supervisor = create_supervisor(
    [research_agent, math_agent],
    model=model,
    output_mode = "last_message",    # what we pass back from agent to supervisor. we dont need to always state this
    prompt=(
        "You are a team supervisor managing a research expert and a math expert. "
        "For current events, use research_agent. "
        "For math problems, use math_agent."
    )
).compile(name="math_supervisor")


research_supervisor = create_supervisor(
    [research_agent, math_agent],
    model=model,
    output_mode = "last_message",    # what we pass back from agent to supervisor. we dont need to always state this
    prompt=(
        "You are a team supervisor managing a research expert and a math expert. "
        "For current events, use research_agent. "
        "For math problems, use math_agent."
    )
).compile(name="research_supervisor")

workflow = create_supervisor(
    [math_supervisor, research_supervisor],
    supervisor_name = "top_level_supervisor",
    model=model,
    prompt=(
        "You are a team supervisor managing a research expert and a math expert. "
        "For current events, use research_agent. "
        "For math problems, use math_agent."
    )
).compile()




#--------------------------------------------- NODE with Command (used with tool node) ---------------------------------------------
#-----------------------------------------------------------------------------------------------------------------------
llm = llm.bind_tools([retriever_tool, tool_1, tool_2])

def call_model(state:OverallState) -> Command[Literal['tools', END]]:
    message = state.messages[-1].content
    response = llm.invoke(message)
    if len(response.tool_calls) > 0:
        next_node = "tools"
    else:
        next_node = END
    return Command(goto=next_node, update={"messages": response})   # update helps to update the state with the response from the model. You dont need always need to do this

workflow = StateGraph(OverallState)
workflow.add_node("call_model", call_model)
workflow.add_node("tools", create_tool_node_with_fallback([retriever_tool, tool_1, tool_2]))
workflow.add_edge(START, "call_model")
workflow.add_edge("call_model", "tools")
graph = workflow.compile()


# If you are using subgraphs, you might want to navigate from a node a subgraph to a different subgraph 
# (i.e. a different node in the parent graph).
def my_node(state: State) -> Command[Literal["my_other_node"]]:
    return Command(
        update={"foo": "bar"},
        goto="other_subgraph",  # where `other_subgraph` is a node in the parent graph
        graph=Command.PARENT
    )

#--------------------------------------------- LangGraph Workflow ---------------------------------------------   
#-----------------------------------------------------------------------------------------------------------
# Add nodes to the workflow
workflow.add_node("Classify Query", classify_query)
workflow.add_node("End Conversation", end_conversation)

# Define edges between nodes
workflow.add_edge(START, "Classify Query")
workflow.add_edge("Classify Query", "Answer FAQ")
workflow.add_edge("Recommend Products", "End Conversation")
workflow.add_edge("End Conversation", END)

# Set entry and finish points
workflow.set_entry_point("Classify Query")  # Start the conversation. Use this only when START is not used.
workflow.set_finish_point("End Conversation")   # End the conversation. Use this only when END is not used.

# Compile the workflow
app = workflow.compile()

# Test the workflow with a sample query
initial_state = CustomerSupportState(query="which do you recommend between product A and product B?")
result = app.invoke(initial_state)


> LangGraph Graph Call

In [None]:
from langgraph.errors import GraphRecursionError
from langchain.schema import HumanMessage, SystemMessage, AIMessage  # Import AIMessage for assistant responses

#----------------------------------------------------- Graoh Invoke-----------------------------------

# **Input Collection**
user_input = "I want to travel from New York to Paris on 2023-12-15 and return on 2023-12-22. There are 2 adults and 1 child. My budget is $5000. I need 1 room. I prefer a hotel with free breakfast and a swimming pool. I also want to visit the museums and enjoy local cuisine, and go to the club at night. I might also want a massage."    

# **Input State**
input_state = {"messages": [HumanMessage(content=user_input)]}

# **Graph Invocation**
output = graph.invoke(input_state, {"recursion_limit": 300})



#--------------------------------------------------------- Graph Stream-----------------------------------
# Stream the Graph
# Define the input data
input_data = {
    "messages": [HumanMessage(content="I want to travel from New York to Paris on 2023-12-15 and return on 2023-12-22. There are 2 adults and 1 child. My budget is $5000. I need 1 room. I prefer a hotel with free breakfast and a swimming pool. I also want to visit the museums and enjoy local cuisine, and go to the club at night. I might also want a massage.")]
}

# Define the configuration (e.g., recursion limit)
config = {"recursion_limit": 10}

# Stream the execution
events = graph.stream(input_data, config)
for event in events:
    print(event)
    
#--------------------------------------------------------- Graph Stream with Memory-----------------------------------


thread_config = {"configurable": {"thread_id": "1"}}

try:
    for event in graph.stream(input_state, thread_config, stream_mode = "values", ):
        messages = event['messages'][-1]
        # Filter and print only the AIMessage content
        if isinstance(messages, AIMessage):
            print(messages.content)

except GraphRecursionError:
    print("Recursion Error")


#--------------------------------------------------------- Graph Stream with pretty print-----------------------------------

from langgraph.pregel.remote import RemoteGraph
from langchain_core.messages import convert_to_messages
from langchain_core.messages import HumanMessage, SystemMessage

graph_name = "task_maistro" 

# Connect to the deployment
remote_graph = RemoteGraph(graph_name, url=local_deployment_url)

user_input = "Hi I'm Lance. I live in San Francisco with my wife and have a 1 year old."
config = {"configurable": {"user_id": "Test-Deployment-User"}}
for chunk in remote_graph.stream({"messages": [HumanMessage(content=user_input)]}, stream_mode="values", config=config):
    convert_to_messages(chunk["messages"])[-1].pretty_print()
    
    

> Langchain Tool Call

In [None]:
#-------------------------------------------------- Tool with Input Class and Tool Class ----------------------------------
#--------------------------------------------------------------------------------------------------------------------------
# Define the input class
class MyToolInput(BaseModel):
    param1: str
    param2: int

# Define the tool class
class MyTool:
    def __call__(self, input: MyToolInput) -> str:
        # Tool logic here
        return f"Processed: {input.param1}, {input.param2}"

my_tool = Tool(
    name="my_tool",
    func=MyTool(),
    description="Tool description.",
    args_schema=MyToolInput
)

# Call the tool
result = my_tool.func(MyToolInput(param1="value1", param2=42))
print(result)



#-------------------------------------------------- Tool Using @tool Decorator ----------------------------------
#----------------------------------------------------------------------------------------------------------------
from langchain.tools import tool

@tool
def my_tool(param1: str, param2: int) -> str:
    """Tool description."""
    return f"Processed: {param1}, {param2}"

# Call the tool
result = my_tool({"param1": "value1", "param2": 42})
print(result)



#-------------------------------- Tool with Structured Inputs Using "BaseTool" ----------------------------------
#----------------------------------------------------------------------------------------------------------------
from langchain.tools import BaseTool
from pydantic import BaseModel

class MyToolInput(BaseModel):
    param1: str
    param2: int

class MyTool(BaseTool):
    name: str = "my_tool"
    description: str = "Tool description."

    def _run(self, param1: str, param2: int) -> str:
        """Tool logic."""
        return f"Processed: {param1}, {param2}"

# Create an instance of the tool
my_tool = MyTool()

# Call the tool
result = my_tool.run({"param1": "value1", "param2": 42})
print(result)



#--------------------------------------------------  Tool call with Agent ----------------------------------
#-----------------------------------------------------------------------------------------------------------
from langchain.agents import initialize_agent, Tool

def my_tool_func(param1: str, param2: int) -> str:
    """Tool logic."""
    return f"Processed: {param1}, {param2}"

# Create the tool
my_tool = Tool(
    name="my_tool",
    func=my_tool_func,
    description="Tool description."
)

# Add the tool to an agent
tools = [my_tool]
agent = initialize_agent(tools, llm, agent="zero-shot-react-description")   # you can also use react agent here or any agent

# Call the tool via the agent
result = agent.invoke("Call my_tool with param1='value1' and param2=42")
print(result)

> Langchain Prompt Templates

In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain.prompts import PromptTemplate

#--------------------------------------------- Style 1 ---------------------------------------------
#---------------------------------------------------------------------------------------------------

# Add a node for a model to generate a query based on the question and schema
query_gen_system = """You are a SQL expert with a strong attention to detail.

Given an input question, output a syntactically correct SQLite query to run, then look at the results of the query and return the answer.
DO NOT call any tool besides SubmitFinalAnswer to submit the final answer.
"""

query_gen_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", query_gen_system),
        ("placeholder", "{messages}"),
    ]
)


#--------------------------------------------- Style 2 ---------------------------------------------
#---------------------------------------------------------------------------------------------------
primary_assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            '''You are a helpful customer support assistant for Solar Panels Belgium.
            You should get the following information from them:
            - monthly electricity cost
            If you are not able to discern this info, ask them to clarify! Do not attempt to wildly guess.
            After you are able to discern all the information, call the relevant tool.
            ''',
        ),
        ("placeholder", "{messages}"),
    ]
)


#--------------------------------------------- Style 3 ---------------------------------------------
#---------------------------------------------------------------------------------------------------
template = ''' 
You are a travel suggestion agent. Answer the user's questions based on their travel preferences. 
If you need to find information about a specific destination, use the search_tool. Understand that the information was retrieved from the web,
interpret it, and generate a response accordingly.

Answer the following questions as best as you can. You have access to the following tools:
{tools}

Use the following format:

"Question": the input question you must answer
"Thought": your reasoning about what to do next
"Action": the action you should take, one of [{tool_names}] (if no action is needed, write "None")
"Action Input": the input to the action (if no action is needed, write "None")
"Observation": the result of the action (if no action is needed, write "None")
"Thought": your reasoning after observing the action
"Final Answer": the final answer to the original input question

Ensure every Thought is followed by an Action, Action Input, and Observation. If no tool is needed, explicitly write "None" for Action, Action Input, and Observation.

Begin!
Question: {input}
Thought: {agent_scratchpad}
'''

prompt = PromptTemplate.from_template(template)

agent_executor = AgentExecutor(
    agent=search_agent,
    tools=tools,
    verbose=True,
    return_intermediate_steps=False,
    handle_parsing_errors=True,
)

response = agent_executor.invoke({
    "input": latest_query,
    "agent_scratchpad": ""  # Initialize with an empty scratchpad
})

> Display or Visualize LangGraph

In [36]:
#--------------------------------------------------------
print(app.get_graph().draw_mermaid())       # Converting a Graph to a Mermaid Diagram


#-------------------------Using Mermaid.Ink--------------------------------
from IPython.display import Image, display
from langchain_core.runnables.graph import CurveStyle, MermaidDrawMethod, NodeStyles

display(
    Image(
        app.get_graph().draw_mermaid_png(
            draw_method=MermaidDrawMethod.API,
        )
    )
)


#-------------------------Using Mermaid + Pyppeteer--------------------------------
import nest_asyncio

nest_asyncio.apply()  # Required for Jupyter Notebook to run async functions

display(
    Image(
        app.get_graph().draw_mermaid_png(
            curve_style=CurveStyle.LINEAR,
            node_colors=NodeStyles(first="#ffdfba", last="#baffc9", default="#fad7de"),
            wrap_label_n_words=9,
            output_file_path=None,
            draw_method=MermaidDrawMethod.PYPPETEER,
            background_color="white",
            padding=10,
        )
    )
)


#-------------------------Using Graphviz--------------------------------
%pip install pygraphviz

display(Image(app.get_graph().draw_png()))

%%{init: {'flowchart': {'curve': 'linear'}}}%%
graph TD;
	__start__([<p>__start__</p>]):::first
	Node_A(Node A)
	Node_B(Node B)
	__end__([<p>__end__</p>]):::last
	Node_A --> Node_B;
	Node_B --> __end__;
	__start__ --> Node_A;
	classDef default fill:#f2f0ff,line-height:1.2
	classDef first fill-opacity:0
	classDef last fill:#bfb6fc



> LangGraph Deployment

In [None]:
# Build the graph
agent = workflow.compile()


# requirements.txt
langgraph==0.1.0
langchain_core==0.1.0


# Langgraph.json
{
  "name": "todo_agent",
  "description": "A simple ToDo list agent",
  "graphs": {
    "todo_agent": {
      "entrypoint": "agent",
      "file": "agent.py"
    }
  }
}


# Dockerfile
FROM python:3.9-slim

WORKDIR /app
COPY . .

RUN pip install -r requirements.txt

CMD ["langgraph", "serve", "--host", "0.0.0.0", "--port", "8123"]



# or
# Use docker-compose.yml to create containers for Redis, PostgreSQL, and the LangGraph API.
$ cd module-6/deployment
$ docker compose up

# Building Docker Image
$ langgraph build -t todo_agent

# Run the Docker Image
docker run -p 8123:8123 todo_agent

In [None]:
#-------------------------Deployment Setup--------------------------------
# Use docker-compose.yml to create containers for Redis, PostgreSQL, and the LangGraph API.
$ cd module-6/deployment
$ docker compose up

# Building Docker Image
$ langgraph build -t todo_agent

# Run the Docker Image
docker run -p 8123:8123 todo_agent

#-------------------------Assistants-------------------------------------
#---------------------
# Creating Assistants (Connect to the Deployment)
#---------------------
from langgraph_sdk import get_client
client = get_client(url="http://localhost:8123")

# Create a personal assistant
personal_assistant = await client.assistants.create(
    "task_maistro",
    config={"configurable": {"todo_category": "personal"}}
)

#---------------------
# Updating Assistants
#---------------------
personal_assistant = await client.assistants.update(
    personal_assistant["assistant_id"],
    config={"configurable": {"todo_category": "personal", "user_id": "lance"}}
)

#---------------------
# Searching and Deleting Assistants
#---------------------
assistants = await client.assistants.search()
for assistant in assistants:
    print(assistant['assistant_id'], assistant['config'])
    

await client.assistants.delete("assistant_id")  # Delete an assistant


#-------------------------Threads and Runs--------------------------------

#---------------------
# Creating Threads
#---------------------
thread = await client.threads.create()


#---------------------
# Running a Graph
#---------------------
run = await client.runs.create(
    thread["thread_id"],
    "task_maistro",
    input={"messages": [HumanMessage(content="Add a ToDo")]},
    config={"configurable": {"user_id": "Test"}}
)

#---------------------
# Streaming Runs
#---------------------
async for chunk in client.runs.stream(
    thread["thread_id"],
    "task_maistro",
    input={"messages": [HumanMessage(content="What ToDo should I focus on?")]},
    stream_mode="messages-tuple"
):
    if chunk.event == "messages":
        print(chunk.data)



#---------------------
# Background Runs
#---------------------
run = await client.runs.create(thread["thread_id"], "task_maistro", input={"messages": [...]})
print(await client.runs.get(thread["thread_id"], run["run_id"]))


#-------------------------Double Texting Strategies--------------------------------

#---------------------
# Reject
#---------------------
await client.runs.create(
    thread["thread_id"],
    "task_maistro",
    input={"messages": [HumanMessage(content="New ToDo")]},
    multitask_strategy="reject" # Reject the current task if another task is already in progress ()
)


#---------------------
# Enqueue
#---------------------
await client.runs.create(
    thread["thread_id"],
    "task_maistro",
    input={"messages": [HumanMessage(content="New ToDo")]},
    multitask_strategy="enqueue"    # Enqueue new runs (or Interrupt, Rollback etc.)
)


#-------------------------Human-in-the-Loop--------------------------------

#---------------------
# Forking Threads
#---------------------
copied_thread = await client.threads.copy(thread["thread_id"])

#---------------------
# Editing State
#---------------------
forked_input = {"messages": HumanMessage(content="Updated ToDo", id=message_id)}
await client.threads.update_state(
    thread["thread_id"],
    forked_input,
    checkpoint_id=checkpoint_id
)




> LangGraph Studio

In [None]:
# Step 1: Create the langgraph.json file (see example)
{
    "dockerfile_lines": [], 
    "graphs": {
        "chat": "./src/react_agent/graph.py:graph",
        "researcher": "./src/react_agent/graph.py:researcher",
        "agent": "./src/react_agent/graph.py:agent",
    },
    "env": [
        "OPENAI_API_KEY",
        "WEAVIATE_API_KEY",
        "WEAVIATE_URL",
        "ANTHROPIC_API_KEY",
        "ELASTIC_API_KEY"
    ],
    "python_version": "3.11",
    "dependencies": [
        "."
    ]
}

# Step 2: Run the langgraph-cli command
!pip install "langgraph-cli[inmem]==0.1.55" # Install the langgraph-cli package

# Step 3: Move to the directory containing the langgraph.json file

# Step 4: Install the dependencies
    # If you are using requirements.txt:
    python -m pip install -r requirements.txt

    If you are using pyproject.toml or setuptools:
    # python -m pip install -e .

# Step 5: Run the LangGraph server
langgraph dev # start a local development server


> LangGraph Conditional Edges

In [None]:
#------------------------------------Basic Conditional Edge-------------------------------------
#-----------------------------------------------------------------------------------------------
from langgraph.graph import StateGraph, START, END
from typing import TypedDict

# Define the state structure
class State(TypedDict):
    value: int
    query: str
    response: str
    
# Define the conditional function
def conditional_edge(state: State) -> str:
    if state["value"] > 10:
        return "node_b"
    else:
        return "__end__"

# Define the graph
builder = StateGraph(State)
builder.add_node("node_a", lambda state: {"value": state["value"] + 1})
builder.add_node("node_b", lambda state: {"value": state["value"] - 1})
builder.add_edge(START, "node_a")
builder.add_conditional_edges("node_a", conditional_edge)
builder.add_edge("node_b", "node_a")
graph = builder.compile()

# Test the graph
initial_state = {"value": 5}
result = graph.invoke(initial_state)


#------------------------------------Router with Multiple Conditions-------------------------------------
#---------------------------------------------------------------------------------------------------------
members = ["researcher", "coder"]

def make_supervisor_node(llm: BaseChatModel, members: list[str]) -> str:
    options = ["FINISH"] + members
    system_prompt = (
        "You are a supervisor tasked with managing a conversation between the"
        f" following workers: {members}. Given the following user request,"
        " respond with the worker to act next. Each worker will perform a"
        " task and respond with their results and status. When finished,"
        " respond with FINISH."
    )

    class Router(TypedDict):
        """Worker to route to next. If no workers needed, route to FINISH."""
        next: Literal[*options]

    def supervisor_node(state: MessagesState) -> Command[Literal[*members, "__end__"]]:
        """An LLM-based router."""
        messages = [
            {"role": "system", "content": system_prompt},
        ] + state["messages"]
        response = llm.with_structured_output(Router).invoke(messages)
        goto = response["next"]
        if goto == "FINISH":
            goto = END

        return Command(goto=goto)

    return supervisor_node

llm = ChatAnthropic(model="claude-3-5-sonnet-latest")
supervisor_node = make_supervisor_node(llm, ["search", "web_scraper"])

builder = StateGraph(MessagesState)
builder.add_node("supervisor", supervisor_node)
builder.add_node("search", search_node)
builder.add_node("web_scraper", web_scraper_node)
builder.add_edge(START, "supervisor")
graph = builder.compile()


#------------------------------------Using Tool Conditions-------------------------------------
#---------------------------------------------------------------------------------------------------
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph, START
from langgraph.prebuilt import tools_condition

builder = StateGraph(State)


# Define nodes: these do the work
builder.add_node("assistant", Assistant(part_1_assistant_runnable))
builder.add_node("tools", ToolNode([retriever_tool]))
# Define edges: these determine how the control flow moves
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
    "assistant",
    tools_condition,
)
builder.add_edge("tools", "assistant")

# The checkpointer lets the graph persist its state
# this is a complete memory for the entire graph.
memory = MemorySaver()
part_1_graph = builder.compile(checkpointer=memory)



#------------------------------------Using Tool conditions from Scratch-------------------------------------
#-----------------------------------------------------------------------------------------------------------
def tools_condition(
    state: Union[list[AnyMessage], dict[str, Any], BaseModel],
    messages_key: str = "messages",
) -> Literal["tools", "__end__"]:
    """Use in the conditional_edge to route to the ToolNode if the last message

    has tool calls. Otherwise, route to the end.

    Args:
        state (Union[list[AnyMessage], dict[str, Any], BaseModel]): The state to check for
            tool calls. Must have a list of messages (MessageGraph) or have the
            "messages" key (StateGraph).

    Returns:
        The next node to route to.
    """
    if isinstance(state, list):
        ai_message = state[-1]
    elif isinstance(state, dict) and (messages := state.get(messages_key, [])):
        ai_message = messages[-1]
    elif messages := getattr(state, messages_key, []):
        ai_message = messages[-1]
    else:
        raise ValueError(f"No messages found in input state to tool_edge: {state}")
    if hasattr(ai_message, "tool_calls") and len(ai_message.tool_calls) > 0:
        return "tools"
    return "__end__"    # you can change this to any other node name instead of "__end__"


workflow.add_conditional_edges(
    "agent",
    tools_condition,  # decides if the agent is calling a tool or finishing
    {
        "tools": "retrieve",        # the dictionary is helpful if we named the nodes differently from the default tool condition function
        END: END,  # if the agent does not call any tool, we end the graph
    },
)

#------------------------------------Custom Condition functions-------------------------------------
#---------------------------------------------------------------------------------------------------

from typing import Literal, Union, List, Dict, Any
from langchain_core.messages import AnyMessage, HumanMessage

def data_api_condition(
    state: Union[List[AnyMessage], Dict[str, Any]],
    messages_key: str = "messages",
) -> Literal["data_api_node", "assistant_node"]:
    """
    Route to the Data API Node if the query involves fetching information.
    Otherwise, route to the Assistant Node.
    """
    if isinstance(state, list):
        user_message = state[-1]
    elif isinstance(state, dict) and (messages := state.get(messages_key, [])):
        user_message = messages[-1]
    else:
        raise ValueError(f"No messages found in input state: {state}")
    
    # Check if the query involves fetching information
    if isinstance(user_message, HumanMessage) and any(keyword in user_message.content.lower() for keyword in ["weather", "stock", "price"]):
        return "data_api_node"
    return "assistant_node"



#------------------------------------Custom Conditional Edges 2---------------------------------
#-----------------------------------------------------------------------------------------------

from langgraph.graph import END

def should_continue(state: AgentState) -> Literal["tools", "agent", END]:
    messages = state["messages"]
    if not messages:
        return "agent"  # Start the conversation
    
    last_message = messages[-1]
    
    # If the last message is from a tool
    if isinstance(last_message, ToolMessage):
        if last_message.content == "File added successfully":
            state["file_added"] = True
            print("📌 File addition confirmed")
            return "agent"
        print("🏁 Search complete, ending workflow")
        return END
    
    # If the last message is from the AI
    if isinstance(last_message, AIMessage):
        # If the file is added but not indexed, wait
        if state.get("file_added") and not state.get("indexed"):
            print("⏳ Waiting for indexing to complete...")
            return "agent"
        
        # If the AI asks to call a tool
        if hasattr(last_message, "tool_calls") and last_message.tool_calls:
            print("🛠️ Executing tool calls...")
            return "tools"
    
    return "agent"

workflow = StateGraph(AgentState)
workflow.add_node("agent", call_model)
workflow.add_node("tools", ToolNode([tool_1, tool_2]))

# Add edges
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
    "agent",
    should_continue,
    {
        "tools": "tools",  # Route to tools if tool calls are detected
        "agent": "agent",  # Continue with the agent if no tool calls
        END: END,          # End the workflow if conditions are met
    }
)

In [2]:
from IPython.display import Image, display
from langchain_core.runnables.graph import CurveStyle, MermaidDrawMethod, NodeStyles

display(
    Image(
        graph.get_graph().draw_mermaid_png(
            draw_method=MermaidDrawMethod.API,
        )
    )
)

> LangChain Messages (HumanMessage, AIMessage, SystemMessage, BaseMessage)

In [None]:
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, BaseMessage, ChatMessage, ToolMessage, RemoveMessage

# BaseMessage
    # The base class for all message types. Inherited by HumanMessage, AIMessage, and SystemMessage
    class CustomMessage(BaseMessage):
        content: str
        role: str  # e.g., "user", "assistant", "system"

    custom_msg = CustomMessage(content="Hello, world!", role="user")

#------------------ Message Types -------------------------------------------------------------
#----------------------------------------------------------------------------------------------
# HumanMessage
    # Represents a message from a human user.
    human_msg = HumanMessage(content="My name is John Doe.", name = "Paul Okafor")  # you can use the name of the node or agent
    
    # or
    # Represents a message from a human user.
    class HumanMessage(BaseMessage):
        content: str

    human_msg = HumanMessage(content="My name is John Doe.")

# AIMessage
    # Represents a message generated by an AI agent.
    ai_msg = AIMessage(content="I am a helpful assistant.")

# SystemMessage
    # Represents a system message or prompt.
    system_msg = SystemMessage(content="You are a helpful assistant.")

# ChatMessage
    # Represents a message in a chat conversation.
    chat_msg = ChatMessage(role="custom_role", content="This is a custom message.")

# ToolMessage
    # Represents a message generated by a tool.
    tool_msg = ToolMessage(content="This is a tool message.", tool_call_id="123", tool_name="GradeMaster", id="123")

# RemoveMessage
    # Represents a message to remove a message from the conversation.
    remove_msg = [RemoveMessage(id=m.id) for m in state['messages'][:-2]]
    
#---------------------------------- When to use it----------------------------------------------
#-----------------------------------------------------------------------------------------------
# Example 1:
messages = [SystemMessage(content="Welcome! Please provide your name.")]
user_input = "My name is John Doe."
messages.append(HumanMessage(content=user_input))
messages.append(AIMessage(content=responses))

# Example 2:
messages = [
    HumanMessage(content="My name is John Doe."),
    AIMessage(content="Hello, John Doe! How can I assist you today?"),
]

# Example 3: Prompt Template
chat_template = ChatPromptTemplate.from_messages(
    [
        SystemMessage(content="Welcome! Please provide your name."),
        MessagesPlaceholder(variable_name="messages"),
    ]
)

chat_template.invoke({"messages": [HumanMessage(content="John Doe")]})
chat_template.messages

# Example 4: Agent Invocation
agent = create_react_agent(tools=tools, llm=llm)
messages = [    # Simulate a conversation
    SystemMessage(content="You are a helpful assistant."),
    HumanMessage(content="Search for LangChain documentation."),
]

# Invoke the agent
response = agent.invoke({"messages": messages})


# Example 5: Node Example
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langgraph.graph import StateGraph

class OverallState(TypedDict):
    messages: Sequence[BaseMessage]

def user_node(state: OverallState) -> OverallState:
    try:
        # Initialize the conversation if no messages exist
        if not state.messages:
            state.messages = [SystemMessage(content="Welcome! Please provide your name.")]
        
        # Check if the last message is from the user (HumanMessage)
        if state.messages and isinstance(state.messages[-1], HumanMessage):
            # Invoke the LLM with the current state
            response = llm.invoke(state.messages)
            
            # Append the LLM's response as an Assistant Message (AIMessage)
            state.messages.append(AIMessage(content=response.content))
        
        return state
    except Exception as e:
        # Handle errors gracefully
        state.messages = state.messages + [SystemMessage(content=f"An error occurred: {str(e)}")]
        return state


> LangGraph Workflow

In [None]:
# BaseAgent : The base class for all agent nodes. Provides a common interface for all agents..
# LandGraphNode : A generic template for any node in the LandGraph.
# Workflow : Compiles and executes the workflow by connecting all nodes.


#--------------------------------------------- BaseAgent ---------------------------------------------
#-----------------------------------------------------------------------------------------------------

from typing import Dict, Any, List, Optional
from .utils.views import print_agent_output

class BaseAgent:
    """
    Base class for all agents. Provides common functionality and a standardized interface.
    """
    def __init__(self, websocket=None, stream_output=None, headers=None, tools: Optional[List[Any]] = None, **kwargs):
        """
        Initialize the agent.
        Args:
            websocket: WebSocket connection for real-time communication.
            stream_output: Function to stream output to the client.
            headers: Additional headers or metadata for the agent.
            tools: A list of tools (functions or objects) that the agent can use.
            **kwargs: Additional configuration for the agent.
        """
        self.websocket = websocket
        self.stream_output = stream_output
        self.headers = headers or {}
        self.tools = tools or []
        self.config = kwargs

    async def log(self, message: str, agent_name: str = "AGENT"):
        """
        Log messages to the console or stream them via WebSocket.
        Args:
            message: The message to log.
            agent_name: The name of the agent (for logging purposes).
        """
        if self.websocket and self.stream_output:
            await self.stream_output("logs", agent_name.lower(), message, self.websocket)
        else:
            print_agent_output(message, agent_name)

    async def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
        """
        Execute the agent's task. Subclasses must implement this method.
        Args:
            state: The current state of the workflow.
        Returns:
            Updated state after the agent's execution.
        """
        raise NotImplementedError("Subclasses must implement the `run` method.")

    def add_tool(self, tool: Any):
        """
        Add a tool to the agent's toolkit.
        Args:
            tool: A function or object that the agent can use.
        """
        self.tools.append(tool)

    def get_tool(self, tool_name: str) -> Optional[Any]:
        """
        Retrieve a tool by name or identifier.
        Args:
            tool_name: The name or identifier of the tool.
        Returns:
            The tool if found, otherwise None.
        """
        for tool in self.tools:
            if hasattr(tool, "__name__") and tool.__name__ == tool_name:
                return tool
            if hasattr(tool, "name") and tool.name == tool_name:
                return tool
        return None
    

#--------------------------------------------- LandGraphNode ---------------------------------------------
#---------------------------------------------------------------------------------------------------------

class LandGraphNode(BaseAgent):
    """
    A generic node template for the LandGraph. Can be customized for any task.
    """
    def __init__(self, node_name: str, **kwargs):
        """
        Initialize the node.
        Args:
            node_name: The name of the node (for identification and logging).
            **kwargs: Additional configuration for the node.
        """
        super().__init__(node_name=node_name,**kwargs)
        self.add_tool([tool_1, tool_2])

    async def process(self, state: Dict[str, Any]) -> Dict[str, Any]:
        """
        Process the input state and return the updated state.
        Args:
            state: The current state of the workflow.
        Returns:
            Updated state after processing.
        """
        query = state.get("task", {}).get("query", "")
        await self.log(f"Processing task in node: {query}", self.node_name.upper())

        # Use the tabular search tool
        tool = self.get_tool("tabular_search_tool")
        if tool:
            results = await tool(query, self.table)
            return {"node_name": self.node_name, "results": results}
        else:
            return {"node_name": self.node_name, "error": "Tool not found"}

    async def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
        """
        Run the node's task.
        Args:
            state: The current state of the workflow.
        Returns:
            Updated state after the node's execution.
        """
        return await self.process(state)





#--------------------------------------------- Workflow ----------------------------------------------------
#-----------------------------------------------------------------------------------------------------------
# This class is responsible for:
    # Initializing all nodes.
    # Defining the workflow graph.
    # Compiling and executing the workflow.

from langgraph.graph import StateGraph, END
import time

class Workflow:
    """
    The Workflow class compiles all nodes into a LandGraph and executes the workflow.
    """
    def __init__(self, task: Dict[str, Any], websocket=None, stream_output=None, headers=None):
        """
        Initialize the workflow.
        Args:
            task: The task to execute. Must include a "query" and can include additional metadata.
            websocket: WebSocket connection for real-time communication.
            stream_output: Function to stream output to the client.
            headers: Additional headers or metadata.
        """
        self.task = task
        self.websocket = websocket
        self.stream_output = stream_output
        self.headers = headers or {}
        self.task_id = self._generate_task_id()
        self.nodes = self._initialize_nodes()

    def _generate_task_id(self) -> int:
        """Generate a unique task ID."""
        return int(time.time())

    def _initialize_nodes(self) -> Dict[str, LandGraphNode]:
        """
        Initialize all nodes for the workflow.
        Returns:
            A dictionary of nodes, keyed by their names.
        """
        return {
            "node_1": LandGraphNode(node_name="node_1", websocket=self.websocket, stream_output=self.stream_output, headers=self.headers),
            "node_2": LandGraphNode(node_name="node_2", websocket=self.websocket, stream_output=self.stream_output, headers=self.headers),
            "node_3": LandGraphNode(node_name="node_3", websocket=self.websocket, stream_output=self.stream_output, headers=self.headers),
        }

    def _create_workflow_graph(self) -> StateGraph:
        """
        Create the workflow graph using the initialized nodes.
        Returns:
            The compiled workflow graph.
        """
        workflow = StateGraph(ResearchState)

        # Add nodes to the graph
        for node_name, node in self.nodes.items():
            workflow.add_node(node_name, node.run)

        # Define edges between nodes
        workflow.add_edge("node_1", "node_2")
        workflow.add_edge("node_2", "node_3")
        workflow.set_entry_point("node_1")
        workflow.add_edge("node_3", END)

        return workflow

    async def execute(self) -> Dict[str, Any]:
        """
        Execute the workflow.
        Returns:
            The final result of the workflow.
        """
        workflow_graph = self._create_workflow_graph()
        compiled_workflow = workflow_graph.compile()

        await self.log(f"Starting workflow for task: {self.task.get('query')}", "WORKFLOW")
        result = await compiled_workflow.ainvoke({"task": self.task})
        return result


#----------------------------------------------
# call the workflow
#----------------------------------------------
task = {
    "query": "Process some data",  # The main query or input
    "verbose": True,               # Optional: Whether to log detailed output
}

# Initialize the workflow
workflow = Workflow(task=task)

# Execute the workflow
result = await workflow.execute()
print(result)

> LangSmith

In [None]:

# LangSmith is a platform by LangChain that helps developers trace, debug, and evaluate LLM (Large Language Model) applications. 
# It provides tools for observability (seeing how your app works), testing (ensuring your app behaves as expected), and feedback collection 
# (improving your app based on user input).

# Why is LangSmith Useful?
    # Tracing : See how your app processes inputs and generates outputs (Logs every step of your app's execution).
    # Testing : Evaluate your app's performance with datasets (Runs your app on datasets to measure performance).
    # Feedback : Collect user feedback to improve your app.
    # Observability : Monitor your app in production/real-time to catch issues early.

In [None]:
# --------------------------------- Setting Up LangSmith ---------------------------------
import os
from langsmith import Client, traceable, wrappers
from langchain.smith import RunEvalConfig, run_on_dataset


os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com"
os.environ["LANGCHAIN_API_KEY"] = str(os.getenv("LANGCHAIN_API_KEY"))
os.environ["LANGCHAIN_PROJECT"] = "my-project" 
# Load environment variables from a .env file (optional)
from dotenv import load_dotenv
load_dotenv(dotenv_path="../../.env", override=True)



# --------------------------------- Tracing with LangSmith ---------------------------------
# @traceable Decorator : Automatically logs function calls.
# Use trace context manager for specific blocks of code.

from langsmith import traceable

# Use @traceable to log function calls
@traceable(run_type="chain")
def retrieve_documents(question: str):
    return retriever.invoke(question)

# Use context manager for fine-grained tracing
from langsmith import trace
with trace(name="Generate Response", run_type="chain") as ls_trace:
    response = call_openai(messages)
    ls_trace.end(outputs={"output": response})

# Wrap OpenAI client for automatic tracing
from langsmith.wrappers import wrap_openai
openai_client = wrap_openai(openai.Client())



# --------------------------------- Testing and Evaluation with LangSmith ---------------------------------
# Use create_dataset to create and manage datasets.
# Define custom evaluators to score your app's outputs.
# Use evaluate to run experiments and measure performance.

from langsmith import Client

# Create a dataset
client = Client()

#---------------------------------
# Create a dataset
#---------------------------------

# Create dataset for testing our AI agents
dataset_input = [
    {"input": "What is the capital of France?", "output": "Paris"},
    {"input": "Who wrote the book '1984'?",  "output": "George Orwell"},
    {"input": "What is the square root of 16?",  "output": "4"},
]

dataset = client.create_dataset(
    dataset_name = "my-dataset", 
    description="A dataset for testing AI agents.")

for data in dataset_input:
    try:
        client.create_example(
            inputs={"question": data['input']},  # Wrapping the input into a dictionary
            outputs={"answer": data['output']},  # Wrapping the output into a dictionary
            dataset_id=dataset.id  # Assuming dataset.id is already created
        )
    except Exception as e:
        print(f"Failed to create example for input: {data['input']}, Error: {e}")
    
# or use create_examples to add multiple examples at once
client.create_examples(
    inputs=[{"question": data['input']}],
    outputs=[{"output": data['output']}],
    dataset_id=dataset.id
)

#---------------------------------
# Create a Target or label
#---------------------------------
# Define the application logic you want to evaluate inside a target function
# The SDK will automatically send the inputs from the dataset to your target function
def target(inputs: dict) -> dict:
  response = openai_client.chat.completions.create(
      model="gpt-4o-mini",
      messages=[
          { "role": "system", "content": "Answer the following question accurately" },
          { "role": "user", "content": inputs["question"] },
      ],
  )
  return { "response": response.choices[0].message.content.strip() }


#---------------------------------
# Define an Evaluator
#---------------------------------

# Define instructions for the LLM judge evaluator
instructions = """Evaluate Student Answer against Ground Truth for conceptual similarity and classify true or false: 
- False: No conceptual match and similarity
- True: Most or full conceptual match and similarity
- Key criteria: Concept should match, not exact wording.
"""

# Define output schema for the LLM judge
class Grade(BaseModel):
  score: bool = Field(
      description="Boolean that indicates whether the response is accurate relative to the reference answer"
  )

# Define LLM judge that grades the accuracy of the response relative to reference output
def accuracy(outputs: dict, reference_outputs: dict) -> bool:
  response = openai_client.beta.chat.completions.parse(
      model="gpt-4o-mini",
      messages=[
          { "role": "system", "content": instructions },
          {
              "role": "user",
              "content": f"""Ground Truth answer: {reference_outputs["answer"]}; 
              Student's Answer: {outputs["response"]}"""
          },
      ],
      response_format=Grade,
  )
  return response.choices[0].message.parsed.score



#---------------------------------
# Run and View results
#---------------------------------
# After running the evaluation, a link will be provided to view the results in langsmith
experiment_results = client.evaluate(
  target,
  data="my-dataset",
  evaluators=[
      accuracy,
      # can add multiple evaluators here
  ],
  experiment_prefix="first-eval-in-langsmith",
  max_concurrency=2,
)





# --------------------------------- Prompt Engineering ---------------------------------

from langsmith import Client
from langsmith.client import convert_prompt_to_openai_format

# Pull a prompt from LangSmith Prompt Hub
client = Client()
prompt = client.pull_prompt("your-prompt-id")

# Use the prompt in your app
hydrated_prompt = prompt.invoke({"question": "What is LangSmith?"})
messages = convert_prompt_to_openai_format(hydrated_prompt)["messages"]
response = openai_client.chat.completions.create(model="gpt-4", messages=messages)




# --------------------------------- Collecting Human Feedback ---------------------------------
# Add feedback to a run
from langsmith import Client
from langsmith import traceable
import uuid

@traceable
def foo():
    return "This is a sample Run!"


client = Client()
client.create_feedback(
    run_id="your-run-id",
    key="user_feedback",
    score=1.0,
    comment="The response was helpful."
)

# Pre-generate run IDs for feedback
pre_defined_run_id = uuid.uuid4()
foo(langsmith_extra={"run_id": pre_defined_run_id})
client.create_feedback(pre_defined_run_id, "user_feedback", score=1)



# --------------------------------- Production Observability ---------------------------------
# Filter runs in production
from langsmith import Client
from datetime import datetime, timedelta

client = Client()
runs = client.list_runs(
    project_name="langsmith-academy",
    filter="eq(is_root, true)",
    start_time=datetime.now() - timedelta(days=1)
)

for run in runs:
    print(run)

# Run your app to trigger online evaluations
from app import langsmith_rag
question = "How do I set up tracing?"
langsmith_rag(question)


> Human in the Loop

In [None]:
# Use the `interrupt` function instead.

#------------------------- Basic Human-in-the-Loop with Breakpoints--------------------------------
# Compile graph with breakpoint
graph = builder.compile(
    checkpointer=memory, 
    interrupt_before=["step_for_human_in_the_loop"] # Add breakpoint
)

# Run graph up to breakpoint
thread_config = {"configurable": {"thread_id": "1"}}
for event in graph.stream(inputs, thread_config, stream_mode="values"):
    print(event)

# Perform human action (e.g., approve, edit, input)
# Resume graph execution
# Human approval step
user_approval = input("Do you want to call the tool? (yes/no): ")
if user_approval.lower() == "yes":
    # Resume graph execution
    for event in graph.stream(None, thread_config, stream_mode="values"):
        print(event)
else:
    print("Operation cancelled by user.")

#------------------------- Dynamic Breakpoints--------------------------------
# Dynamic breakpoints allow the graph to interrupt itself based on conditions defined within a node.
# can define some *condition* that must be met for a breakpoint to be triggered
from langgraph.errors import NodeInterrupt

# Define a node with dynamic breakpoint
def my_node(state: State) -> State:
    if len(state['input']) > 5:  # Condition for breakpoint
        raise NodeInterrupt(f"Input too long: {state['input']}")
    return state

# Resume after dynamic breakpoint
graph.update_state(config=thread_config, values={"input": "foo"})  # Update state to pass condition
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)

# Skip node entirely
graph.update_state(None, config=thread_config, as_node="my_node")  # Skip node
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)
    

#------------------------- Editing State with Human Feedback--------------------------------
# You can modify the graph state during interruptions to incorporate human feedback.

# Get current state after interruption
state = graph.get_state(thread_config)
print(state)

# Update state with human feedback
graph.update_state(
    thread_config, 
    {"user_input": "human feedback"},  # Add human input
    as_node="human_input"  # Treat update as a node
)

# Resume graph execution
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)


#------------------------- Input Pattern or Tool Call--------------------------------
    
 # Compile graph with input breakpoint
graph = builder.compile(
    checkpointer=checkpointer, 
    interrupt_before=["human_input"]  # Node for human input
)

# Run graph up to input breakpoint
for event in graph.stream(inputs, thread_config, stream_mode="values"):
    print(event)

# Add human input and resume
graph.update_state(
    thread_config, 
    {"user_input": "human input"},  # Provide human input or tool call
    as_node="human_input"  # Treat update as node
)
for event in graph.stream(None, thread_config, stream_mode="values"):
    print(event)   
    


#------------------------- LangGraph API Integration--------------------------------
from langgraph_sdk import get_client

# Connect to LangGraph Studio
client = get_client(url="http://localhost:56091")

# Stream graph with breakpoint
async for chunk in client.runs.stream(
    thread["thread_id"],
    assistant_id="agent",
    input=initial_input,
    stream_mode="values",
    interrupt_before=["tools"],  # Set breakpoint
):
    print(f"Receiving new event of type: {chunk.event}...")
    print(chunk.data)

# Resume from breakpoint
async for chunk in client.runs.stream(
    thread["thread_id"],
    assistant_id="agent",
    input=None,
    stream_mode="values",
):
    print(f"Receiving new event of type: {chunk.event}...")
    print(chunk.data)



> LangGraph Memory Module

In [None]:
#------------------------- Short term Memory --------------------------------
# Short-term memory is managed using checkpointers , which save the state of a graph at each step.

from langgraph.checkpoint.memory import MemorySaver

# Initialize a checkpointer for short-term memory
checkpointer = MemorySaver()

# Compile a graph with the checkpointer
graph = builder.compile(checkpointer=checkpointer)

# Run the graph and save state
thread_config = {"configurable": {"thread_id": "1"}}
for event in graph.stream(inputs, thread_config, stream_mode="values"):
    print(event)

# Retrieve the state from the checkpointer
state = graph.get_state(thread_config)
print(state)


#------------------------- Long-Term Memory with Stores (Memory Store) --------------------------------
# Long-term memory is managed using stores , which persist data across threads or sessions.
from langgraph.store.memory import InMemoryStore

# Initialize a store for long-term memory
store = InMemoryStore()

#---------------------
# Save a memory
#----------------------
# Save a memory
user_id = "1"
namespace = (user_id, "memories") # Namespace for user-specific memories
key = "profile"
value = {"name": "Lance", "interests": ["biking", "bakeries"]}
store.put(namespace, key, value)    # Save a memory to the store
#---------------------
#---------------------


#---------------------
# Save a memory 2 (Memory Schema Collection)
#---------------------
from pydantic import BaseModel, Field

# Define a memory schema
class Memory(BaseModel):
    content: str = Field(description="The main content of the memory.")

# Create a collection of memories
memory_collection = [
    Memory(content="User likes biking"),
    Memory(content="User enjoys bakeries")
]

# Save memories to the store
for memory in memory_collection:
    key = str(uuid.uuid4())
    store.put(namespace, key, memory.model_dump())
#---------------------
#---------------------

#---------------------------------
# Retrieve a memory from the store
#---------------------------------
memories = store.get(namespace, key)
print(memories.value)
# or
memories = store.search(namespace)
for memory in memories:
    print(memory.value)

#---------------------
#---------------------

#---------------------------------
# Dynamic Memory Updates
#---------------------------------
# Dynamic memory updates allow the agent to decide when to save memories and what type of memory to update 
# (e.g., profile, collection, or instructions)

def update_memory(state: MessagesState, config: RunnableConfig, store: BaseStore):
    user_id = config["configurable"]["user_id"]
    namespace = ("memory", user_id)
    existing_memories = store.search(namespace)
    tool_name = "Memory"
    existing_memories_formatted = [(m.key, tool_name, m.value) for m in existing_memories]
    result = trustcall_extractor.invoke({"messages": state["messages"], "existing": existing_memories_formatted})
    for r, rmeta in zip(result["responses"], result["response_metadata"]):
        store.put(namespace, rmeta.get("json_doc_id", str(uuid.uuid4())), r.model_dump())



#---------------------------------------------- Memory Agents ------------------------------------------
#-------------------------------------------------------------------------------------------------------
from langgraph.graph import StateGraph, MessagesState
from langgraph.checkpoint.memory import MemorySaver

# Define nodes for the agent
def call_model(state: MessagesState, config: RunnableConfig, store: BaseStore):
    # Retrieve memory and personalize responses
    user_id = config["configurable"]["user_id"]
    namespace = ("memory", user_id)
    memories = store.search(namespace)
    memory_content = "\n".join([m.value["content"] for m in memories])
    system_msg = f"Memory: {memory_content}"
    response = model.invoke([SystemMessage(content=system_msg)] + state["messages"])
    return {"messages": response}

def write_memory(state: MessagesState, config: RunnableConfig, store: BaseStore):
    # Reflect on chat history and save memories
    user_id = config["configurable"]["user_id"]
    namespace = ("memory", user_id)
    result = trustcall_extractor.invoke({"messages": state["messages"]})
    for r in result["responses"]:
        key = str(uuid.uuid4())
        store.put(namespace, key, r.model_dump())

# Compile the graph
builder = StateGraph(MessagesState)
builder.add_node("call_model", call_model)
builder.add_node("write_memory", write_memory)
builder.add_edge(START, "call_model")
builder.add_edge("call_model", "write_memory")
builder.add_edge("write_memory", END)

# Compile with memory store and checkpointer
graph = builder.compile(checkpointer=MemorySaver(), store=InMemoryStore())





#------------------------- Long-Term Memory with Stores (Memory Store) --------------------------------









#------------------------- Long-Term Memory with Stores (Memory Store) --------------------------------





> LangGraph Project 1 (code that works)

In [None]:
from langgraph.graph import StateGraph, MessagesState, START, END
from langgraph.graph.message import add_messages
from typing import Annotated, Literal
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, BaseMessage
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import ToolNode
from langchain.prompts import PromptTemplate
from langchain.agents import create_react_agent, AgentExecutor
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
import os
import json # Parse JSON response



class Chatbot:
    def __init__(self):
        self.llm = ChatOpenAI(
            api_key=os.getenv("DEEPSEEK_API_KEY"),
            model="deepseek-chat",
            base_url="https://api.deepseek.com",
            streaming=True,
            callbacks=AsyncCallbackManager([StreamingStdOutCallbackHandler()]),
        )
        
    def call_tool(self):
        tool = TavilySearchResults(max_results=2)
        self.tools = [tool]
        self.tool_node = ToolNode(tools=[tool])
        self.llm_with_tool = self.llm.bind_tools(self.tools)
        
    def call_model(self, state: MessagesState):
        """
        LLM node to process the user's query and invoke tools if needed.
        """
        messages = state['messages']
        latest_query = messages[-1].content if messages else "No query provided."

        # template = ''' 
        # You are a travel suggestion agent. Answer the user's questions based on their travel preferences. 
        # If you need to find information about a specific destination, use the search_tool. Understand that the information was retrieved from the web,
        # interpret it, and generate a response accordingly.

        # Answer the following questions as best as you can. You have access to the following tools:
        # {tools}

        # Use the following format:
        # Question: the input question you must answer
        # Thought: you should always think about what to do
        # Action: the action you should take, should be one of [{tool_names}]
        # Action Input: the input to the action
        # Observation: the result of the action
        # Thought: I now know the final answer
        # Final Answer: [Your final answer here as a concise and complete sentence]

        # Ensure the response strictly follows this format. Do not repeat the Final Answer multiple times.

        # Begin!
        # Question: {input}
        # Thought: {agent_scratchpad}
        # '''

        # Improved prompt to enforce JSON output
        template = ''' 
        You are a travel suggestion agent. Answer the user's questions based on their travel preferences. 
        If you need to find information about a specific destination, use the search_tool. Understand that the information was retrieved from the web,
        interpret it, and generate a response accordingly.

        Answer the following questions as best as you can. You have access to the following tools:
        {tools}

        Use the following format:
        
        "Question": the input question you must answer
        "Thought": your reasoning about what to do next
        "Action": the action you should take, one of [{tool_names}] (if no action is needed, write "None")
        "Action Input": the input to the action (if no action is needed, write "None")
        "Observation": the result of the action (if no action is needed, write "None")
        "Thought": your reasoning after observing the action
        "Final Answer": the final answer to the original input question
        
        Ensure every Thought is followed by an Action, Action Input, and Observation. If no tool is needed, explicitly write "None" for Action, Action Input, and Observation.

        Begin!
        Question: {input}
        Thought: {agent_scratchpad}
        '''
        
        prompt = PromptTemplate.from_template(template)
        search_agent = create_react_agent(
            llm=self.llm_with_tool,
            prompt=prompt,
            tools=self.tools
        )

        agent_executor = AgentExecutor(
            agent=search_agent,
            tools=self.tools,
            verbose=True,
            return_intermediate_steps=False,
            handle_parsing_errors=True,
        )

        try:
            response = agent_executor.invoke({
                "input": latest_query,
                "agent_scratchpad": ""  # Initialize with an empty scratchpad
            })

            # Check if the response is already a dictionary
            if isinstance(response, dict):
                final_answer = response.get("output", "No final answer provided.")
            else:
                raise ValueError("Unexpected response type. Expected a dictionary.")

            # print("")
            # print(f'response: {response}')
                
            # # Validate and clean response
            # if not response.startswith("Final Answer:"):
            #     raise ValueError("Invalid agent response format. Missing 'Final Answer:' prefix.")
            # final_answer = response.replace("Final Answer:", "")[-1].strip()
            
            state['messages'].append(AIMessage(content=final_answer))  # Append clean response to messages
            return state

        except Exception as e:
            error_message = f"Error: {e}"
            state['messages'].append(AIMessage(content=error_message))
            return state

    
    def router_function(self, state: MessagesState) -> Literal["tools", END]:
        """
        Determine the next node based on tool invocation.
        """
        messages = state['messages']
        last_message = messages[-1]
        if hasattr(last_message, "tool_calls") and last_message.tool_calls:
            return "tools"
        return END
    
    def __call__(self):
        """
        Build and return the workflow graph.
        """
        self.call_tool()
        workflow = StateGraph(MessagesState)
        workflow.add_node("agent", self.call_model)
        workflow.add_node("tools", self.tool_node)
        workflow.add_edge(START, "agent")
        workflow.add_conditional_edges("agent", self.router_function, {"tools": "tools", END: END})
        workflow.add_edge("tools", "agent")
        self.app = workflow.compile()
        return self.app


if __name__ == "__main__":
    mybot = Chatbot()
    workflow = mybot()

    # Properly initialize MessagesState with HumanMessage objects
    initial_state = {
        "messages": [
            HumanMessage(content="Search tthe web and tell me about Airi Shimamura from Oklahoma?")
        ]
    }
    
    response = workflow.invoke(initial_state)
    print(response['messages'][-1].content)

> LangGraph Project 2 (code that works)

In [None]:
from langchain_core.tools import tool
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import Runnable
# from langchain_aws import ChatBedrock
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
import os
import boto3
from typing import Annotated
from langchain_core.callbacks.manager import AsyncCallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain_openai import ChatOpenAI
from typing_extensions import TypedDict
from langgraph.graph.message import AnyMessage, add_messages
from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda
from langgraph.prebuilt import ToolNode
from langgraph.prebuilt import tools_condition


@tool
def compute_savings(monthly_cost: float) -> float:
    """
    Tool to compute the potential savings when switching to solar energy based on the user's monthly electricity cost.
    
    Args:
        monthly_cost (float): The user's current monthly electricity cost.
    
    Returns:
        dict: A dictionary containing:
            - 'number_of_panels': The estimated number of solar panels required.
            - 'installation_cost': The estimated installation cost.
            - 'net_savings_10_years': The net savings over 10 years after installation costs.
    """
    def calculate_solar_savings(monthly_cost):
        # Assumptions for the calculation
        cost_per_kWh = 0.28  
        cost_per_watt = 1.50  
        sunlight_hours_per_day = 3.5  
        panel_wattage = 350  
        system_lifetime_years = 10  
        # Monthly electricity consumption in kWh
        monthly_consumption_kWh = monthly_cost / cost_per_kWh
        
        # Required system size in kW
        daily_energy_production = monthly_consumption_kWh / 30
        system_size_kW = daily_energy_production / sunlight_hours_per_day
        
        # Number of panels and installation cost
        number_of_panels = system_size_kW * 1000 / panel_wattage
        installation_cost = system_size_kW * 1000 * cost_per_watt
        
        # Annual and net savings
        annual_savings = monthly_cost * 12
        total_savings_10_years = annual_savings * system_lifetime_years
        net_savings = total_savings_10_years - installation_cost
        
        return {
            "number_of_panels": round(number_of_panels),
            "installation_cost": round(installation_cost, 2),
            "net_savings_10_years": round(net_savings, 2)
        }
    # Return calculated solar savings
    return calculate_solar_savings(monthly_cost)

def handle_tool_error(state) -> dict:
    """
    Function to handle errors that occur during tool execution.
    
    Args:
        state (dict): The current state of the AI agent, which includes messages and tool call details.
    
    Returns:
        dict: A dictionary containing error messages for each tool that encountered an issue.
    """
    # Retrieve the error from the current state
    error = state.get("error")
    
    # Access the tool calls from the last message in the state's message history
    tool_calls = state["messages"][-1].tool_calls
    
    # Return a list of ToolMessages with error details, linked to each tool call ID
    return {
        "messages": [
            ToolMessage(
                content=f"Error: {repr(error)}\n please fix your mistakes.",  # Format the error message for the user
                tool_call_id=tc["id"],  # Associate the error message with the corresponding tool call ID
            )
            for tc in tool_calls  # Iterate over each tool call to produce individual error messages
        ]
    }

def create_tool_node_with_fallback(tools: list) -> dict:
    """
    Function to create a tool node with fallback error handling.
    
    Args:
        tools (list): A list of tools to be included in the node.
    
    Returns:
        dict: A tool node that uses fallback behavior in case of errors.
    """
    # Create a ToolNode with the provided tools and attach a fallback mechanism
    # If an error occurs, it will invoke the handle_tool_error function to manage the error
    return ToolNode(tools).with_fallbacks(
        [RunnableLambda(handle_tool_error)],  # Use a lambda function to wrap the error handler
        exception_key="error"  # Specify that this fallback is for handling errors
    )

class State(TypedDict):
    messages: Annotated[list[AnyMessage], add_messages]

class Assistant:
    def __init__(self, runnable: Runnable):
        # Initialize with the runnable that defines the process for interacting with the tools
        self.runnable = runnable
    def __call__(self, state: State):
        while True:
            # Invoke the runnable with the current state (messages and context)
            result = self.runnable.invoke(state)
            
            # If the tool fails to return valid output, re-prompt the user to clarify or retry
            if not result.tool_calls and (
                not result.content
                or isinstance(result.content, list)
                and not result.content[0].get("text")
            ):
                # Add a message to request a valid response
                messages = state["messages"] + [("user", "Respond with a real output.")]
                state = {**state, "messages": messages}
            else:
                # Break the loop when valid output is obtained
                break
        # Return the final state after processing the runnable
        return {"messages": result}

# def get_bedrock_client(region):
#     return boto3.client("bedrock-runtime", region_name=region)

# def create_bedrock_llm(client):
#     return ChatBedrock(model_id='anthropic.claude-3-sonnet-20240229-v1:0', client=client, model_kwargs={'temperature': 0}, region_name='us-east-1')

# llm = create_bedrock_llm(get_bedrock_client(region='us-east-1'))


llm = ChatOpenAI(
        api_key=OPENAI_API_KEY,
        model="gpt-4o",
        streaming=True,
        callbacks=AsyncCallbackManager([StreamingStdOutCallbackHandler()]),
    )

primary_assistant_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            '''You are a helpful customer support assistant for Solar Panels Belgium.
            You should get the following information from them:
            - monthly electricity cost
            If you are not able to discern this info, ask them to clarify! Do not attempt to wildly guess.
            After you are able to discern all the information, call the relevant tool.
            ''',
        ),
        ("placeholder", "{messages}"),
    ]
)

# Define the tools the assistant will use
part_1_tools = [
    compute_savings
]

# Bind the tools to the assistant's workflow
part_1_assistant_runnable = primary_assistant_prompt | llm.bind_tools(part_1_tools, tool_choice="any")


builder = StateGraph(State)
builder.add_node("assistant", Assistant(part_1_assistant_runnable))
builder.add_node("tools", create_tool_node_with_fallback(part_1_tools))

builder.add_edge(START, "assistant")  # Start with the assistant
builder.add_conditional_edges("assistant", tools_condition)  # Move to tools after input
builder.add_edge("tools", "assistant")  # Return to assistant after tool execution

memory = MemorySaver()
graph = builder.compile(checkpointer=memory)

# import shutil
import uuid

def _print_event(event, _printed):
    """
    Helper function to print events generated by the graph.
    
    Args:
        event: The event to print.
        _printed: A set to track already printed events to avoid duplicates.
    """
    if event["messages"]:
        for message in event["messages"]:
            if message.id not in _printed:
                # Check the type of the message and print accordingly
                if hasattr(message, "type"):
                    print(f"Type: {message.type}, Content: {message.content}")
                elif hasattr(message, "role"):
                    print(f"Role: {message.role}, Content: {message.content}")
                else:
                    print(f"Message: {message}")
                _printed.add(message.id)
                
                
# Let's create an example conversation a user might have with the assistant
tutorial_questions = [
    'hey',
    'can you calculate my energy saving',
    "my montly cost is $100, what will i save"
]
thread_id = str(uuid.uuid4())
config = {
    "configurable": {
        "thread_id": thread_id,
    }
}
_printed = set()
for question in tutorial_questions:
    events = graph.stream(
        {"messages": ("user", question)}, config, stream_mode="values"
    )
    for event in events:
        _print_event(event, _printed)

## Multi-Agent System

In [2]:
# Multi-Agent Systems
    # Definition : A system where multiple agents interact to achieve a common goal.
    # Use Case : Used by LangChain to model complex workflows involving multiple agents with different capabilities.
    # Example : A multi-agent system for customer support, where agents handle different types of customer queries (FAQs, escalations, recommendations).
    
    # Agent Supervisor Node : Routes queries to specific agents based on the query type.
    # Hierarchical Agent Teams : Organizes agents into teams based on their expertise or function.
    # Multi-Agent Collaboration : Enables agents to share information and coordinate tasks to achieve a common goal.
    

> Advanced Agent Supervisor

In [None]:
import inspect
from typing import Callable, Literal

from langchain_core.tools import BaseTool
from langchain_core.language_models import LanguageModelLike
from langgraph.graph import StateGraph, START
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt.chat_agent_executor import (
    AgentState,
    StateSchemaType,
    Prompt,
    create_react_agent,
)

from langgraph_supervisor.handoff import (
    create_handoff_tool,
    create_handoff_back_messages,
)


OutputMode = Literal["full_history", "last_message"]
"""Mode for adding agent outputs to the message history in the multi-agent workflow

- `full_history`: add the entire agent message history
- `last_message`: add only the last message
"""


def _make_call_agent(
    agent: CompiledStateGraph,
    output_mode: OutputMode,
    add_handoff_back_messages: bool,
    supervisor_name: str,
) -> Callable[[dict], dict]:
    if output_mode not in OutputMode.__args__:
        raise ValueError(
            f"Invalid agent output mode: {output_mode}. "
            f"Needs to be one of {OutputMode.__args__}"
        )

    def call_agent(state: dict) -> dict:
        output = agent.invoke(state)
        messages = output["messages"]
        if output_mode == "full_history":
            pass
        elif output_mode == "last_message":
            messages = messages[-1:]
        else:
            raise ValueError(
                f"Invalid agent output mode: {output_mode}. "
                f"Needs to be one of {OutputMode.__args__}"
            )

        if add_handoff_back_messages:
            messages.extend(create_handoff_back_messages(agent.name, supervisor_name))

        return {"messages": messages}

    return call_agent


def create_supervisor(
    agents: list[CompiledStateGraph],
    *,
    model: LanguageModelLike,
    tools: list[Callable | BaseTool] | None = None,
    prompt: Prompt | None = None,
    state_schema: StateSchemaType = AgentState,
    output_mode: OutputMode = "last_message",
    add_handoff_back_messages: bool = True,
    supervisor_name: str = "supervisor",
) -> StateGraph:
    """Create a multi-agent supervisor.

    Args:
        agents: List of agents to manage
        model: Language model to use for the supervisor
        tools: Tools to use for the supervisor
        prompt: Optional prompt to use for the supervisor. Can be one of:
            - str: This is converted to a SystemMessage and added to the beginning of the list of messages in state["messages"].
            - SystemMessage: this is added to the beginning of the list of messages in state["messages"].
            - Callable: This function should take in full graph state and the output is then passed to the language model.
            - Runnable: This runnable should take in full graph state and the output is then passed to the language model.
        state_schema: State schema to use for the supervisor graph.
        output_mode: Mode for adding managed agents' outputs to the message history in the multi-agent workflow.
            Can be one of:
            - `full_history`: add the entire agent message history
            - `last_message`: add only the last message (default)
        add_handoff_back_messages: Whether to add a pair of (AIMessage, ToolMessage) to the message history
            when returning control to the supervisor to indicate that a handoff has occurred.
        supervisor_name: Name of the supervisor node.
    """
    agent_names = set()
    for agent in agents:
        if agent.name is None or agent.name == "LangGraph":
            raise ValueError(
                "Please specify a name when you create your agent, either via `create_react_agent(..., name=agent_name)` "
                "or via `graph.compile(name=name)`."
            )

        if agent.name in agent_names:
            raise ValueError(
                f"Agent with name '{agent.name}' already exists. Agent names must be unique."
            )

        agent_names.add(agent.name)

    handoff_tools = [create_handoff_tool(agent_name=agent.name) for agent in agents]
    all_tools = (tools or []) + handoff_tools

    if (
        hasattr(model, "bind_tools")
        and "parallel_tool_calls" in inspect.signature(model.bind_tools).parameters
    ):
        model = model.bind_tools(all_tools, parallel_tool_calls=False)

    supervisor_agent = create_react_agent(
        name=supervisor_name,
        model=model,
        tools=all_tools,
        prompt=prompt,
        state_schema=state_schema,
    )

    builder = StateGraph(state_schema)
    builder.add_node(supervisor_agent, destinations=tuple(agent_names))
    builder.add_edge(START, supervisor_agent.name)
    for agent in agents:
        builder.add_node(
            agent.name,
            _make_call_agent(
                agent,
                output_mode,
                add_handoff_back_messages,
                supervisor_name,
            ),
        )
        builder.add_edge(agent.name, supervisor_agent.name)

    return builder

In [None]:
import re
import uuid
from typing_extensions import Annotated

from langchain_core.messages import AIMessage, ToolMessage, ToolCall
from langchain_core.tools import tool, BaseTool, InjectedToolCallId
from langgraph.types import Command


WHITESPACE_RE = re.compile(r"\s+")


def _normalize_agent_name(agent_name: str) -> str:
    """Normalize an agent name to be used inside the tool name."""
    return WHITESPACE_RE.sub("_", agent_name.strip()).lower()


def create_handoff_tool(*, agent_name: str) -> BaseTool:
    """Create a tool that can handoff control to the requested agent.

    Args:
        agent_name: The name of the agent to handoff control to, i.e.
            the name of the agent node in the multi-agent graph.
            Agent names should be simple, clear and unique, preferably in snake_case,
            although you are only limited to the names accepted by LangGraph
            nodes as well as the tool names accepted by LLM providers
            (the tool name will look like this: `transfer_to_<agent_name>`).
    """
    tool_name = f"transfer_to_{_normalize_agent_name(agent_name)}"

    @tool(tool_name)
    def handoff_to_agent(
        tool_call_id: Annotated[str, InjectedToolCallId],
    ):
        """Ask another agent for help."""
        tool_message = ToolMessage(
            content=f"Successfully transferred to {agent_name}",
            name=tool_name,
            tool_call_id=tool_call_id,
        )
        return Command(
            goto=agent_name,
            graph=Command.PARENT,
            update={"messages": [tool_message]},
        )

    return handoff_to_agent


def create_handoff_back_messages(
    agent_name: str, supervisor_name: str
) -> tuple[AIMessage, ToolMessage]:
    """Create a pair of (AIMessage, ToolMessage) to add to the message history when returning control to the supervisor."""
    tool_call_id = str(uuid.uuid4())
    tool_name = f"transfer_back_to_{_normalize_agent_name(supervisor_name)}"
    tool_calls = [ToolCall(name=tool_name, args={}, id=tool_call_id)]
    return (
        AIMessage(
            content=f"Transferring back to {supervisor_name}",
            tool_calls=tool_calls,
            name=agent_name,
        ),
        ToolMessage(
            content=f"Successfully transferred back to {supervisor_name}",
            name=tool_name,
            tool_call_id=tool_call_id,
        ),
    )

> Agent Supervisor

In [None]:
# Agent Supervisor Node
from typing import Literal
from typing_extensions import TypedDict

from langchain_anthropic import ChatAnthropic
from langgraph.graph import MessagesState
from langgraph.types import Command

'''
User
 ├── Supervisor
 │     ├── [direct] Agent 1
 │     ├── [conditional] Agent 2 (if condition A is met)
 │     └── [direct] Agent 3
 │           ├── [conditional] Sub-Agent 3.1 (if condition B is met)
 │           └── [direct] Sub-Agent 3.2
 │
 └── Feedback Loop (User <--> Supervisor)

'''
members = ["researcher", "coder"]

def make_supervisor_node(llm: BaseChatModel, members: list[str]) -> str:
    options = ["FINISH"] + members
    system_prompt = (
        "You are a supervisor tasked with managing a conversation between the"
        f" following workers: {members}. Given the following user request,"
        " respond with the worker to act next. Each worker will perform a"
        " task and respond with their results and status. When finished,"
        " respond with FINISH."
    )

    class Router(TypedDict):
        """Worker to route to next. If no workers needed, route to FINISH."""
        next: Literal[*options]

    def supervisor_node(state: MessagesState) -> Command[Literal[*members, "__end__"]]:
        """An LLM-based router."""
        messages = [
            {"role": "system", "content": system_prompt},
        ] + state["messages"]
        response = llm.with_structured_output(Router).invoke(messages)
        goto = response["next"]
        if goto == "FINISH":
            goto = END

        return Command(goto=goto)

    return supervisor_node

llm = ChatAnthropic(model="claude-3-5-sonnet-latest")
supervisor_node = make_supervisor_node(llm, ["search", "web_scraper"])

def research_node(state: MessagesState) -> Command[Literal["supervisor"]]:
    result = research_agent.invoke(state)
    return Command(
        update={
            "messages": [
                HumanMessage(content=result["messages"][-1].content, name="researcher")
            ]
        },
        goto="supervisor",  # Return to supervisor node
    )

builder = StateGraph(MessagesState)
builder.add_node("supervisor", supervisor_node)
builder.add_edge(START, "supervisor")
builder.add_node("researcher", research_node)
graph = builder.compile()

> Combining Several Graphs into a Supergraph

In [None]:
# https://github.com/MindzKonnectedAI/SkillSync/tree/gaurav 

# Step 1: Export Compiled Graphs

    # github_graph.py
    def github_team_supervisor():
        # Build and compile the GitHub graph
        github_graph = StateGraph(GithubTeamState)
        github_graph.add_node("query_param_generator", query_param_node)
        github_graph.add_node("fetch_users", fetch_node)
        github_graph.add_node("supervisor", supervisor_agent)
        github_graph.add_edge(START, "query_param_generator")
        github_graph.add_edge("query_param_generator", "fetch_users")
        github_graph.add_conditional_edges("fetch_users", condition)
        github_graph.add_edge("supervisor", END)
        return github_graph.compile()

    # sql_graph.py
    def sql_agent_team_supervisor():
        # Build and compile the SQL graph
        builder = StateGraph(State)
        builder.add_node("assistant", Assistant(assistant_runnable))
        builder.add_node("tools", create_tool_node_with_fallback(tools))
        builder.set_entry_point("assistant")
        builder.add_conditional_edges("assistant", tools_condition, {"tools": "tools", END: END})
        builder.add_edge("tools", "assistant")
        return builder.compile()


# Step 2: Import Graphs into the Supergraph

    # app.py
    from github_graph import github_team_supervisor
    from sql_graph import sql_agent_team_supervisor

    # Import the compiled graphs
    github_chain = github_team_supervisor()
    sql_chain = sql_agent_team_supervisor()


# Step 3: Define the Supergraph

    from langgraph.graph import StateGraph, START, END

    # Define the supergraph state
    class SuperState(TypedDict):
        messages: Annotated[List[BaseMessage], operator.add]
        next: str

    # Initialize the supergraph
    super_graph = StateGraph(SuperState)

    # Add the imported graphs as nodes
    super_graph.add_node("GithubTeam", github_chain)
    super_graph.add_node("SqlTeam", sql_chain)

    # Add a supervisor node to manage the flow
    super_graph.add_node("super_supervisor", supervisor_node)

    # Define edges
    super_graph.add_edge(START, "super_supervisor")
    super_graph.add_edge("GithubTeam", "super_supervisor")
    super_graph.add_edge("SqlTeam", "super_supervisor")

    # Add conditional edges
    super_graph.add_conditional_edges(
        "super_supervisor",
        lambda x: x["next"],
        {
            "GithubTeam": "GithubTeam",
            "SqlTeam": "SqlTeam",
            "FINISH": END,
        },
    )

    # Compile the supergraph
    super_graph = super_graph.compile()



# Step 4: Run the Supergraph

    # Example user input
    user_input = "Find GitHub users in India and query their data from the SQL database."

    # Invoke the supergraph
    result = super_graph.invoke(input={"messages": [HumanMessage(content=user_input)]})
    print(result["messages"][-1].content)



