From 50953628fad131830d0d065048fde7c0f4dc8411 Mon Sep 17 00:00:00 2001 From: morgan fainberg Date: Thu, 26 Feb 2026 09:13:08 -0800 Subject: [PATCH 1/3] fix: route List[Path] outputs through FileOutput IPC for upload When a predict function returns List[Path] (not an iterator), the list was being processed by output::process_output which base64-encodes Path objects inline. This caused multi-megabyte data URIs in the response instead of uploaded URLs. Now process_single_output detects list/tuple returns and iterates their items through send_output_item, which routes Path objects through the FileOutput IPC path for proper upload. Adds coglet_list_path_upload_url integration test to verify List[Path] outputs are uploaded via --upload-url and not base64-encoded. --- crates/coglet-python/src/predictor.rs | 16 +++++++ .../tests/coglet_list_path_upload_url.txtar | 46 +++++++++++++++++++ 2 files changed, 62 insertions(+) create mode 100644 integration-tests/tests/coglet_list_path_upload_url.txtar diff --git a/crates/coglet-python/src/predictor.rs b/crates/coglet-python/src/predictor.rs index b28afb31e5..1bab5b3a17 100644 --- a/crates/coglet-python/src/predictor.rs +++ b/crates/coglet-python/src/predictor.rs @@ -747,6 +747,22 @@ impl PythonPredictor { return Ok(PredictionOutput::Single(serde_json::Value::Null)); } + // List/tuple output — iterate items so file outputs (Path, IOBase) + // go through the FileOutput IPC path for upload instead of being + // base64-encoded inline by process_output. + if let Ok(list) = result.cast::() { + for item in list.iter() { + send_output_item(py, &item, json_module, slot_sender)?; + } + return Ok(PredictionOutput::Stream(vec![])); + } + if let Ok(tuple) = result.cast::() { + for item in tuple.iter() { + send_output_item(py, &item, json_module, slot_sender)?; + } + return Ok(PredictionOutput::Stream(vec![])); + } + // Non-file output — process normally let processed = output::process_output(py, result, None) .map_err(|e| PredictionError::Failed(format!("Failed to process output: {}", e)))?; diff --git a/integration-tests/tests/coglet_list_path_upload_url.txtar b/integration-tests/tests/coglet_list_path_upload_url.txtar new file mode 100644 index 0000000000..5b62e01830 --- /dev/null +++ b/integration-tests/tests/coglet_list_path_upload_url.txtar @@ -0,0 +1,46 @@ +# Test that List[Path] outputs are uploaded to --upload-url. +# This verifies that list returns (not just iterators) go through the +# FileOutput IPC path for upload instead of being base64-encoded inline. + +cog build -t $TEST_IMAGE + +# Start mock upload server on the host, sets $UPLOAD_SERVER_URL +upload-server-start + +cog serve --upload-url $UPLOAD_SERVER_URL + +# Run a prediction — three Path outputs should be uploaded, not base64-encoded +curl POST /predictions '{"input":{}}' +stdout '"status":"succeeded"' +# Outputs should be URLs pointing at the mock server, not data URIs +stdout '"output":\["http://host.docker.internal' +! stdout 'data:image/' + +# Verify the mock server received exactly 3 PUT uploads +upload-server-count 3 + +-- cog.yaml -- +build: + python_version: "3.12" + python_packages: + - "pillow==10.4.0" +predict: "predict.py:Predictor" + +-- predict.py -- +import os +import tempfile +from typing import List + +from cog import BasePredictor, Path +from PIL import Image + + +class Predictor(BasePredictor): + def predict(self) -> List[Path]: + outputs = [] + for color in ["red", "blue", "green"]: + d = tempfile.mkdtemp() + p = os.path.join(d, f"{color}.png") + Image.new("RGB", (10, 10), color).save(p) + outputs.append(Path(p)) + return outputs From 1639c91e11086dbe4872b0c242709cdacf37a802 Mon Sep 17 00:00:00 2001 From: morgan fainberg Date: Thu, 26 Feb 2026 10:29:48 -0800 Subject: [PATCH 2/3] fix: use schema Output type to preserve array shape for single-element List[Path] Single-element List[Path] returns were collapsed to a scalar string instead of a one-element array, breaking the API contract. The orchestrator now checks the schema's Output type ("type": "array") to decide Single vs Stream wrapping. For Any/missing schemas, a predictor-side is_stream flag on the Done message serves as fallback. - Add wrap_outputs() helper with schema + predictor signal priority - Propagate is_stream through PredictionOutcome -> worker -> Done IPC - 13 unit tests covering all wrapping combinations + serde round-trip - Integration tests for List[Path] single-element and scalar Path output --- crates/coglet-python/src/worker_bridge.rs | 7 +- crates/coglet/src/bridge/codec.rs | 3 + crates/coglet/src/bridge/protocol.rs | 6 + crates/coglet/src/orchestrator.rs | 227 ++++++++++++++++-- crates/coglet/src/worker.rs | 18 +- .../coglet_list_path_single_element.txtar | 56 +++++ .../tests/coglet_single_path_output.txtar | 44 ++++ 7 files changed, 342 insertions(+), 19 deletions(-) create mode 100644 integration-tests/tests/coglet_list_path_single_element.txtar create mode 100644 integration-tests/tests/coglet_single_path_output.txtar diff --git a/crates/coglet-python/src/worker_bridge.rs b/crates/coglet-python/src/worker_bridge.rs index e755f19015..c92f2a4567 100644 --- a/crates/coglet-python/src/worker_bridge.rs +++ b/crates/coglet-python/src/worker_bridge.rs @@ -554,7 +554,12 @@ impl PredictHandler for PythonPredictHandler { match result { Ok(r) => { - PredictResult::success(output_to_json(r.output), start.elapsed().as_secs_f64()) + let is_stream = r.output.is_stream(); + PredictResult::success( + output_to_json(r.output), + start.elapsed().as_secs_f64(), + is_stream, + ) } Err(e) => { if matches!(e, coglet_core::PredictionError::Cancelled) { diff --git a/crates/coglet/src/bridge/codec.rs b/crates/coglet/src/bridge/codec.rs index 23e25fcc37..34e3b17a74 100644 --- a/crates/coglet/src/bridge/codec.rs +++ b/crates/coglet/src/bridge/codec.rs @@ -156,6 +156,7 @@ mod tests { id: "test".to_string(), output: Some(serde_json::json!("result")), predict_time: 1.5, + is_stream: false, }; codec.encode(resp, &mut buf).unwrap(); let decoded = codec.decode(&mut buf).unwrap().unwrap(); @@ -165,10 +166,12 @@ mod tests { id, output, predict_time, + is_stream, } => { assert_eq!(id, "test"); assert_eq!(output, Some(serde_json::json!("result"))); assert!((predict_time - 1.5).abs() < 0.001); + assert!(!is_stream); } _ => panic!("wrong variant"), } diff --git a/crates/coglet/src/bridge/protocol.rs b/crates/coglet/src/bridge/protocol.rs index 7efbc7c523..bad14a2647 100644 --- a/crates/coglet/src/bridge/protocol.rs +++ b/crates/coglet/src/bridge/protocol.rs @@ -271,6 +271,11 @@ pub enum SlotResponse { #[serde(skip_serializing_if = "Option::is_none")] output: Option, predict_time: f64, + /// Predictor signal: true when the output is a list, generator, or + /// iterator — used as fallback when the schema Output type is `Any` + /// or unavailable. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + is_stream: bool, }, Failed { @@ -436,6 +441,7 @@ mod tests { id: "pred_123".to_string(), output: Some(json!("final result")), predict_time: 1.234, + is_stream: false, }; insta::assert_json_snapshot!(resp); } diff --git a/crates/coglet/src/orchestrator.rs b/crates/coglet/src/orchestrator.rs index 709164da5d..ac95460d0b 100644 --- a/crates/coglet/src/orchestrator.rs +++ b/crates/coglet/src/orchestrator.rs @@ -101,6 +101,35 @@ fn try_lock_prediction( } } +/// Wrap collected output items into the correct `PredictionOutput` variant. +/// +/// Priority: +/// 1. Schema says `"type": "array"` (`output_is_array = true`) → always `Stream` +/// 2. Predictor signals `is_stream` (list/generator return) → always `Stream` +/// 3. Otherwise → `Single` for one item, `Stream` for multiple +/// +/// This ensures `List[Path]` with a single element returns `["url"]` not `"url"`. +fn wrap_outputs( + outputs: Vec, + output_is_array: bool, + is_stream: bool, +) -> PredictionOutput { + let should_stream = output_is_array || is_stream; + + match outputs.as_slice() { + [] => { + if should_stream { + PredictionOutput::Stream(vec![]) + } else { + PredictionOutput::Single(serde_json::Value::Null) + } + } + _ if should_stream => PredictionOutput::Stream(outputs), + [single] => PredictionOutput::Single(single.clone()), + _ => PredictionOutput::Stream(outputs), + } +} + fn emit_worker_log(target: &str, level: &str, msg: &str) { use std::collections::HashMap; use std::sync::OnceLock; @@ -543,6 +572,25 @@ pub async fn spawn_worker( tracing::trace!(target: "coglet::schema", schema = %json, "OpenAPI schema"); } + // Determine whether the output type is an array from the schema so the + // event loop can correctly wrap single-element list returns as Stream + // instead of collapsing them to Single. + let output_is_array = schema + .as_ref() + .and_then(|s| s.get("components")) + .and_then(|c| c.get("schemas")) + .and_then(|schemas| { + let key = if config.is_train { + "TrainingOutput" + } else { + "Output" + }; + schemas.get(key) + }) + .and_then(|output| output.get("type")) + .and_then(|t| t.as_str()) + .is_some_and(|t| t == "array"); + let pool = Arc::new(PermitPool::new(num_slots)); let sockets = transport.drain_sockets(); @@ -585,6 +633,7 @@ pub async fn spawn_worker( cancel_rx, pool_for_loop, upload_url, + output_is_array, ) .await; }); @@ -616,6 +665,10 @@ async fn run_event_loop( mut cancel_rx: mpsc::Receiver, pool: Arc, upload_url: Option, + // Schema says Output is "type": "array" — always wrap as Stream. + // When false, the schema was unavailable or Output type is Any; fall + // back to the predictor's is_stream flag on the Done message. + output_is_array: bool, ) { let mut predictions: HashMap>> = HashMap::new(); let mut idle_senders: HashMap> = @@ -997,11 +1050,13 @@ async fn run_event_loop( } } } - Ok(SlotResponse::Done { id, output: _, predict_time }) => { + Ok(SlotResponse::Done { id, output: _, predict_time, is_stream }) => { tracing::info!( target: "coglet::prediction", prediction_id = %id, predict_time, + is_stream, + output_is_array, "Prediction succeeded" ); let uploads = pending_uploads.remove(&slot_id).unwrap_or_default(); @@ -1013,11 +1068,11 @@ async fn run_event_loop( // registered waiters; spawning a task can fire the // notification before the service registers its waiter. if let Some(mut p) = try_lock_prediction(&pred) { - let pred_output = match p.take_outputs().as_slice() { - [] => PredictionOutput::Single(serde_json::Value::Null), - [single] => PredictionOutput::Single(single.clone()), - many => PredictionOutput::Stream(many.to_vec()), - }; + let pred_output = wrap_outputs( + p.take_outputs(), + output_is_array, + is_stream, + ); p.set_succeeded(pred_output); } } else { @@ -1040,11 +1095,6 @@ async fn run_event_loop( prediction_id = %upload_pred_id, "Aborting in-flight uploads due to cancellation" ); - // JoinAll drops the JoinHandles when it goes out of - // scope at the end of this branch, but JoinHandle::drop - // does NOT abort the spawned task. The upload tasks - // were already aborted by the cancel handler in the - // event loop (cancel_rx arm), so they will terminate. if let Some(mut p) = try_lock_prediction(&pred) { p.set_canceled(); } @@ -1057,11 +1107,11 @@ async fn run_event_loop( } } if let Some(mut p) = try_lock_prediction(&pred) { - let pred_output = match p.take_outputs().as_slice() { - [] => PredictionOutput::Single(serde_json::Value::Null), - [single] => PredictionOutput::Single(single.clone()), - many => PredictionOutput::Stream(many.to_vec()), - }; + let pred_output = wrap_outputs( + p.take_outputs(), + output_is_array, + is_stream, + ); p.set_succeeded(pred_output); } }); @@ -1121,3 +1171,148 @@ async fn run_event_loop( tracing::info!("Event loop exiting"); } + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + // ── wrap_outputs: schema says array (output_is_array = true) ── + + #[test] + fn wrap_outputs_schema_array_empty() { + // List[Path] that returned no items → empty array + let result = wrap_outputs(vec![], true, true); + assert!(result.is_stream()); + assert_eq!(result.into_values(), Vec::::new()); + } + + #[test] + fn wrap_outputs_schema_array_single_item() { + // List[Path] with num_outputs=1 → ["url"] not "url" + let result = wrap_outputs(vec![json!("https://example.com/img.png")], true, true); + assert!(result.is_stream()); + assert_eq!( + result.into_values(), + vec![json!("https://example.com/img.png")] + ); + } + + #[test] + fn wrap_outputs_schema_array_multiple_items() { + // List[Path] with num_outputs=4 + let items = vec![ + json!("https://example.com/1.png"), + json!("https://example.com/2.png"), + json!("https://example.com/3.png"), + json!("https://example.com/4.png"), + ]; + let result = wrap_outputs(items.clone(), true, true); + assert!(result.is_stream()); + assert_eq!(result.into_values(), items); + } + + #[test] + fn wrap_outputs_schema_array_overrides_is_stream_false() { + // Schema says array but predictor didn't set is_stream (shouldn't happen, + // but schema is authoritative) + let result = wrap_outputs(vec![json!("url")], true, false); + assert!(result.is_stream()); + } + + // ── wrap_outputs: predictor signal (is_stream = true, no schema) ── + + #[test] + fn wrap_outputs_predictor_stream_empty() { + // Generator that yielded nothing, no schema + let result = wrap_outputs(vec![], false, true); + assert!(result.is_stream()); + assert_eq!(result.into_values(), Vec::::new()); + } + + #[test] + fn wrap_outputs_predictor_stream_single_item() { + // Any-typed list with one element, no schema + let result = wrap_outputs(vec![json!("only_item")], false, true); + assert!(result.is_stream()); + assert_eq!(result.into_values(), vec![json!("only_item")]); + } + + #[test] + fn wrap_outputs_predictor_stream_multiple_items() { + // Generator yielding multiple, no schema + let items = vec![json!("a"), json!("b"), json!("c")]; + let result = wrap_outputs(items.clone(), false, true); + assert!(result.is_stream()); + assert_eq!(result.into_values(), items); + } + + // ── wrap_outputs: scalar output (neither schema array nor predictor stream) ── + + #[test] + fn wrap_outputs_scalar_empty() { + // Single output that was null (e.g. Path sent via FileOutput, not yet resolved?) + let result = wrap_outputs(vec![], false, false); + assert!(!result.is_stream()); + assert_eq!(result.final_value(), &json!(null)); + } + + #[test] + fn wrap_outputs_scalar_single() { + // return Path("output.png") → single string + let result = wrap_outputs(vec![json!("https://example.com/output.png")], false, false); + assert!(!result.is_stream()); + assert_eq!(result.final_value(), &json!("https://example.com/output.png")); + } + + #[test] + fn wrap_outputs_scalar_multiple_falls_back_to_stream() { + // Shouldn't happen for scalar returns, but if multiple items arrive + // with neither flag set, Stream is the safe choice + let items = vec![json!("a"), json!("b")]; + let result = wrap_outputs(items.clone(), false, false); + assert!(result.is_stream()); + assert_eq!(result.into_values(), items); + } + + // ── Serialization: is_stream field on Done message ── + + #[test] + fn done_is_stream_false_omitted_from_json() { + let msg = SlotResponse::Done { + id: "p1".into(), + output: None, + predict_time: 1.0, + is_stream: false, + }; + let json = serde_json::to_value(&msg).unwrap(); + assert!(json.get("is_stream").is_none(), "is_stream=false should be omitted"); + } + + #[test] + fn done_is_stream_true_present_in_json() { + let msg = SlotResponse::Done { + id: "p1".into(), + output: None, + predict_time: 1.0, + is_stream: true, + }; + let json = serde_json::to_value(&msg).unwrap(); + assert_eq!(json.get("is_stream"), Some(&json!(true))); + } + + #[test] + fn done_without_is_stream_deserializes_as_false() { + // Backward compat: old workers won't send is_stream + let json = json!({ + "type": "done", + "id": "p1", + "predict_time": 1.0 + }); + let msg: SlotResponse = serde_json::from_value(json).unwrap(); + match msg { + SlotResponse::Done { is_stream, .. } => assert!(!is_stream), + _ => panic!("wrong variant"), + } + } +} diff --git a/crates/coglet/src/worker.rs b/crates/coglet/src/worker.rs index fef3a3f924..524b0a6b0a 100644 --- a/crates/coglet/src/worker.rs +++ b/crates/coglet/src/worker.rs @@ -372,6 +372,8 @@ pub enum PredictionOutcome { Success { output: serde_json::Value, predict_time: f64, + /// True when the predictor returned a list, generator, or iterator. + is_stream: bool, }, /// Prediction failed with an error Failed { error: String, predict_time: f64 }, @@ -385,11 +387,12 @@ pub struct PredictResult { } impl PredictResult { - pub fn success(output: serde_json::Value, predict_time: f64) -> Self { + pub fn success(output: serde_json::Value, predict_time: f64, is_stream: bool) -> Self { Self { outcome: PredictionOutcome::Success { output, predict_time, + is_stream, }, } } @@ -815,6 +818,7 @@ async fn run_prediction( PredictionOutcome::Success { output, predict_time, + is_stream, } => { // Send output as a separate message (handles spilling for large values). // Skip if null or empty array — those mean "already streamed" (generators). @@ -838,6 +842,7 @@ async fn run_prediction( id: prediction_id.clone(), output: None, predict_time, + is_stream, } } PredictionOutcome::Cancelled { .. } => SlotResponse::Cancelled { @@ -863,10 +868,19 @@ mod tests { #[test] fn predict_result_success() { - let r = PredictResult::success(serde_json::json!("hello"), 0.5); + let r = PredictResult::success(serde_json::json!("hello"), 0.5, false); assert!(matches!(r.outcome, PredictionOutcome::Success { .. })); } + #[test] + fn predict_result_success_stream() { + let r = PredictResult::success(serde_json::json!([]), 0.5, true); + assert!(matches!( + r.outcome, + PredictionOutcome::Success { is_stream: true, .. } + )); + } + #[test] fn predict_result_failed() { let r = PredictResult::failed("oops".into(), 0.5); diff --git a/integration-tests/tests/coglet_list_path_single_element.txtar b/integration-tests/tests/coglet_list_path_single_element.txtar new file mode 100644 index 0000000000..7b8ddde943 --- /dev/null +++ b/integration-tests/tests/coglet_list_path_single_element.txtar @@ -0,0 +1,56 @@ +# Test that List[Path] with a single element returns an array, not a scalar. +# Regression test: the orchestrator must not collapse [url] → url for +# single-element list outputs. The schema declares Output as "type": "array", +# so the response must always be an array regardless of item count. + +cog build -t $TEST_IMAGE + +# Start mock upload server on the host, sets $UPLOAD_SERVER_URL +upload-server-start + +cog serve --upload-url $UPLOAD_SERVER_URL + +# Single element: output MUST be ["url"], not "url" +curl POST /predictions '{"input":{"count":1}}' +stdout '"status":"succeeded"' +# Match array with exactly one URL element (not a bare string) +stdout '"output":\["http://host.docker.internal[^"]*"\]' +! stdout 'data:image/' + +upload-server-count 1 + +# Multiple elements: output must be ["url1","url2"] +curl POST /predictions '{"input":{"count":2}}' +stdout '"status":"succeeded"' +stdout '"output":\["http://host.docker.internal[^"]*","http://host.docker.internal[^"]*"\]' +! stdout 'data:image/' + +# 1 from first prediction + 2 from second = 3 total uploads +upload-server-count 3 + +-- cog.yaml -- +build: + python_version: "3.12" + python_packages: + - "pillow==10.4.0" +predict: "predict.py:Predictor" + +-- predict.py -- +import os +import tempfile +from typing import List + +from cog import BasePredictor, Input, Path +from PIL import Image + + +class Predictor(BasePredictor): + def predict(self, count: int = Input(description="Number of images", default=1)) -> List[Path]: + outputs = [] + colors = ["red", "blue", "green", "yellow"] + for i in range(count): + d = tempfile.mkdtemp() + p = os.path.join(d, f"{colors[i % len(colors)]}.png") + Image.new("RGB", (10, 10), colors[i % len(colors)]).save(p) + outputs.append(Path(p)) + return outputs diff --git a/integration-tests/tests/coglet_single_path_output.txtar b/integration-tests/tests/coglet_single_path_output.txtar new file mode 100644 index 0000000000..4abbfac35f --- /dev/null +++ b/integration-tests/tests/coglet_single_path_output.txtar @@ -0,0 +1,44 @@ +# Test that a single Path return (not List[Path]) returns a scalar string, not an array. +# This complements coglet_list_path_single_element.txtar — verifying that +# the schema-driven output wrapping correctly distinguishes: +# -> Path → "url" (scalar) +# -> List[Path] → ["url"] (array, even with one element) + +cog build -t $TEST_IMAGE + +# Start mock upload server on the host, sets $UPLOAD_SERVER_URL +upload-server-start + +cog serve --upload-url $UPLOAD_SERVER_URL + +# Single Path output: must be "url" (scalar string), NOT ["url"] +curl POST /predictions '{"input":{}}' +stdout '"status":"succeeded"' +# Output should be a bare string URL, not wrapped in an array +stdout '"output":"http://host.docker.internal' +! stdout '"output":\[' +! stdout 'data:image/' + +upload-server-count 1 + +-- cog.yaml -- +build: + python_version: "3.12" + python_packages: + - "pillow==10.4.0" +predict: "predict.py:Predictor" + +-- predict.py -- +import os +import tempfile + +from cog import BasePredictor, Path +from PIL import Image + + +class Predictor(BasePredictor): + def predict(self) -> Path: + d = tempfile.mkdtemp() + p = os.path.join(d, "output.png") + Image.new("RGB", (10, 10), "red").save(p) + return Path(p) From 7e7b18917bbe483013b347f869c72e974b15f241 Mon Sep 17 00:00:00 2001 From: morgan fainberg Date: Thu, 26 Feb 2026 12:24:09 -0800 Subject: [PATCH 3/3] fix: rustfmt formatting in tests --- crates/coglet/src/orchestrator.rs | 10 ++++++++-- crates/coglet/src/worker.rs | 5 ++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/crates/coglet/src/orchestrator.rs b/crates/coglet/src/orchestrator.rs index ac95460d0b..07c26faac3 100644 --- a/crates/coglet/src/orchestrator.rs +++ b/crates/coglet/src/orchestrator.rs @@ -1262,7 +1262,10 @@ mod tests { // return Path("output.png") → single string let result = wrap_outputs(vec![json!("https://example.com/output.png")], false, false); assert!(!result.is_stream()); - assert_eq!(result.final_value(), &json!("https://example.com/output.png")); + assert_eq!( + result.final_value(), + &json!("https://example.com/output.png") + ); } #[test] @@ -1286,7 +1289,10 @@ mod tests { is_stream: false, }; let json = serde_json::to_value(&msg).unwrap(); - assert!(json.get("is_stream").is_none(), "is_stream=false should be omitted"); + assert!( + json.get("is_stream").is_none(), + "is_stream=false should be omitted" + ); } #[test] diff --git a/crates/coglet/src/worker.rs b/crates/coglet/src/worker.rs index 524b0a6b0a..72887b5f9a 100644 --- a/crates/coglet/src/worker.rs +++ b/crates/coglet/src/worker.rs @@ -877,7 +877,10 @@ mod tests { let r = PredictResult::success(serde_json::json!([]), 0.5, true); assert!(matches!( r.outcome, - PredictionOutcome::Success { is_stream: true, .. } + PredictionOutcome::Success { + is_stream: true, + .. + } )); }