<a href="https://colab.research.google.com/github/weedge/doraemon-nb/blob/main/gemini/Get_started_LiveAPI_tools.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##### Copyright 2025 Google LLC.

In [None]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

- https://ai.google.dev/gemini-api/docs/live-tools?hl=zh-cn

# Gemini - Multimodal live API: Tool use

本笔记本提供了如何使用 Gemini 2.5 的多模态实时 API 来操作工具的示例。

该 API 提供 Google 搜索、代码执行和函数调用工具。早期的 Gemini 模型支持这些工具的不同版本。Gemini 2.5（实时 API）最大的变化在于，基本上所有工具都由代码执行来处理。有了这项变化，您可以在单个 API 调用中使用**多个工具**，模型也可以在单个代码执行块中使用多个工具。

本教程假设您已熟悉实时 API，如[本教程](../quickstarts/Get_started_LiveAPI.ipynb)中所述。

## Setup

### 安装 SDK

全新的 **[Google Gen AI SDK](https://ai.google.dev/gemini-api/docs/sdks)** 提供对 Gemini 2.5（及更早版本）的编程访问，支持使用 [Google AI for Developers](https://ai.google.dev/gemini-api/docs) 和 [Vertex AI](https://cloud.google.com/vertex-ai/generative-ai/docs/overview) API。除少数例外情况外，在一个平台上运行的代码可以在两个平台上运行。这意味着您可以使用开发者 API 创建应用程序原型，然后将应用程序迁移到 Vertex AI，而无需重写代码。

有关此新 SDK 的更多详细信息，请参阅 [文档](https://ai.google.dev/gemini-api/docs/sdks) 或 [入门指南](../quickstarts/Get_started.ipynb)。

In [1]:
%pip install -U -q google-genai

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m46.8/46.8 kB[0m [31m2.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m261.2/261.2 kB[0m [31m9.9 MB/s[0m eta [36m0:00:00[0m
[?25h

### 设置 API 密钥

要运行以下单元格，您的 API 密钥必须存储在名为 `GOOGLE_API_KEY` 的 Colab Secret 中。如果您还没有 API 密钥，或者不确定如何创建 Colab Secret，请参阅 [Authentication ![image](https://storage.googleapis.com/generativeai-downloads/images/colab_icon16.png)](../quickstarts/Authentication.ipynb) 获取示例。

In [2]:
from google.colab import userdata
import os

os.environ['GOOGLE_API_KEY']=userdata.get('GOOGLE_API_KEY')

### 初始化 SDK 客户端

客户端将从环境变量中获取您的 API 密钥。

要使用正式 API，您需要将客户端版本设置为 `v1alpha`。

In [3]:
from google import genai

client = genai.Client(http_options={"api_version": "v1alpha"})

### 选择模型

您可以选择最新的稳定版模型或预览版模型。请注意，像 `gemini-2.5-flash-native-audio-preview-09-2025` 这样的原生音频模型无法在此笔记本中使用，因为它们无法输出纯文本，而 Colab 对此有严格的限制。

In [16]:
MODEL_ID = 'gemini-2.5-flash-native-audio-preview-09-2025'  # @param ['gemini-2.0-flash-live-001', 'gemini-live-2.5-flash-preview', 'gemini-2.5-flash-native-audio-preview-09-2025'] {allow-input: true, isTemplate: true}

### Imports

In [17]:
import asyncio
import contextlib
import json
import wave

from IPython.display import display, Markdown, Audio, HTML

from google import genai
from google.genai import types

### Utilities

您将使用 Live API 的音频输出，在 Colab 中收听音频的最简单方法是将 `PCM` 数据写入 `WAV` 文件：

In [18]:
@contextlib.contextmanager
def wave_file(filename, channels=1, rate=24000, sample_width=2):
    with wave.open(filename, "wb") as wf:
        wf.setnchannels(channels)
        wf.setsampwidth(sample_width)
        wf.setframerate(rate)
        yield wf

使用日志记录器可以更轻松地开启/关闭调试消息。

In [19]:
import logging
logger = logging.getLogger('Live')
logger.setLevel('INFO')
#logger.setLevel('DEBUG')  # Switch between "INFO" and "DEBUG" to toggle debug messages.

## Get started

大部分 Live API 的设置与[入门教程](../quickstarts/Get_started_LiveAPI.ipynb)类似。由于本教程并未着重讲解 API 的实时交互功能，因此代码已简化：此代码使用 Live API，但仅发送一条文本提示，并监听一轮回复。

您可以在任何示例中设置 `modality="AUDIO"` 以获取语音输出。

In [20]:
n = 0
async def run(prompt, modality="TEXT", tools=None):
  global n
  if tools is None:
    tools=[]

  config = {
          "tools": tools,
          "response_modalities": [modality]
  }

  async with client.aio.live.connect(model=MODEL_ID, config=config) as session:
    display(Markdown(prompt))
    display(Markdown('-------------------------------'))
    await session.send_client_content(
      turns={"role": "user", "parts": [{"text": prompt}]}, turn_complete=True
    )

    audio = False
    filename = f'audio_{n}.wav'
    with wave_file(filename) as wf:
      async for response in session.receive():
        logger.debug(str(response))
        if response.server_content and response.server_content.model_turn and response.server_content.model_turn.parts and hasattr(response.server_content.model_turn.parts[0], 'text'):
          if text := response.server_content.model_turn.parts[0].text:
            display(Markdown(text))
            continue

        if response.server_content and response.server_content.model_turn and response.server_content.model_turn.parts and hasattr(response.server_content.model_turn.parts[0], 'data'):
          if data := response.server_content.model_turn.parts[0].data:
            print('.', end='')
            wf.writeframes(data)
            audio = True
            continue

        server_content = response.server_content
        if server_content is not None:
          handle_server_content(wf, server_content)
          continue

        tool_call = response.tool_call
        if tool_call is not None:
          await handle_tool_call(session, tool_call)


  if audio:
    display(Audio(filename, autoplay=True))
    n = n+1

由于本教程演示了多个工具，您需要编写更多代码来处理它们返回的不同类型的对象。

- `code_execution` 工具可以返回 `executable_code` 和 `code_execution_result` 两部分。

- `google_search` 工具可能会附加一个 `grounding_metadata` 对象。

In [21]:
def handle_server_content(wf, server_content):
  model_turn = server_content.model_turn
  if model_turn:
    for part in model_turn.parts:
      executable_code = part.executable_code
      if executable_code is not None:
        display(Markdown('-------------------------------'))
        display(Markdown(f'``` python\n{executable_code.code}\n```'))
        display(Markdown('-------------------------------'))

      code_execution_result = part.code_execution_result
      if code_execution_result is not None:
        display(Markdown('-------------------------------'))
        display(Markdown(f'``` \n{code_execution_result.output}\n```'))
        display(Markdown('-------------------------------'))

  grounding_metadata = getattr(server_content, 'grounding_metadata', None)
  if grounding_metadata is not None:
    display(
        HTML(grounding_metadata.search_entry_point.rendered_content))

  return

最后，使用 `function_declarations` 工具时，API 可能会返回 `tool_call` 对象。为了保持代码简洁，`tool_call` 处理程序只需对每个函数调用回复 `"ok"` 即可。

In [22]:
async def handle_tool_call(session, tool_call):
  print("Tool call:")
  function_responses = []
  for fc in tool_call.function_calls:
    function_response = types.FunctionResponse(
        id=fc.id,
        name=fc.name,
        response={"result": "ok"},
    )
    function_responses.append(function_response)
  print('>>> ', function_responses)
  await session.send_tool_response(function_responses=function_responses)

Try running it for a first time:

In [23]:
await run(prompt="Hello?", tools=None, modality = "TEXT")

Hello?

-------------------------------

Hello!

 How can I help you today?


## Simple function call

API 的函数调用功能可以处理多种类型的函数。SDK 的支持仍在开发中。因此，请尽量简化操作，只需发送一个最基本的函数定义：仅包含函数名称即可。

请注意，在实际的 API 中，函数调用与聊天轮次无关。即使函数调用正在处理中，对话也可以继续进行。

In [25]:
turn_on_the_lights = {'name': 'turn_on_the_lights'}
turn_off_the_lights = {'name': 'turn_off_the_lights'}

In [26]:
prompt = "Turn on the lights"

tools = [
    {'function_declarations': [turn_on_the_lights, turn_off_the_lights]}
]

await run(prompt, tools=tools, modality = "TEXT")

Turn on the lights

-------------------------------

-------------------------------

``` python
print(default_api.turn_on_the_lights())

```

-------------------------------

Tool call:
>>>  [FunctionResponse(
  id='function-call-7466577173747406610',
  name='turn_on_the_lights',
  response={
    'result': 'ok'
  }
)]


-------------------------------

``` 
{'result': 'ok'}

```

-------------------------------

OK. I'

ve turned on the lights.


## 异步函数调用

**异步函数调用** 允许模型异步管理其函数调用，而不会阻塞用户输入。

您可以决定函数调用结束时模型的行为：不输出任何内容、中断当前操作或等待完成当前任务。

接下来的单元格将使用略微更新的代码来调用 Live API，以便会话保持打开状态 20 秒，并接受每 10 秒发送给模型的多个请求。如果您对此实现感兴趣，请展开下一个单元格。

In [27]:
# @title Live class with multiple messages (just run this cell)

import collections.abc
import inspect
from asyncio.exceptions import CancelledError
import traceback

class Live:
  def __init__(self, client):
    self.client = client


  async def run(self, config, functions=None, messages=None):
    self.config = config
    self.send_queue = asyncio.Queue()
    self.tool_call_queue = asyncio.Queue()

    try:
      async with (
            client.aio.live.connect(model=MODEL_ID, config=config) as session,
            asyncio.TaskGroup() as tg
      ):
        self.session = session
        recv_task = tg.create_task(self._recv())
        send_task = tg.create_task(self._send())
        tool_call_task = tg.create_task(self._run_tool_calls(functions))
        read_text= tg.create_task(self._read_text(messages))


        await read_text
        await asyncio.sleep(20) # Keeping the socket open for 20s to wait for the FC and different messages

        raise CancelledError
    except CancelledError:
      pass
    except ExceptionGroup as EG:
      traceback.print_exception(EG)

  async def _recv(self):
    try:
      mode = None
      while True:
        async for response in self.session.receive():
          logger.debug(str(response))
          if response.server_content and response.server_content.model_turn and response.server_content.model_turn.parts and hasattr(response.server_content.model_turn.parts[0], 'text'):
            if text := response.server_content.model_turn.parts[0].text:
              if mode != 'text':
                mode = 'text'
                print()
              print(text)
          else:
            if mode == 'text':
              mode = 'other'
              print()
            print(f'<<<  {response.model_dump_json(exclude_none=True)}\n')

          tool_call = response.tool_call
          if tool_call is not None:
            await self.tool_call_queue.put(tool_call)

    except asyncio.CancelledError:
      pass

  async def _send(self):
    while True:
      msg = await self.send_queue.get()
      print(f'>>> {repr(msg)}\n')
      await self.session.send_client_content(turns=msg,turn_complete=True)

  async def _run_tool_calls(self, functions):
    while True:
      tool_call = await self.tool_call_queue.get()
      for fc in tool_call.function_calls:
        fun = functions[fc.name]
        called = fun(**fc.args)
        if inspect.iscoroutine(called):
          print(f'>> Starting {fc.name}\n')
          result = await called
          print(f'>> Done {fc.name} >>> {repr(result)}\n')
          result = self._wrap_function_result(fc, result)
          await self.session.send_tool_response(function_responses=[result])
        elif isinstance(called, collections.abc.AsyncIterable):
          async for result in called:
            result.will_continue=True
            result = self._wrap_function_result(fc, result)
            print(f">>> {repr(result)}\n")
            await self.session.send_tool_response(function_responses=[result])

          result = self._wrap_function_result(
              fc,
              types.FunctionResponse(will_continue=False)
          )
          print(f">>> {repr(result)}\n")
          await self.session.send_tool_response(
              function_responses=[result]
          )


        else:
          raise TypeError(f"expected {fc.name} to return a coroutine, or an "
                          f"AsyncIterable, got {type(fun)}")

  def _wrap_function_result(self, fc, result):
    if result is None:
      return types.FunctionResponse(
          name=fc.name,
          id=fc.id,
          response={'result': 'ok'}
      )
    elif isinstance(result, types.FunctionResponse):
      result.name = fc.name
      result.id = fc.id
      return result
    else:
      return types.FunctionResponse(
          name=fc.name,
          id=fc.id,
          response= {'result': result}
      )

  async def _read_text(self, messages):
    if messages:
        for n, message in enumerate(messages):
            await self.send_queue.put({
                'role': 'user',
                'parts': [{'text': message}]
            })
            if n+1 < len(messages):
              await asyncio.sleep(5)
    else:
        while True:
            message = await asyncio.to_thread(input, "message > ")
            if message.lower() == "q":
                break
            await self.send_queue.put({
                'role': 'user',
                'parts': [{'text': message}]
            })

### 默认行为：阻塞

我们先来看默认行为。首先定义一个模拟天气函数，通过等待 10 秒来模拟计算时间。

默认行为类似于先进先出 (FIFO) 队列：函数调用会被添加到队列中，任何后续请求都会被排队（阻塞）在其后，直到该函数处理完成。

In [30]:
# Mock function, takes 10s to process
async def get_weather_vegas():
  await asyncio.sleep(10)
  return {'weather': "Sunny, 42 degrees"}

# multiple prompts, they are going to be asked with 5s delay between each of them.
questions = [
    "拉斯维加斯的天气怎么样？",
    "与此同时，请告诉我有关巴黎赌场的情况。"
]

await Live(client).run(
    messages=questions,
    functions={
        'get_weather_vegas': get_weather_vegas,
    },
    config={
        "response_modalities": ["TEXT"],
        "tools": [
            {
                'function_declarations': [
                    {'name': 'get_weather_vegas',  "behavior": "UNSPECIFIED"}, # This is default behavior, equivalent to BLOCKING
                ]
            }
        ]
    }
)

>>> {'role': 'user', 'parts': [{'text': '拉斯维加斯的天气怎么样？'}]}

<<<  {"tool_call":{"function_calls":[{"id":"function-call-12948117924596791327","args":{},"name":"get_weather_vegas"}]}}

>> Starting get_weather_vegas

>>> {'role': 'user', 'parts': [{'text': '与此同时，请告诉我有关巴黎赌场的情况。'}]}

>> Done get_weather_vegas >>> {'weather': 'Sunny, 42 degrees'}


拉斯维加斯
现在是晴天，42度。


<<<  {"server_content":{"generation_complete":true}}

<<<  {"server_content":{"turn_complete":true},"usage_metadata":{"prompt_token_count":175,"response_token_count":28,"total_token_count":203,"prompt_tokens_details":[{"modality":"TEXT","token_count":175}],"response_tokens_details":[{"modality":"TEXT","token_count":28}]}}


我对巴黎
赌场一无所知。

<<<  {"server_content":{"generation_complete":true}}

<<<  {"server_content":{"turn_complete":true},"usage_metadata":{"prompt_token_count":198,"response_token_count":8,"total_token_count":206,"prompt_tokens_details":[{"modality":"TEXT","token_count":198}],"response_tokens_details":[{"modality":"TE

如您所见，模型立即调用了 `get_weather_vegas` 函数，但由于模型仍在等待函数调用结果，因此忽略了第二个问题。直到函数调用结果出来后，模型才开始回答第二个问题。

### **中断**：停止当前操作并处理结果

这次，`behavior` 设置为 `NON_BLOCKING`，这意味着它将使用异步函数调用。

此时，您需要定义模型在收到函数调用结果后将执行的操作。这可以在函数内部或处理函数调用的脚本中进行管理（因为自动函数调用不可用），方法是在 `FunctionResponse` 中添加 `scheduling` 值。

这次的 `scheduling` 行为是“**`INTERRUPT`**”，这意味着模型一旦收到响应，就会立即停止当前操作并处理响应。

In [31]:
# Mock function, takes 10s to process
async def get_weather_vegas():
  await asyncio.sleep(10)
  return types.FunctionResponse(
      response={'weather': "Sunny, 42 degrees"},
      scheduling="INTERRUPT"
  )

# multiple prompts, they are going to be asked with 5s delay between each of them.
questions = [
    "拉斯维加斯的天气怎么样？",
    "与此同时，告诉我你对巴黎赌场了解多少，里面都有哪些娱乐活动和景点。然后继续跟我说说拉斯维加斯的赌场，直到我让你闭嘴为止。别问我，只管不停地说。"
    "那么，你能告诉我你最喜欢的太阳马戏团表演是什么吗？"
]

await Live(client).run(
    messages=questions,
    functions={
        'get_weather_vegas': get_weather_vegas,
    },
    config={
        "response_modalities": ["TEXT"],
        "tools": [
            {
                'function_declarations': [
                    {'name': 'get_weather_vegas',  "behavior": "NON_BLOCKING"},
                ]
            }
        ]
    }
)

>>> {'role': 'user', 'parts': [{'text': '拉斯维加斯的天气怎么样？'}]}

<<<  {"tool_call":{"function_calls":[{"id":"function-call-111365119741752038","args":{},"name":"get_weather_vegas"}]}}

>> Starting get_weather_vegas


我现在
正在获取拉斯维加斯的天气信息。请稍等。


<<<  {"server_content":{"generation_complete":true}}

<<<  {"server_content":{"turn_complete":true},"usage_metadata":{"prompt_token_count":456,"response_token_count":30,"total_token_count":486,"prompt_tokens_details":[{"modality":"TEXT","token_count":456}],"response_tokens_details":[{"modality":"TEXT","token_count":30}]}}

>>> {'role': 'user', 'parts': [{'text': '与此同时，告诉我你对巴黎赌场了解多少，里面都有哪些娱乐活动和景点。然后继续跟我说说拉斯维加斯的赌场，直到我让你闭嘴为止。别问我，只管不停地说。那么，你能告诉我你最喜欢的太阳马戏团表演是什么吗？'}]}


我知道
您对巴黎赌场和拉斯维加斯赌场有兴趣，也想了解
我最喜欢的太阳马戏团表演。但是，由于我还在获取拉斯维加斯的天气信息，而且您要求我提供大量的信息，我担心
可能会超出我的能力范围。首先，让我专注于提供您要求的拉斯维加斯天气信息。一旦我完成了，我将尽力满足您其他的要求
。请您耐心等待。


<<<  {"server_content":{"generation_complete":true}}

<<<  {"server_content":{"turn_complete":true},"usage_metadata":{"prompt_token_count":531,"resp

如您所见，这次模型确认了我们的请求，并说了类似“‘拉斯维加斯天气请求正在运行。完成后我会通知您’”之类的话，然后继续处理您向它提出的问题，当函数响应返回时，它停止了正在做的事情，告诉我们天气情况，然后继续谈论它正在谈论的内容。

### **等待空闲**：在处理此结果之前，请先完成当前操作

同样，`behavior` 设置为 `NON_BLOCKING`，这意味着它将使用异步函数调用，并且您需要在 `FunctionResponse` 中添加 `scheduling` 值。

这次的 `scheduling` 行为设置为 `W​​hen_idle`，这意味着模型将**等待完成**当前操作，然后再告知您请求的结果。

In [33]:
import time

# Mock function, takes 6s to process
async def get_weather_vegas():
  await asyncio.sleep(6)
  return types.FunctionResponse(
      response={'weather': "Sunny, 42 degres"},
      scheduling="WHEN_IDLE"
  )

# multiple prompts, they are going to be asked with 5s delay between each of them.
questions = [
    "拉斯维加斯的天气怎么样？",
    "与此同时，请不要使用任何工具，告诉我你对巴黎赌场以及里面所有娱乐和景点的了解。也请告诉我拉斯维加斯大道上每家赌场的情况！"
]

await Live(client).run(
    messages=questions,
    functions={
        'get_weather_vegas': get_weather_vegas,
    },
    config={
        "response_modalities": ["TEXT"],
        "tools": [
            {
                'function_declarations': [
                    {'name': 'get_weather_vegas',  "behavior": "NON_BLOCKING"},
                ]
            }
        ]
    }
)

>>> {'role': 'user', 'parts': [{'text': '拉斯维加斯的天气怎么样？'}]}

<<<  {"tool_call":{"function_calls":[{"id":"function-call-16238183825586039880","args":{},"name":"get_weather_vegas"}]}}

>> Starting get_weather_vegas


我
正在查询拉斯维加斯的天气。请稍后查看。


<<<  {"server_content":{"generation_complete":true}}

<<<  {"server_content":{"turn_complete":true},"usage_metadata":{"prompt_token_count":458,"response_token_count":30,"total_token_count":488,"prompt_tokens_details":[{"modality":"TEXT","token_count":458}],"response_tokens_details":[{"modality":"TEXT","token_count":30}]}}

>>> {'role': 'user', 'parts': [{'text': '与此同时，请不要使用任何工具，告诉我你对巴黎赌场以及里面所有娱乐和景点的了解。也请告诉我拉斯维加斯大道上每家赌场的情况！'}]}


好吧
，我会尽力告诉你我知道的关于巴黎赌场和拉斯维加斯大道
上每家赌场的信息，但我无法访问实时数据库或网络。因此，我的信息可能并不完全是最新的。

**巴黎拉斯维加斯赌场
**

巴黎拉斯维加斯赌场当然以其标志性的埃菲尔铁塔复制品而闻名，它本身就是一个景点。游客可以乘坐电梯到
塔顶，欣赏拉斯维加斯大道的壮丽景色。赌场内部的设计旨在营造一种巴黎的氛围，拥有鹅卵石人行道、法国
风格的商店和餐厅。

*   **赌场:** 巴黎赌场拥有各种赌博游戏，包括老虎机、扑克和桌上游戏。
*
   **娱乐:** 除了赌场，巴黎还提供各种娱乐选择，如音乐会、喜剧表演和歌舞表演。
*   **餐饮:** 
巴黎拉斯维加斯拥有各种餐厅，从小咖啡馆到高档餐厅，提供各种美食，

如您所见，这一次，即使在回答有关赌场的问题时收到了函数调用响应（参见 `>> Done get_weather_vegas >>> [...] response={'weather': 'Sunny, 42 degres'})` 行），它也等到完成当前回答后才告知天气情况。

### 静默模式：将所学信息保留在自己手中

这次，`behavior` 仍然设置为 `NON_BLOCKING`，这意味着它将使用异步函数调用，并且需要在 `FunctionResponse` 中设置 `scheduling` 值。

这次的 `scheduling` 行为设置为“SILENT”(静默)，这意味着模型不会通知您函数调用何时完成，但它可能在后续对话中仍然会用到这些信息。

In [34]:
import time

# Mock function, takes 5s to process
async def get_weather_vegas():
  time.sleep(10)
  return types.FunctionResponse(
      response={'weather': "Sunny, 42 degres"},
      scheduling="SILENT"
  )

# multiple prompts, they are going to be asked with 5s delay between each of them.
questions = [
    "拉斯维加斯的天气怎么样？",
    "与此同时，请你告诉我一些关于巴黎赌场的情况。",
    "温度超过40度了吗？"
]

await Live(client).run(
    messages=questions,
    functions={
        'get_weather_vegas': get_weather_vegas,
    },
    config={
        "response_modalities": ["TEXT"],
        "tools": [
            {
                'function_declarations': [
                    {'name': 'get_weather_vegas',  "behavior": "NON_BLOCKING"},
                ]
            }
        ]
    }
)

>>> {'role': 'user', 'parts': [{'text': '拉斯维加斯的天气怎么样？'}]}

<<<  {"tool_call":{"function_calls":[{"id":"function-call-11792910172795837443","args":{},"name":"get_weather_vegas"}]}}

>> Starting get_weather_vegas

>> Done get_weather_vegas >>> FunctionResponse(
  response={
    'weather': 'Sunny, 42 degres'
  },
  scheduling=<FunctionResponseScheduling.SILENT: 'SILENT'>
)


我
正在查询拉斯维加斯的天气。请稍后回来查看。

<<<  {"server_content":{"generation_complete":true}}

<<<  {"server_content":{"turn_complete":true},"usage_metadata":{"prompt_token_count":458,"response_token_count":30,"total_token_count":488,"prompt_tokens_details":[{"modality":"TEXT","token_count":458}],"response_tokens_details":[{"modality":"TEXT","token_count":30}]}}

>>> {'role': 'user', 'parts': [{'text': '与此同时，请你告诉我一些关于巴黎赌场的情况。'}]}


我对巴黎
赌场一无所知。

<<<  {"server_content":{"generation_complete":true}}

<<<  {"server_content":{"turn_complete":true},"usage_metadata":{"prompt_token_count":546,"response_token_count":8,"total_token_count":554

这一次，正如你所看到的，当函数调用结束时，模型什么也没做，但当再次询问同一问题时，它却在没有进行新的函数调用的情况下回答了这个问题。

## Code execution

`code_execution` 允许模型编写并运行 Python 代码。尝试用它来解决一个模型无法凭记忆解决的数学问题：

In [35]:
prompt="你能计算出小于 100000 的最大素数回文串吗？"

tools = [
    {'code_execution': {}}
]

await run(prompt, tools=tools, modality="TEXT")

你能计算出小于 100000 的最大素数回文串吗？

-------------------------------

以下是我

解决此问题的方法：

1.  **回文定义：**首先

，回文数字从前往后和从后往前读取时都一样。例如，121, 353 和 9009 

都是回文。
2.  **素数定义：**素数是可以被 1 和它自身整除的数字。
3.  **

范围：**我们需要在小于 100000 的数字中搜索。
4.  **策略：**我将生成回文数

（从最大可能的数字开始），然后检查它们是否为素数，直到我找到一个素数回文数。由于我们正在寻找最大值，因此

从较大的数字开始会更快。

现在，我将使用 Python 来实现这个逻辑。



-------------------------------

``` python
def is_prime(n):
    """检查数字是否为素数."""
    if n < 2:
        return False
    for i in range(2, int(n**0.5) + 1):
        if n % i == 0:
            return False
    return True

def generate_palindromes():
    """生成小于 100000 的回文数，从最大的开始."""
    # 5 位回文数：abcba
    for a in range(9, -1, -1):
        for b in range(9, -1, -1):
            for c in range(9, -1, -1):
                yield int(str(a) + str(b) + str(c) + str(b) + str(a))

    # 4 位回文数：abba
    for a in range(9, -1, -1):
        for b in range(9, -1, -1):
            yield int(str(a) + str(b) + str(b) + str(a))

    # 3 位回文数：aba
    for a in range(9, -1, -1):
        for b in range(9, -1, -1):
            yield int(str(a) + str(b) + str(a))

    # 2 位回文数：aa
    for a in range(9, -1, -1):
      yield int(str(a) + str(a))

    # 1 位回文数：a
    for a in range(9, -1, -1):
        yield a

# 查找最大的素数回文数
for palindrome in generate_palindromes():
    if palindrome < 100000 and is_prime(palindrome):
        print(palindrome)
        break

```

-------------------------------

-------------------------------

``` 
98689

```

-------------------------------

最大

素数回文小于 100000 是 986

89。

## 组合式函数调用

组合式函数调用是指能够使用 `code_execution` 工具将用户自定义函数组合起来。模型会将这些函数写入更大的代码块中，然后在等待您为每个调用返回响应时暂停执行。

In [37]:
#prompt="Can write some code to loop through and print integers from 1-20, and every time you hit a multiple of 3 turn on the lights, and every time you hit a multiple of 5 turn them off?"
prompt = "能否编写一些代码，循环遍历并打印 1 到 20 之间的整数，每次遇到 3 的倍数时打开灯，每次遇到 5 的倍数时关闭灯？"
tools = [
    {'code_execution': {}},
    {'function_declarations': [turn_on_the_lights, turn_off_the_lights]}
]

await run(prompt, tools=tools, modality="TEXT")

能否编写一些代码，循环遍历并打印 1 到 20 之间的整数，每次遇到 3 的倍数时打开灯，每次遇到 5 的倍数时关闭灯？

-------------------------------

当然

，这是实现该目标的 Python 代码：



-------------------------------

``` python
for i in range(1, 21):
    print(i)
    if i % 3 == 0:
        default_api.turn_on_the_lights()
    if i % 5 == 0:
        default_api.turn_off_the_lights()

```

-------------------------------

Tool call:
>>>  [FunctionResponse(
  id='function-call-3739458337160023292',
  name='turn_on_the_lights',
  response={
    'result': 'ok'
  }
)]
Tool call:
>>>  [FunctionResponse(
  id='function-call-15146055125664033134',
  name='turn_off_the_lights',
  response={
    'result': 'ok'
  }
)]
Tool call:
>>>  [FunctionResponse(
  id='function-call-15407822005768767923',
  name='turn_on_the_lights',
  response={
    'result': 'ok'
  }
)]
Tool call:
>>>  [FunctionResponse(
  id='function-call-15146055125664032351',
  name='turn_on_the_lights',
  response={
    'result': 'ok'
  }
)]
Tool call:
>>>  [FunctionResponse(
  id='function-call-6349727572494443973',
  name='turn_off_the_lights',
  response={
    'result': 'ok'
  }
)]
Tool call:
>>>  [FunctionResponse(
  id='function-call-15407822005768767030',
  name='turn_on_the_lights',
  response={
    'result': 'ok'
  }
)]
Tool call:
>>>  [FunctionResponse(
  id='function-call-6349727572494444250',
  name='turn_on_the_lights',
  response={
    '

-------------------------------

``` 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

```

-------------------------------

此

代码循环访问数字 1 到 20。对于每个数字，它

首先打印该数字。然后，它检查该数字是否是 3 的倍数。如果是，则使用 `default_api.turn_on_

the_lights()` 打开灯。接下来，它检查该数字是否是 5 的倍数。如果是，则使用 `default_api.turn_

off_the_lights()` 关闭灯。


## Google search

`google_search` 工具允许模型执行 Google 搜索。例如，您可以尝试询问一些发生时间太近、尚未包含在训练数据中的事件。

搜索仍将以 `AUDIO` 模式执行，但您不会看到详细的搜索结果：

In [38]:
prompt="巴西对阵阿根廷的足球比赛是什么时候进行的？最终比分是多少？"

tools = [
   {'google_search': {}}
]

await run(prompt, tools=tools, modality="TEXT")

巴西对阵阿根廷的足球比赛是什么时候进行的？最终比分是多少？

-------------------------------

-------------------------------

``` python
print(google_search.search(queries=["巴西对阵阿根廷足球比赛时间及比分", "Brazil vs Argentina soccer game date and score"]))

```

-------------------------------

-------------------------------

``` 
Looking up information on Google Search.

```

-------------------------------

根据

我搜索到的信息，巴西和阿根廷最近一次足球比赛是在20

25年3月26日进行的世界杯预选赛上，阿根廷以4-1战胜巴西。比赛在阿根廷布

宜诺斯艾利斯的Estadio Mâs Monumental体育场举行。


## Multi-tool


新版 API 的最大区别在于，您不再受限于每次请求只能使用一个工具。尝试将前面章节中的任务组合起来使用：

In [39]:
prompt = """\
  嘿，我需要你帮我做三件事。

1. 计算小于 100000 的最大素数平面。

2. 用谷歌搜索查找关于 2024 年 12 月 5 日当周加州最大地震的信息。

3. 打开灯。

谢谢！
  """

tools = [
    {'google_search': {}},
    {'code_execution': {}},
    {'function_declarations': [turn_on_the_lights, turn_off_the_lights]}
]

await run(prompt, tools=tools, modality="TEXT")

  嘿，我需要你帮我做三件事。

1. 计算小于 100000 的最大素数平面。

2. 用谷歌搜索查找关于 2024 年 12 月 5 日当周加州最大地震的信息。

3. 打开灯。

谢谢！
  

-------------------------------

好的

，没问题！我会依次处理这些任务。

首先，计算小于 10

0000 的最大素数。


-------------------------------

``` python
import sympy

def find_largest_prime_below(limit):
  """Finds the largest prime number less than the given limit."""
  n = limit
  while n > 2:
    n -= 1
    if sympy.isprime(n):
      return n
  return None

largest_prime = find_largest_prime_below(100000)
print(largest_prime)

```

-------------------------------

-------------------------------

``` 
99991

```

-------------------------------

好的

，小于 100000 的最大素数是 9

9991。

接下来，我将搜索关于 2024 年 12 月 5 日当周加州最大地震的信息。


-------------------------------

``` python
concise_search("largest earthquake California week of December 5, 2024", max_num_results=5)

```

-------------------------------

-------------------------------

``` 
Looking up information on Google Search.

```

-------------------------------

根据

搜索结果，2024 年 12 月 5 日左右

加州发生了一次大地震，震级为 7.0 级，震中位于门多西诺角附近的海上。 它引发了海

啸警报，但造成的破坏很小。

最后，我将尝试打开灯。


-------------------------------

``` python
try:
  result = default_api.turn_on_the_lights()
  print(result)
except Exception as e:
  print(f"Error turning on lights: {e}")

```

-------------------------------

Tool call:
>>>  [FunctionResponse(
  id='function-call-12208865589107067837',
  name='turn_on_the_lights',
  response={
    'result': 'ok'
  }
)]


-------------------------------

``` 
{'result': 'ok'}

```

-------------------------------

好的

，灯已经打开了。

我已经完成了你要求的所有三项任务：

计算了小于 100000 的最大素数（99991），搜索了关于 2024 年 1

2 月 5 日当周加州最大地震的信息，并打开了灯。


## 后续步骤

- 有关 SDK 的更多信息，请参阅 [SDK 文档](https://googleapis.github.io/python-genai/)

或者，您可以查看 [Cookbook](https://github.com/google-gemini/cookbook/tree/main/quickstarts) 中的其他 Gemini 2.5 功能，特别是另一个 [Search_Grounding](https://github.com/google-gemini/cookbook/blob/main/quickstarts/Search_Grounding.ipynb) 示例以及关于 Gemini [空间功能](https://github.com/google-gemini/cookbook/blob/main/quickstarts/Spatial_understanding.ipynb) 的示例。