In [14]:
# %pip install langfuse wikipedia openai google-search-results

In [15]:
# Step 1: Load Configuration and Dependencies

import json

# Load sensitive config from config.json
with open('config.json', 'r') as f:
    config = json.load(f)

# Set notebook variables (these should match what you set in your notebook)
temperature = 0.0
verbose = True
use_langfuse = True
model_name = "gpt-4.1-2025-04-14"
username = "Shreyashgupta5"
code_link = "https://huggingface.co/spaces/Shreyashgupta5/ai_agents_course"
api_base_url = "https://agents-course-unit4-scoring.hf.space"

# Print to verify
print("Config loaded. Sensitive keys available for use.")
print("Notebook variables set:")
print(f"  model_name: {model_name}")
print(f"  temperature: {temperature}")
print(f"  verbose: {verbose}")
print(f"  use_langfuse: {use_langfuse}")
print(f"  username: {username}")
print(f"  code_link: {code_link}")
print(f"  api_base_url: {api_base_url}")

Config loaded. Sensitive keys available for use.
Notebook variables set:
  model_name: gpt-4.1-2025-04-14
  temperature: 0.0
  verbose: True
  use_langfuse: True
  username: Shreyashgupta5
  code_link: https://huggingface.co/spaces/Shreyashgupta5/ai_agents_course
  api_base_url: https://agents-course-unit4-scoring.hf.space


In [16]:
# Step 2: Configure Langfuse Decorator-Based Client

from langfuse.decorators import langfuse_context

langfuse_context.configure(
    secret_key=config["langfuse_secret"],
    public_key=config["langfuse_public_key"],
    host=config["host"]
)

In [17]:
# Step 3: Load Questions

import json

# Set how many questions you want to process
NUM_QUESTIONS_TO_RUN = 20  # <--- Change this number as needed

# Load all questions from all_questions.json
with open('all_questions.json', 'r') as f:
    all_questions = json.load(f)

# Only process up to NUM_QUESTIONS_TO_RUN questions
questions = all_questions[:NUM_QUESTIONS_TO_RUN]

# Print out each question's task_id and question text for verification
for q in questions:
    print(f"Task ID: {q['task_id']}")
    print(f"Question: {q['question']}")
    print("-" * 40)

Task ID: 8e867cd7-cff9-4e6c-867a-ff5ddc2550be
Question: How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)? You can use the latest 2022 version of english wikipedia.
----------------------------------------
Task ID: a1e91b78-d3d8-4675-bb8d-62741b4b68a6
Question: In the video https://www.youtube.com/watch?v=L1vXCYZAYYM, what is the highest number of bird species to be on camera simultaneously?
----------------------------------------


In [18]:
# Step 4: Define Tools

import wikipedia
from langfuse.decorators import observe
import os
import requests

# Wikipedia Search Tool
@observe()
def wikipedia_search(query, sentences=2):
    try:
        summary = wikipedia.summary(query, sentences=sentences, auto_suggest=True, redirect=True)
        return summary
    except wikipedia.DisambiguationError as e:
        return f"Disambiguation error. Options: {e.options[:5]}"
    except wikipedia.PageError:
        return "No Wikipedia page found for the query."
    except Exception as e:
        return f"Error: {str(e)}"

# SerpAPI Web Search Tool
@observe()
def serpapi_search(query):
    api_key = config.get("SERPAPI_API_KEY") or os.environ.get("SERPAPI_API_KEY")
    if not api_key:
        return "No SerpAPI key provided."
    params = {
        "q": query,
        "api_key": api_key,
        "engine": "google",
        "num": 3
    }
    response = requests.get("https://serpapi.com/search", params=params)
    if response.status_code == 200:
        data = response.json()
        if "answer_box" in data and "answer" in data["answer_box"]:
            return data["answer_box"]["answer"]
        elif "organic_results" in data and len(data["organic_results"]) > 0:
            return data["organic_results"][0].get("snippet", "No snippet found.")
        else:
            return "No relevant results found."
    else:
        return f"SerpAPI error: {response.status_code} {response.text}"

# SerpAPI Image Search Tool
@observe()
def serpapi_image_search(query):
    """
    Uses SerpAPI's Google Images API to search for images related to the query.
    Returns the first image result's URL and title.
    Docs: https://serpapi.com/images-results
    """
    api_key = config.get("SERPAPI_API_KEY") or os.environ.get("SERPAPI_API_KEY")
    if not api_key:
        return "No SerpAPI key provided."
    params = {
        "q": query,
        "api_key": api_key,
        "engine": "google_images"
    }
    response = requests.get("https://serpapi.com/search", params=params)
    if response.status_code == 200:
        data = response.json()
        images = data.get("images_results", [])
        if images:
            first = images[0]
            return {
                "title": first.get("title"),
                "image_url": first.get("original"),
                "thumbnail": first.get("thumbnail"),
                "source": first.get("source")
            }
        else:
            return "No images found."
    else:
        return f"SerpAPI error: {response.status_code} {response.text}"


In [19]:
# Step 5: Agent Planning Step (OpenAI v1.x+)

import openai
from langfuse.decorators import observe  # (if not already imported)

# Set your OpenAI API key from config
client = openai.OpenAI(api_key=config["openai_api_key"])

@observe(as_type="generation")
def get_agent_plan(question, model_name, temperature=0.0):
    """
    Sends the question to the model and asks for a plan and tool list.
    The prompt now describes all available tools, including image search.
    """
    prompt = (
        "You are an AI agent. Here is a question you need to answer:\n"
        f"Question: {question}\n\n"
        "You have access to the following tools:\n"
        "- Wikipedia Search: For factual and encyclopedic information.\n"
        "- SerpAPI Web Search: For general web search (Google, Bing, etc.).\n"
        "- SerpAPI Image Search: For finding images or when the question is about pictures/photos.\n"
        "Create a step-by-step plan to answer this question. For each step, specify which tool you would use and why. "
        "If the question is about images, use the image search tool. "
        "If Wikipedia is insufficient, fall back to SerpAPI Web Search. "
        "Be explicit about your reasoning for tool selection."
    )
    response = client.chat.completions.create(
        model=model_name,
        messages=[{"role": "user", "content": prompt}],
        temperature=temperature
    )
    plan = response.choices[0].message.content
    return plan

In [20]:
# Step 6: Tool Execution Step

from langfuse.decorators import observe
import re

@observe()
def execute_tools(plan, question):
    """
    Executes tools based on the tools mentioned in the plan.
    - Runs only the tools mentioned in the plan.
    - Still falls back to SerpAPI web search if Wikipedia is insufficient and not already in the plan.
    """
    tool_outputs = {}

    # Normalize plan to lower case for matching
    plan_lower = plan.lower()

    # Helper: check if a tool is mentioned in the plan
    def tool_in_plan(tool_name):
        return tool_name in plan_lower

    # Run tools as per plan
    if tool_in_plan("wikipedia"):
        wiki_result = wikipedia_search(question)
        tool_outputs['wikipedia'] = wiki_result
    else:
        wiki_result = None

    if tool_in_plan("image"):
        image_result = serpapi_image_search(question)
        tool_outputs['serpapi_image'] = image_result

    if tool_in_plan("serpapi web") or tool_in_plan("web search") or tool_in_plan("serpapi search") or tool_in_plan("google search"):
        serp_result = serpapi_search(question)
        tool_outputs['serpapi'] = serp_result

    # Fallback: If Wikipedia was run and is insufficient, and SerpAPI web search wasn't already run, run it
    fallback_needed = (
        wiki_result is not None and (
            "no wikipedia page found" in wiki_result.lower() or
            "disambiguation error" in wiki_result.lower() or
            "error:" in wiki_result.lower() or
            len(wiki_result) < 50  # You can adjust this threshold
        ) and 'serpapi' not in tool_outputs
    )
    if fallback_needed:
        serp_result = serpapi_search(question)
        tool_outputs['serpapi'] = serp_result
        print("Fallback: Used SerpAPI web search due to insufficient Wikipedia result.")

    # Print which tools were used
    print("Tools used for this question:", list(tool_outputs.keys()))

    return tool_outputs

In [21]:
# Step 7: Synthesis Step (OpenAI v1.x+)

from langfuse.decorators import observe  # (if not already imported)

@observe(as_type="generation")
def synthesize_final_answer(task_id, question, tool_outputs, gaia_doc, model_name, temperature=0.0):
    """
    Uses the model to synthesize a final answer in GAIA format.
    """
    prompt = (
        f"You are an AI agent participating in the GAIA benchmark. "
        f"Here is the official GAIA documentation for answer formatting:\n\n"
        f"{gaia_doc}\n\n"
        f"Here is the original question:\n{question}\n\n"
        f"Here are the outputs from the tools you used:\n{tool_outputs}\n\n"
        "If the Wikipedia output is sufficient and correct, use it. "
        "If not, use the SerpAPI output. If neither is sufficient, say so. "
        "Using the information above, generate the final answer in the required GAIA JSON format. "
        "Only output the JSON object, nothing else."
    )
    response = client.chat.completions.create(
        model=model_name,
        messages=[{"role": "user", "content": prompt}],
        temperature=temperature
    )
    final_answer_json = response.choices[0].message.content
    return final_answer_json

In [22]:
# Step 8: Main Agent Loop with Langfuse Traceability

from langfuse.decorators import observe, langfuse_context  # (if not already imported)

@observe()
def process_all_questions(questions, model_name, temperature, gaia_doc):
    final_answers = []
    for q in questions:
        print(f"Processing Task ID: {q['task_id']}")
        plan = get_agent_plan(q['question'], model_name, temperature)
        tool_outputs = execute_tools(plan, q['question'])
        # Use the correct task_id from the question
        final_answer_json = synthesize_final_answer(
            task_id=q['task_id'],
            question=q['question'],
            tool_outputs=tool_outputs,
            gaia_doc=gaia_doc,
            model_name=model_name,
            temperature=temperature
        )
        final_answers.append(final_answer_json)
        # Print trace URL for traceability
        print("Langfuse Trace URL:", langfuse_context.get_current_trace_url())
    return final_answers

# Load GAIA documentation from file
with open("documentation/GIAI-documentation.md", "r") as f:
    gaia_doc = f.read()

# Process all questions:
final_answers = process_all_questions(questions, model_name, temperature, gaia_doc)

Processing Task ID: 8e867cd7-cff9-4e6c-867a-ff5ddc2550be
Tools used for this question: ['wikipedia', 'serpapi']
Langfuse Trace URL: https://us.cloud.langfuse.com/project/cmabtwja701n8ad06grpw13lr/traces/74d0887a-3098-43f6-b52e-be8aa7abc1d6
Processing Task ID: a1e91b78-d3d8-4675-bb8d-62741b4b68a6
Tools used for this question: ['serpapi_image', 'serpapi_youtube', 'serpapi']
Langfuse Trace URL: https://us.cloud.langfuse.com/project/cmabtwja701n8ad06grpw13lr/traces/74d0887a-3098-43f6-b52e-be8aa7abc1d6


In [23]:
# Step 9: Save Results (with cleaning and validation)

import json
import re

def clean_and_validate_answer(answer_str, required_fields=("task_id", "submitted_answer"), correct_task_id=None):
    answer_str = answer_str.strip()
    answer_str = re.sub(r"^```[a-zA-Z]*", "", answer_str)
    answer_str = re.sub(r"```$", "", answer_str).strip()
    try:
        answer_obj = json.loads(answer_str)
    except Exception as e:
        raise ValueError(f"Invalid JSON: {e}\nRaw output: {answer_str}")
    if correct_task_id is not None:
        answer_obj["task_id"] = correct_task_id
    for field in required_fields:
        if field not in answer_obj:
            raise ValueError(f"Missing required field '{field}' in answer: {answer_obj}")
    return answer_obj

def save_final_answers(final_answers, questions, filename="final_answers.jsonl"):
    cleaned_answers = []
    for i, answer in enumerate(final_answers):
        correct_task_id = questions[i]["task_id"]
        if isinstance(answer, str):
            try:
                answer_obj = clean_and_validate_answer(answer, correct_task_id=correct_task_id)
            except Exception as e:
                print(f"Error in answer {i}: {e}")
                continue
        else:
            answer_obj = answer
            answer_obj["task_id"] = correct_task_id
        cleaned_answers.append(answer_obj)
    with open(filename, "w") as f:
        for answer_obj in cleaned_answers:
            f.write(json.dumps(answer_obj, ensure_ascii=False) + "\n")
    print(f"Saved {len(cleaned_answers)} answers to {filename}")
    return cleaned_answers

# Example usage:
final_answers_cleaned = save_final_answers(final_answers, questions)

Saved 2 answers to final_answers.jsonl


In [24]:
# Step 10: Validate Answers (Check with GAIA API)

import requests

def validate_answers(final_answers, username, code_link, api_base_url, agent_code=None):
    """
    Submits answers to the GAIA evaluation endpoint for validation.
    Prints the score and which answers were correct.
    """
    url = f"{api_base_url}/submit"
    if agent_code is None:
        # Try to read your notebook as code, or use code_link as fallback
        try:
            with open("agent.ipynb", "r") as f:
                agent_code = f.read()
        except Exception:
            agent_code = code_link  # fallback
    payload = {
        "username": username,
        "code_link": code_link,
        "agent_code": agent_code,
        "answers": final_answers  # <-- Use the cleaned answers here!
    }
    response = requests.post(url, json=payload)
    if response.status_code == 200:
        result = response.json()
        print("Submission successful!")
        print(f"Score: {result.get('score', 'N/A')}%")
        if "results" in result:
            print("\nDetailed Results:")
            for r in result["results"]:
                status = "✅" if r.get("correct") else "❌"
                print(f"{status} Task ID: {r['task_id']} | Your Answer: {r['submitted_answer']} | Correct: {r.get('correct_answer', 'N/A')}")
        else:
            print("No detailed results returned.")
        return result
    else:
        print("Submission failed:", response.status_code, response.text)
        return None

# Example usage:
validation_result = validate_answers(final_answers_cleaned, username, code_link, api_base_url)

Submission successful!
Score: 0.0%
No detailed results returned.


In [25]:
# Step 11: (Optional) Save Validation Results

def save_validation_results(validation_result, filename="validation_results.json"):
    if validation_result is not None:
        with open(filename, "w") as f:
            json.dump(validation_result, f, indent=2, ensure_ascii=False)
        print(f"Validation results saved to {filename}")

# Example usage:
save_validation_results(validation_result)

Validation results saved to validation_results.json


In [26]:
# Step 12: Save Validation Results (with cleaning and validation)

# --- Langfuse flush at the end of the notebook ---
from langfuse.decorators import langfuse_context  # (if not already imported)
langfuse_context.flush()