From 5d5fbe45849b6895c22fb4c6d5ad2c95879cd5f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20A=C3=9Fmus?= Date: Mon, 29 Jun 2026 09:31:50 +0200 Subject: [PATCH] Support streaming function calls in Vertex LlmProvider --- crates/llm/src/vertex.rs | 768 +++++++++++++++++++++++++++++++++++---- 1 file changed, 698 insertions(+), 70 deletions(-) diff --git a/crates/llm/src/vertex.rs b/crates/llm/src/vertex.rs index c13c37c1..b7a64894 100644 --- a/crates/llm/src/vertex.rs +++ b/crates/llm/src/vertex.rs @@ -2,7 +2,7 @@ use crate::{ recording::APIRecorder, types::*, utils, ApiError, LLMProvider, RateLimitHandler, StreamingCallback, StreamingChunk, }; -use anyhow::Result; +use anyhow::{bail, Result}; use async_trait::async_trait; use reqwest::{Client, Response}; use serde::{Deserialize, Serialize}; @@ -201,8 +201,24 @@ struct VertexContent { #[derive(Debug, Serialize, Deserialize)] struct VertexFunctionCall { - name: String, - args: serde_json::Value, + #[serde(skip_serializing_if = "Option::is_none")] + name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + args: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + partial_args: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + will_continue: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +struct VertexPartialArg { + json_path: String, + #[serde(skip_serializing_if = "Option::is_none")] + will_continue: Option, + #[serde(flatten)] + value: serde_json::Map, } #[derive(Debug, Serialize, Deserialize)] @@ -243,6 +259,401 @@ impl RateLimitHandler for VertexRateLimitInfo { } } +#[derive(Debug)] +struct VertexStreamingState { + content_blocks: Vec, + active_tool_calls: Vec, + tool_counter: u32, + last_usage: Option, +} + +impl VertexStreamingState { + fn new() -> Self { + Self { + content_blocks: Vec::new(), + active_tool_calls: Vec::new(), + tool_counter: 0, + last_usage: None, + } + } +} + +#[derive(Debug)] +struct ActiveVertexToolCall { + block_index: usize, + id: String, + name: String, + completed: bool, + final_input_emitted: bool, +} + +fn finish_last_block(blocks: &mut [ContentBlock]) { + let now = SystemTime::now(); + if let Some( + ContentBlock::Text { end_time, .. } + | ContentBlock::Thinking { end_time, .. } + | ContentBlock::ToolUse { end_time, .. }, + ) = blocks.last_mut() + { + if end_time.is_none() { + *end_time = Some(now); + } + } +} + +fn enable_streaming_function_call_arguments(request_json: &mut serde_json::Value) { + if request_json + .get("tools") + .is_none_or(|tools| tools.is_null()) + { + return; + } + + request_json["tool_config"]["function_calling_config"] = json!({ + "mode": "AUTO", + "stream_function_call_arguments": true, + }); +} + +fn emit_vertex_input_json_snapshot( + callback: &StreamingCallback, + tool: &ActiveVertexToolCall, + input: &serde_json::Value, +) -> Result<()> { + callback(&StreamingChunk::InputJson { + content: serde_json::to_string(input)?, + tool_name: Some(tool.name.clone()), + tool_id: Some(tool.id.clone()), + }) +} + +fn emit_vertex_tool_start(callback: &StreamingCallback, tool: &ActiveVertexToolCall) -> Result<()> { + callback(&StreamingChunk::InputJson { + content: String::new(), + tool_name: Some(tool.name.clone()), + tool_id: Some(tool.id.clone()), + }) +} + +fn vertex_tool_input_mut<'a>( + blocks: &'a mut [ContentBlock], + tool: &ActiveVertexToolCall, +) -> Result<&'a mut serde_json::Value> { + match blocks.get_mut(tool.block_index) { + Some(ContentBlock::ToolUse { input, .. }) => Ok(input), + _ => bail!("Vertex streaming tool state points to a non-tool block"), + } +} + +fn vertex_tool_input<'a>( + blocks: &'a [ContentBlock], + tool: &ActiveVertexToolCall, +) -> Result<&'a serde_json::Value> { + match blocks.get(tool.block_index) { + Some(ContentBlock::ToolUse { input, .. }) => Ok(input), + _ => bail!("Vertex streaming tool state points to a non-tool block"), + } +} + +fn start_vertex_tool_call( + state: &mut VertexStreamingState, + request_id: u64, + name: String, + thought_signature: Option, + callback: &StreamingCallback, +) -> Result { + finish_last_block(&mut state.content_blocks); + + state.tool_counter += 1; + let tool_id = format!("tool-{}-{}", request_id, state.tool_counter); + let block_index = state.content_blocks.len(); + + state.content_blocks.push(ContentBlock::ToolUse { + id: tool_id.clone(), + name: name.clone(), + input: json!({}), + thought_signature, + start_time: Some(SystemTime::now()), + end_time: None, + }); + + state.active_tool_calls.push(ActiveVertexToolCall { + block_index, + id: tool_id, + name, + completed: false, + final_input_emitted: false, + }); + + let tool = state + .active_tool_calls + .last() + .expect("newly pushed Vertex tool call must exist"); + emit_vertex_tool_start(callback, tool)?; + + Ok(state.active_tool_calls.len() - 1) +} + +fn active_vertex_tool_call_index(state: &VertexStreamingState) -> Option { + state + .active_tool_calls + .iter() + .enumerate() + .rev() + .find_map(|(index, tool)| (!tool.completed).then_some(index)) +} + +fn active_vertex_tool_call_index_by_name( + state: &VertexStreamingState, + name: &str, +) -> Option { + state + .active_tool_calls + .iter() + .enumerate() + .rev() + .find_map(|(index, tool)| (!tool.completed && tool.name == name).then_some(index)) +} + +fn handle_vertex_function_call( + function_call: &VertexFunctionCall, + thought_signature: Option, + state: &mut VertexStreamingState, + request_id: u64, + callback: &StreamingCallback, +) -> Result<()> { + let has_name = function_call + .name + .as_deref() + .is_some_and(|name| !name.is_empty()); + + let tool_index = if let Some(name) = function_call + .name + .as_deref() + .filter(|name| !name.is_empty()) + { + if let Some(index) = active_vertex_tool_call_index_by_name(state, name) { + index + } else { + start_vertex_tool_call( + state, + request_id, + name.to_string(), + thought_signature, + callback, + )? + } + } else { + match active_vertex_tool_call_index(state) { + Some(index) => index, + None => { + warn!("Vertex functionCall continuation arrived without an active tool call"); + return Ok(()); + } + } + }; + + if let Some(args) = &function_call.args { + let tool = &state.active_tool_calls[tool_index]; + *vertex_tool_input_mut(&mut state.content_blocks, tool)? = args.clone(); + } + + for partial_arg in &function_call.partial_args { + let tool = &state.active_tool_calls[tool_index]; + let input = vertex_tool_input_mut(&mut state.content_blocks, tool)?; + apply_vertex_partial_arg(input, partial_arg)?; + } + + let should_complete = function_call.args.is_some() + || function_call.will_continue == Some(false) + || (!has_name + && function_call.args.is_none() + && function_call.partial_args.is_empty() + && function_call.will_continue.is_none()); + + if should_complete { + let tool = &state.active_tool_calls[tool_index]; + let input = vertex_tool_input(&state.content_blocks, tool)?; + if !tool.final_input_emitted { + emit_vertex_input_json_snapshot(callback, tool, input)?; + } + + if let Some(ContentBlock::ToolUse { end_time, .. }) = + state.content_blocks.get_mut(tool.block_index) + { + *end_time = Some(SystemTime::now()); + } + + if let Some(tool) = state.active_tool_calls.get_mut(tool_index) { + tool.completed = true; + tool.final_input_emitted = true; + } + } + + Ok(()) +} + +fn finish_open_vertex_tool_calls( + state: &mut VertexStreamingState, + callback: &StreamingCallback, +) -> Result<()> { + for tool in &mut state.active_tool_calls { + if tool.completed { + continue; + } + + let input = vertex_tool_input(&state.content_blocks, tool)?; + if !tool.final_input_emitted { + emit_vertex_input_json_snapshot(callback, tool, input)?; + tool.final_input_emitted = true; + } + + if let Some(ContentBlock::ToolUse { end_time, .. }) = + state.content_blocks.get_mut(tool.block_index) + { + *end_time = Some(SystemTime::now()); + } + + tool.completed = true; + } + + Ok(()) +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum VertexJsonPathToken { + ObjectKey(String), + ArrayIndex(usize), +} + +fn parse_vertex_json_path(path: &str) -> Result> { + let mut chars = path.chars().peekable(); + if chars.next() != Some('$') { + bail!("Vertex partialArgs jsonPath must start with '$': {path}"); + } + + let mut tokens = Vec::new(); + while let Some(ch) = chars.next() { + match ch { + '.' => { + let mut key = String::new(); + while let Some(next) = chars.peek().copied() { + if next == '.' || next == '[' { + break; + } + key.push(next); + chars.next(); + } + if key.is_empty() { + bail!("Vertex partialArgs jsonPath has an empty object key: {path}"); + } + tokens.push(VertexJsonPathToken::ObjectKey(key)); + } + '[' => { + let mut index = String::new(); + for next in chars.by_ref() { + if next == ']' { + break; + } + index.push(next); + } + let index = index + .parse::() + .map_err(|_| anyhow::anyhow!("Unsupported Vertex jsonPath index in {path}"))?; + tokens.push(VertexJsonPathToken::ArrayIndex(index)); + } + _ => bail!("Unsupported Vertex partialArgs jsonPath syntax: {path}"), + } + } + + Ok(tokens) +} + +fn vertex_partial_arg_value(partial_arg: &VertexPartialArg) -> Option { + if let Some(value) = partial_arg.value.get("stringValue") { + return value.as_str().map(|value| json!(value)); + } + if let Some(value) = partial_arg.value.get("numberValue") { + return Some(value.clone()); + } + if let Some(value) = partial_arg.value.get("boolValue") { + return value.as_bool().map(|value| json!(value)); + } + if partial_arg.value.contains_key("nullValue") { + return Some(serde_json::Value::Null); + } + None +} + +fn apply_vertex_partial_arg( + target: &mut serde_json::Value, + partial_arg: &VertexPartialArg, +) -> Result<()> { + let Some(value) = vertex_partial_arg_value(partial_arg) else { + return Ok(()); + }; + let tokens = parse_vertex_json_path(&partial_arg.json_path)?; + if tokens.is_empty() { + merge_vertex_partial_value(target, value); + return Ok(()); + } + + let mut current = target; + for (index, token) in tokens.iter().enumerate() { + let is_last = index + 1 == tokens.len(); + match token { + VertexJsonPathToken::ObjectKey(key) => { + if !current.is_object() { + *current = json!({}); + } + let object = current.as_object_mut().expect("object just created"); + if is_last { + let slot = object.entry(key.clone()).or_insert(serde_json::Value::Null); + merge_vertex_partial_value(slot, value); + return Ok(()); + } + current = object + .entry(key.clone()) + .or_insert_with(|| match tokens[index + 1] { + VertexJsonPathToken::ObjectKey(_) => json!({}), + VertexJsonPathToken::ArrayIndex(_) => json!([]), + }); + } + VertexJsonPathToken::ArrayIndex(array_index) => { + if !current.is_array() { + *current = json!([]); + } + let array = current.as_array_mut().expect("array just created"); + while array.len() <= *array_index { + array.push(serde_json::Value::Null); + } + if is_last { + merge_vertex_partial_value(&mut array[*array_index], value); + return Ok(()); + } + if array[*array_index].is_null() { + array[*array_index] = match tokens[index + 1] { + VertexJsonPathToken::ObjectKey(_) => json!({}), + VertexJsonPathToken::ArrayIndex(_) => json!([]), + }; + } + current = &mut array[*array_index]; + } + } + } + + Ok(()) +} + +fn merge_vertex_partial_value(target: &mut serde_json::Value, value: serde_json::Value) { + match (target, value) { + (serde_json::Value::String(existing), serde_json::Value::String(delta)) => { + existing.push_str(&delta); + } + (slot, value) => *slot = value, + } +} + pub struct VertexClient { client: Client, model: String, @@ -386,8 +797,10 @@ impl VertexClient { thought: None, thought_signature: thought_signature.clone(), function_call: Some(VertexFunctionCall { - name: name.clone(), - args: input.clone(), + name: Some(name.clone()), + args: Some(input.clone()), + partial_args: Vec::new(), + will_continue: None, }), function_response: None, }), @@ -548,8 +961,8 @@ impl VertexClient { let tool_id = format!("tool-{request_id}-{tool_counter}"); ContentBlock::ToolUse { id: tool_id, - name: function_call.name, - input: function_call.args, + name: function_call.name.unwrap_or_default(), + input: function_call.args.unwrap_or_default(), thought_signature: part.thought_signature.clone(), start_time: None, end_time: None, @@ -613,6 +1026,8 @@ impl VertexClient { request_json = crate::config_merge::merge_json(request_json, custom_config.clone()); } + enable_streaming_function_call_arguments(&mut request_json); + // Allow request customizer to modify the request self.request_customizer .customize_request(&mut request_json)?; @@ -658,32 +1073,13 @@ impl VertexClient { let mut response = utils::check_response_error::(response).await?; let rate_limits = VertexRateLimitInfo::from_response(&response); - let mut content_blocks = Vec::new(); - let mut last_usage: Option = None; + let mut state = VertexStreamingState::new(); let mut line_buffer = String::new(); - let mut tool_counter = 0; - - let finish_last_block = |blocks: &mut Vec| { - // Complete the previous block if it exists - let now = std::time::SystemTime::now(); - if let Some( - ContentBlock::Text { end_time, .. } - | ContentBlock::Thinking { end_time, .. } - | ContentBlock::ToolUse { end_time, .. }, - ) = blocks.last_mut() - { - if end_time.is_none() { - *end_time = Some(now); - } - } - }; // Helper function to process SSE lines let process_sse_line = |line: &str, - blocks: &mut Vec, - usage: &mut Option, + state: &mut VertexStreamingState, callback: &StreamingCallback, - tool_counter: &mut u32, request_id: u64, recorder: &Option| -> Result<()> { @@ -696,7 +1092,7 @@ impl VertexClient { if let Ok(response) = serde_json::from_str::(data) { // Always update usage metadata if present (including final responses) if let Some(usage_metadata) = response.usage_metadata { - *usage = Some(usage_metadata); + state.last_usage = Some(usage_metadata); } // Process candidates and their content parts if present if let Some(candidate) = response.candidates.first() { @@ -705,7 +1101,7 @@ impl VertexClient { // Check if this is a thinking part if part.thought == Some(true) { // Check if we can extend the last thinking block or need to create a new one - match blocks.last_mut() { + match state.content_blocks.last_mut() { Some(ContentBlock::Thinking { thinking, signature, @@ -720,10 +1116,10 @@ impl VertexClient { } _ => { // Complete the previous block if it exists - finish_last_block(blocks); + finish_last_block(&mut state.content_blocks); // Create new thinking block - blocks.push(ContentBlock::Thinking { + state.content_blocks.push(ContentBlock::Thinking { thinking: text.clone(), signature: part .thought_signature @@ -738,7 +1134,7 @@ impl VertexClient { callback(&StreamingChunk::Thinking(text.clone()))?; } else { // Check if we can extend the last text block or need to create a new one - match blocks.last_mut() { + match state.content_blocks.last_mut() { Some(ContentBlock::Text { text: last_text, .. }) => { @@ -746,10 +1142,10 @@ impl VertexClient { last_text.push_str(text); } _ => { - finish_last_block(blocks); + finish_last_block(&mut state.content_blocks); // Create new text block - blocks.push(ContentBlock::Text { + state.content_blocks.push(ContentBlock::Text { text: text.clone(), start_time: Some(SystemTime::now()), end_time: None, @@ -760,30 +1156,13 @@ impl VertexClient { callback(&StreamingChunk::Text(text.clone()))?; } } else if let Some(function_call) = &part.function_call { - // Generate a tool ID using request_id and counter - *tool_counter += 1; - let tool_id = format!("tool-{}-{}", request_id, *tool_counter); - - finish_last_block(blocks); - - // Always create a new tool use block (they don't get extended) - blocks.push(ContentBlock::ToolUse { - id: tool_id.clone(), - name: function_call.name.clone(), - input: function_call.args.clone(), - thought_signature: part.thought_signature.clone(), - start_time: Some(SystemTime::now()), - end_time: None, - }); - - // Stream the JSON input for tools - if let Ok(args_str) = serde_json::to_string(&function_call.args) { - callback(&StreamingChunk::InputJson { - content: args_str, - tool_name: Some(function_call.name.clone()), - tool_id: Some(tool_id), - })?; - } + handle_vertex_function_call( + function_call, + part.thought_signature.clone(), + state, + request_id, + callback, + )?; } } } @@ -804,10 +1183,8 @@ impl VertexClient { if !line_buffer.is_empty() { match process_sse_line( &line_buffer, - &mut content_blocks, - &mut last_usage, + &mut state, streaming_callback, - &mut tool_counter, request_id, &self.recorder, ) { @@ -816,9 +1193,9 @@ impl VertexClient { continue; } Err(e) if e.to_string().contains("Tool limit reached") => { - debug!("Tool limit reached, stopping streaming early. Collected {} blocks so far", content_blocks.len()); + debug!("Tool limit reached, stopping streaming early. Collected {} blocks so far", state.content_blocks.len()); - finish_last_block(&mut content_blocks); + finish_last_block(&mut state.content_blocks); line_buffer.clear(); // Make sure we stop processing break; // Exit chunk processing loop early @@ -836,16 +1213,15 @@ impl VertexClient { if !line_buffer.is_empty() { process_sse_line( &line_buffer, - &mut content_blocks, - &mut last_usage, + &mut state, streaming_callback, - &mut tool_counter, request_id, &self.recorder, )?; } - finish_last_block(&mut content_blocks); + finish_open_vertex_tool_calls(&mut state, streaming_callback)?; + finish_last_block(&mut state.content_blocks); // Send StreamingComplete to indicate streaming has finished streaming_callback(&StreamingChunk::StreamingComplete)?; @@ -857,8 +1233,8 @@ impl VertexClient { Ok(( LLMResponse { - content: content_blocks, - usage: if let Some(usage_metadata) = last_usage { + content: state.content_blocks, + usage: if let Some(usage_metadata) = state.last_usage { Usage { input_tokens: usage_metadata.prompt_token_count, output_tokens: usage_metadata.candidates_token_count, @@ -989,3 +1365,255 @@ Note, there is no ID associated with each function call/result, only the order. } ``` */ + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + + fn partial_string(path: &str, value: &str, will_continue: Option) -> VertexPartialArg { + let mut partial_value = serde_json::Map::new(); + partial_value.insert("stringValue".to_string(), json!(value)); + VertexPartialArg { + json_path: path.to_string(), + will_continue, + value: partial_value, + } + } + + fn partial_number(path: &str, value: i64) -> VertexPartialArg { + let mut partial_value = serde_json::Map::new(); + partial_value.insert("numberValue".to_string(), json!(value)); + VertexPartialArg { + json_path: path.to_string(), + will_continue: None, + value: partial_value, + } + } + + fn capture_callback() -> (StreamingCallback, Arc>>) { + let chunks = Arc::new(Mutex::new(Vec::new())); + let captured = chunks.clone(); + let callback = Box::new(move |chunk: &StreamingChunk| { + captured.lock().unwrap().push(chunk.clone()); + Ok(()) + }); + (callback, chunks) + } + + fn input_json_chunks(chunks: &Arc>>) -> Vec { + chunks + .lock() + .unwrap() + .iter() + .filter(|chunk| matches!(chunk, StreamingChunk::InputJson { .. })) + .cloned() + .collect() + } + + #[test] + fn streaming_requests_enable_vertex_function_call_argument_streaming() { + let mut request = json!({ + "contents": [], + "tools": [{ + "function_declarations": [] + }] + }); + + enable_streaming_function_call_arguments(&mut request); + + assert_eq!( + request["tool_config"]["function_calling_config"], + json!({ + "mode": "AUTO", + "stream_function_call_arguments": true, + }) + ); + + let mut request_without_tools = json!({ "contents": [] }); + enable_streaming_function_call_arguments(&mut request_without_tools); + assert!(request_without_tools.get("tool_config").is_none()); + } + + #[test] + fn vertex_partial_args_merge_into_nested_tool_input() -> Result<()> { + let mut input = json!({}); + + apply_vertex_partial_arg(&mut input, &partial_string("$.scene", "dot ", Some(true)))?; + apply_vertex_partial_arg(&mut input, &partial_string("$.scene", "A", Some(false)))?; + apply_vertex_partial_arg(&mut input, &partial_number("$.items[0].count", 2))?; + + assert_eq!( + input, + json!({ + "scene": "dot A", + "items": [{ + "count": 2 + }] + }) + ); + + Ok(()) + } + + #[test] + fn vertex_streaming_extends_one_tool_call_across_partial_arg_events() -> Result<()> { + let (callback, chunks) = capture_callback(); + let mut state = VertexStreamingState::new(); + + handle_vertex_function_call( + &VertexFunctionCall { + name: Some("set_scene".to_string()), + args: None, + partial_args: Vec::new(), + will_continue: Some(true), + }, + None, + &mut state, + 7, + &callback, + )?; + handle_vertex_function_call( + &VertexFunctionCall { + name: Some("set_scene".to_string()), + args: None, + partial_args: vec![partial_string("$.scene", "#A 10 ", Some(true))], + will_continue: Some(true), + }, + None, + &mut state, + 7, + &callback, + )?; + handle_vertex_function_call( + &VertexFunctionCall { + name: None, + args: None, + partial_args: vec![partial_string("$.scene", "20", Some(false))], + will_continue: Some(true), + }, + None, + &mut state, + 7, + &callback, + )?; + handle_vertex_function_call( + &VertexFunctionCall { + name: None, + args: None, + partial_args: Vec::new(), + will_continue: Some(false), + }, + None, + &mut state, + 7, + &callback, + )?; + + assert_eq!(state.content_blocks.len(), 1); + match &state.content_blocks[0] { + ContentBlock::ToolUse { + id, name, input, .. + } => { + assert_eq!(id, "tool-7-1"); + assert_eq!(name, "set_scene"); + assert_eq!(input, &json!({ "scene": "#A 10 20" })); + } + other => panic!("expected ToolUse block, got {other:?}"), + } + + let chunks = input_json_chunks(&chunks); + assert_eq!(chunks.len(), 2); + match &chunks[0] { + StreamingChunk::InputJson { + content, + tool_name, + tool_id, + } => { + assert_eq!(content, ""); + assert_eq!(tool_name.as_deref(), Some("set_scene")); + assert_eq!(tool_id.as_deref(), Some("tool-7-1")); + } + other => panic!("expected InputJson chunk, got {other:?}"), + } + match &chunks[1] { + StreamingChunk::InputJson { + content, + tool_name, + tool_id, + } => { + assert_eq!( + serde_json::from_str::(content)?, + json!({ "scene": "#A 10 20" }) + ); + assert_eq!(tool_name.as_deref(), Some("set_scene")); + assert_eq!(tool_id.as_deref(), Some("tool-7-1")); + } + other => panic!("expected InputJson chunk, got {other:?}"), + } + + Ok(()) + } + + #[test] + fn vertex_streaming_keeps_multiple_tool_calls_as_separate_blocks() -> Result<()> { + let (callback, chunks) = capture_callback(); + let mut state = VertexStreamingState::new(); + + handle_vertex_function_call( + &VertexFunctionCall { + name: Some("get_weather".to_string()), + args: Some(json!({ "location": "New Delhi" })), + partial_args: Vec::new(), + will_continue: None, + }, + None, + &mut state, + 3, + &callback, + )?; + handle_vertex_function_call( + &VertexFunctionCall { + name: Some("get_weather".to_string()), + args: Some(json!({ "location": "San Francisco" })), + partial_args: Vec::new(), + will_continue: None, + }, + None, + &mut state, + 3, + &callback, + )?; + + assert_eq!(state.content_blocks.len(), 2); + match &state.content_blocks[0] { + ContentBlock::ToolUse { id, input, .. } => { + assert_eq!(id, "tool-3-1"); + assert_eq!(input, &json!({ "location": "New Delhi" })); + } + other => panic!("expected first ToolUse block, got {other:?}"), + } + match &state.content_blocks[1] { + ContentBlock::ToolUse { id, input, .. } => { + assert_eq!(id, "tool-3-2"); + assert_eq!(input, &json!({ "location": "San Francisco" })); + } + other => panic!("expected second ToolUse block, got {other:?}"), + } + + let chunks = input_json_chunks(&chunks); + assert_eq!(chunks.len(), 4); + assert!(matches!( + &chunks[0], + StreamingChunk::InputJson { content, tool_id, .. } + if content.is_empty() && tool_id.as_deref() == Some("tool-3-1") + )); + assert!(matches!( + &chunks[2], + StreamingChunk::InputJson { content, tool_id, .. } + if content.is_empty() && tool_id.as_deref() == Some("tool-3-2") + )); + + Ok(()) + } +}