Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 38 additions & 31 deletions cmd/anna/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,12 @@ func runServer(ctx context.Context, s *setupResult, listFn channel.ModelListFunc
// Link codes are shared between admin panel and channel bots.
linkCodes := auth.NewLinkCodeStore()

// Admin server is always created so channel stop functions can be registered
// even when the panel is disabled.
adminSrv := admin.New(s.store, as, engine, s.mem, s.db, linkCodes)

// Start admin panel server.
if adminPort > 0 {
adminSrv := admin.New(s.store, as, engine, s.mem, s.db, linkCodes)
ln, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", adminPort))
if err != nil {
return fmt.Errorf("admin listen: %w", err)
Expand Down Expand Up @@ -110,10 +113,10 @@ func runServer(ctx context.Context, s *setupResult, listFn channel.ModelListFunc
}

// Load channel configs from DB.
tgCfg := loadChannelConfig[telegramChannelConfig](s.store, "telegram")
qqCfg := loadChannelConfig[qqChannelConfig](s.store, "qq")
fsCfg := loadChannelConfig[feishuChannelConfig](s.store, "feishu")
wxCfg := loadChannelConfig[weixinChannelConfig](s.store, "weixin")
tgCfg := loadChannelConfig[telegramChannelConfig](s.store, channel.PlatformTelegram)
qqCfg := loadChannelConfig[qqChannelConfig](s.store, channel.PlatformQQ)
fsCfg := loadChannelConfig[feishuChannelConfig](s.store, channel.PlatformFeishu)
wxCfg := loadChannelConfig[weixinChannelConfig](s.store, channel.PlatformWeixin)

// --- Telegram ---
if tgCfg != nil && tgCfg.Token != "" {
Expand All @@ -127,12 +130,13 @@ func runServer(ctx context.Context, s *setupResult, listFn channel.ModelListFunc
telegram.WithAuth(as, engine, linkCodes),
)
if err != nil {
return fmt.Errorf("create telegram bot: %w", err)
}

channels = append(channels, tgBot)
if tgCfg.EnableNotify {
s.notifier.Register(tgBot)
slog.Warn("telegram bot disabled", "error", err)
} else {
channels = append(channels, tgBot)
adminSrv.RegisterChannelStop(channel.PlatformTelegram, tgBot.Stop)
if tgCfg.EnableNotify {
s.notifier.Register(tgBot)
}
}
}

Expand All @@ -148,12 +152,13 @@ func runServer(ctx context.Context, s *setupResult, listFn channel.ModelListFunc
qq.WithAuth(as, engine, linkCodes),
)
if err != nil {
return fmt.Errorf("create qq bot: %w", err)
}

channels = append(channels, qqBot)
if qqCfg.EnableNotify {
s.notifier.Register(qqBot)
slog.Warn("qq bot disabled", "error", err)
} else {
channels = append(channels, qqBot)
adminSrv.RegisterChannelStop(channel.PlatformQQ, qqBot.Stop)
if qqCfg.EnableNotify {
s.notifier.Register(qqBot)
}
}
}

Expand All @@ -172,12 +177,13 @@ func runServer(ctx context.Context, s *setupResult, listFn channel.ModelListFunc
feishu.WithAuth(as, engine, linkCodes),
)
if err != nil {
return fmt.Errorf("create feishu bot: %w", err)
}

channels = append(channels, fsBot)
if fsCfg.EnableNotify {
s.notifier.Register(fsBot)
slog.Warn("feishu bot disabled", "error", err)
} else {
channels = append(channels, fsBot)
adminSrv.RegisterChannelStop(channel.PlatformFeishu, fsBot.Stop)
if fsCfg.EnableNotify {
s.notifier.Register(fsBot)
}
}
}

Expand All @@ -194,12 +200,13 @@ func runServer(ctx context.Context, s *setupResult, listFn channel.ModelListFunc
weixin.WithAuth(as, engine, linkCodes),
)
if err != nil {
return fmt.Errorf("create weixin bot: %w", err)
}

channels = append(channels, wxBot)
if wxCfg.EnableNotify {
s.notifier.Register(wxBot)
slog.Warn("weixin bot disabled", "error", err)
} else {
channels = append(channels, wxBot)
adminSrv.RegisterChannelStop(channel.PlatformWeixin, wxBot.Stop)
if wxCfg.EnableNotify {
s.notifier.Register(wxBot)
}
}
}

Expand Down Expand Up @@ -363,10 +370,10 @@ type weixinChannelConfig struct {
}

// loadChannelConfig loads a channel's JSON config from the store and
// deserializes it into the given type. Returns nil if not found.
// deserializes it into the given type. Returns nil if not found or disabled.
func loadChannelConfig[T any](store config.Store, channelID string) *T {
ch, err := store.GetChannel(context.Background(), channelID)
if err != nil {
if err != nil || !ch.Enabled {
return nil
}
var cfg T
Expand Down
3 changes: 3 additions & 0 deletions internal/admin/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,8 @@ func (s *Server) updateChannel(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
if !ch.Enabled {
s.stopChannel(platform)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Treat manual channel stops as non-fatal in server loop

Calling s.stopChannel(platform) when a channel is disabled now invokes each bot's Stop(), but runServer still treats any non-nil Start return as a fatal error. For QQ, Feishu, and Weixin, Stop() cancels an internal context and Start() returns context.Canceled, so disabling one of these channels through /api/channels/{platform} will cause the errgroup to fail and shut down the whole server instead of only stopping that channel.

Useful? React with πŸ‘Β / πŸ‘Ž.

}
writeData(w, http.StatusOK, ch)
}
3 changes: 2 additions & 1 deletion internal/admin/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strconv"

"github.com/vaayne/anna/internal/auth"
"github.com/vaayne/anna/internal/channel"
)

// listProfileIdentities handles GET /api/auth/profile/identities.
Expand Down Expand Up @@ -105,7 +106,7 @@ func (s *Server) generateLinkCode(w http.ResponseWriter, r *http.Request) {
}

switch body.Platform {
case "telegram", "qq", "feishu":
case channel.PlatformTelegram, channel.PlatformQQ, channel.PlatformFeishu:
// valid
default:
writeError(w, http.StatusBadRequest, "platform must be telegram, qq, or feishu")
Expand Down
27 changes: 27 additions & 0 deletions internal/admin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"log/slog"
"net/http"
"sync"

"github.com/vaayne/anna/internal/auth"
"github.com/vaayne/anna/internal/config"
Expand All @@ -26,6 +27,9 @@ type Server struct {
mux *http.ServeMux
log *slog.Logger
corsOriginV string // cached CORS origin

channelMu sync.RWMutex
channelStop map[string]func() // platform β†’ stop function for running channel
}

// New creates an admin server with all API routes mounted.
Expand All @@ -50,6 +54,7 @@ func New(store config.Store, authStore auth.AuthStore, engine *auth.PolicyEngine
mux: http.NewServeMux(),
log: slog.With("component", "admin"),
corsOriginV: corsOrigin,
channelStop: make(map[string]func()),
}

// Serve static assets (JS modules).
Expand Down Expand Up @@ -182,6 +187,28 @@ func (s *Server) LinkCodes() *auth.LinkCodeStore {
return s.linkCodes
}

// RegisterChannelStop registers a stop function for a running channel so the
// admin panel can stop it when it is disabled via the UI.
func (s *Server) RegisterChannelStop(platform string, stop func()) {
s.channelMu.Lock()
s.channelStop[platform] = stop
s.channelMu.Unlock()
}

// stopChannel stops a running channel if one is registered for the platform.
func (s *Server) stopChannel(platform string) {
s.channelMu.RLock()
stop, ok := s.channelStop[platform]
s.channelMu.RUnlock()
if ok {
s.log.Info("stopping channel", "platform", platform)
stop()
s.channelMu.Lock()
delete(s.channelStop, platform)
s.channelMu.Unlock()
}
}

// Handler returns the HTTP handler with CORS, JSON, and auth middleware applied.
func (s *Server) Handler() http.Handler {
return s.corsMiddleware(s.authMiddleware(s.jsonMiddleware(s.mux)))
Expand Down
2 changes: 1 addition & 1 deletion internal/admin/ui/pages/channels.templ
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ templ channelBlock(name string, platform string) {
<input
type="checkbox"
:checked={ "channelData." + platform + ".enabled" }
@change={ "channelData." + platform + ".enabled = $event.target.checked" }
@change={ "channelData." + platform + ".enabled = $event.target.checked; saveChannel('" + platform + "')" }
class="toggle toggle-primary toggle-sm"
/>
<span class="text-sm">Enable</span>
Expand Down
4 changes: 2 additions & 2 deletions internal/admin/ui/pages/channels_templ.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions internal/admin/weixin_qr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/vaayne/anna/internal/auth"
"github.com/vaayne/anna/internal/channel"
"github.com/vaayne/anna/internal/channel/weixin"
"github.com/vaayne/anna/internal/config"
)
Expand Down Expand Up @@ -66,11 +67,11 @@ func (s *Server) pollWeixinQRStatus(w http.ResponseWriter, r *http.Request) {
externalID = status.ILinkBotID
}
if externalID != "" && s.authStore != nil {
if _, err := s.authStore.GetIdentityByPlatform(r.Context(), "weixin", externalID); err != nil {
if _, err := s.authStore.GetIdentityByPlatform(r.Context(), channel.PlatformWeixin, externalID); err != nil {
// Identity doesn't exist yet β€” create it.
if _, err := s.authStore.CreateIdentity(r.Context(), auth.Identity{
UserID: info.UserID,
Platform: "weixin",
Platform: channel.PlatformWeixin,
ExternalID: externalID,
}); err != nil {
s.log.Warn("create weixin identity", "user_id", info.UserID, "error", err)
Expand All @@ -86,10 +87,10 @@ func (s *Server) pollWeixinQRStatus(w http.ResponseWriter, r *http.Request) {
// channel config in the DB.
func (s *Server) saveWeixinCredentials(ctx context.Context, status *weixin.QRCodeStatusResponse) error {
var raw map[string]any
ch, err := s.store.GetChannel(ctx, "weixin")
ch, err := s.store.GetChannel(ctx, channel.PlatformWeixin)
if err != nil {
raw = make(map[string]any)
ch = config.Channel{ID: "weixin", Enabled: true}
ch = config.Channel{ID: channel.PlatformWeixin, Enabled: true}
} else {
if err := json.Unmarshal([]byte(ch.Config), &raw); err != nil {
raw = make(map[string]any)
Expand All @@ -107,7 +108,7 @@ func (s *Server) saveWeixinCredentials(ctx context.Context, status *weixin.QRCod
}

return s.store.UpsertChannel(ctx, config.Channel{
ID: "weixin",
ID: channel.PlatformWeixin,
Enabled: ch.Enabled,
Config: string(data),
})
Expand Down
2 changes: 1 addition & 1 deletion internal/channel/cli/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func newChatModel(ctx context.Context, pool *agent.Pool, provider, model string,
return m
}

const cliChannel = "cli"
const cliChannel = channel.PlatformCLI

// resolveSession returns the most recently active CLI session ID,
// or creates a new session if none exist.
Expand Down
2 changes: 1 addition & 1 deletion internal/channel/feishu/feishu.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (b *Bot) fetchBotOpenID(ctx context.Context) error {
}

// Name returns the backend name. Implements channel.Backend.
func (b *Bot) Name() string { return "feishu" }
func (b *Bot) Name() string { return channel.PlatformFeishu }

// Notify sends a notification message. Implements channel.Backend.
// Supports both chat IDs (oc_ prefix) and user open IDs (ou_ prefix).
Expand Down
2 changes: 1 addition & 1 deletion internal/channel/feishu/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (b *Bot) onMessage(ctx context.Context, event *larkim.P2MessageReceiveV1) e

// Try link code before anything else.
if b.authStore != nil && b.linkCodes != nil && text != "" {
if resp, ok := channel.TryLinkCode(b.ctx, b.authStore, b.linkCodes, text, "feishu", openID, ""); ok {
if resp, ok := channel.TryLinkCode(b.ctx, b.authStore, b.linkCodes, text, channel.PlatformFeishu, openID, ""); ok {
replyFn(resp)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/channel/feishu/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (b *Bot) resolveWithThread(openID, chatID, chatType, rootID string) (*chann
b.store,
b.authStore,
b.engine,
"feishu",
channel.PlatformFeishu,
openID,
"",
chatID,
Expand Down
9 changes: 9 additions & 0 deletions internal/channel/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@ package channel

import "context"

// Platform identifiers for each messaging channel.
const (
PlatformTelegram = "telegram"
PlatformQQ = "qq"
PlatformFeishu = "feishu"
PlatformWeixin = "weixin"
PlatformCLI = "cli"
)

// Channel is a messaging platform that receives user messages and sends notifications.
type Channel interface {
// Name returns a unique identifier (e.g. "telegram", "qq").
Expand Down
2 changes: 1 addition & 1 deletion internal/channel/qq/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (b *Bot) c2cMessageHandler() event.C2CMessageEventHandler {
// Try link code before anything else.
text := strings.TrimSpace(msg.Content)
if b.authStore != nil && b.linkCodes != nil && text != "" {
if resp, ok := channel.TryLinkCode(b.ctx, b.authStore, b.linkCodes, text, "qq", authorID, ""); ok {
if resp, ok := channel.TryLinkCode(b.ctx, b.authStore, b.linkCodes, text, channel.PlatformQQ, authorID, ""); ok {
b.replyC2C(b.ctx, authorID, msg.ID, resp)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions internal/channel/qq/qq.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (b *Bot) Stop() {
}

// Name returns the channel name. Implements channel.Channel.
func (b *Bot) Name() string { return "qq" }
func (b *Bot) Name() string { return channel.PlatformQQ }

// Notify sends a notification message. Implements channel.Channel.
func (b *Bot) Notify(ctx context.Context, n channel.Notification) error {
Expand Down Expand Up @@ -201,7 +201,7 @@ func (b *Bot) resolve(authorID, groupID string) (*channel.ResolvedChat, error) {
b.store,
b.authStore,
b.engine,
"qq",
channel.PlatformQQ,
authorID,
"",
groupID,
Expand Down
2 changes: 1 addition & 1 deletion internal/channel/telegram/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (b *Bot) handleText(c tele.Context) error {
if name == "" {
name = sender.Username
}
if resp, ok := channel.TryLinkCode(b.ctx, b.authStore, b.linkCodes, text, "telegram", strconv.FormatInt(sender.ID, 10), name); ok {
if resp, ok := channel.TryLinkCode(b.ctx, b.authStore, b.linkCodes, text, channel.PlatformTelegram, strconv.FormatInt(sender.ID, 10), name); ok {
return c.Send(resp)
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/channel/telegram/telegram.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (b *Bot) Stop() {
}

// Name returns the channel name. Implements channel.Channel.
func (b *Bot) Name() string { return "telegram" }
func (b *Bot) Name() string { return channel.PlatformTelegram }

// Notify sends a message to the specified chat. Implements channel.Channel.
func (b *Bot) Notify(_ context.Context, n channel.Notification) error {
Expand Down Expand Up @@ -255,7 +255,7 @@ func (b *Bot) resolve(c tele.Context) (*channel.ResolvedChat, error) {
b.store,
b.authStore,
b.engine,
"telegram",
channel.PlatformTelegram,
senderID,
name,
chatID,
Expand Down
2 changes: 1 addition & 1 deletion internal/channel/weixin/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (b *Bot) handleText(msg WeixinMessage, text string) {

// Try link code before anything else.
if b.authStore != nil && b.linkCodes != nil {
if resp, ok := channel.TryLinkCode(b.ctx, b.authStore, b.linkCodes, text, "weixin", msg.FromUserID, ""); ok {
if resp, ok := channel.TryLinkCode(b.ctx, b.authStore, b.linkCodes, text, channel.PlatformWeixin, msg.FromUserID, ""); ok {
reply(resp)
return
}
Expand Down
Loading
Loading