Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
07b4007
feat: add context cancellation to Stream.Next and handle empty chunks…
x2d7 Feb 19, 2026
4e2345d
refactor(stream): add context parameter to Next() method for tests
x2d7 Feb 19, 2026
dc5420d
test: add context cancellation test for OpenAIStream.Next()
x2d7 Feb 19, 2026
874e171
Merge pull request #18 from x2d7/17-error-empty-events
x2d7 Feb 19, 2026
ec6444c
test: add Execute() calls to verify JSON unmarshalling in tools tests
x2d7 Feb 19, 2026
9119c32
fix: handle primitive types in tool input unmarshaling via reflection
x2d7 Feb 19, 2026
36ecaa4
fix(tests): use correct 'scores' field in TestNewTool_PrimitiveWithMa…
x2d7 Feb 19, 2026
bf9de6f
Merge pull request #21 from x2d7/20-wrapper-unmarshal-fix
x2d7 Feb 19, 2026
320c746
test: add context cancellation tests for streaming and partial tool c…
x2d7 Feb 19, 2026
92331bf
feat: assemble partial tool calls
x2d7 Feb 19, 2026
f7fd1af
refactor(chat): remove event modification logic in Session handler
x2d7 Feb 19, 2026
db8b13a
test: remove EventCompletionEnded from mock streams since Session han…
x2d7 Feb 19, 2026
caf8a8c
test: add test for interleaved token and tool call event ordering
x2d7 Feb 19, 2026
550d36a
fix: correct tool call event ordering in stream processing
x2d7 Feb 19, 2026
ed56bad
fix: handle empty choices in OpenAI streaming response
x2d7 Feb 19, 2026
5805eb0
refactor(chat): extract session state into dedicated struct
x2d7 Feb 19, 2026
4c88ac5
refactor(chat): extract lastToolCall flush logic into dedicated method
x2d7 Feb 19, 2026
7b026b9
refactor: send completion ended event via helper instead of direct ch…
x2d7 Feb 19, 2026
aeb7905
refactor(chat): move client and events into sessionState struct
x2d7 Feb 19, 2026
ab78269
refactor(chat): extract handleCompletionEnd method and move send to s…
x2d7 Feb 19, 2026
0cabafb
chore: add TODO comment for EventCompletionEnded struct
x2d7 Feb 19, 2026
04a3d21
Merge pull request #22 from x2d7/19-assembling-tool-call
x2d7 Feb 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 135 additions & 59 deletions chat/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (c *Chat) Complete(ctx context.Context, client Client) <-chan StreamEvent {
}
defer stream.Close()

for stream.Next() {
for stream.Next(ctx) {
event := stream.Current()

select {
Expand All @@ -38,99 +38,125 @@ func (c *Chat) Complete(ctx context.Context, client Client) <-chan StreamEvent {
return result
}

type sessionState struct {
// session context

client Client
events <-chan StreamEvent
send func(StreamEvent) bool

// session state variables

builder strings.Builder
toolCalls []EventNewToolCall
lastToolCall *EventNewToolCall
approval *ApproveWaiter
}

func (s *sessionState) reset() {
s.builder.Reset()
s.toolCalls = s.toolCalls[:0]
s.lastToolCall = nil
s.approval = NewApproveWaiter()
}

func (s *sessionState) flushLastToolCall() bool {
if s.lastToolCall == nil {
return true
}
ok := s.send(*s.lastToolCall)
s.lastToolCall = nil
return ok
}

func (c *Chat) Session(ctx context.Context, client Client) <-chan StreamEvent {
// insert chat context into client input configuration
client = client.SyncInput(c)

// creating the channels
result := make(chan StreamEvent, 16)
events := c.Complete(ctx, client)

// delivers a StreamEvent to the result channel
// skips nil events
send := func(event StreamEvent) bool {
if event == nil {
if ctx.Err() != nil {
return false
}
return true
}
select {
case result <- event:
return true
case <-ctx.Done():
return false
}
}

// event handling
go func() {
defer close(result)

// event collectors
var stringBuilder strings.Builder
toolCalls := make([]EventNewToolCall, 0)

approval := NewApproveWaiter()
// session state
state := &sessionState{
client: client,
events: c.Complete(ctx, client),
send: send,
}
state.reset()

for {
select {
case <-ctx.Done():
return
case ev, ok := <-events:
case ev, ok := <-state.events:
if !ok {
// adding collected events to the chat (assistant's tokens and tool calls)
if stringBuilder.Len() != 0 {
c.AppendEvent(NewEventNewToken(stringBuilder.String()))
}
for _, call := range toolCalls {
c.AppendEvent(call)
}

callAmount := len(toolCalls)

// ending current completion
result <- NewEventCompletionEnded()

if callAmount == 0 {
if !c.handleCompletionEnd(ctx, state) {
return
}
continue
}

// initializing approval waiter
verdicts := approval.Wait(ctx, callAmount)

// processing user verdicts
for verdict := range verdicts {
call := verdict.call

if verdict.Accepted {
callResult, success := c.Tools.Execute(call.Name, call.Content)
c.AppendEvent(NewEventNewToolMessage(call.CallID, callResult, success))
} else {
c.AppendEvent(NewEventNewToolMessage(call.CallID, "User declined the tool call", false))
}
// flush last tool call if event type switched away from tool call stream
if _, isToolCall := ev.(EventNewToolCall); !isToolCall {
if !state.flushLastToolCall() {
return
}

// reset collectors
stringBuilder.Reset()
toolCalls = make([]EventNewToolCall, 0)

// reset approval waiter
approval = NewApproveWaiter()

// resume text completion
client = client.SyncInput(c)
events = c.Complete(ctx, client)
continue
}

// in case of ev changes inside "collecting events" block
var modifiedEvent StreamEvent
// in case if we need to skip event
var skipEvent bool

// collecting events
switch event := ev.(type) {
case EventNewToken:
stringBuilder.WriteString(event.Content)
state.builder.WriteString(event.Content)
case EventNewToolCall:
approval.Attach(&event)
modifiedEvent = event
toolCalls = append(toolCalls, event)
// prevent adding tool call immediately — we need to wait until end of completion
skipEvent = true
if event.CallID != "" {
// flush the previous tool call — it's now complete
if !state.flushLastToolCall() {
return
}
state.approval.Attach(&event)
state.toolCalls = append(state.toolCalls, event)
state.lastToolCall = &state.toolCalls[len(state.toolCalls)-1]
} else {
// add token to the last tool call
state.lastToolCall.Content += event.Content
}
case EventNewRefusal:
c.AppendEvent(event)
}

// modifying event
if modifiedEvent != nil {
ev = modifiedEvent
// skipping event
if skipEvent {
continue
}

// sending events to the channel
select {
case result <- ev:
case <-ctx.Done():
if !send(ev) {
return
}
}
Expand All @@ -139,3 +165,53 @@ func (c *Chat) Session(ctx context.Context, client Client) <-chan StreamEvent {

return result
}

func (c *Chat) handleCompletionEnd(ctx context.Context, state *sessionState) (proceed bool) {
// adding collected events to the chat (assistant's tokens and tool calls)
if state.builder.Len() != 0 {
c.AppendEvent(NewEventNewToken(state.builder.String()))
}
for _, call := range state.toolCalls {
c.AppendEvent(call)
}

callAmount := len(state.toolCalls)

// send last tool call if it wasn't sent yet
if !state.flushLastToolCall() {
return false
}

// ending current completion
if !state.send(NewEventCompletionEnded()) {
return false
}

if callAmount == 0 {
return false
}

// initializing approval waiter
verdicts := state.approval.Wait(ctx, callAmount)

// processing user verdicts
for verdict := range verdicts {
call := verdict.call

if verdict.Accepted {
callResult, success := c.Tools.Execute(call.Name, call.Content)
c.AppendEvent(NewEventNewToolMessage(call.CallID, callResult, success))
} else {
c.AppendEvent(NewEventNewToolMessage(call.CallID, "User declined the tool call", false))
}
}

// reset state
state.reset()

// resume text completion
state.client = state.client.SyncInput(c)
state.events = c.Complete(ctx, state.client)

return true
}
Loading