In [3]:
# This file is part of astrochat.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (https://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""Demo module illustrating an LSST-style Python code layout.

This module uses DSPy for LLM-based orchestration, supports memory
of user–assistant interactions, implements RAG (Retrieval Augmented
Generation) to fetch observational data, and runs Python code via a
ProgramOfThought.

Notes
-----
- All docstrings, including these module docstrings, follow the LSST
  style constraints for docstrings (Numpydoc).
- Comments are kept within 79 characters per line.
"""

__all__ = [
    "UserInteraction",
    "ConversationContext",
    "ResponseWithContext",
    "respond_cot",
    "getObservingData",
    "retrieve_observing_metadata",
    "ProgramOfThought",
    "Tool",
    "rag_tool",
    "python_tool",
    "dummy_tool",
    "Worker",
    "rag_worker",
    "python_worker",
    "Boss"
]

import logging
import os
import re

from typing import List, Optional, Callable

import pandas as pd
from pydantic import BaseModel

import dspy
from dspy import (
    Signature,
    ChainOfThought,
    Module,
    settings
)
#from dspy.models import LM
from dspy.primitives.python_interpreter import CodePrompt, PythonInterpreter

LOG = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

#with open('/Users/cmorales/.openaikey.txt', 'r') as file:
#    api_key = file.read().strip()

lm = dspy.LM(# 'ollama_chat/tulu3:70b', 
             # 'ollama_chat/olmo2:13b-1124-instruct-fp16',
             # 'ollama_chat/olmo2:13b',
             # 'ollama_chat/olmo2:7b',
             # 'ollama_chat/phi4:14b-fp16',
             # 'ollama_chat/dolphin3:latest',
             # 'ollama_chat/llama3.3:70b-instruct-q2_K',
             # 'ollama_chat/ALIENTELLIGENCE/cybersecuritymonitoring:latest',
             # 'ollama_chat/ALIENTELLIGENCE/whiterabbitv2:latest',
             # 'ollama_chat/TULU_131k_T08_70b:latest',
             # 'ollama_chat/COT_32k_T03_qwq:latest',
             # 'ollama_chat/qwq:latest',
             # 'ollama_chat/qwq:32b',
             # 'ollama_chat/tulu3:8b',
             'ollama_chat/tulu3:70b',
             # 'ollama_chat/hermes3:3b',
             # 'ollama_chat/hermes3:8b',
             # 'ollama_chat/hermes3:70b',
             # 'ollama_chat/hermes3:405b',
             # 'ollama_chat/llama3.3:70b',
             # 'ollama_chat/llama3.2-vision:90b',
             # 'ollama_chat/llama3.2:3b',
             # 'ollama_chat/qwen2.5:0.5b',
             # 'ollama_chat/qwen2.5:32b',
             # 'ollama_chat/qwen2.5:72b',
             # 'ollama_chat/qwen2.5-coder:32b',
             # 'ollama_chat/nemotron:70b',
             # 'ollama_chat/snowflake-arctic-embed:latest',
             # 'ollama_chat/nomic-embed-text:latest',
             
             api_base='https://5bbc-200-29-147-35.ngrok-free.app', api_key='') #api_key should be empty
# lm = dspy.LM('ollama_chat/tulu3:8b', api_base='http://localhost:11434', api_key='') #api_key should be empty
# lm = LM("openai/gpt-4o", api_key=api_key)

settings.configure(lm=lm)


class UserInteraction:
    """Represents a single user–assistant interaction.

    Parameters
    ----------
    message : `str`
        The user's message.
    response : `str`
        The assistant's response.
    """

    def __init__(self, message: str, response: str):
        self.message = message
        self.response = response

    def serialize_by_role(self) -> List[dict]:
        """Return the interaction in a format suitable for LLM prompts.

        Returns
        -------
        serialized : `list` of `dict`
            List of role/content dicts for user and assistant.
        """
        return [
            {"role": "user", "content": self.message},
            {"role": "assistant", "content": self.response}
        ]


class ConversationContext:
    """Stores and formats recent conversation interactions as context.

    Parameters
    ----------
    window_size : `int`, optional
        Number of recent interactions to keep in memory (default 5).
    """

    def __init__(self, window_size: int = 5): 
        self.window_size = window_size
        self.content: List[UserInteraction] = []

    def update(self, interaction: UserInteraction) -> None:
        """Append a new interaction and trim the conversation to `window_size`.

        Parameters
        ----------
        interaction : `UserInteraction`
            The new user–assistant interaction to store.
        """
        self.content.append(interaction)
        self.content = self.content[-self.window_size:]

    def render(self) -> str:
        """Return a string of the most recent interactions as context.

        Returns
        -------
        context_str : `str`
            Conversation text from the most recent interactions.
        """
        out = []
        for entry in self.content:
            out.append(
                f"User: {entry.message}\n\nAssistant: {entry.response}\n\n"
            )
        return "".join(out)


class ResponseWithContext(Signature):
    """Signature requiring context, user message, and RAG context.

    Attributes
    ----------
    context : `str`
        Conversation context.
    message : `str`
        The user's message.
    rag_context : `str`
        Information from retrieval-augmented generation (RAG).
    response : `str`
        The contextualized response (output).
    """
    context = dspy.InputField(desc="Conversation context")
    message = dspy.InputField(desc="User message")
    rag_context = dspy.InputField(
        desc="Relevant information from external or observational data"
    )
    response = dspy.OutputField(desc="Contextualized response")


respond_cot = ChainOfThought(ResponseWithContext)


def getObservingData(dayObs: Optional[int] = None) -> pd.DataFrame:
    """Obtain a DataFrame of observational data (dummy example).

    Parameters
    ----------
    dayObs : `int` or `None`, optional
        A day-identifier for observational data. If None, default to
        a placeholder value.

    Returns
    -------
    df : `pandas.DataFrame`
        A DataFrame sorted by its index with columns from the JSON file.
        May be empty if file is not found or an error occurs.
    """
    localDayObs = 20230913  # dummy placeholder
    if dayObs is None:
        dayObs = localDayObs

    filename = f"/home/c/carlosm/lsst/summit_extras/python/lsst/summit/extras/dayObs_{dayObs}.json"
    if not os.path.exists(filename):
        LOG.warning("File not found: %s", filename)
        return pd.DataFrame()

    try:
        df = pd.read_json(filename).T
        # Drop columns starting with underscore
        df = df.drop(
            [col for col in df.columns if col.startswith("_")],
            axis=1,
            errors="ignore"
        )
        return df.sort_index()
    except Exception as exc:
        LOG.error("Error reading %s: %s", filename, exc)
        return pd.DataFrame()


def retrieve_observing_metadata(query: str, dayObs: Optional[int] = None) -> str:
    """Filter a DataFrame by a query and return matching rows.

    Parameters
    ----------
    query : `str`
        Substring to search within the DataFrame's string representation.
    dayObs : `int` or `None`, optional
        Observation day identifier. Defaults to None for placeholder data.

    Returns
    -------
    result_str : `str`
        String representation of matching rows, or a message if none found.
    """
    df = getObservingData(dayObs)
    if df.empty:
        return "No observing data available."

    # Filter rows containing `query`
    result = df[df.apply(
        lambda row: query.lower() in row.to_string().lower(), axis=1
    )]
    if result.empty:
        return "No relevant data found."
    return result.to_string(index=False)


class ProgramOfThought(Module):
    """Generates and executes Python code iteratively up to `max_iters`.

    This uses a `PythonInterpreter` with an import whitelist
    to control allowed packages.

    Parameters
    ----------
    signature : `dspy.Signature`
        DSpy signature specifying input/output fields.
    max_iters : `int`, optional
        Maximum number of code generation attempts (default 3).
    """

    def __init__(self, signature: Signature, max_iters: int = 3):
        super().__init__()
        self.signature = signature
        self.max_iters = max_iters
        self.interpreter = PythonInterpreter(
            action_space={"print": print},
            import_white_list=["numpy", "astropy", "sympy", "matplotlib"]
        )

    def forward(self, question: str, context_str: str = "",
                rag_context: str = "", variables: dict = None) -> dict:
        """Generate and execute Python code for `question`.

        Parameters
        ----------
        question : `str`
            The user question or request to be solved via Python code.
        context_str : `str`, optional
            Conversation context to include in code generation.
        rag_context : `str`, optional
            Data from RAG to include in code generation.
        variables : `dict`, optional
            Additional variables or context.

        Returns
        -------
        result_dict : `dict`
            Dictionary with key "answer" containing the code execution
            result or None if unsuccessful.
        """
        if variables is None:
            variables = {}

        for iteration in range(self.max_iters):
            generated_code = self._generate_code(question, context_str,
                                                 rag_context)
            print(f"\nIteration {iteration + 1}: Generated Code:\n"
                  f"{generated_code}")

            try:
                result, _ = CodePrompt(generated_code).execute(
                    self.interpreter, user_variable=variables
                )
                print("Execution Result:", result)
                return {"answer": result}
            except Exception as exc:
                LOG.error("Execution Error: %s", exc)

        LOG.error("Failed to generate valid code after multiple attempts.")
        return {"answer": None}

    def _generate_code(self, question: str, context_str: str,
                       rag_context: str) -> str:
        """Helper method to produce Python code via LLM.

        Parameters
        ----------
        question : `str`
            The user question or request.
        context_str : `str`
            Relevant conversation context.
        rag_context : `str`
            Retrieved data (RAG).

        Returns
        -------
        code : `str`
            Cleaned and ready-to-run Python code.
        """
        code_signature = dspy.Signature("question -> generated_code")
        predict = dspy.Predict(code_signature)

        prompt = (
            "You are an AI that writes Python code to solve problems.\n"
            "Below is some conversation context and retrieved metadata:\n\n"
            f"Context:\n{context_str}\n\n"
            f"Observing Metadata:\n{rag_context}\n\n"
            "Now, generate Python code to solve the following problem.\n"
            "Do not include any explanations or markdown.\n"
            "Only output executable Python code.\n\n"
            f"Question: {question}\n\nCode:\n"
        )
        completion = predict(question=prompt)
        code_block = re.sub(r"```[^\n]*\n", "", completion.generated_code)
        code_block = re.sub(r"```", "", code_block)
        return code_block


class Tool(BaseModel):
    """Base template for Tools used by Worker modules.

    Parameters
    ----------
    name : `str`
        Name of the tool.
    description : `str`
        Short description of the tool's function.
    requires : `str`
        Description of the required input parameter(s).
    func : `Callable`
        The callable used to perform the tool's operation.
    """

    name: str
    description: str
    requires: str
    func: Callable


rag_tool = Tool(
    name="rag retrieval",
    description="Search observation data for a query and return results.",
    requires="query",
    func=lambda q: retrieve_observing_metadata(q)
)

code_tool_signature = dspy.Signature("question -> answer")
code_program = ProgramOfThought(signature=code_tool_signature, max_iters=3)

python_tool = Tool(
    name="python executor",
    description=("Generate and execute Python code with context, "
                 "using ProgramOfThought."),
    requires="question",
    func=lambda question_dict: code_program(
        question=question_dict.get("question", ""),
        context_str=question_dict.get("context", ""),
        rag_context=question_dict.get("rag_context", ""),
        variables=question_dict.get("variables", {})
    )
)

dummy_tool = Tool(
    name="dummy",
    description="Example tool for future extension.",
    requires="some_argument",
    func=lambda arg: f"Dummy tool called with argument: {arg}"
)


class Worker(Module):
    """Generic worker that can plan tasks, decide on tools, and execute them.

    Parameters
    ----------
    role : `str`
        Role or name of the worker (e.g., "rag_worker").
    tools : `list` of `Tool`
        List of available Tool instances.
    """

    def __init__(self, role: str, tools: List[Tool]):
        self.role = role
        self.tools = {t.name: t for t in tools}
        self._plan = ChainOfThought("task, context -> proposed_plan")
        self._use_tool = ChainOfThought("task, context -> tool_name, tool_argument")

    def plan(self, task: str, feedback: str = "") -> str:
        """Generate a textual plan for the given `task`.

        Parameters
        ----------
        task : `str`
            Task description or goal.
        feedback : `str`, optional
            Feedback from a supervisor or orchestration component.

        Returns
        -------
        plan_text : `str`
            Proposed plan as text.
        """
        context = (
            f"Worker role: {self.role}\n"
            f"Tools: {list(self.tools.keys())}\n"
            f"Feedback:\n{feedback}"
        )
        result = self._plan(task=task, context=context)
        return result.proposed_plan

    def execute(self, task: str, use_tool: bool, context: str = "") -> str:
        """Execute the `task`, optionally using a tool.

        Parameters
        ----------
        task : `str`
            The task or instruction to execute.
        use_tool : `bool`
            Whether to use a tool or not.
        context : `str`, optional
            Plan or context to pass to the tool decision logic.

        Returns
        -------
        outcome : `str`
            Result of the execution or an error message.
        """
        print(f"[{self.role}] Executing: {task}")
        if not use_tool:
            return f"Task '{task}' completed without tools."

        tool_decision = self._use_tool(task=task, context=context)
        tool_name = tool_decision.tool_name
        arg = tool_decision.tool_argument

        if tool_name in self.tools:
            return self.tools[tool_name].func(arg)

        return f"Tool not found: '{tool_name}'."


rag_worker = Worker("rag_worker", tools=[rag_tool, dummy_tool])
python_worker = Worker("python_worker", tools=[python_tool, dummy_tool])


class Boss(Module):
    """Orchestrates tasks by assigning them to Workers and managing their plans.

    Parameters
    ----------
    workers : `list` of `Worker`
        List of workers available to the boss.
    """

    def __init__(self, workers: List[Worker]):
        self.workers = {w.role: w for w in workers}
        self._assign = ChainOfThought("task -> who")
        self._approve = ChainOfThought("task, plan -> approval")

    def plan_and_execute(self, task: str, worker_hint: str = "",
                         use_tool: bool = True) -> str:
        """Decide which worker to use, generate and approve a plan, then execute.

        Parameters
        ----------
        task : `str`
            The goal or instruction to achieve.
        worker_hint : `str`, optional
            Suggestion for which worker to assign (bypasses assignment logic).
        use_tool : `bool`, optional
            Whether the chosen worker should use a tool (default True).

        Returns
        -------
        exec_result : `str`
            The result of the worker's execution.
        """
        if worker_hint and worker_hint in self.workers:
            assignee = worker_hint
        else:
            assign_result = self._assign(task=task)
            assignee = assign_result.who

        if assignee not in self.workers:
            return f"No worker found with role '{assignee}'."

        worker = self.workers[assignee]
        plan_text = worker.plan(task, feedback="N/A")
        print(f"Proposed Plan: {plan_text}")

        approval_res = self._approve(task=task, plan=plan_text)
        is_approved = "yes" in approval_res.approval.lower()
        if not is_approved:
            # TODO: DM-12345 Possibly refine or iterate plan if disapproved
            print("Plan was not approved. Adjusting...")

        context = f"Plan: {plan_text}\nBoss: Approved"
        exec_result = worker.execute(task, use_tool=use_tool, context=context)
        return exec_result


def main():
    """Demo main execution function for illustration."""
    boss = Boss(workers=[rag_worker, python_worker])

    # Example 1: Retrieve data about 'spectra' using RAG
    print("\n=== EXAMPLE 1: RAG ===")
    result_1 = boss.plan_and_execute(
        task="Find observation data about spectra.",
        worker_hint="rag_worker",
        use_tool=True
    )
    print("Final RAG Result:", result_1)

    # Example 2: Run Python code to compute potential energy
    print("\n=== EXAMPLE 2: Python Code Execution ===")
    result_2 = boss.plan_and_execute(
        task="Calculate the potential energy of an object with mass=10kg "
             "at a height=5m.",
        worker_hint="python_worker",
        use_tool=True
    )
    print("Final Code Execution Result:", result_2)


if __name__ == "__main__":
    main()


=== EXAMPLE 1: RAG ===
Proposed Plan: 1. Access MAST through their website: https://mast.stsci.edu/
2. Use the search bar to input keywords related to spectra observations, such as "spectra," "emission lines," or specific objects of interest.
3. Filter results by selecting appropriate filters like wavelength range, object type, and instrument used for observation.
4. Review the list of available datasets and select those that match your criteria.
5. Download the data files for further analysis.
Plan was not approved. Adjusting...
[rag_worker] Executing: Find observation data about spectra.
Final RAG Result: Tool not found: 'web_browser'.

=== EXAMPLE 2: Python Code Execution ===
Proposed Plan: 1. Define the mass, height, and acceleration due to gravity.
2. Use the potential energy formula PE = mgh to calculate the potential energy.
3. Print out the result.

Here is a Python code snippet that accomplishes this:

```python
# Define given values
mass = 10  # in kg
height = 5  # in meters