Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
338 changes: 338 additions & 0 deletions docs/plans/2026-05-09-001-feat-event-driven-architecture-plan.md

Large diffs are not rendered by default.

162 changes: 89 additions & 73 deletions pkg/services/llm/anthropic.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"encoding/json"
"fmt"
"io"
"iter"
"net/http"
"os"
"strings"
"time"
)

const anthropicVersion = "2023-06-01"
Expand Down Expand Up @@ -115,13 +117,9 @@ func (p *anthropicProvider) Chat(ctx context.Context, cfg *config, messages []Me
return result, nil
}

func (p *anthropicProvider) StreamChat(ctx context.Context, cfg *config, messages []Message, tools []ToolDefinition) (<-chan StreamResult, error) {
ch := make(chan StreamResult, 100)
func (p *anthropicProvider) StreamChat(ctx context.Context, cfg *config, messages []Message, tools []ToolDefinition) iter.Seq2[*Event, error] {

go func() {
defer close(ch)

// 构建请求
return func(yield func(*Event, error) bool) {
endpoint := anthropicMessagesEndpoint(cfg.baseURL)
anthropicMessages, systemText := toAnthropicMessages(messages)

Expand All @@ -144,7 +142,7 @@ func (p *anthropicProvider) StreamChat(ctx context.Context, cfg *config, message
if len(tools) > 0 {
converted, err := toAnthropicTools(tools)
if err != nil {
ch <- StreamResult{Error: err}
yield(nil, err)
return
}
reqBody.Tools = converted
Expand All @@ -160,19 +158,17 @@ func (p *anthropicProvider) StreamChat(ctx context.Context, cfg *config, message
"messages", MessagesLogged(messages),
)

// 序列化请求体,保存用于错误时打印
reqBodyBytes, err := json.Marshal(reqBody)
if err != nil {
logger().Warnw("marshal stream request failed", "err", err)
ch <- StreamResult{Error: err}
yield(nil, err)
return
}

// 构建请求
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(reqBodyBytes))
if err != nil {
logger().Warnw("create stream request failed", "err", err, "reqBody", string(reqBodyBytes))
ch <- StreamResult{Error: err}
yield(nil, err)
return
}

Expand All @@ -185,7 +181,6 @@ func (p *anthropicProvider) StreamChat(ctx context.Context, cfg *config, message
req.Header.Set(k, v)
}

// 发送请求
hc := cfg.httpClient
if hc == nil {
hc = &http.Client{Timeout: 0}
Expand All @@ -194,39 +189,43 @@ func (p *anthropicProvider) StreamChat(ctx context.Context, cfg *config, message
resp, err := hc.Do(req)
if err != nil {
logger().Warnw("stream request failed", "err", err, "reqBody", string(reqBodyBytes))
ch <- StreamResult{Error: err}
yield(nil, err)
return
}

if cfg.debug || resp.StatusCode >= 300 {
fmt.Fprintf(os.Stderr, "\n%s\n%d bytes\n", string(reqBodyBytes), len(reqBodyBytes))
}

// 检查响应状态码
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
resp.Body.Close()
errMsg := fmt.Errorf("http %d: %s", resp.StatusCode, strings.TrimSpace(string(respBody)))
logger().Warnw("stream response error",
"status", resp.StatusCode,
// "reqBody", string(reqBodyBytes),
"respBody", string(respBody))
ch <- StreamResult{Error: errMsg}
yield(nil, errMsg)
return
}
defer resp.Body.Close()

// 解析流响应
if err := p.parseStreamResponse(resp.Body, ch, cfg.debug, cfg.logDir, cfg.model, messages, tools); err != nil {
ch <- StreamResult{Error: err}
// pusher 适配:补全 author 后直接 yield
push := func(event *Event, err error) bool {
if err != nil {
return yield(nil, err)
}
event.Author = "assistant"
return yield(event, nil)
}
}()

return ch, nil
if err := p.parseStreamResponse(resp.Body, push, cfg.debug, cfg.logDir, cfg.model, messages, tools); err != nil {
yield(nil, err)
}
}
}

// parseStreamResponse 解析流式响应
func (p *anthropicProvider) parseStreamResponse(body io.Reader, ch chan<- StreamResult, debug bool, logDir, model string, messages []Message, tools []ToolDefinition) error {
// parseStreamResponse 解析流式响应,通过 push 直接产出 *Event。
func (p *anthropicProvider) parseStreamResponse(body io.Reader, push Pusher, debug bool, logDir, model string, messages []Message, tools []ToolDefinition) error {
var currentToolCalls []ToolCall
var currentText strings.Builder
var thinkContent string
Expand All @@ -237,7 +236,7 @@ func (p *anthropicProvider) parseStreamResponse(body io.Reader, ch chan<- Stream
line, err := bufReader.ReadBytes('\n')
if err != nil {
if err == io.EOF {
ch <- StreamResult{Done: true}
push(&Event{Done: true}, nil)
} else {
logger().Infow("read stream response failed", "err", err)
return fmt.Errorf("read: %w", err)
Expand All @@ -261,22 +260,20 @@ func (p *anthropicProvider) parseStreamResponse(body io.Reader, ch chan<- Stream

data := bytes.TrimSpace(line[5:])
if string(data) == "[DONE]" {
ch <- StreamResult{Done: true}
push(&Event{Done: true}, nil)
return nil
}

event, err := p.parseStreamEvent(data)
se, err := p.parseStreamEvent(data)
if err != nil {
continue
}
// logger().Debugw("stream event parsed", "type", event.Type, "index", event.Index,
// "delta", &event.Delta)

done, toolCalls := p.handleStreamEvent(event, &currentText, currentToolCalls, ch, logDir, model, messages, tools, &thinkContent)
done, toolCalls := p.handleStreamEvent(se, &currentText, currentToolCalls, push, logDir, model, messages, tools, &thinkContent)
currentToolCalls = toolCalls

if done {
logger().Infow("stream done", "event_type", event.Type, "tool_calls_count", len(currentToolCalls))
logger().Infow("stream done", "event_type", se.Type, "tool_calls_count", len(currentToolCalls))
return nil
}
}
Expand Down Expand Up @@ -336,97 +333,116 @@ func (p *anthropicProvider) parseStreamEvent(data []byte) (streamEvent, error) {
return event, nil
}

// handleStreamEvent 处理流事件,返回是否结束及更新后的 toolCalls
func (p *anthropicProvider) handleStreamEvent(event streamEvent, currentText *strings.Builder, currentToolCalls []ToolCall, ch chan<- StreamResult, logDir, model string, messages []Message, tools []ToolDefinition, thinkContent *string) (bool, []ToolCall) {
switch event.Type {
// handleStreamEvent 处理流事件,通过 push 直接产出 *Event。返回是否结束及更新后的 toolCalls。
func (p *anthropicProvider) handleStreamEvent(se streamEvent, currentText *strings.Builder, currentToolCalls []ToolCall,
push Pusher, logDir, model string, messages []Message, tools []ToolDefinition, thinkContent *string) (
bool, []ToolCall) {
switch se.Type {
case "content_block_start":
// 开始新的内容块,检查是否是 tool_use 类型
if event.ContentBlock != nil && event.ContentBlock.Type == "tool_use" {
toolID := event.ContentBlock.ID
// 开始新的内容块,检查是否是 tool_use 类型
if se.ContentBlock != nil && se.ContentBlock.Type == "tool_use" {
toolID := se.ContentBlock.ID
if toolID == "" {
toolID = fmt.Sprintf("toolu_%d", event.Index)
toolID = fmt.Sprintf("toolu_%d", se.Index)
}
currentToolCalls = append(currentToolCalls, ToolCall{
ID: toolID,
Type: "function",
Function: ToolCallFunc{
Name: event.ContentBlock.Name,
Name: se.ContentBlock.Name,
},
})
logger().Debugw("tool_use started", "id", toolID, "name", event.ContentBlock.Name)
logger().Debugw("tool_use started", "id", toolID, "name", se.ContentBlock.Name)
}
case "content_block_delta":
if event.Delta.Type == "text_delta" {
currentText.WriteString(event.Delta.Text)
ch <- StreamResult{
Delta: event.Delta.Text,
if se.Delta.Type == "text_delta" {
currentText.WriteString(se.Delta.Text)
if !push(&Event{
ID: NewEventID(),
Timestamp: time.Now(),
Delta: se.Delta.Text,
ToolCalls: currentToolCalls,

}, nil) {
return true, currentToolCalls
}
} else if event.Delta.Type == "thinking_delta" {
*thinkContent += event.Delta.Thinking
// thinking 独立于 tool_use,不附带正在构建的 tool_calls
ch <- StreamResult{
Think: event.Delta.Thinking,
} else if se.Delta.Type == "thinking_delta" {
*thinkContent += se.Delta.Thinking
// thinking 独立于 tool_use,不附带正在构建的 tool_calls
if !push(&Event{
ID: NewEventID(),
Timestamp: time.Now(),
Think: se.Delta.Thinking,

}, nil) {
return true, currentToolCalls
}
} else if event.Delta.Type == "input_json_delta" {
// 处理 tool_use 的参数,直接取最后一个 tool_call
if len(currentToolCalls) > 0 && event.Delta.PartialJSON != "" {
} else if se.Delta.Type == "input_json_delta" {
// 处理 tool_use 的参数,直接取最后一个 tool_call
if len(currentToolCalls) > 0 && se.Delta.PartialJSON != "" {
lastIdx := len(currentToolCalls) - 1
// 跳过 thinking 相关字段(thinking_delta 伴随 input_json_delta 出现,但不属于 tool_use 参数)
if !strings.HasPrefix(strings.TrimSpace(event.Delta.PartialJSON), "\"thinking") {
// 跳过 thinking 相关字段(thinking_delta 伴随 input_json_delta 出现,但不属于 tool_use 参数)
if !strings.HasPrefix(strings.TrimSpace(se.Delta.PartialJSON), "\"thinking") {
currentToolCalls[lastIdx].Function.Arguments = append(
currentToolCalls[lastIdx].Function.Arguments,
event.Delta.PartialJSON...,
se.Delta.PartialJSON...,
)
}
}
}
case "content_block_stop":
// 内容块结束
case "message_delta":
stopReason := FinishReason(event.Delta.StopReason)
stopReason := FinishReason(se.Delta.StopReason)
if stopReason == "end_turn" {
stopReason = "stop"
} else if len(currentToolCalls) > 0 { // 检查是否有 tool_calls
} else if len(currentToolCalls) > 0 {
stopReason = "tool_calls" // 为了兼容 OpenAI
}
// 发送完成信号
ch <- StreamResult{
ToolCalls: currentToolCalls,
FinishReason: stopReason,
Usage: event.Usage.toUsage(),
if !push(&Event{
ID: NewEventID(),
Timestamp: time.Now(),
ToolCalls: currentToolCalls,
StopReason: stopReason,
Usage: se.Usage.toUsage(),

}, nil) {
return true, currentToolCalls
}
// 写入交互日志
if logDir != "" {
go LogInteraction(logDir, "anthropic", &InteractionLog{
Model: model,
Messages: messages,
Tools: tools,
Usage: event.Usage.toUsage(),
Usage: se.Usage.toUsage(),
Response: currentText.String(),
ToolCalls: currentToolCalls,
Think: *thinkContent,
StopReason: string(stopReason),
})
}
case "message_stop": // 在 message_delta 后会跟一个message_stop,里面没有实际信息
ch <- StreamResult{
Done: true,
push(&Event{
ID: NewEventID(),
Timestamp: time.Now(),
Done: true,
ToolCalls: currentToolCalls,
}
}, nil)
return true, currentToolCalls
case "message_start":
if event.Message != nil {
ch <- StreamResult{
Model: event.Message.Model,
ResponseID: event.Message.ID,
}
if se.Message != nil {
push(&Event{
ID: NewEventID(),
Timestamp: time.Now(),
Model: se.Message.Model,
ResponseID: se.Message.ID,

}, nil)
}
// 忽略
case "ping":
// 忽略
default:
logger().Infow("unknown anthropic event type", "type", event.Type)
logger().Infow("unknown anthropic event type", "type", se.Type)
}
return false, currentToolCalls
}
Expand Down
19 changes: 6 additions & 13 deletions pkg/services/llm/anthropic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,22 +204,15 @@ func TestAnthropicProviderStreamChat(t *testing.T) {
model: "claude-3-5-sonnet-20241022",
}

ch, err := p.StreamChat(context.Background(), cfg, []Message{
var results []*Event
for event, err := range p.StreamChat(context.Background(), cfg, []Message{
{Role: RoleUser, Content: "Hi"},
}, nil)

if err != nil {
t.Errorf("StreamChat() error = %v", err)
return
}

var results []StreamResult
for result := range ch {
if result.Error != nil {
t.Errorf("stream error = %v", result.Error)
}, nil) {
if err != nil {
t.Errorf("stream error = %v", err)
break
}
results = append(results, result)
results = append(results, event)
}

if len(results) == 0 {
Expand Down
11 changes: 6 additions & 5 deletions pkg/services/llm/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"iter"
"strings"
)

Expand All @@ -14,8 +15,8 @@ var ErrUnsupportedProvider = errors.New("unsupported provider")
type Client interface {
// Chat 发送聊天请求,返回完整响应
Chat(ctx context.Context, messages []Message, tools []ToolDefinition) (*ChatResult, error)
// StreamChat 发送流式聊天请求,返回流式响应
StreamChat(ctx context.Context, messages []Message, tools []ToolDefinition) (<-chan StreamResult, error)
// StreamChat 发送流式聊天请求,返回 iter.Seq2 事件流
StreamChat(ctx context.Context, messages []Message, tools []ToolDefinition) iter.Seq2[*Event, error]
// Generate 简单文本生成(用于关键词提取等)
Generate(ctx context.Context, prompt string) (string, *Usage, error)
// Embedding 向量化文本
Expand All @@ -31,7 +32,7 @@ type client struct {
// provider 接口定义
type provider interface {
Chat(ctx context.Context, cfg *config, messages []Message, tools []ToolDefinition) (*ChatResult, error)
StreamChat(ctx context.Context, cfg *config, messages []Message, tools []ToolDefinition) (<-chan StreamResult, error)
StreamChat(ctx context.Context, cfg *config, messages []Message, tools []ToolDefinition) iter.Seq2[*Event, error]
Generate(ctx context.Context, cfg *config, prompt string) (string, *Usage, error)
Embedding(ctx context.Context, cfg *config, texts []string) ([]float64, error)
}
Expand Down Expand Up @@ -68,8 +69,8 @@ func (c *client) Chat(ctx context.Context, messages []Message, tools []ToolDefin
return c.provider.Chat(ctx, c.cfg, messages, tools)
}

// StreamChat 发送流式聊天请求
func (c *client) StreamChat(ctx context.Context, messages []Message, tools []ToolDefinition) (<-chan StreamResult, error) {
// StreamChat 发送流式聊天请求,返回 iter.Seq2 事件流
func (c *client) StreamChat(ctx context.Context, messages []Message, tools []ToolDefinition) iter.Seq2[*Event, error] {
return c.provider.StreamChat(ctx, c.cfg, messages, tools)
}

Expand Down
Loading
Loading