-
-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement smr queue #119
Conversation
b78e8bd
to
2c6dd84
Compare
0c803e5
to
a709ac1
Compare
pkg/bots/slackbot/slackbot.go
Outdated
if !errors.Is(err, context.Canceled) { | ||
s.logger.WithField("error", err.Error()).Error("slack bot server shutdown failed") | ||
return err | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
不论如何都打印。cancelled 说明 graceful 耗时太长了,如果是正常情况,是不是要改 stop timeout 阈值?
pkg/bots/slackbot/slackbot.go
Outdated
|
||
func (s *BotService) Stop(ctx context.Context) error { | ||
err := s.server.Shutdown(ctx) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
没必要多空一行?
pkg/bots/slackbot/slackbot.go
Outdated
|
||
func (s *BotService) Stop(ctx context.Context) error { | ||
err := s.server.Shutdown(ctx) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pkg/bots/discordbot/discordbot.go
Outdated
b.logger.Info("discord: shutting down...") | ||
b.Close(ctx) | ||
b.webhookStarted = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
给 health check 用的没必要设回 false,要不然会触发告警的
internal/services/smr/utils.go
Outdated
|
||
parsedURL, err2 := url.Parse(urlString) | ||
if err2 != nil { | ||
return ErrParse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return ErrParse | |
return fmt.Errorf("%w: %w", err2, ErrParse) |
internal/services/smr/utils.go
Outdated
return ErrParse | ||
} | ||
if parsedURL.Scheme == "" || !lo.Contains([]string{"http", "https"}, parsedURL.Scheme) { | ||
return ErrScheme |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return ErrScheme | |
return fmt.Errorf("%w: %w", err2, ErrScheme) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
要正确传递 err 链,要不然上层日志打印的时候没办法排错了
internal/services/smr/taskmgr.go
Outdated
return info, err | ||
} | ||
|
||
err = json.Unmarshal([]byte(res[1]), &info) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
好像没保证 res
有 len() == 2
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
以及为什么是 index 1
?
internal/services/smr/taskmgr.go
Outdated
close(s.closeChan) | ||
} | ||
|
||
func (s *Service) getTask() (types.TaskInfo, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
用 pointer pattern 省去调用方解 err
internal/services/smr/taskmgr.go
Outdated
s.closeChan <- struct{}{} | ||
close(s.closeChan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
用 context.CancelFunc 关?
internal/services/smr/taskmgr.go
Outdated
if needToClose { | ||
break | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
可以放 select 上面方便读
internal/services/services.go
Outdated
@@ -12,5 +13,6 @@ func NewModules() fx.Option { | |||
fx.Provide(health.NewHealth()), | |||
fx.Provide(pprof.NewPprof()), | |||
fx.Provide(autorecap.NewAutoRecapService()), | |||
fx.Provide(smr.NewService()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
其实我没有太理解为什么要单独加一个 service,是为了避免并发造成的 OOM 吗?
defer cancel() | ||
err = h.smrService.AddTask(types.TaskInfo{ | ||
Platform: smr.FromPlatformTelegram, | ||
Url: urlString, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
URL
var body receivedCommandInfo | ||
if err := ctx.Bind(&body); err != nil { | ||
ctx.AbortWithStatus(http.StatusBadRequest) | ||
h.logger.WithField("error", err.Error()).Warn("failed to bind request body, maybe slack request definition changed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
h.logger.WithField("error", err.Error()).Warn("failed to bind request body, maybe slack request definition changed") | |
h.logger.WithField("error", err.Error()).Warn("failed to bind request body, type definition of slack request body may have changed") |
h.logger.WithFields(logrus.Fields{ | ||
"user_id": body.UserID, | ||
"channel_id": body.ChannelID, | ||
}).Infof("slack: command received: /smr %s", body.Text) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
}).Infof("slack: command received: /smr %s", body.Text) | |
}).Trace("slack: command received: /smr %s", body.Text) |
err := smrutils.CheckUrl(urlString) | ||
if err != nil { | ||
if smrutils.IsUrlCheckError(err) { | ||
ctx.JSON(http.StatusOK, slackbot.NewSlackWebhookMessage(err.Error())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这种也可以用 AbortWithStatus 么?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
不可以,Slack 会提示 smr 因为 dispatch 错误而失败
internal/services/smr/error.go
Outdated
ErrNoLink = errors.New("没有找到链接,可以发送一个有效的链接吗?用法:/smr <链接>") | ||
ErrParse = errors.New("你发来的链接无法被理解,可以重新发一个试试。用法:/smr <链接>") | ||
ErrScheme = errors.New("你发来的链接无法被理解,可以重新发一个试试。用法:/smr <链接>") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
最好不要把这种有程序错误作为上游的错误文案放到 error 里面,文案应该是业务 specific 的。比如 Telegram 的 /smr <链接>
应该用 <code></code>
扩起来,其他平台不行。
internal/services/smr/processor.go
Outdated
return result.FormatSummarizationAsDiscordMarkdown() | ||
} | ||
|
||
return "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
直接写 default 吧
internal/services/smr/processor.go
Outdated
|
||
_, err := s.tgBot.Send(msgEdit) | ||
if err != nil { | ||
s.logger.WithError(err).WithField("platform", info.Platform).Warn("smr service: failed to send result message") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
太长了,链式调用换个行吧
internal/services/smr/processor.go
Outdated
|
||
_, err := s.tgBot.Send(msgEdit) | ||
if err != nil { | ||
s.logger.WithError(err).WithField("platform", info.Platform).Warn("smr service: failed to send result message") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s.logger.WithError(err).WithField("platform", info.Platform).Warn("smr service: failed to send result message") | |
s.logger.WithError(err). | |
WithField("platform", info.Platform). | |
Warn("smr service: failed to send result message") |
internal/services/smr/processor.go
Outdated
First(context.Background()) | ||
|
||
if err != nil { | ||
s.logger.WithError(err).WithField("platform", info.Platform).Warn("smr service: failed to get team's access token") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s.logger.WithError(err).WithField("platform", info.Platform).Warn("smr service: failed to get team's access token") | |
s.logger.WithError(err). | |
WithField("platform", info.Platform). | |
Warn("smr service: failed to get team's access token") |
internal/services/smr/processor.go
Outdated
) | ||
|
||
if err != nil { | ||
s.logger.WithError(err).WithField("platform", info.Platform).Warn("smr service: failed to send result message") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s.logger.WithError(err).WithField("platform", info.Platform).Warn("smr service: failed to send result message") | |
s.logger.WithError(err). | |
WithField("platform", info.Platform). | |
Warn("smr service: failed to send result message") |
internal/services/smr/processor.go
Outdated
) | ||
|
||
if err != nil { | ||
s.logger.WithError(err).WithField("platform", info.Platform).Warn("smr service: failed to send result message") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s.logger.WithError(err).WithField("platform", info.Platform).Warn("smr service: failed to send result message") | |
s.logger.WithError(err). | |
WithField("platform", info.Platform). | |
Warn("smr service: failed to send result message") |
internal/services/smr/processor.go
Outdated
} | ||
} | ||
|
||
func (s *Service) botExists(platform smr.FromPlatform) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
返回 bool 的函数用动词 verb 开头,isBotExists 或者 hasBot
internal/services/smr/smr.go
Outdated
closeChan chan struct{} | ||
alreadyClosed bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
用 lazy init 的 cancelFunc context.CancelFunc
来解决呗?
internal/services/smr/smr.go
Outdated
case <-s.closeChan: | ||
s.logger.WithField("last tasks count", s.queue.Count()).Info("smr service: received stop signal, waiting for all tasks done") | ||
|
||
needToClose = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
用 context.Context.Done()
可以吗
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
确实,这样优雅一些
type OngoingTaskPool struct { | ||
tasks []TaskInfo | ||
mu *sync.RWMutex | ||
} | ||
|
||
func NewOngoingTaskPool() *OngoingTaskPool { | ||
return &OngoingTaskPool{ | ||
mu: &sync.RWMutex{}, | ||
} | ||
} | ||
|
||
func (t *OngoingTaskPool) Add(info TaskInfo) { | ||
t.mu.Lock() | ||
t.tasks = append(t.tasks, info) | ||
t.mu.Unlock() | ||
} | ||
|
||
func (t *OngoingTaskPool) Remove() { | ||
t.mu.Lock() | ||
if len(t.tasks) == 0 { | ||
t.mu.Unlock() | ||
return | ||
} | ||
|
||
t.tasks = t.tasks[1:] | ||
t.mu.Unlock() | ||
} | ||
|
||
func (t *OngoingTaskPool) Len() int { | ||
t.mu.RLock() | ||
l := len(t.tasks) | ||
t.mu.RUnlock() | ||
|
||
return l | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
internal/services/smr/smr.go
L100 又定义了一个 pool,和这个有啥区别?
internal/services/smr/types/types.go
Outdated
Platform smr.FromPlatform `json:"platform"` | ||
Url string `json:"url"` // url to summarize | ||
|
||
ChatID int64 `json:"chatID"` // only for telegram | ||
MessageID int `json:"messageID"` // used to edit the reply message of request, not work in slack or discordbot currently | ||
|
||
ChannelID string `json:"channelID"` // for slack and discordbot | ||
|
||
TeamID string `json:"teamID"` // only for slack, used to query access token and refresh token |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
能拆成三个附属结构体的吗?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
那样的话创建这个结构体实例时没那么方便了,会多出好几行
if urlString == "" { | ||
return smr.ErrNoLink | ||
} | ||
|
||
parsedURL, err2 := url.Parse(urlString) | ||
if err2 != nil { | ||
return smr.ErrParse | ||
} | ||
if parsedURL.Scheme == "" || !lo.Contains([]string{"http", "https"}, parsedURL.Scheme) { | ||
return smr.ErrScheme | ||
} | ||
|
||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
能正确传递 err 链吗?写成这样的话外面怎么打日志呢?
ctx.JSON(http.StatusOK, slackbot.NewSlackWebhookMessage(smrutils.FormatUrlCheckError(err, smr.FromPlatformSlack))) | ||
return | ||
} | ||
|
||
ctx.JSON(http.StatusOK, slackbot.NewSlackWebhookMessage("出现了一些问题,可以再试试?")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
能用 ctx.AbortWithStatusJSON()
么?
ctx.JSON(http.StatusOK, slackbot.NewSlackWebhookMessage("本应用没有权限向这个频道发送消息,尝试重新安装一下?")) | ||
return | ||
} | ||
|
||
ctx.JSON(http.StatusOK, slackbot.NewSlackWebhookMessage("出现了一些问题,可以再试试?")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
能用 ctx.AbortWithStatusJSON()
么?
}) | ||
if err != nil { | ||
h.logger.WithError(err).Warn("slack: failed to add task") | ||
ctx.JSON(http.StatusOK, slackbot.NewSlackWebhookMessage("量子速读请求发送失败了,可以再试试?")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
能用 ctx.AbortWithStatusJSON()
么?
internal/bots/slack/slack.go
Outdated
ctx, cancel := context.WithTimeout(ctx, time.Second*5) | ||
defer cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lifecycle 本身有 timeout 信号的,不用自己写
f593da7
to
17b77b5
Compare
* fix: add line break because invoke chain too long * fix: rename function botExists to isBotExists * fix: use cancel func to stop service * fix: return origin error when check url * fix: unit test * fix: lint issue * fix: remove conc * fix: require pop result length equals to 2 * fix: format error text instead * fix: change some error handling method in telegram smr command * fix: change slack command receive log level * fix: grammar issue in log text * fix: rename fields in TaskInfo * fix: remove code will cause health check alert * fix: remove redundant blank line * fix: add timeout for slack webhook server shutdown, do log for any error when shutdown * fix: rename some identifier Co-authored-by: Neko Ayaka <neko@ayaka.moe>
17b77b5
to
20a23c3
Compare
No description provided.