In [31]:
# Imports
import os
import getpass
import base64
from typing import TypedDict, Annotated, Sequence

import operator
from typing import Annotated, Any, Dict, List, Optional, Sequence, TypedDict
import functools

from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph, END
from langgraph.prebuilt import ToolNode
from langgraph.graph import StateGraph, END
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.tools import GooglePlacesTool
from langchain_core.output_parsers.openai_functions import JsonOutputFunctionsParser

# Set API Keys (temp)
os.environ['LANGCHAIN_TRACING_V2'] = 'true'
os.environ['LANGCHAIN_API_KEY'] = getpass.getpass("Enter your Langchain API key: ")
os.environ['TAVILY_API_KEY'] = getpass.getpass("Enter your Tavily API key: ")
os.environ['OPENAI_API_KEY'] = getpass.getpass("Enter your OpenAI API key: ")
os.environ["GPLACES_API_KEY"] = getpass.getpass("Enter your Google Places API key: ")

## Define Tools

In [32]:
# Search tool
search_tool = TavilySearchResults(max_results=3)
places_tool = GooglePlacesTool()
tools = [search_tool, places_tool]

## Define Execution Agent

In [33]:
from langchain import hub
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent

# Get the prompt to use - you can modify this!
prompt = hub.pull("wfh/react-agent-executor")
prompt.pretty_print()

# Choose the LLM that will drive the agent
llm = ChatOpenAI(model="gpt-4o")
agent_executor = create_react_agent(llm, tools, messages_modifier=prompt)


You are a helpful assistant.


[33;1m[1;3m{{messages}}[0m


## Define State

In [34]:
from typing import List, Tuple, Annotated, TypedDict, Union
import operator


class PlanExecute(TypedDict):
    input: str
    images: List
    plan: List[str]
    past_steps: Annotated[List[Tuple], operator.add]
    response: str

In [35]:
# [("user", state["input"])]
# [HumanMessage(content=text_input + image_inputs)]

## Planning Step

In [36]:
from langchain_core.pydantic_v1 import BaseModel, Field
from langchain_core.prompts import ChatPromptTemplate

# Define Plan class
class Plan(BaseModel):
    """Plan to follow in future"""

    steps: List[str] = Field(
        description="different steps to follow, should be in sorted order"
    )

# Define planner chain
planner_sys_message = """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
If the task requires observing the images, include the string "(use_images)" at the end of the task but remain verbose and specific with the full task description. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps."""

planner_prompt = ChatPromptTemplate.from_messages(
    [
        ("system", planner_sys_message),
        ("placeholder", "{messages}"),
    ]
)
planner = planner_prompt | ChatOpenAI(
    model="gpt-4o", temperature=0
).with_structured_output(Plan)

## Re-planning Step

In [37]:
from typing import Union

# Define Response class
class Response(BaseModel):
    """Response to user."""

    response: str

# Define Act class
class Act(BaseModel):
    """Action to perform."""

    action: Union[Response, Plan] = Field(
        description="Action to perform. If you want to respond to user, use Response. "
        "If you need to further use tools to get the answer, use Plan."
    )

# Define replanner chain
replanner_prompt = ChatPromptTemplate.from_template(
    """For the given objective, come up with a simple step by step plan. \
This plan should involve individual tasks, that if executed correctly will yield the correct answer. Do not add any superfluous steps. \
If the task requires observing the images, include the string "(use_images)" at the end of the task but remain verbose and specific with the full task description. \
The result of the final step should be the final answer. Make sure that each step has all the information needed - do not skip steps.

Your objective was this:
{input}

Your original plan was this:
{plan}

You have currently done the follow steps:
{past_steps}

Update your plan accordingly. If no more steps are needed and you can return to the user, then respond with that. Otherwise, fill out the plan. Only add steps to the plan that still NEED to be done. Do not return previously done steps as part of the plan."""
)

replanner = replanner_prompt | ChatOpenAI(
    model="gpt-4o", temperature=0
).with_structured_output(Act)

## Define Graph

In [38]:
from typing import Literal

# Plan step
async def plan_step(state: PlanExecute):
    plan = await planner.ainvoke({"messages": [("user", state["input"])]})
    return {"plan": plan.steps}

# Execute step
async def execute_step(state: PlanExecute):
    plan = state["plan"]
    plan_str = "\n".join(f"{i+1}. {step}" for i, step in enumerate(plan))
    task = plan[0]
    task_formatted = f"""For the following plan:
{plan_str}\n\nYou are tasked with executing step {1}, {task}."""
    messages = [("user", task_formatted)]
    if "use_images" in task:
        messages.append(("user", state["images"]))
    agent_response = await agent_executor.ainvoke(
        {"messages": messages}
    )
    return {
        "past_steps": (task, agent_response["messages"][-1].content),
    }

# Replan step
async def replan_step(state: PlanExecute):
    output = await replanner.ainvoke(state)
    if isinstance(output.action, Response):
        return {"response": output.action.response}
    else:
        return {"plan": output.action.steps}

# Determine if the workflow should end
def should_end(state: PlanExecute) -> Literal["agent", "__end__"]:
    if "response" in state and state["response"]:
        return "__end__"
    else:
        return "agent"

In [39]:
from langgraph.graph import StateGraph

# Define workflow
workflow = StateGraph(PlanExecute)

# Add nodes
workflow.add_node("planner", plan_step)
workflow.add_node("agent", execute_step)
workflow.add_node("replan", replan_step)
workflow.set_entry_point("planner")

# Add edges
workflow.add_edge("planner", "agent")
workflow.add_edge("agent", "replan")
workflow.add_conditional_edges(
    "replan",
    should_end,
)

# Compile
app = workflow.compile()

## Run

In [40]:
# Initial prompts
json_prompt = "Only return a valid json string (RCF8259). Do provide any other commentary. Do not wrap the JSON in markdown such as ```json. Only use the data from the provided content."
prompt_template = """USER: Given a set of streetview images from a vehicle, your task is to determine the \
coordinates from which the picture was taken. It can be anywhere in the world.

Return json with the city and coordinates following the below example. {json_prompt}
output={{"city": "Orland Park, IL, 60467, USA", "latitude": "42.0099", "longitude": "-87.62317"}}

AGENT: output="""

text_prompt = prompt_template.format(json_prompt=json_prompt)

# Function to encode the image
def encode_image(image_path):
  with open(image_path, "rb") as image_file:
    return base64.b64encode(image_file.read()).decode('utf-8')

# Path to your image
id = "103"
image_dir = f'./data/{id}/'
directions = ["north"] #, "south", "east", "west"]
image_inputs = []
for direction in directions:
    base64_img = encode_image(f"{image_dir}{direction}.png")
    image_input = {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_img}"}}
    image_inputs.append(image_input)

# Finalize inputs
inputs = {"input": text_prompt, "images": image_inputs}
config = {"recursion_limit": 20}

# Run
async for event in app.astream(inputs, config=config):
    for k, v in event.items():
        if k != "__end__":
            print(v)

{'plan': ['Analyze the streetview images to identify any visible landmarks, street signs, or other identifiable features (use_images).', 'Use the identified features to determine the city and coordinates where the picture was taken.']}
{'past_steps': ('Analyze the streetview images to identify any visible landmarks, street signs, or other identifiable features (use_images).', 'In the provided street view image, the following identifiable features are visible:\n\n1. **Street Signage**:\n   - A triangular traffic sign with a bump symbol, indicating a speed bump ahead.\n   - A sign on the left side of the street with French text, possibly indicating construction or parking information ("P P A EN FACE").\n\n2. **Street Markings**:\n   - Bicycle lane markings.\n   - Pedestrian crossing markings.\n\n3. **Construction Site**:\n   - A construction site with signage that includes the name "Bouygues," which is a well-known French construction company.\n\n4. **Buildings and Surroundings**:\n   - 

CancelledError: 