# Graph that calculates the footprint

Let's start by having all agents share the same state. We're going to have the following nodes

* Planner
* Manufacturing
* Raw materials
* Packaging
* Transportation
* Use phase
* End of life


In [1]:
%load_ext autoreload
%autoreload 2

from dotenv import load_dotenv
from time import sleep
import os

while 'requirements.txt' not in os.listdir():
    os.chdir('..')

# Load environment variables from .env.local file
load_dotenv(dotenv_path='.env.local')

# Update with your name to group your own traces
os.environ['LANGCHAIN_PROJECT'] = 'steve-fap-sandbox'

In [2]:
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from pydantic import Field, BaseModel
from langchain_openai import ChatOpenAI
from state import FootprintState

# Tools

## Page Analyzer

In [3]:
from langchain_core.tools import tool

def page_analyzer(url: str, questions: list[str]) -> list:
    # If the url has not been crawled, crawl and cache it
    # {"brand": "Apple", "category": "cellphone", "description": "An iPhone 15"}
    answers = []
    for question in questions:
        if "packaging" in question:
            answers.append({"question":question, "answer":"A cardboard box"})
        elif "brand" in question:
            answers.append({"question":question, "answer":"Apple"})
        elif "category" in question:
            answers.append({"question":question, "answer":"cellphone"})
        elif "description" in question:
            answers.append({"question":question, "answer":"An Apple iPhone 15 Pro"})
        elif "weight" in question:
            answers.append({"question":question, "answer":0.17})
        else:
            answers.append({"question":question, "answer":f"not found"})
    return answers

@tool
def page_analyzer_tool(url: str, questions: list[str]) -> list:
    """Given a list of questions about a product described at url, returns a
    list of question and answer pairs. If an answer for a question cannot be
    found, it will be "not found"."""
    return page_analyzer(url, questions)


## Material Estimator

In [4]:
def material_estimator(url: str) -> float:
    return [{
        "kgCO2e": 100,
        "description": f"This is a test description for {url}",
        "source": "This is a test source"
    }]

@tool
def material_estimator_tool(url) -> float:
    """For a given product url, returns a list of materials in the product."""
    return material_estimator(url)

#material_estimator("6.12 inch LCD display for a cell phone", "manufacturing")

# Agents

## Planner

In [5]:
from langchain_openai.chat_models import ChatOpenAI
from langgraph.prebuilt import create_react_agent

# The planner orchestrates the work
def planner(state: FootprintState):
    print('Running Planner')

    starting_questions = [
        "What is the brand of the product?",
        "What is the product category?",
        "What is a 1 sentence, factual description of the product?",
        "What is the product's weight?"
    ]
    
    # Call the page analyzer directly to get the high level product details
    high_level_product_details = page_analyzer(state["url"], starting_questions)
    print(high_level_product_details)
    materials = material_estimator(state["url"])
    print(materials)
    
    return {
        "messages": [{"role": "ai", "content": "planner"}],
        "brand": high_level_product_details[0]["answer"],
        "category": high_level_product_details[1]["answer"],
        "description": high_level_product_details[2]["answer"],
    }

planner({"url": "https://www.apple.com/iphone-15-pro/"})

Running Planner
[{'question': 'What is the brand of the product?', 'answer': 'Apple'}, {'question': 'What is the product category?', 'answer': 'cellphone'}, {'question': 'What is a 1 sentence, factual description of the product?', 'answer': 'An Apple iPhone 15 Pro'}, {'question': "What is the product's weight?", 'answer': 0.17}]
[{'kgCO2e': 100, 'description': 'This is a test description for https://www.apple.com/iphone-15-pro/', 'source': 'This is a test source'}]


{'messages': [{'role': 'ai', 'content': 'planner'}],
 'brand': 'Apple',
 'category': 'cellphone',
 'description': 'An Apple iPhone 15 Pro'}

## Manufacturing

In [6]:
# The maufacturing agent estimates the footprint of assembling the product
def manufacturing_phase(state: FootprintState):
    print('Running raw materials and manufacturing')
    sleep(4)
    print('Finished raw materials and manufacturing')
    return {"messages": [{"role": "ai", "content": "manufacturing"}]}

## Summarizer

In [7]:
def summarizer(state: FootprintState):
    print('Running summarizer')
    sleep(3)
    print('Finished summarizer')
    return {"messages": [{"role": "ai", "content": "summarizer"}]}


# Assemble the Analysis Graph

In [8]:
from psycopg_pool import ConnectionPool
from langgraph.checkpoint.postgres import PostgresSaver
from agents.packaging import packaging_phase
from agents.transportation import transportation_phase
from agents.use import use_phase
from agents.eol import eol_phase

# See https://langchain-ai.github.io/langgraph/how-tos/persistence_postgres/
DB_URI = os.environ['DB_DEV_CONNECTION']
connection_kwargs = {
    "autocommit": True,
    "prepare_threshold": 0,
}

model = ChatOpenAI(model_name="o3", temperature=0)

# Setup the building with that state
graph_builder = StateGraph(FootprintState)

graph_builder.add_node("planner", planner)
graph_builder.add_edge(START, "planner")
graph_builder.add_node("manufacturing_phase", manufacturing_phase)
graph_builder.add_node("packaging_phase", packaging_phase)
graph_builder.add_node("transportation_phase", transportation_phase)
graph_builder.add_node("use_phase", use_phase)
graph_builder.add_node("eol_phase", eol_phase)
graph_builder.add_node("summarizer", summarizer)


phases = ["manufacturing_phase", "packaging_phase", "transportation_phase", "use_phase", "eol_phase"]
for phase in phases:
    graph_builder.add_edge("planner", phase)

# By using a list for the first argument, the summarizer knows to wait for all
# phases to finish before running
graph_builder.add_edge(phases, "summarizer")
graph_builder.add_edge("summarizer", END)

# Compile with no checkpointer just to generate the graph image
# Mermaid is having issues...
#from IPython.display import Image
#Image(graph_builder.compile().get_graph().draw_mermaid_png())
graph_builder.compile().get_graph().print_ascii()

                                                              +-----------+                                                              
                                                              | __start__ |                                                              
                                                              +-----------+                                                              
                                                                    *                                                                    
                                                                    *                                                                    
                                                                    *                                                                    
                                                          ****+---------+******                                                          
                                  

In [None]:
config = {"configurable": {"thread_id": "10"}}
with ConnectionPool(
    conninfo=DB_URI,
    max_size=20,
    kwargs=connection_kwargs,
) as pool:
    checkpointer = PostgresSaver(pool)

    #graph = graph_builder.compile(checkpointer=checkpointer)
    # For testing, start over each time and don't use a checkpointer
    graph = graph_builder.compile()
    # res = graph.invoke({
    #     "user_input": "calculate the cradle to gate footprint",
    #     "url": "https://www.apple.com/iphone-15-pro/",
    #     "messages": [("human", "This is a test message")]
    # }, config)
    # checkpoint = checkpointer.get(config)

    events = graph.stream(
        {
            "user_input": "calculate the cradle to gate footprint",
            "url": "https://www.apple.com/iphone-15-pro/",
            "messages": [("human", "This is a test message")]
        },
        config, 
        # See https://langchain-ai.github.io/langgraph/how-tos/streaming/
        stream_mode=["updates", "values"]
    )
    # heads up, some print statements are actually debug prints from the tools and agents
    for mode, event in events:
        # This will stream the LLM calls as tokens are generated (suppressing here, gets messy)
        # if mode == "messages":
        #     print("MESSAGES:")
        #     msg, metadata = event
        #     if msg.content:
        #         print(msg.content, end="", flush=True)
        if mode == "updates":
            print("UPDATE:", event)

        elif mode == "values":
            print("VALUES:", event)

#display(res)
# print(checkpoint)

VALUES: {'messages': [HumanMessage(content='This is a test message', additional_kwargs={}, response_metadata={}, id='cf486573-2c66-437d-aa35-242705072784')], 'user_input': 'calculate the cradle to gate footprint', 'url': 'https://www.apple.com/iphone-15-pro/'}
Running Planner
[{'question': 'What is the brand of the product?', 'answer': 'Apple'}, {'question': 'What is the product category?', 'answer': 'cellphone'}, {'question': 'What is a 1 sentence, factual description of the product?', 'answer': 'An Apple iPhone 15 Pro'}, {'question': "What is the product's weight?", 'answer': 0.17}]
[{'kgCO2e': 100, 'description': 'This is a test description for https://www.apple.com/iphone-15-pro/', 'source': 'This is a test source'}]
UPDATE: {'planner': {'messages': [{'role': 'ai', 'content': 'planner'}], 'brand': 'Apple', 'category': 'cellphone', 'description': 'An Apple iPhone 15 Pro'}}
VALUES: {'messages': [HumanMessage(content='This is a test message', additional_kwargs={}, response_metadata={}

# Testing

In [18]:
from agents.transportation import transportation_phase
from agents.use import use_phase
from agents.eol import eol_phase
from tools.emissions_factors import emissions_factor_finder, epa_ef_finder
#transportation_phase({"brand": "Apple", "category": "cellphone", "description": "An iPhone 15"})
#use_phase({"brand": "Apple", "category": "cellphone", "description": "An iPhone 15"})
#eol_phase({"brand": "Apple", "category": "cellphone", "description": "An iPhone 15"})

out = emissions_factor_finder("electricity usage in Hawaii", "manufacturing")
display(out)
out = epa_ef_finder("electricity usage in Hawaii", "manufacturing")
display(out)

INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


{'kgCO2e': 0.5,
 'units': 'kWh',
 'description': 'The carbon emissions factor for electricity usage in Hawaii during the manufacturing phase. This estimate considers the energy mix in Hawaii, which includes a significant portion of renewable energy sources such as solar and wind, alongside traditional fossil fuels.',
 'citation_desc': "This value is from gpt-4o's parametric knowledge",
 'citation_url': 'N/A'}

INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


kgCO2e=1122.1 units='MWh' description='Total output emission factor for electricity usage in the HICC Miscellaneous subregion, which includes Maui County islands & Hawaiʻi (“Big Island”) grids.'


{'kgCO2e': 1122.1,
 'units': 'MWh',
 'description': 'Total output emission factor for electricity usage in the HICC Miscellaneous subregion, which includes Maui County islands & Hawaiʻi (“Big Island”) grids.',
 'citation_desc': 'The 2025 annual update of the Emission Factors Hub (January 2025) includes updates to emission factors for purchased electricity from eGRID, mobile combustion, upstream and downstream transportation, business travel, product transport, and employee commuting.',
 'citation_url': 'https://www.epa.gov/climateleadership/ghg-emission-factors-hub'}

In [None]:
from langgraph.graph import StateGraph, START, END
from tools.emissions_factors import EFState
from tools.emissions_factors_sources.epa_emissions_factors_hub import epa_ef_finder
from tools.emissions_factors_sources.parametric_knowledge import parametric_knowledge_ef_finder


def source_picker(state:EFState):
    # Remove invalid candidates (those with a negative factor value)
    valid_candidates = [c for c in state["ef_candidates"] if c["CO2e_factor"] >= 0]

    # If there's only one valid candidate, just return it
    if len(valid_candidates) == 1:
        return {"emissions_factor": valid_candidates[0]}
    
    class BestIndex(BaseModel):
        best_index: int = Field(description="The index (starting at zero) of the best emission factor candidate")

    ef_picker_llm = ChatOpenAI(
        model_name="gpt-4o", 
        temperature=0,
    ).with_structured_output(BestIndex)

    sys_prompt = """
    You will be provided with 2 or more CO2 emissions factor values. Evaluate
    the sources and decide which is the best fit for the supplied process and
    analysis phase. Always prefer an emissions factor with a real citation
    over an emissions factor based on parametric knowledge.
    """
    data_prompt = f"Process: {state['process_desc']}, Phase: {state['phase']}\n\n---\n\n"
    data_prompt += "\n\n".join([f"[{i}]: {c}" for i, c in enumerate(valid_candidates)])

    response:BestIndex = ef_picker_llm.invoke([
        {"role": "system", "content": sys_prompt},
        {"role": "user", "content": data_prompt}
    ])

    return {"emissions_factor": valid_candidates[response.best_index]}

builder = StateGraph(EFState)
builder.add_node(source_picker)
sources = [
    parametric_knowledge_ef_finder, 
    epa_ef_finder
]
named_sources = {f"source-{i}": source for i, source in enumerate(sources)}

# Fan out
for name, source in named_sources.items():
    builder.add_node(name, source)
    builder.add_edge(START, name)

# Fan in
builder.add_edge(named_sources.keys(), "source_picker")
builder.add_edge("source_picker", END)

ef_graph = builder.compile()
ef_graph.get_graph().print_ascii()

           +-----------+             
           | __start__ |             
           +-----------+             
           ***        ***            
          *              *           
        **                **         
+----------+           +----------+  
| source-0 |           | source-1 |  
+----------+           +----------+  
           ***        ***            
              *      *               
               **  **                
         +---------------+           
         | source_picker |           
         +---------------+           
                  *                  
                  *                  
                  *                  
            +---------+              
            | __end__ |              
            +---------+              


In [59]:
#ef_graph.invoke({"process_desc": "electricity usage in Hawaii", "phase": "manufacturing"})
#ef_graph.invoke({"process_desc": "pig skin", "phase": "manufacturing"})
ef_graph.invoke({"process_desc": "Heavy duty truck", "phase": "transporation"})

INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"


{'ef_candidates': [{'CO2e_factor': 0.2,
   'units': 'kgCO2e/ton-mile',
   'description': 'The emissions factor for heavy-duty trucks during the transportation phase represents the average carbon dioxide equivalent emissions per ton of freight transported over a mile. This factor accounts for the fuel efficiency, load capacity, and typical operating conditions of heavy-duty trucks.',
   'citation_desc': 'parametric knowledge',
   'citation_url': 'N/A'},
  {'CO2e_factor': 0.2050278,
   'units': 'kgCO2/tonne-mile',
   'description': 'The emissions factor for medium- and heavy-duty trucks during transportation, measured in kg CO2 per short ton-mile.',
   'citation_desc': 'The 2025 annual update of the Emission Factors Hub (January 2025)',
   'citation_url': 'https://www.epa.gov/climateleadership/ghg-emission-factors-hub'}],
 'emissions_factor': {'CO2e_factor': 0.2050278,
  'units': 'kgCO2/tonne-mile',
  'description': 'The emissions factor for medium- and heavy-duty trucks during transport

In [6]:
from langgraph.prebuilt import create_react_agent
from tools.emissions_factors import emissions_factor_finder_tool

packaging_agent = create_react_agent(
    model=ChatOpenAI(model_name="gpt-4o", temperature=0),
    tools=[emissions_factor_finder_tool],
    prompt="Calculate the emissions of a banana, assume it goes by truck from South America to Seattle.",
    name="test_agent"
)

packaging_agent.invoke({})

TOOL: Emissions Factor Finder transportation of goods by truck transport
TOOL: Emissions Factor Finder banana production production


{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_dYHRFGF2I8HLD4ja2RFXv9W9', 'function': {'arguments': '{"process_desc": "transportation of goods by truck", "phase": "transport"}', 'name': 'emissions_factor_finder_tool'}, 'type': 'function'}, {'id': 'call_JdebX0uUkNlH68SgflyyPRLE', 'function': {'arguments': '{"process_desc": "banana production", "phase": "production"}', 'name': 'emissions_factor_finder_tool'}, 'type': 'function'}], 'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 69, 'prompt_tokens': 72, 'total_tokens': 141, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-2024-08-06', 'system_fingerprint': 'fp_47f3ebdcfb', 'id': 'chatcmpl-BVB9avlOeD2kYoc4iuQzSIhGEgq3Z', 'finish_reason': 'tool_calls', 'logprobs': None}, name='test_agent', id='run-82af74