In [None]:
import functools
import operator
import os
import warnings
from datetime import datetime
from typing import Annotated
from typing import Sequence, TypedDict

import requests
from langchain.agents import create_openai_tools_agent, AgentExecutor
from langchain_core.messages import BaseMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool
from langchain_experimental.tools import PythonREPLTool
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END

import chat_apps.auth_keys as auth_keys
import chat_apps.myconfig as myconfig

warnings.filterwarnings("ignore")

os.environ["OPENAI_API_KEY"] = auth_keys.openai_api_key
os.environ["OPENWEATHERMAP_API_KEY"] = auth_keys.openweather_api_key

os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = auth_keys.langchain_api_key
os.environ["LANGCHAIN_PROJECT"] = "multi_agent"

# State

In [None]:
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    next: str

In [None]:
llm = ChatOpenAI(model='gpt-4o', temperature=0)

# Tools

In [None]:
python_repl_tool = PythonREPLTool()

In [None]:
@tool("live_data")
def get_live_data():
    """Retrieves live data of the solar system"""
    base_url = myconfig.url_to_raspberry_rest_api

    try:
        response = requests.get(base_url)
        return response.json()
    except:
        return "There was an error retrieving the data."


@tool("summed_historic_data")
def get_summed_historic_data():
    """Retrieves the summed up historic solar data"""
    base_url = myconfig.url_summed_up_data
    response = requests.get(base_url)

    if response.status_code == 200:
        data = response.json()
        output_string = "Energy Historic Data last three days: \n"
        for idx, entry in enumerate(data):
            date = entry['date']
            consumption_positive = entry['consumption_positive']
            grid_negative = entry['grid_negative']
            grid_positive = entry['grid_positive']
            production_positive = entry['production_positive']

            if idx == len(data) - 1:
                date_description = datetime.strptime(date, "%Y-%m-%d").strftime("%d.%m.%Y") + " (today)"
            else:
                date_description = datetime.strptime(date, "%Y-%m-%d").strftime("%d.%m.%Y")

            entry_string = f"""
    Date: {date_description}
    - Consumption Positive: {consumption_positive}
    - Grid Negative: {grid_negative}
    - Grid Positive: {grid_positive}
    - Production Positive: {production_positive}
        """
            output_string += entry_string
        output_string += "\n \nGrid Positive is how much was drawn from the grid. \nGrid negative is how much was fed into the grid."
        return output_string
    else:
        return "There was an error retrieving the data."


@tool("weather_forecaster")
def get_weather_forecast():
    """Retrieves the current weather and the forecast for the next 3 days."""
    base_url = "https://api.openweathermap.org/data/2.5/forecast/daily"
    params = {
        'lat': '49.300652',
        'lon': '10.571460',
        'appid': auth_keys.openweather_api_key,
        'units': 'metric'
    }

    response = requests.get(base_url, params=params)
    print(response.url)
    if response.status_code == 200:
        data = response.json()
        export_data = {}
        for i in range(4):
            export_data[i] = {
                "date": data["list"][i]["dt"],
                "temp": data["list"][i]["temp"]["day"],
                "weather": data["list"][i]["weather"][0]["main"],
                "clouds": data["list"][i]["clouds"],
            }

        output_string = f"""
Weather Forecast
Today's Forecast:
Date: {datetime.utcfromtimestamp(data["list"][0]["dt"]).strftime('%Y-%m-%d')}
Temperature: {export_data[0]["temp"]} °C
Weather: {export_data[0]["weather"]}
Cloud Coverage: {export_data[0]["clouds"]}%

Next 3 Days:
Day 1 - {datetime.utcfromtimestamp(data["list"][1]["dt"]).strftime('%Y-%m-%d')}
Temperature: {export_data[1]["temp"]} °C
Weather: {export_data[1]["weather"]}
Cloud Coverage: {export_data[1]["clouds"]}%

Day 2 - {datetime.utcfromtimestamp(data["list"][2]["dt"]).strftime('%Y-%m-%d')}
Temperature: {export_data[2]["temp"]} °C
Weather: {export_data[2]["weather"]}
Cloud Coverage: {export_data[2]["clouds"]}%

Day 3 - {datetime.utcfromtimestamp(data["list"][3]["dt"]).strftime('%Y-%m-%d')}
Temperature: {export_data[3]["temp"]} °C
Weather: {export_data[3]["weather"]}
Cloud Coverage: {export_data[3]["clouds"]}%
"""
        return output_string
    else:
        return "There was an error retrieving the data."


# Helper Utils

In [None]:
def create_agent(llm: ChatOpenAI, tools: list, system_prompt: str):
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                system_prompt,
            ),
            MessagesPlaceholder(variable_name="messages"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ]
    )
    agent = create_openai_tools_agent(llm, tools, prompt)
    executor = AgentExecutor(
        agent=agent,
        tools=tools,
    )
    return executor

In [None]:
def agent_node(state: AgentState, agent: AgentExecutor, name: str):
    result = agent.invoke(state)
    if name == "Energy optimizer":
        return {"messages": [HumanMessage(content=result["output"])]}
    else:
        return {"messages": [HumanMessage(content=name + ' says: \n' + result["output"])]}


def weather_state_update(state: AgentState, agent: AgentExecutor, name: str):
    print("weather_state_update called")
    result = agent.invoke(state)
    
    updated_content = (
        f"{state.get('messages')[0].content}\n"
        "____additional information____\n\n"
        f"{result['output']}\n"
        "The data has successfully been retrieved."
    )
    
    return {
        "messages": [HumanMessage(content=updated_content)],
        "next": "supervisor",
        "intermediate_steps": [(name, str(result))]
    }

In [None]:
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser

analyzers = ["Weather Retriever", "Coder"]
nodes = analyzers + ["Energy optimizer"]
system_prompt = (
    f"""You are the Supervisor. Your task is to manage the conversation between the nodes and decide which node should be called next.

Follow these guidelines:

If the user asks about the current weather or a weather forecast, route the request to the Weather Retriever.
If the user requests data on energy production, consumption, or grid interaction of the solar panel system, or any visualization route the request to the Coder.
If the user needs specific optimization suggestions regarding energy usage, route the request to the Energy optimizer.
If the user's request does not clearly fit into any of the above categories, decide based on the context which node can provide the most appropriate response. If you can't decide chose the Energy optimizer.
If the question has nothing to do with any topic or no additional data is required select Energy optimizer
If there is a question about any energy usage intense task check the weather to plan that task
If additional data is required select one of: {analyzers}

You can chose between {nodes}
     """
)
options = nodes

function_def = {
    "name": "route",
    "description": "Select the next role.",
    "parameters": {
        "title": "routeSchema",
        "type": "object",
        "properties": {
            "next": {
                "title": "Next",
                "anyOf": [
                    {"enum": options},
                ],
            }
        },
        "required": ["next"],
    },
}

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_prompt),
        MessagesPlaceholder(variable_name="messages"),
        (
            "system",
            "Given the conversation above, who should act next?"
            "If the question has nothing to do with any topic select Energy optimizer "
            "If no additional data is required to answer the question select Energy optimizer "
            "If additional data is required select one of: {options}",
        ),
    ]
).partial(options=str(options), members=", ".join(nodes))

supervisor_chain = (
        prompt
        | llm.bind_functions(functions=[function_def], function_call="route")
        | JsonOutputFunctionsParser()
)

In [None]:
weather_retriever = create_agent(llm, [get_weather_forecast],
                                 """You are the Weather Retriever. Your task is to provide the current weather and the weather forecast for a predefined location.""")
weather_retriever_node = functools.partial(weather_state_update, agent=weather_retriever, name="Weather Retriever")

code_agent = create_agent(llm, [python_repl_tool, get_summed_historic_data, get_live_data],
                          "You may generate safe Python code to analyze data and generate charts using matplotlib. If your task ist to plot solar data you can request time series data with python from the endpoint <insert url here>, it returns the solar data in csv format. The data covers the last three days and includes the following keys: production (in kWh), grid (in kWh), consumption (in kWh), timestamp, battery_status (in %). The timestamp is formatted as follows: YYYY-MM-DDTHH:MM:SS. \n. Don't use double quotes in the code \n"
                          "  Answer only with your results and never ask follow up questions.")
code_node = functools.partial(agent_node, agent=code_agent, name="Coder")

analyze_agent = create_agent(llm, [],
                             """You are an energy optimizer. You analyze solar and weather data to provide insights on energy usage and optimization. This includes identifying non-optimal energy usage periods, suggesting optimal times for high energy consumption based on solar production and weather forecast, and offering general energy-saving recommendations. Energy from the solar panel is free, so always prioritize power coming from the solar panel. Recommend times where the sun is shining for energy-intensive tasks to utilize the free energy from the solar panel. When analyzing the data, consider the following hierarchy of factors: 
                             Solar Production: Prioritize recommendations based on periods with the highest expected solar energy production.
                             Weather Conditions: Consider weather conditions such as cloud cover and precipitation that affect solar production.
                             Temperature: Suggest energy-intensive tasks during periods with favorable temperatures, if solar production is insufficient.
                             When you receive information from the coder, repeat it and analyze it according to the above factors. Please give tips on how to analyze a visualization if the coder responded with a visualization. Don't interact with the coder or weather retriever, just pass the information as it would be yours.""")
analyze_node = functools.partial(agent_node, agent=analyze_agent, name="Energy optimizer")

In [None]:
graph = StateGraph(AgentState)

graph.add_node("Weather Retriever", weather_retriever_node)
graph.add_node("Coder", code_node)
graph.add_node("Energy optimizer", analyze_node)
graph.add_node("supervisor", supervisor_chain)

In [None]:
for analyzer in analyzers:
    graph.add_edge(analyzer, "supervisor")

conditional_map = {k: k for k in analyzers}
conditional_map["Energy optimizer"] = "Energy optimizer"
graph.add_conditional_edges("supervisor", lambda x: x["next"], conditional_map)

graph.add_edge("Energy optimizer", END)

graph.set_entry_point("supervisor")

In [None]:
graph = graph.compile()

In [None]:
from IPython.display import Image

Image(graph.get_graph().draw_mermaid_png())

In [None]:
config = {"recursion_limit": 10}
output = ""
for s in graph.stream(
        {
            "messages": [HumanMessage(
                "Give me a summary of my solar data.")]
        }, config=config
):
    print(s)
    output = s

In [None]:
print(output['Energy optimizer']['messages'][0].content)