Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
### Changed
- Rename worker memory concept to worker constraints to avoid conflicts with the agent's built-in memory feature
- Rename bee coordinator memory management to constraint management to avoid conflicts with the agent's built-in memory feature
- Feishu and DingTalk reaction add/recall now retry up to 5 times with exponential backoff (500ms base delay) on network failures, improving resilience to transient connectivity issues.

### Fixed
- Fix agent execution error message not being delivered to IM
Expand Down
46 changes: 26 additions & 20 deletions internal/platform/dingtalk/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/theopenbee/openbee/internal/infra/logger"
"github.com/theopenbee/openbee/internal/infra/media"
"github.com/theopenbee/openbee/internal/platform"
retryutil "github.com/theopenbee/openbee/internal/utils"
)

var log = logger.With(zap.String("component", "dingtalk"))
Expand Down Expand Up @@ -948,46 +949,51 @@ func buildEmojiPayload(cfg config.DingTalkConfig, data *chatbot.BotCallbackDataM
return payload
}

func doEmojiRequest(ctx context.Context, cfg config.DingTalkConfig, data *chatbot.BotCallbackDataModel, url string, timeout time.Duration, action string) {
token, err := getAccessToken(cfg.ClientID, cfg.ClientSecret)
if err != nil {
log.Warn("failed to get access token for emoji "+action, zap.Error(err))
return
}

payload := buildEmojiPayload(cfg, data)

ctx, cancel := context.WithTimeout(ctx, timeout)
func doEmojiRequest(ctx context.Context, token string, payload []byte, url string, timeout time.Duration, action string) error {
reqCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(payload))
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, url, bytes.NewReader(payload))
if err != nil {
log.Warn("failed to create emoji "+action+" request", zap.Error(err))
return
return fmt.Errorf("create emoji %s request: %w", action, err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("x-acs-dingtalk-access-token", token)

resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Warn("failed to "+action+" emoji reaction", zap.Error(err))
return
return fmt.Errorf("%s emoji reaction: %w", action, err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
log.Warn("emoji "+action+" returned non-200", zap.Int("status", resp.StatusCode))
return fmt.Errorf("emoji %s returned non-200: %d", action, resp.StatusCode)
}
return nil
}

func doEmojiRequestWithRetry(ctx context.Context, cfg config.DingTalkConfig, data *chatbot.BotCallbackDataModel, url string, timeout time.Duration, action string, logFn func(string, ...zap.Field)) {
token, err := getAccessToken(cfg.ClientID, cfg.ClientSecret)
if err != nil {
logFn("get access token for emoji "+action, zap.Error(err))
return
}
payload := buildEmojiPayload(cfg, data)
retryCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err := retryutil.RetryWithBackoff(retryCtx, func() error {
return doEmojiRequest(retryCtx, token, payload, url, timeout, action)
}, retryutil.DefaultRetryCount, retryutil.DefaultRetryDelay); err != nil {
logFn(action+" emoji failed after retries", zap.Error(err))
}
}

// addThinkingEmoji adds a thinking emoji reaction to the user's message.
func addThinkingEmoji(ctx context.Context, cfg config.DingTalkConfig, data *chatbot.BotCallbackDataModel) {
doEmojiRequest(ctx, cfg, data, "https://api.dingtalk.com/v1.0/robot/emotion/reply", 5*time.Second, "reply")
doEmojiRequestWithRetry(ctx, cfg, data, "https://api.dingtalk.com/v1.0/robot/emotion/reply", 5*time.Second, "reply", log.Error)
}

// recallThinkingEmoji recalls the thinking emoji reaction from the user's message.
func recallThinkingEmoji(ctx context.Context, cfg config.DingTalkConfig, data *chatbot.BotCallbackDataModel) {
doEmojiRequest(ctx, cfg, data, "https://api.dingtalk.com/v1.0/robot/emotion/recall", 3*time.Second, "recall")
doEmojiRequestWithRetry(ctx, cfg, data, "https://api.dingtalk.com/v1.0/robot/emotion/recall", 3*time.Second, "recall", log.Warn)
}

var _ platform.Platform = (*DingTalkPlatform)(nil)
Expand Down
68 changes: 45 additions & 23 deletions internal/platform/feishu/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/theopenbee/openbee/internal/infra/media"
"github.com/theopenbee/openbee/internal/platform"
"github.com/theopenbee/openbee/internal/infra/utils"
retryutil "github.com/theopenbee/openbee/internal/utils"
)

var log = logger.With(zap.String("component", "feishu"))
Expand Down Expand Up @@ -135,22 +136,35 @@ func (r *FeishuReceiver) Start(ctx context.Context, dispatch func(platform.Inbou
defer time.AfterFunc(10*time.Minute, func() {
r.pendingReactions.Delete(*msg.MessageId)
})
resp, err := r.larkClient.Im.MessageReaction.Create(ctx,
larkim.NewCreateMessageReactionReqBuilder().
MessageId(*msg.MessageId).
Body(larkim.NewCreateMessageReactionReqBodyBuilder().
ReactionType(larkim.NewEmojiBuilder().
EmojiType("Typing").
Build()).
req := larkim.NewCreateMessageReactionReqBuilder().
MessageId(*msg.MessageId).
Body(larkim.NewCreateMessageReactionReqBodyBuilder().
ReactionType(larkim.NewEmojiBuilder().
EmojiType("Typing").
Build()).
Build())
if err != nil || !resp.Success() {
log.Error("add reaction error", zap.Error(err), zap.Any("resp", resp))
Build()).
Build()
addCtx, addCancel := context.WithTimeout(ctx, 30*time.Second)
defer addCancel()
var reactionID string
err := retryutil.RetryWithBackoff(addCtx, func() error {
resp, e := r.larkClient.Im.MessageReaction.Create(addCtx, req)
if e != nil {
return e
}
if !resp.Success() {
return fmt.Errorf("add reaction: %w", resp.CodeError)
}
if resp.Data != nil && resp.Data.ReactionId != nil {
reactionID = *resp.Data.ReactionId
}
return nil
}, retryutil.DefaultRetryCount, retryutil.DefaultRetryDelay)
if err != nil {
log.Error("add reaction failed after retries", zap.Error(err))
close(reactionCh)
return
}
if resp.Data != nil && resp.Data.ReactionId != nil {
reactionCh <- *resp.Data.ReactionId
} else if reactionID != "" {
reactionCh <- reactionID
} else {
close(reactionCh)
}
Expand Down Expand Up @@ -464,20 +478,28 @@ func (s *FeishuSender) Send(ctx context.Context, msg platform.OutboundMessage) e
if val, ok := s.pendingReactions.LoadAndDelete(messageID); ok {
if ch, ok := val.(chan string); ok {
go func() {
recallCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
select {
case reactionID, received := <-ch:
if received && reactionID != "" {
recallCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
resp, err := s.larkClient.Im.MessageReaction.Delete(recallCtx,
larkim.NewDeleteMessageReactionReqBuilder().
MessageId(messageID).
ReactionId(reactionID).
Build())
cancel()
if err != nil || !resp.Success() {
log.Warn("recall reaction error", zap.Error(err), zap.Any("resp", resp))
req := larkim.NewDeleteMessageReactionReqBuilder().
MessageId(messageID).
ReactionId(reactionID).
Build()
if err := retryutil.RetryWithBackoff(recallCtx, func() error {
resp, e := s.larkClient.Im.MessageReaction.Delete(recallCtx, req)
if e != nil {
return e
}
if !resp.Success() {
return fmt.Errorf("recall reaction: %w", resp.CodeError)
}
return nil
}, retryutil.DefaultRetryCount, retryutil.DefaultRetryDelay); err != nil {
log.Warn("recall reaction failed after retries", zap.Error(err))
}
}
case <-timer.C:
Expand Down
35 changes: 35 additions & 0 deletions internal/utils/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package utils

import (
"context"
"time"
)

const (
DefaultRetryCount = 5
DefaultRetryDelay = 500 * time.Millisecond
)

func RetryWithBackoff(ctx context.Context, fn func() error, maxRetries int, baseDelay time.Duration) error {
if maxRetries <= 0 {
return fn()
}
var err error
delay := baseDelay
for i := range maxRetries {
if err = fn(); err == nil {
return nil
}
if i < maxRetries-1 {
t := time.NewTimer(delay)
select {
case <-ctx.Done():
t.Stop()
return ctx.Err()
case <-t.C:
}
delay *= 2
}
}
return err
}
87 changes: 87 additions & 0 deletions internal/utils/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package utils_test

import (
"context"
"errors"
"testing"
"time"

"github.com/theopenbee/openbee/internal/utils"
)

var errFake = errors.New("fake error")

func TestRetryWithBackoff_SuccessOnFirst(t *testing.T) {
calls := 0
err := utils.RetryWithBackoff(context.Background(), func() error {
calls++
return nil
}, 5, time.Millisecond)
if err != nil {
t.Fatalf("expected nil, got %v", err)
}
if calls != 1 {
t.Fatalf("expected 1 call, got %d", calls)
}
}

func TestRetryWithBackoff_SuccessOnThird(t *testing.T) {
calls := 0
err := utils.RetryWithBackoff(context.Background(), func() error {
calls++
if calls < 3 {
return errFake
}
return nil
}, 5, time.Millisecond)
if err != nil {
t.Fatalf("expected nil, got %v", err)
}
if calls != 3 {
t.Fatalf("expected 3 calls, got %d", calls)
}
}

func TestRetryWithBackoff_AllFail(t *testing.T) {
calls := 0
err := utils.RetryWithBackoff(context.Background(), func() error {
calls++
return errFake
}, 5, time.Millisecond)
if !errors.Is(err, errFake) {
t.Fatalf("expected errFake, got %v", err)
}
if calls != 5 {
t.Fatalf("expected 5 calls, got %d", calls)
}
}

func TestRetryWithBackoff_ContextCancelled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
calls := 0
err := utils.RetryWithBackoff(ctx, func() error {
calls++
cancel() // cancel after first failure
return errFake
}, 5, time.Millisecond)
if !errors.Is(err, context.Canceled) {
t.Fatalf("expected context.Canceled, got %v", err)
}
if calls != 1 {
t.Fatalf("expected 1 call before cancel, got %d", calls)
}
}

func TestRetryWithBackoff_ZeroMaxRetries(t *testing.T) {
called := false
err := utils.RetryWithBackoff(context.Background(), func() error {
called = true
return errFake
}, 0, time.Millisecond)
if !errors.Is(err, errFake) {
t.Fatalf("expected errFake, got %v", err)
}
if !called {
t.Fatal("expected fn to be called once")
}
}