Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions codex-rs/core/src/tools/js_repl/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ const { Buffer } = require("node:buffer");
const crypto = require("node:crypto");
const fs = require("node:fs");
const { builtinModules, createRequire } = require("node:module");
const { createInterface } = require("node:readline");
const { performance } = require("node:perf_hooks");
const path = require("node:path");
const { URL, URLSearchParams, fileURLToPath, pathToFileURL } = require(
Expand Down Expand Up @@ -1659,6 +1658,7 @@ function handleEmitImageResult(message) {
}

let queue = Promise.resolve();
let pendingInputSegments = [];

process.on("uncaughtException", (error) => {
scheduleFatalExit("uncaught exception", error);
Expand All @@ -1668,8 +1668,7 @@ process.on("unhandledRejection", (reason) => {
scheduleFatalExit("unhandled rejection", reason);
});

const input = createInterface({ input: process.stdin, crlfDelay: Infinity });
input.on("line", (line) => {
function handleInputLine(line) {
if (!line.trim()) {
return;
}
Expand All @@ -1692,4 +1691,49 @@ input.on("line", (line) => {
if (message.type === "emit_image_result") {
handleEmitImageResult(message);
}
}

function takePendingInputFrame() {
if (pendingInputSegments.length === 0) {
return null;
}

// Keep raw stdin chunks queued until a full JSONL frame is ready so we only
// assemble the frame bytes once.
const frame =
pendingInputSegments.length === 1
? pendingInputSegments[0]
: Buffer.concat(pendingInputSegments);
pendingInputSegments = [];
return frame;
}

function handleInputFrame(frame) {
if (!frame) {
return;
}

if (frame[frame.length - 1] === 0x0d) {
frame = frame.subarray(0, frame.length - 1);
}
handleInputLine(frame.toString("utf8"));
}

process.stdin.on("data", (chunk) => {
const input = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
let segmentStart = 0;
let frameEnd = input.indexOf(0x0a);
while (frameEnd !== -1) {
pendingInputSegments.push(input.subarray(segmentStart, frameEnd));
handleInputFrame(takePendingInputFrame());
segmentStart = frameEnd + 1;
frameEnd = input.indexOf(0x0a, segmentStart);
}
if (segmentStart < input.length) {
pendingInputSegments.push(input.subarray(segmentStart));
}
});

process.stdin.on("end", () => {
handleInputFrame(takePendingInputFrame());
});
89 changes: 89 additions & 0 deletions codex-rs/core/src/tools/js_repl/mod_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1532,6 +1532,95 @@ await codex.emitImage(out);

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn js_repl_dynamic_tool_response_preserves_js_line_separator_text() -> anyhow::Result<()> {
if !can_run_js_repl_runtime_tests().await {
return Ok(());
}

for (tool_name, description, expected_text, literal) in [
(
"line_separator_tool",
"Returns text containing U+2028.",
"alpha\u{2028}omega".to_string(),
r#""alpha\u2028omega""#,
),
(
"paragraph_separator_tool",
"Returns text containing U+2029.",
"alpha\u{2029}omega".to_string(),
r#""alpha\u2029omega""#,
),
] {
let (session, turn, rx_event) =
make_session_and_context_with_dynamic_tools_and_rx(vec![DynamicToolSpec {
name: tool_name.to_string(),
description: description.to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {},
"additionalProperties": false
}),
}])
.await;

*session.active_turn.lock().await = Some(crate::state::ActiveTurn::default());

let tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::default()));
let manager = turn.js_repl.manager().await?;
let code = format!(
r#"
const out = await codex.tool("{tool_name}", {{}});
const text = typeof out === "string" ? out : out?.output;
console.log(text === {literal});
console.log(text);
"#
);

let session_for_response = Arc::clone(&session);
let expected_text_for_response = expected_text.clone();
let response_watcher = async move {
loop {
let event = tokio::time::timeout(Duration::from_secs(2), rx_event.recv()).await??;
if let EventMsg::DynamicToolCallRequest(request) = event.msg {
session_for_response
.notify_dynamic_tool_response(
&request.call_id,
DynamicToolResponse {
content_items: vec![DynamicToolCallOutputContentItem::InputText {
text: expected_text_for_response.clone(),
}],
success: true,
},
)
.await;
return Ok::<(), anyhow::Error>(());
}
}
};

let (result, response_watcher_result) = tokio::join!(
manager.execute(
Arc::clone(&session),
Arc::clone(&turn),
tracker,
JsReplArgs {
code,
timeout_ms: Some(15_000),
},
),
response_watcher,
);
response_watcher_result?;

let result = result?;
assert_eq!(result.output, format!("true\n{expected_text}"));
}

Ok(())
}

#[tokio::test]
async fn js_repl_prefers_env_node_module_dirs_over_config() -> anyhow::Result<()> {
if !can_run_js_repl_runtime_tests().await {
Expand Down
Loading