Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions tests/spec/features/streaming_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,28 @@
end
end

scenario "The process can be killed after stdin is closed" do
editor.set <<~EOF
fn main() {
loop {
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
EOF

click_on("Run")

within(:stdin) do
click_on 'Execution control'
click_on 'Close stdin'
click_on 'Execution control'
click_on 'Kill process'
end

within(:output, :error) do
expect(page).to have_content 'SIGKILL'
end
end

def editor
Editor.new(page)
Expand Down
24 changes: 15 additions & 9 deletions ui/src/server_axum/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ async fn handle_core(mut socket: WebSocket, feature_flags: FeatureFlags) {
_ = active_execution_gc_interval.tick() => {
active_executions = mem::take(&mut active_executions)
.into_iter()
.filter(|(_id, (_, tx))| !tx.is_closed())
.filter(|(_id, (_, tx))| tx.as_ref().map_or(false, |tx| !tx.is_closed()))
.collect();
},

Expand Down Expand Up @@ -489,7 +489,7 @@ async fn handle_msg(
txt: String,
tx: &ResponseTx,
manager: &mut CoordinatorManager,
active_executions: &mut BTreeMap<i64, (CancellationToken, mpsc::Sender<String>)>,
active_executions: &mut BTreeMap<i64, (CancellationToken, Option<mpsc::Sender<String>>)>,
) {
use WSMessageRequest::*;

Expand All @@ -500,7 +500,7 @@ async fn handle_msg(
let token = CancellationToken::new();
let (execution_tx, execution_rx) = mpsc::channel(8);

active_executions.insert(meta.sequence_number, (token.clone(), execution_tx));
active_executions.insert(meta.sequence_number, (token.clone(), Some(execution_tx)));

// TODO: Should a single execute / build / etc. session have a timeout of some kind?
let spawned = manager
Expand All @@ -520,7 +520,7 @@ async fn handle_msg(
}

Ok(ExecuteStdin { payload, meta }) => {
let Some((_, execution_tx)) = active_executions.get(&meta.sequence_number) else {
let Some((_, Some(execution_tx))) = active_executions.get(&meta.sequence_number) else {
warn!("Received stdin for an execution that is no longer active");
return;
};
Expand All @@ -536,14 +536,20 @@ async fn handle_msg(
}

Ok(ExecuteStdinClose { meta }) => {
let execution_tx = active_executions.remove(&meta.sequence_number);
drop(execution_tx); // Signal closed
let Some((_, execution_tx)) = active_executions.get_mut(&meta.sequence_number) else {
warn!("Received stdin close for an execution that is no longer active");
return;
};

*execution_tx = None; // Drop to signal closed
}

Ok(ExecuteKill { meta }) => {
if let Some((token, _)) = active_executions.remove(&meta.sequence_number) {
token.cancel();
}
let Some((token, _)) = active_executions.get(&meta.sequence_number) else {
warn!("Received kill for an execution that is no longer active");
return;
};
token.cancel();
}

Err(e) => {
Expand Down