### 1. 官网获取api key

https://tavily.com/

### 2. 官网或github有示例demo

### 3. 搜索工具类


In [None]:
# 搜索工具
from tavily import TavilyClient
from dotenv import load_dotenv
import os
import asyncio
from typing import List, Dict, Any

class TavilySearch:
    def __init__(self, api_key=None):
        """初始化 Tavily 搜索类
        
        Args:
            api_key: Tavily API 密钥，如果为None则从环境变量获取
        """
        load_dotenv()
        self.api_key = api_key or os.getenv("TAVILY_API_KEY")
        self.client = TavilyClient(api_key=self.api_key)
    
    def search(self, query: str) -> Dict[str, Any]:
        """常规搜索方法
        
        Args:
            query: 搜索查询字符串
            
        Returns:
            搜索结果字典
        """
        try:
            response = self.client.search(query)
            return response
        except Exception as e:
            print(f"搜索出错: {e}")
            return {"error": str(e)}
    
    def search_with_subqueries(self, subqueries: List[str]) -> List[Dict[str, Any]]:
        """将查询拆分为较小的子查询进行搜索
        
        Args:
            subqueries: 子查询列表
            
        Returns:
            搜索结果列表
        """
        results = []
        for query in subqueries:
            try:
                result = self.client.search(query)
                results.append(result)
            except Exception as e:
                print(f"子查询 '{query}' 搜索出错: {e}")
                results.append({"query": query, "error": str(e)})
        return results
    
    async def search_async(self, queries: List[str], max_concurrency: int = 5) -> List[Dict[str, Any]]:
        """异步搜索多个查询
        
        Args:
            queries: 查询列表
            max_concurrency: 最大并发请求数
            
        Returns:
            搜索结果列表
        """
        async def _search_one(query: str) -> Dict[str, Any]:
            try:
                loop = asyncio.get_event_loop()
                # 在异步环境中调用同步方法
                result = await loop.run_in_executor(None, lambda: self.client.search(query))
                return result
            except Exception as e:
                print(f"异步查询 '{query}' 搜索出错: {e}")
                return {"query": query, "error": str(e)}
        
        # 限制并发请求数
        semaphore = asyncio.Semaphore(max_concurrency)
        
        async def _search_with_limit(query: str) -> Dict[str, Any]:
            async with semaphore:
                return await _search_one(query)
        
        # 并发执行所有查询
        tasks = [_search_with_limit(query) for query in queries]
        results = await asyncio.gather(*tasks)
        return results

# 使用示例
if __name__ == "__main__":
    # 创建搜索工具实例
    search_tool = TavilySearch()
    
    # 1. 常规搜索
    result = search_tool.search("人工智能最新发展")
    print("常规搜索结果:", result)
    
    # 2. 子查询搜索
    subqueries = [
        "人工智能在医疗领域的应用",
        "人工智能在教育领域的应用",
        "人工智能的未来发展趋势"
    ]
    sub_results = search_tool.search_with_subqueries(subqueries)
    print("子查询搜索结果:", sub_results)
    
    # 3. 异步搜索
    async def run_async_search():
        async_results = await search_tool.search_async(subqueries)
        print("异步搜索结果:", async_results)
    
    # 运行异步搜索
    asyncio.run(run_async_search())


