# NASA Mars Missions Q&A with Azure AI Agent
This notebook demonstrates how to:
- Connect to Azure AI Projects using `AIProjectClient`.
- Build an Agent that can search NASA documents about Mars missions.
- Use Bing search as a fallback for grounding.
- Call custom Python functions.
- Present a user-friendly chat UI with Gradio.

> **Prerequisites**:
>
> 1. An Azure AI Projects (preview) resource (your `PROJECT_CONNECTION_STRING`).
> 2. A Bing search connection (your `BING_CONNECTION_NAME`).
> 3. A local folder `nasa-data` with at least one NASA public-domain text file about Mars.
> 4. A `.env` file (optional) containing the relevant environment variables.

Let's begin!

## 1) Install & Import Packages
If you haven't installed these packages, do so with:
```bash
pip install azure-ai-projects azure-identity gradio python-dotenv
```
Then, run the imports below.

In [None]:
import os
import re
from datetime import datetime as pydatetime
from typing import Any, List, Dict

import gradio as gr
from gradio import ChatMessage

from dotenv import load_dotenv
from azure.identity import DefaultAzureCredential
from azure.ai.projects import AIProjectClient
from azure.ai.projects.models import (
    AgentEventHandler,
    RunStep,
    RunStepDeltaChunk,
    ThreadMessage,
    ThreadRun,
    MessageDeltaChunk,
    BingGroundingTool,
    FilePurpose,
    FileSearchTool,
    FunctionTool,
    ToolSet
)

# Load environment variables from .env file
load_dotenv()

## 2) (Optional) Define Custom Python Functions
You might have custom logic, such as looking up upcoming rocket launches or formatting a mission summary. We'll define an example function below.

In a real project, you could have a separate file `mars_functions.py` or something similar. For simplicity, we'll define one inline.

In [None]:
def get_upcoming_rocket_launches():
    """
    A placeholder function that returns a short message about upcoming NASA rocket launches.
    In reality, you'd integrate an official NASA API or a known schedule.
    """
    return "The next NASA rocket launch is scheduled for March 10, 2025, from Kennedy Space Center."  # example data

def format_mission_summary(mission_name: str, highlight: str):
    """
    A placeholder function that returns a formatted string summary about a mission.
    """
    return f"Mission {mission_name}: Key highlight -> {highlight}"  # example data

# We'll combine these into a dictionary so we can wrap them in a FunctionTool.
mars_functions = {
    "get_upcoming_rocket_launches": get_upcoming_rocket_launches,
    "format_mission_summary": format_mission_summary,
}


## 3) Create Azure AI Client
Use `DefaultAzureCredential` (which can work with Azure CLI, VS Code, managed identity, etc.) plus your `PROJECT_CONNECTION_STRING` environment variable.

In [None]:
credential = DefaultAzureCredential()
project_client = AIProjectClient.from_connection_string(
    credential=credential,
    conn_str=os.environ["PROJECT_CONNECTION_STRING"]  # e.g. 'endpoint=https://xxx;accesskey=xxxx'
)
project_client

## 4) Set Up Tools
We'll attempt to connect to Bing and create/reuse a vector store for our NASA docs. 

### 4.1) Bing Grounding Tool
Requires an existing Bing connection name set in your environment variable `BING_CONNECTION_NAME`.

In [None]:
try:
    bing_connection = project_client.connections.get(connection_name=os.environ["BING_CONNECTION_NAME"])
    bing_tool = BingGroundingTool(connection_id=bing_connection.id)
    print("bing > connected successfully")
except Exception as e:
    print("bing > not connected; check your BING_CONNECTION_NAME.")
    print(e)
    bing_tool = None


### 4.2) File Search Tool (Vector Store)
We'll store NASA documents in a vector store named `mars-vector-store`. If it doesn’t exist, we’ll upload documents from `nasa-data/` and create a store.

In [None]:
VECTOR_STORE_NAME = "mars-vector-store"
FOLDER_NAME = "nasa-data"  # local folder with text or PDF docs about NASA / Mars

all_stores = project_client.agents.list_vector_stores().data
existing_store = next((s for s in all_stores if s.name == VECTOR_STORE_NAME), None)

vector_store_id = None
if existing_store:
    vector_store_id = existing_store.id
    print(f"Reusing vector store: {existing_store.name} (id: {vector_store_id})")
else:
    if os.path.isdir(FOLDER_NAME):
        file_ids = []
        for fname in os.listdir(FOLDER_NAME):
            fpath = os.path.join(FOLDER_NAME, fname)
            if os.path.isfile(fpath):
                print(f"Uploading file: {fname}")
                uploaded_file = project_client.agents.upload_file_and_poll(
                    file_path=fpath,
                    purpose=FilePurpose.AGENTS
                )
                file_ids.append(uploaded_file.id)

        if file_ids:
            print(f"Creating vector store from {len(file_ids)} file(s)...")
            vs = project_client.agents.create_vector_store_and_poll(
                file_ids=file_ids,
                name=VECTOR_STORE_NAME
            )
            vector_store_id = vs.id
            print(f"Vector store created: {vs.name} (id: {vector_store_id})")
    else:
        print(f"No folder '{FOLDER_NAME}' found. Create it and add NASA docs.")

file_search_tool = None
if vector_store_id:
    file_search_tool = FileSearchTool(vector_store_ids=[vector_store_id])
    print("file_search_tool > ready")


## 5) Combine Tools Into a ToolSet
We’ll create a `ToolSet` subclass that logs each tool call, then add our tools:
- BingGroundingTool
- FileSearchTool (vector store)
- FunctionTool (our custom Python functions).

In [None]:
class LoggingToolSet(ToolSet):
    def execute_tool_calls(self, tool_calls: List[Any]) -> List[dict]:
        # Before calling the parent logic, log the input arguments
        for call in tool_calls:
            if hasattr(call, 'function') and call.function:
                fn_name = call.function.name
                fn_args = call.function.arguments
                print(f"[Tool Call Start] {fn_name} with args: {fn_args}")

        results = super().execute_tool_calls(tool_calls)

        # Log the output results
        for r in results:
            print(f"[Tool Call Result] {r['output']}")

        return results

custom_functions_tool = FunctionTool(mars_functions)

toolset = LoggingToolSet()
if bing_tool:
    toolset.add(bing_tool)
if file_search_tool:
    toolset.add(file_search_tool)
toolset.add(custom_functions_tool)

# Optional: print out tool details
for t in toolset._tools:
    print(f"Tool: {t.__class__.__name__}")
    for d in t.definitions:
        if hasattr(d, 'function'):
            print(f" - {d.function.name} => {d.function.description}")

## 6) Create (or Reuse) Agent
We'll make an agent named `nasa-mars-agent` using an Azure model (like `gpt-4o`, `gpt-3.5`, etc.). 
If it already exists, we'll reuse it. Otherwise, we'll create a new one with instructions describing the role.

In [None]:
AGENT_NAME = "nasa-mars-agent"
all_agents = project_client.agents.list_agents().data
found_agent = next((a for a in all_agents if a.name == AGENT_NAME), None)

model_deployment = os.environ.get("MODEL_DEPLOYMENT_NAME", "gpt-4o")
system_instructions = (
    "You are a helpful assistant specialized in NASA Mars missions. "
    f"Today is {pydatetime.now().strftime('%Y-%m-%d')}. "
    "You can reference NASA docs in file_search, use Bing for grounding, "
    "and call custom functions like get_upcoming_rocket_launches. "
    "Provide concise, factual, and professional responses."
)

if found_agent:
    print(f"Reusing existing agent: {found_agent.name} (id: {found_agent.id})")
    agent = project_client.agents.update_agent(
        assistant_id=found_agent.id,
        model=found_agent.model,
        instructions=found_agent.instructions,
        toolset=toolset,
    )
else:
    agent = project_client.agents.create_agent(
        model=model_deployment,
        name=AGENT_NAME,
        instructions=system_instructions,
        toolset=toolset
    )
    print(f"Created new agent: {agent.name} (id: {agent.id})")

agent

## 7) Create a Conversation Thread
A conversation *thread* is like a chat session with the agent. We can have multiple threads for different chat sessions.

In [None]:
thread = project_client.agents.create_thread()
print(f"Created thread with id: {thread.id}")
thread

## 8) Define a Custom Event Handler for Streaming
We'll implement `AgentEventHandler` to see partial responses, which helps for debugging. 
For production, you might skip or simplify the console logging.

In [None]:
class MyEventHandler(AgentEventHandler):
    def __init__(self):
        super().__init__()
        self._current_message_id = None
        self._accumulated_text = ""

    def on_message_delta(self, delta: MessageDeltaChunk) -> None:
        # If a new message, we start fresh
        if delta.id != self._current_message_id:
            if self._current_message_id is not None:
                print()  # new line from the previous message
            self._current_message_id = delta.id
            self._accumulated_text = ""
            print("\nAssistant says:", end=" ")

        # Accumulate partial text
        partial_text = ""
        if delta.delta.content:
            for chunk in delta.delta.content:
                partial_text += chunk.text.get("value", "")

        self._accumulated_text += partial_text
        print(partial_text, end="", flush=True)

    def on_thread_message(self, message: ThreadMessage) -> None:
        if message.status == "completed" and message.role == "assistant":
            print("\n[Assistant message complete]\n")
            self._current_message_id = None
            self._accumulated_text = ""

    def on_thread_run(self, run: ThreadRun) -> None:
        print(f"\nThreadRun status: {run.status}")
        if run.status == "failed":
            print(f"[Error] {run.last_error}")

    def on_run_step(self, step: RunStep) -> None:
        print(f"[RunStep] {step.type} => {step.status}")

    def on_run_step_delta(self, delta: RunStepDeltaChunk) -> None:
        pass  # You could handle partial tool calls here.

    def on_unhandled_event(self, event_type: str, event_data):
        pass  # for debugging any unexpected events

    def on_error(self, data: str) -> None:
        print(f"[Stream Error] {data}")

    def on_done(self) -> None:
        print("[Streaming done]")

## 9) Main Chat Function for Gradio
We'll define a function `mars_agent_chat` that:
1. Takes user input and existing chat history (as lists of `dict` messages in Gradio format).
2. Sends the user message to the agent.
3. Streams partial responses back (including function/tool calls), building an updated conversation.

We then yield partial updates to the Gradio UI in real-time.

In [None]:
def _convert_dict_to_chatmessage(msg: dict) -> ChatMessage:
    return ChatMessage(role=msg["role"], content=msg["content"], metadata=msg.get("metadata", None))

def mars_agent_chat(user_message: str, history: List[dict]):
    # Convert Gradio history to ChatMessage objects
    conversation = [_convert_dict_to_chatmessage(m) for m in history]

    # Add the new user message
    conversation.append(ChatMessage(role="user", content=user_message))
    yield (conversation, "")  # Show the user message and clear the input box

    # Post the user's message to the Azure AI thread
    project_client.agents.create_message(
        thread_id=thread.id,
        role="user",
        content=user_message
    )

    # We'll track partial tool calls or partial text. This is done by streaming events below.

    with project_client.agents.create_stream(
        thread_id=thread.id,
        assistant_id=agent.id,
        event_handler=MyEventHandler()
    ) as stream:
        for item in stream:
            event_type, event_data, *_ = item

            # 1) If the agent is sending partial text, handle it
            if event_type == "thread.message.delta":
                text_delta = ""
                for chunk in event_data["delta"]["content"]:
                    text_delta += chunk["text"].get("value", "")

                # If the last message is an assistant bubble, append partial text
                if conversation and conversation[-1].role == "assistant":
                    conversation[-1].content += text_delta
                else:
                    # create a new assistant message bubble
                    conversation.append(ChatMessage(role="assistant", content=text_delta))
                yield (conversation, "")

            # 2) If entire assistant message is completed
            elif event_type == "thread.message" and event_data["role"] == "assistant" and event_data["status"] == "completed":
                # finalize that bubble
                yield (conversation, "")

            # 3) If the conversation is done
            elif event_type == "thread.message.completed":
                yield (conversation, "")
                break

    return (conversation, "")

## 10) Build a Gradio UI
We'll build a chat interface that:
1. Shows example questions (e.g. “What is NASA’s plan for Mars sample return?”)
2. Lets the user type their own question.
3. Displays the conversation with the agent in real-time.
4. Has a “Clear” button to reset the thread.


In [None]:
brand_theme = gr.themes.Default(
    primary_hue="blue",
    secondary_hue="blue",
    neutral_hue="gray",
).set(
    button_primary_background_fill="#005EB8",
    button_primary_text_color="#FFFFFF",
)

with gr.Blocks(theme=brand_theme, css="footer {visibility: hidden;}") as demo:
    gr.Markdown("# NASA Mars Missions Chatbot")
    gr.Markdown("Ask me about NASA's Mars missions! I can also do Bing searches and call custom functions.")

    def clear_conversation():
        global thread
        thread = project_client.agents.create_thread()
        return []

    chatbot = gr.Chatbot(
        type="messages",
        examples=[
            {"text": "What is the Perseverance rover?"},
            {"text": "When is the next NASA rocket launch?"},
            {"text": "Summarize the Mars Sample Return mission."},
        ]
    )
    
    textbox = gr.Textbox(placeholder="Ask about NASA Mars...")

    # On example select, fill the textbox
    def on_example(evt: gr.SelectData):
        return evt.value["text"]

    chatbot.example_select(fn=on_example, inputs=None, outputs=textbox)

    # Link the textbox submit to mars_agent_chat
    (textbox
     .submit(fn=mars_agent_chat, inputs=[textbox, chatbot], outputs=[chatbot, textbox])
     .then(fn=lambda: "", outputs=textbox)
    )

    # Clear button
    btn_clear = gr.Button("Clear Chat")
    btn_clear.click(fn=clear_conversation, inputs=None, outputs=chatbot)

demo.launch()


## 11) Optional: Resource Cleanup
If you created resources just for demo (agent, thread, vector store) and want to delete them, you can do so here.

In [None]:
# # Uncomment if you want to clean up everything.
# # WARNING: This is permanent.
#
# try:
#     project_client.agents.delete_agent(agent.id)
#     print("Agent deleted.")
# except Exception as e:
#     print("Could not delete agent:", e)
#
# try:
#     project_client.agents.delete_thread(thread.id)
#     print("Thread deleted.")
# except Exception as e:
#     print("Could not delete thread:", e)
#
# if vector_store_id:
#     try:
#         project_client.agents.delete_vector_store(vector_store_id)
#         print("Vector store deleted.")
#     except Exception as e:
#         print("Could not delete vector store:", e)