Skip to content

feat(extension): emit session turn lifecycle events#23

Merged
omarluq merged 4 commits into
mainfrom
feat/session-turn-events
May 18, 2026
Merged

feat(extension): emit session turn lifecycle events#23
omarluq merged 4 commits into
mainfrom
feat/session-turn-events

Conversation

@omarluq
Copy link
Copy Markdown
Owner

@omarluq omarluq commented May 18, 2026

Summary

  • emit session/input/turn lifecycle events from the assistant runtime
  • expose bounded lifecycle payloads through the runtime-neutral extension host
  • add Lua-backed runtime tests for session load/start, turn end, errors, and message append events
  • document the implemented Phase 5.2 lifecycle events

Validation

  • mise exec -- go test ./...
  • mise exec -- task build
  • mise exec -- task ci
  • cr review --agent -t uncommitted --base main

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 18, 2026

Review Change Stack

📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Extensions can observe bounded assistant lifecycle events via Lua handlers (input, prompt_prepare, session_start/session_load, before_agent_start, agent_start, turn_start, message_append, turn_end, agent_end).
  • Documentation

    • Extension API and roadmap updated with concrete event names, Lua examples, and guidance to treat the reactive stream as observational (not for deterministic mutations); lifecycle payloads are intentionally bounded.
  • Bug Fixes

    • Terminal: copying now reliably captures the last non-empty assistant message.

Walkthrough

Adds bounded assistant lifecycle events emitted during prompt/session/turn/entry flows, payload builders, dispatch helpers, runtime integration (Runtime.Prompt), reactive event-bus APIs, tests validating event ordering/streams, and docs/roadmap updates describing the event model.

Changes

Assistant Lifecycle Events

Layer / File(s) Summary
Lifecycle foundation types and constants
internal/assistant/lifecycle.go, internal/assistant/client.go
promptTurnLifecycle type tracks per-turn completion; constant payload key strings and timestamp helper functions added; new internal JSON keys for usage/session fields.
Lifecycle payload builders
internal/assistant/lifecycle.go, internal/assistant/usage_events.go
Constructs prompt/session/turn/turn-end/entry payload maps, token-usage mapping, and pointer/timestamp helpers; usage-event keys unified to named constants.
Lifecycle dispatch helpers
internal/assistant/lifecycle.go
Adds Runtime methods to dispatch lifecycle events: dispatchLifecycle, dispatchMessageAppend, dispatchTurnStartLifecycle, dispatchTurnEndLifecycle, dispatchTurnErrorLifecycle.
Runtime.Prompt lifecycle integration
internal/assistant/runtime.go
Refactors Prompt to emit input/prompt_prepare/session/turn-start/message_append/turn-end lifecycle events; creates per-turn lifecycle guard; persists and dispatches side-effect appends; resolveSession now returns a lifecycle event name; EventBus accessor exposed.
Event bus reactive API & plumbing
internal/event/bus.go
Adds EnvelopeHandler, Stream()/Channel() observables, OnEnvelope registration, and central subscribe(...) subscription management for reactive envelope streams and channel filtering.
Tests and test runtime wiring
internal/assistant/runtime_lifecycle_test.go, internal/assistant/runtime_test.go
Adds lifecycle tests verifying ordered events, session_load vs start, error payloads, side-effect append sequences, reactive stream behavior, and handler error resilience; refactors test runtime factories to create extension.Manager and event.NewBus.
Event bus tests
internal/event/bus_test.go
Adds TestBus_ReactiveStreams using ro observers and helper runners; verifies Stream, Channel filtering, and OnEnvelope reception.
Documentation and roadmap updates
docs/extension-api.md, docs/extension-roadmap.md
Documents the observational reactive stream vs ordered middleware dispatcher, enumerates Assistant lifecycle event names, explains { stop = true } short-circuit behavior, bounded payload semantics, and marks Phase 5.2 prompt-time events implemented.
Terminal clipboard helper change
internal/terminal/session_commands.go
Replaces global error-returning clipboard helper with func (app *App) copyTextToClipboard(text string) and simplifies copyLastAssistantMessage accordingly.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant Runtime
  participant EventBus
  participant Extensions
  Client->>Runtime: Prompt(request)
  Runtime->>EventBus: dispatchLifecycle(input)
  Runtime->>EventBus: dispatchLifecycle(prompt_prepare)
  Runtime->>Runtime: resolveSession -> (session, eventName)
  Runtime->>EventBus: dispatchLifecycle(session_start/load)
  Runtime->>EventBus: dispatchMessageAppend (user)
  Runtime->>Runtime: create promptTurnLifecycle
  Runtime->>EventBus: dispatchTurnStartLifecycle
  Runtime->>EventBus: dispatchMessageAppend (thinking/toolResult)
  Runtime->>EventBus: dispatchMessageAppend (assistant)
  Runtime->>Runtime: turnLifecycle.dispatchEnd -> dispatchTurnEndLifecycle
  Runtime->>Extensions: optional DispatchLifecycle forwarding
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • omarluq/librecode#21: Introduces lifecycle dispatcher contracts used by runtime.extensions.DispatchLifecycle calls in this PR.
  • omarluq/librecode#10: Related changes touching token-usage event emission and Prompt token handling referenced by usage-event key alignment.

Poem

🐰 Events hop in bounded, tidy streams,
Turns and sessions, payloads and dreams,
Handlers listen, emit, and play,
A rabbit cheers the lifecycle ballet,
Logs and tests keep order in the fray!

🚥 Pre-merge checks | ✅ 4
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat(extension): emit session turn lifecycle events' accurately describes the main objective of the changeset—implementing lifecycle event emission for sessions and turns.
Description check ✅ Passed The description clearly relates to the changeset by summarizing the key implementation goals: emitting lifecycle events, exposing bounded payloads, adding tests, and documenting Phase 5.2.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/session-turn-events

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 18, 2026

Codecov Report

❌ Patch coverage is 74.60317% with 64 lines in your changes missing coverage. Please review.
✅ Project coverage is 57.29%. Comparing base (03a6b4c) to head (8925bf7).

Files with missing lines Patch % Lines
internal/assistant/runtime.go 46.66% 35 Missing and 5 partials ⚠️
internal/assistant/lifecycle.go 90.47% 6 Missing and 6 partials ⚠️
internal/event/bus.go 79.54% 8 Missing and 1 partial ⚠️
internal/terminal/session_commands.go 0.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main      #23      +/-   ##
==========================================
+ Coverage   56.93%   57.29%   +0.36%     
==========================================
  Files         159      160       +1     
  Lines       15572    15758     +186     
==========================================
+ Hits         8866     9029     +163     
- Misses       5752     5767      +15     
- Partials      954      962       +8     
Flag Coverage Δ
unittests 57.29% <74.60%> (+0.36%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
internal/assistant/runtime_lifecycle_test.go (2)

46-57: ⚡ Quick win

Assert lifecycle ordering explicitly instead of only checking substrings.

Line 48-Line 57 validate presence but not sequence, so ordering regressions can slip through even if all event names exist.

Suggested update
 import (
 	"context"
 	"errors"
+	"strings"
 	"testing"
@@
 	output, err := manager.ExecuteCommand(context.Background(), "events", "")
 	require.NoError(t, err)
-	assert.Contains(t, output, "input::")
-	assert.Contains(t, output, "prompt_prepare::")
-	assert.Contains(t, output, "session_start:"+response.SessionID+":")
-	assert.Contains(t, output, "before_agent_start:"+response.SessionID+":")
-	assert.Contains(t, output, "agent_start:"+response.SessionID+":")
-	assert.Contains(t, output, "turn_start:"+response.SessionID+":")
-	assert.Contains(t, output, "message_append:"+response.SessionID+":user")
-	assert.Contains(t, output, "message_append:"+response.SessionID+":assistant")
-	assert.Contains(t, output, "turn_end:"+response.SessionID+":")
-	assert.Contains(t, output, "agent_end:"+response.SessionID+":")
+	lines := strings.Split(strings.TrimSpace(output), "\n")
+	require.GreaterOrEqual(t, len(lines), 10)
+	assert.Equal(t, []string{
+		"input::",
+		"prompt_prepare::",
+		"session_start:" + response.SessionID + ":",
+		"before_agent_start:" + response.SessionID + ":",
+		"agent_start:" + response.SessionID + ":",
+		"turn_start:" + response.SessionID + ":",
+		"message_append:" + response.SessionID + ":user",
+		"message_append:" + response.SessionID + ":assistant",
+		"turn_end:" + response.SessionID + ":",
+		"agent_end:" + response.SessionID + ":",
+	}, lines[:10])
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/assistant/runtime_lifecycle_test.go` around lines 46 - 57, The test
currently only asserts presence of lifecycle event substrings from
manager.ExecuteCommand output but not their order; modify the test in
internal/assistant/runtime_lifecycle_test.go to assert ordering by locating each
event occurrence (use response.SessionID with the exact event tokens like
"input::", "prompt_prepare::", "session_start:"+response.SessionID+":",
"before_agent_start:"+response.SessionID+":",
"agent_start:"+response.SessionID+":", "turn_start:"+response.SessionID+":",
"message_append:"+response.SessionID+":user",
"message_append:"+response.SessionID+":assistant",
"turn_end:"+response.SessionID+":", "agent_end:"+response.SessionID+":"), record
their positions (e.g. with strings.Index or by splitting into lines and finding
indices) and assert that each successive position is greater than the previous
one so the sequence is enforced rather than only presence.

15-173: 🏗️ Heavy lift

Consider consolidating these lifecycle behavior tests into table-driven cases.

These are core behavior tests with repeated setup/assert flow; table-driven structure would reduce duplication and make adding lifecycle scenarios safer.

As per coding guidelines, **/*_test.go: Prefer table-driven tests for core behavior and regression tests for terminal rendering bugs.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/assistant/runtime_lifecycle_test.go` around lines 15 - 173,
Consolidate the five similar tests
(TestRuntime_PromptEmitsSessionTurnLifecycleEvents,
TestRuntime_PromptEmitsSessionLoadForExistingSession,
TestRuntime_PromptEmitsTurnEndOnPromptError,
TestRuntime_PromptEmitsSideEffectMessageAppendEvents,
TestRuntime_PromptLifecycleIgnoresHandlerErrors) into a single table-driven
test: create a slice of cases where each case holds a name, setup function (to
produce the client and extension Lua string using newTestRuntimeWithManager and
loadRuntimeExtension), the Prompt request (or modified SessionID), and the
expected post-conditions to verify via manager.ExecuteCommand output or response
(e.g., contains strings or equals); then iterate cases with t.Run, create
runtime/manager inside the loop, call runtime.Prompt and manager.ExecuteCommand
as needed, and perform the same assertions per-case. Keep existing helpers
(newTestRuntimeWithManager, loadRuntimeExtension, manager.ExecuteCommand,
runtime.Prompt) and preserve each test’s exact assertions by encoding them in
the case definitions so behavior remains identical.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/assistant/runtime.go`:
- Around line 302-305: In resolveSession, wrap all repository errors returned
from runtime.sessions calls (e.g., runtime.sessions.GetSession, any
Create/Update/Ack calls referenced near variables loadedSession/found/err) using
the package's oops pattern — e.g., oops.In("assistant").Code(...).Wrapf(err,
"context message") — instead of returning raw err; ensure each returned error
includes an appropriate oops.Code and a descriptive Wrapf message that mentions
the operation (GetSession/CreateSession/UpdateSession/AckSession) so the
assistant-domain metadata is preserved.

---

Nitpick comments:
In `@internal/assistant/runtime_lifecycle_test.go`:
- Around line 46-57: The test currently only asserts presence of lifecycle event
substrings from manager.ExecuteCommand output but not their order; modify the
test in internal/assistant/runtime_lifecycle_test.go to assert ordering by
locating each event occurrence (use response.SessionID with the exact event
tokens like "input::", "prompt_prepare::",
"session_start:"+response.SessionID+":",
"before_agent_start:"+response.SessionID+":",
"agent_start:"+response.SessionID+":", "turn_start:"+response.SessionID+":",
"message_append:"+response.SessionID+":user",
"message_append:"+response.SessionID+":assistant",
"turn_end:"+response.SessionID+":", "agent_end:"+response.SessionID+":"), record
their positions (e.g. with strings.Index or by splitting into lines and finding
indices) and assert that each successive position is greater than the previous
one so the sequence is enforced rather than only presence.
- Around line 15-173: Consolidate the five similar tests
(TestRuntime_PromptEmitsSessionTurnLifecycleEvents,
TestRuntime_PromptEmitsSessionLoadForExistingSession,
TestRuntime_PromptEmitsTurnEndOnPromptError,
TestRuntime_PromptEmitsSideEffectMessageAppendEvents,
TestRuntime_PromptLifecycleIgnoresHandlerErrors) into a single table-driven
test: create a slice of cases where each case holds a name, setup function (to
produce the client and extension Lua string using newTestRuntimeWithManager and
loadRuntimeExtension), the Prompt request (or modified SessionID), and the
expected post-conditions to verify via manager.ExecuteCommand output or response
(e.g., contains strings or equals); then iterate cases with t.Run, create
runtime/manager inside the loop, call runtime.Prompt and manager.ExecuteCommand
as needed, and perform the same assertions per-case. Keep existing helpers
(newTestRuntimeWithManager, loadRuntimeExtension, manager.ExecuteCommand,
runtime.Prompt) and preserve each test’s exact assertions by encoding them in
the case definitions so behavior remains identical.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 89724e58-3547-4bba-b3d3-9f1138b892ec

📥 Commits

Reviewing files that changed from the base of the PR and between 03a6b4c and a516878.

📒 Files selected for processing (8)
  • docs/extension-api.md
  • docs/extension-roadmap.md
  • internal/assistant/client.go
  • internal/assistant/lifecycle.go
  • internal/assistant/runtime.go
  • internal/assistant/runtime_lifecycle_test.go
  • internal/assistant/runtime_test.go
  • internal/assistant/usage_events.go

Comment thread internal/assistant/runtime.go Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
internal/event/bus_test.go (1)

77-133: ⚡ Quick win

Collapse the new bus behavior cases into a table-driven test.

These three tests exercise the same subscription/emit/assert shape, so a single table over subscription strategy and expected envelopes would be easier to extend and aligns with the repo’s test style. As per coding guidelines, **/*_test.go: Prefer table-driven tests for core behavior and regression tests for terminal rendering bugs.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/event/bus_test.go` around lines 77 - 133, Collapse the three tests
into a single table-driven test that iterates cases describing the subscription
strategy and expected results: for each case include a name, a subscribe
function (using event.NewBus().Stream().Subscribe, .Channel("agent").Subscribe,
or bus.OnEnvelope) that sets up the collector (envelopes or calls), a sequence
of bus.Emit calls, and the expected slice to assert; inside the loop create a
fresh bus via event.NewBus(testLogger()), call the case's subscribe function to
collect outputs (and defer Unsubscribe when applicable), perform the emits with
bus.Emit, then assert the collected results match the case.expected (use
require/ assert as in existing tests). Reference
TestBus_StreamExposesReactiveEnvelopeStream,
TestBus_ChannelReturnsFilteredReactiveStream and
TestBus_OnEnvelopeReceivesAllChannels and the methods bus.Stream, bus.Channel,
bus.OnEnvelope, and bus.Emit to locate the code to refactor.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/event/bus.go`:
- Around line 94-95: On and OnEnvelope build the observable outside of bus.lock
so Clear() can rotate bus.subject and cause subscriptions to attach to an
already-completed subject; fix by acquiring bus.lock before building the
observable and before calling subscribe so the observable is created against the
current bus.subject, or alternatively change subscribe to re-check/obtain the
current bus.subject under lock before registering; update the code paths in On,
OnEnvelope and the subscribe call (referencing functions/methods: On,
OnEnvelope, subscribe, Clear, bus.subject and Channel) to ensure the lock is
held while creating the observable and attaching the observer.

---

Nitpick comments:
In `@internal/event/bus_test.go`:
- Around line 77-133: Collapse the three tests into a single table-driven test
that iterates cases describing the subscription strategy and expected results:
for each case include a name, a subscribe function (using
event.NewBus().Stream().Subscribe, .Channel("agent").Subscribe, or
bus.OnEnvelope) that sets up the collector (envelopes or calls), a sequence of
bus.Emit calls, and the expected slice to assert; inside the loop create a fresh
bus via event.NewBus(testLogger()), call the case's subscribe function to
collect outputs (and defer Unsubscribe when applicable), perform the emits with
bus.Emit, then assert the collected results match the case.expected (use
require/ assert as in existing tests). Reference
TestBus_StreamExposesReactiveEnvelopeStream,
TestBus_ChannelReturnsFilteredReactiveStream and
TestBus_OnEnvelopeReceivesAllChannels and the methods bus.Stream, bus.Channel,
bus.OnEnvelope, and bus.Emit to locate the code to refactor.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 0b4e20b3-8e38-4c51-896d-c6ce32fe29d5

📥 Commits

Reviewing files that changed from the base of the PR and between a516878 and 6d2dc3c.

📒 Files selected for processing (8)
  • docs/extension-api.md
  • docs/extension-roadmap.md
  • internal/assistant/runtime.go
  • internal/assistant/runtime_lifecycle_test.go
  • internal/assistant/runtime_test.go
  • internal/event/bus.go
  • internal/event/bus_test.go
  • internal/terminal/session_commands.go
✅ Files skipped from review due to trivial changes (2)
  • docs/extension-api.md
  • docs/extension-roadmap.md
🚧 Files skipped from review as they are similar to previous changes (3)
  • internal/assistant/runtime_test.go
  • internal/assistant/runtime_lifecycle_test.go
  • internal/assistant/runtime.go

Comment thread internal/event/bus.go Outdated
@sonarqubecloud
Copy link
Copy Markdown

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/event/bus_test.go`:
- Line 172: Replace the unordered assertion with an ordered assertion: in the
test where assert.ElementsMatch(t, expected, got) is used (the emitted stream
sequence check), change it to assert.Equal(t, expected, got) so the test
verifies the exact ordering of the emitted events; locate the assertion call by
the symbol assert.ElementsMatch and update it to use assert.Equal with the same
t, expected, got parameters.

In `@internal/event/bus.go`:
- Around line 63-78: Stream() and Channel() currently return observables that
can be subscribed to after the bus lock is released, allowing Clear() to
replace/complete the subject and causing subscribers to bind to a completed
subject; fix by making Stream and Channel create observables whose Subscribe
action acquires bus.lock and attaches the subscription to the current
bus.subject while the lock is held (i.e., reuse the existing subscribe helper or
implement the same acquire-lock + subject.Subscribe semantics inside
Stream/Channel), and for Channel apply the filter after obtaining the
subscription to ensure the subscribe is atomic with respect to Clear()/subject
replacement.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a742b4da-6b04-4989-9ff6-8a49d80f391d

📥 Commits

Reviewing files that changed from the base of the PR and between 6d2dc3c and 8925bf7.

📒 Files selected for processing (5)
  • internal/assistant/lifecycle.go
  • internal/assistant/runtime.go
  • internal/assistant/runtime_lifecycle_test.go
  • internal/event/bus.go
  • internal/event/bus_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
  • internal/assistant/lifecycle.go
  • internal/assistant/runtime.go

bus.Emit(context.Background(), envelope.Channel, envelope.Data)
}

assert.ElementsMatch(t, expected, got)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Use ordered assertion for emitted stream sequence.

At Line 172, assert.ElementsMatch can mask ordering regressions. These cases emit in a deterministic sequence, so assert.Equal is a stronger guard.

Suggested diff
-	assert.ElementsMatch(t, expected, got)
+	assert.Equal(t, expected, got)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
assert.ElementsMatch(t, expected, got)
assert.Equal(t, expected, got)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/event/bus_test.go` at line 172, Replace the unordered assertion with
an ordered assertion: in the test where assert.ElementsMatch(t, expected, got)
is used (the emitted stream sequence check), change it to assert.Equal(t,
expected, got) so the test verifies the exact ordering of the emitted events;
locate the assertion call by the symbol assert.ElementsMatch and update it to
use assert.Equal with the same t, expected, got parameters.

Comment thread internal/event/bus.go
Comment on lines +63 to +78
func (bus *Bus) Stream() ro.Observable[Envelope] {
bus.lock.Lock()
bus.nextID++
subscriptionID := bus.nextID
subject := bus.subject
subscription := subject.Subscribe(bus.observer(channel, handler))
bus.subscriptions[subscriptionID] = subscription
bus.lock.Unlock()
defer bus.lock.Unlock()

return func() {
bus.unsubscribe(subscriptionID)
}
return bus.subject.AsObservable()
}

// Channel returns a filtered view of Stream for one channel.
func (bus *Bus) Channel(channel string) ro.Observable[Envelope] {
return ro.Pipe1(
bus.Stream(),
ro.Filter(func(envelope Envelope) bool {
return envelope.Channel == channel
}),
)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Direct reactive subscriptions (potentially affected):"
rg -nP --type=go '\.(Stream|Channel)\([^)]*\)\.Subscribe\s*\(' -C2

echo
echo "Clear() call sites to evaluate concurrent rotation risk:"
rg -nP --type=go '\.Clear\s*\(' -C2

Repository: omarluq/librecode

Length of output: 2257


🏁 Script executed:

cat -n internal/event/bus.go | head -100

Repository: omarluq/librecode

Length of output: 3589


🏁 Script executed:

tail -100 internal/event/bus.go

Repository: omarluq/librecode

Length of output: 2701


🏁 Script executed:

rg -A10 "ro\.NewPublishSubject" internal/event/bus.go

Repository: omarluq/librecode

Length of output: 648


🏁 Script executed:

# Check test cases that might show the race condition or how Clear() is used with subscriptions
rg -B5 -A10 "Clear.*Stream|Stream.*Clear|Channel.*Clear" internal/event/bus_test.go

Repository: omarluq/librecode

Length of output: 43


🏁 Script executed:

cat -n internal/event/bus_test.go | head -200

Repository: omarluq/librecode

Length of output: 6060


🏁 Script executed:

# Search for any documentation or tests about PublishSubject behavior with completed subjects
web_search "samber/ro PublishSubject Complete behavior subscription"

Repository: omarluq/librecode

Length of output: 112


🏁 Script executed:

# Check if there are any other files in the codebase using ro.Subject or similar patterns
rg "\.Complete\(\)" --type=go

Repository: omarluq/librecode

Length of output: 107


🏁 Script executed:

# Check if ro package is a vendored dependency or external
fd -e go -path "*/ro/*" | head -20

Repository: omarluq/librecode

Length of output: 230


🏁 Script executed:

cat go.mod | grep samber

Repository: omarluq/librecode

Length of output: 449


🏁 Script executed:

# Check how Stream() and Channel() are actually used in the broader codebase
rg "\.Stream\(\)" --type=go -A2 | head -50

Repository: omarluq/librecode

Length of output: 478


🏁 Script executed:

# Let's trace the exact control flow for Stream().Subscribe() pattern
rg -B3 -A8 "\.Stream\(\)\.Subscribe" internal/event/bus_test.go

Repository: omarluq/librecode

Length of output: 441


🏁 Script executed:

# Let's verify the exact sequence by looking at ro.Pipe1 and understanding if Filter is lazy
# Check if there's any documentation in the codebase about reactive stream semantics
rg "ro\.Pipe" internal/event/bus.go -B2 -A5

Repository: omarluq/librecode

Length of output: 540


🏁 Script executed:

# Let's verify if this is actually a real issue by checking the overall design
# Are there any concurrent tests that would catch this?
rg -B5 -A10 "t\.Parallel\(\)" internal/event/bus_test.go | head -40

Repository: omarluq/librecode

Length of output: 935


Stream()/Channel() expose a race condition where subscriptions can bind to a completed subject.

When Stream() returns an observable (while holding the lock) and then Channel() applies filtering, the lock is released before .Subscribe() is called. If Clear() runs in that window, it completes the original subject and creates a new one. The subsequent subscription binds to the now-completed subject and silently misses all future events emitted to the new subject.

The On() and OnEnvelope() methods are safe because they use the subscribe() helper which holds the lock for the entire atomic operation of creating the observable and attaching the subscription. Direct calls to .Stream().Subscribe() and .Channel().Subscribe() (as shown in TestBus_ReactiveStreams) have this vulnerability.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/event/bus.go` around lines 63 - 78, Stream() and Channel() currently
return observables that can be subscribed to after the bus lock is released,
allowing Clear() to replace/complete the subject and causing subscribers to bind
to a completed subject; fix by making Stream and Channel create observables
whose Subscribe action acquires bus.lock and attaches the subscription to the
current bus.subject while the lock is held (i.e., reuse the existing subscribe
helper or implement the same acquire-lock + subject.Subscribe semantics inside
Stream/Channel), and for Channel apply the filter after obtaining the
subscription to ensure the subscribe is atomic with respect to Clear()/subject
replacement.

@omarluq omarluq merged commit 559f9f5 into main May 18, 2026
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants