<a href="https://colab.research.google.com/github/noahwei682/MCP/blob/main/open_deep_researcher.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install nest_asyncio
import nest_asyncio
nest_asyncio.apply()



In [None]:
import asyncio
import json
from openai import OpenAI
import aiohttp

# =======================
# Configuration Constants
# =======================
OPENROUTER_API_KEY = "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" # Replace with your OpenRouter API key
SERPAPI_API_KEY = "7f39cccc407fb2e59ce52917d178354931cd5884062b2add658a9f1bf5943508" # Replace with your SERPAPI API key
JINA_API_KEY = "jina_c948193913304f68b5c6b68cf75e1987t-wKv5qheig7zcBRcrBDPmFDrER7" # Replace with your JINA API key

# Endpoints
OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
SERPAPI_URL = "https://serpapi.com/search"
JINA_BASE_URL = "https://r.jina.ai/"

# Default LLM model (can be changed if desired)
DEFAULT_MODEL = "qwen/qwen3-32b:free"

# Initialize OpenAI client
client = OpenAI(
    base_url=OPENROUTER_BASE_URL,
    api_key=OPENROUTER_API_KEY,
    default_headers={
        "HTTP-Referer": "https://github.com/mshumer/OpenDeepResearcher",
        "X-Title": "OpenDeepResearcher"
    }
)

# ============================
# API Validation Functions
# ============================

def validate_openrouter_api_key():
    """
    验证OpenRouter API密钥的格式和有效性
    """
    if not OPENROUTER_API_KEY:
        print("错误: OpenRouter API密钥未设置")
        return False

    if not OPENROUTER_API_KEY.startswith("sk-or-v1-"):
        print("错误: OpenRouter API密钥格式不正确")
        print("API密钥应该以 'sk-or-v1-' 开头")
        print("请访问 https://openrouter.ai/keys 获取正确的API密钥")
        return False

    return True

# ============================
# Asynchronous Helper Functions
# ============================

async def call_openrouter_async(session, messages, model=DEFAULT_MODEL):
    """
    异步调用OpenRouter API

    参数:
        session: aiohttp会话对象
        messages: 对话消息列表
        model: 使用的模型名称
    """
    try:
        print(f"\n正在发送请求到OpenRouter API...")
        print(f"使用模型: {model}")

        completion = client.chat.completions.create(
            model=model,
            messages=messages,
            extra_body={}
        )

        return completion.choices[0].message.content

    except Exception as e:
        print(f"\n请求发生错误: {str(e)}")
        print("请检查网络连接是否正常")
        return None


async def generate_search_queries_async(session, user_query):
    """
    Ask the LLM to produce up to four precise search queries (in Python list format)
    based on the user's query.
    """
    prompt = (
        "You are an expert research assistant. Given the user's query, generate up to four distinct, "
        "precise search queries that would help gather comprehensive information on the topic. "
        "Return only a Python list of strings, for example: ['query1', 'query2', 'query3']."
    )
    messages = [
        {"role": "system", "content": "You are a helpful and precise research assistant."},
        {"role": "user", "content": f"User Query: {user_query}\n\n{prompt}"}
    ]

    completion = client.chat.completions.create(
        model=DEFAULT_MODEL,
        messages=messages,
        extra_body={}
    )

    response = completion.choices[0].message.content
    if response:
        try:
            # Expect exactly a Python list (e.g., "['query1', 'query2']")
            search_queries = eval(response)
            if isinstance(search_queries, list):
                return search_queries
            else:
                print("LLM did not return a list. Response:", response)
                return []
        except Exception as e:
            print("Error parsing search queries:", e, "\nResponse:", response)
            return []
    return []


async def perform_search_async(session, query):
    """
    Asynchronously perform a Google search using SERPAPI for the given query.
    Returns a list of result URLs.
    """
    params = {
        "q": query,
        "api_key": SERPAPI_API_KEY,
        "engine": "google"
    }
    try:
        async with session.get(SERPAPI_URL, params=params) as resp:
            if resp.status == 200:
                results = await resp.json()
                if "organic_results" in results:
                    links = [item.get("link") for item in results["organic_results"] if "link" in item]
                    return links
                else:
                    print("No organic results in SERPAPI response.")
                    return []
            else:
                text = await resp.text()
                print(f"SERPAPI error: {resp.status} - {text}")
                return []
    except Exception as e:
        print("Error performing SERPAPI search:", e)
        return []


async def perform_image_search_async(session, image_url):
    """
    Asynchronously perform a reverse image search using SERPAPI.
    Returns a list of dictionaries containing thumbnails and titles of visually similar images.

    Args:
        session: aiohttp session object
        image_url: URL of the image to search for

    Returns:
        List of dicts with 'thumbnail' and 'title' keys
    """
    params = {
        "engine": "google_reverse_image",
        "image_url": image_url,
        "api_key": SERPAPI_API_KEY,
    }

    try:
        async with session.get(SERPAPI_URL, params=params) as resp:
            if resp.status == 200:
                results = await resp.json()
                image_results = []

                # Extract image results from the response
                if "image_results" in results:
                    for item in results["image_results"]:
                        result = {
                            'thumbnail': item.get('thumbnail', ''),
                            'title': item.get('title', ''),
                            'source_url': item.get('source', '')
                        }
                        image_results.append(result)
                    return image_results[:10]  # Return top 10 results
                else:
                    print("No image results in SERPAPI response.")
                    return []
            else:
                text = await resp.text()
                print(f"SERPAPI error: {resp.status} - {text}")
                return []
    except Exception as e:
        print("Error performing SERPAPI image search:", e)
        return []


async def fetch_webpage_text_async(session, url):
    """
    Asynchronously retrieve the text content of a webpage using Jina.
    The URL is appended to the Jina endpoint.
    """
    full_url = f"{JINA_BASE_URL}{url}"
    headers = {
        "Authorization": f"Bearer {JINA_API_KEY}"
    }
    try:
        async with session.get(full_url, headers=headers) as resp:
            if resp.status == 200:
                return await resp.text()
            else:
                text = await resp.text()
                print(f"Jina fetch error for {url}: {resp.status} - {text}")
                return ""
    except Exception as e:
        print("Error fetching webpage text with Jina:", e)
        return ""


async def is_page_useful_async(session, user_query, page_text):
    """
    Ask the LLM if the provided webpage content is useful for answering the user's query.
    The LLM must reply with exactly "Yes" or "No".
    """
    prompt = (
        "You are a critical research evaluator. Given the user's query and the content of a webpage, "
        "determine if the webpage contains information relevant and useful for addressing the query. "
        "Respond with exactly one word: 'Yes' if the page is useful, or 'No' if it is not. Do not include any extra text."
    )
    messages = [
        {"role": "system", "content": "You are a strict and concise evaluator of research relevance."},
        {"role": "user", "content": f"User Query: {user_query}\n\nWebpage Content (first 20000 characters):\n{page_text[:20000]}\n\n{prompt}"}
    ]

    completion = client.chat.completions.create(
        model=DEFAULT_MODEL,
        messages=messages,
        extra_body={}
    )

    response = completion.choices[0].message.content
    if response:
        answer = response.strip()
        if answer in ["Yes", "No"]:
            return answer
        else:
            # Fallback: try to extract Yes/No from the response.
            if "Yes" in answer:
                return "Yes"
            elif "No" in answer:
                return "No"
    return "No"


async def extract_relevant_context_async(session, user_query, search_query, page_text):
    """
    Given the original query, the search query used, and the page content,
    have the LLM extract all information relevant for answering the query.
    """
    prompt = (
        "You are an expert information extractor. Given the user's query, the search query that led to this page, "
        "and the webpage content, extract all pieces of information that are relevant to answering the user's query. "
        "Return only the relevant context as plain text without commentary."
    )
    messages = [
        {"role": "system", "content": "You are an expert in extracting and summarizing relevant information."},
        {"role": "user", "content": f"User Query: {user_query}\nSearch Query: {search_query}\n\nWebpage Content (first 20000 characters):\n{page_text[:20000]}\n\n{prompt}"}
    ]

    completion = client.chat.completions.create(
        model=DEFAULT_MODEL,
        messages=messages,
        extra_body={}
    )

    response = completion.choices[0].message.content
    if response:
        return response.strip()
    return ""


async def get_new_search_queries_async(session, user_query, previous_search_queries, all_contexts):
    """
    Based on the original query, the previously used search queries, and all the extracted contexts,
    ask the LLM whether additional search queries are needed. If yes, return a Python list of up to four queries;
    if the LLM thinks research is complete, it should return "<done>".
    """
    context_combined = "\n".join(all_contexts)
    prompt = (
        "You are an analytical research assistant. Based on the original query, the search queries performed so far, "
        "and the extracted contexts from webpages, determine if further research is needed. "
        "If further research is needed, provide up to four new search queries as a Python list (for example, "
        "['new query1', 'new query2']). If you believe no further research is needed, respond with exactly <done>."
        "\nOutput only a Python list or the token <done> without any additional text."
    )
    messages = [
        {"role": "system", "content": "You are a systematic research planner."},
        {"role": "user", "content": f"User Query: {user_query}\nPrevious Search Queries: {previous_search_queries}\n\nExtracted Relevant Contexts:\n{context_combined}\n\n{prompt}"}
    ]

    completion = client.chat.completions.create(
        model=DEFAULT_MODEL,
        messages=messages,
        extra_body={}
    )

    response = completion.choices[0].message.content
    if response:
        cleaned = response.strip()
        if cleaned == "<done>":
            return "<done>"
        try:
            new_queries = eval(cleaned)
            if isinstance(new_queries, list):
                return new_queries
            else:
                print("LLM did not return a list for new search queries. Response:", response)
                return []
        except Exception as e:
            print("Error parsing new search queries:", e, "\nResponse:", response)
            return []
    return []


async def generate_final_report_async(session, user_query, all_contexts):
    """
    Generate the final comprehensive report using all gathered contexts.
    """
    context_combined = "\n".join(all_contexts)
    prompt = (
        "You are an expert researcher and report writer. Based on the gathered contexts below and the original query, "
        "write a comprehensive, well-structured, and detailed report that addresses the query thoroughly. "
        "Include all relevant insights and conclusions without extraneous commentary."
    )
    messages = [
        {"role": "system", "content": "You are a skilled report writer."},
        {"role": "user", "content": f"User Query: {user_query}\n\nGathered Relevant Contexts:\n{context_combined}\n\n{prompt}"}
    ]

    completion = client.chat.completions.create(
        model=DEFAULT_MODEL,
        messages=messages,
        extra_body={}
    )

    return completion.choices[0].message.content


async def process_link(session, link, user_query, search_query):
    """
    Process a single link: fetch its content, judge its usefulness, and if useful, extract the relevant context.
    """
    print(f"Fetching content from: {link}")
    page_text = await fetch_webpage_text_async(session, link)
    if not page_text:
        return None
    usefulness = await is_page_useful_async(session, user_query, page_text)
    print(f"Page usefulness for {link}: {usefulness}")
    if usefulness == "Yes":
        context = await extract_relevant_context_async(session, user_query, search_query, page_text)
        if context:
            print(f"Extracted context from {link} (first 200 chars): {context[:200]}")
            return context
    return None


async def process_image_search(session, image_url):
    """
    Process an image search request and return relevant results.

    Args:
        session: aiohttp session object
        image_url: URL of the image to search for

    Returns:
        List of image search results with thumbnails and titles
    """
    print(f"\n开始图片搜索: {image_url}")

    # Perform the image search
    results = await perform_image_search_async(session, image_url)

    if not results:
        print("未找到相关图片结果")
        return []

    print(f"找到 {len(results)} 个相关图片结果")
    return results


# =========================
# API测试函数
# =========================

async def test_api_connection():
    """
    测试OpenRouter API连接
    """
    print("\n=== 测试API连接 ===")

    try:
        print("发送测试请求...")

        completion = client.chat.completions.create(
            model=DEFAULT_MODEL,
            messages=[
                {
                    "role": "user",
                    "content": "Hello, this is a test message."
                }
            ],
            extra_body={}
        )

        print("\n✓ API连接测试成功！")
        print("测试响应:", completion.choices[0].message.content[:100])
        return True

    except Exception as e:
        print(f"\n✗ API连接测试失败: {str(e)}")
        print("请检查网络连接并重试")
        return False

# =========================
# Main Asynchronous Routine
# =========================

async def async_main():
    """
    Main asynchronous function that coordinates the research process.
    """
    if not validate_openrouter_api_key():
        return

    print("\n欢迎使用 OpenDeepResearcher!")
    print("这是一个强大的研究助手，可以帮助你进行深入的网络搜索和图片搜索。")

    while True:
        print("\n请选择搜索类型:")
        print("1. 文本搜索")
        print("2. 图片搜索")
        print("3. 退出")

        choice = input("\n请输入选项 (1/2/3): ").strip()

        if choice == "3":
            print("\n感谢使用 OpenDeepResearcher!")
            break

        if choice == "1":
            user_query = input("\n请输入你的研究问题: ").strip()
            if not user_query:
                print("查询不能为空")
                continue

            async with aiohttp.ClientSession() as session:
                # Generate initial search queries
                search_queries = await generate_search_queries_async(session, user_query)
                if not search_queries:
                    print("无法生成搜索查询")
                    continue

                all_contexts = []
                used_queries = []

                while search_queries != "<done>" and isinstance(search_queries, list):
                    for query in search_queries:
                        if query in used_queries:
                            continue

                        print(f"\n执行搜索查询: {query}")
                        links = await perform_search_async(session, query)

                        for link in links:
                            context = await process_link(session, link, user_query, query)
                            if context:
                                all_contexts.append(context)

                        used_queries.append(query)

                    if all_contexts:
                        search_queries = await get_new_search_queries_async(
                            session, user_query, used_queries, all_contexts
                        )
                    else:
                        search_queries = []

                if all_contexts:
                    final_report = await generate_final_report_async(session, user_query, all_contexts)
                    print("\n研究报告:")
                    print(final_report)
                else:
                    print("\n未找到相关信息")

        elif choice == "2":
            image_url = input("\n请输入图片URL: ").strip()
            if not image_url:
                print("图片URL不能为空")
                continue

            async with aiohttp.ClientSession() as session:
                results = await process_image_search(session, image_url)

                if results:
                    print("\n相似图片结果:")
                    for i, result in enumerate(results, 1):
                        print(f"\n{i}. 标题: {result['title']}")
                        print(f"   缩略图: {result['thumbnail']}")
                        print(f"   来源: {result['source_url']}")
                else:
                    print("\n未找到相关图片")
        else:
            print("\n无效的选项，请重试")

def main():
    """
    Entry point of the program.
    """
    try:
        asyncio.run(async_main())
    except KeyboardInterrupt:
        print("\n程序已终止")
    except Exception as e:
        print(f"\n发生错误: {str(e)}")
        print("请检查网络连接和API密钥是否正确")

if __name__ == "__main__":
    main()