Skip to content

Commit

Permalink
给 发表评论/更新评论 接口加上了节流阀
Browse files Browse the repository at this point in the history
这应该是多年来想做又一直没做的功能?
  • Loading branch information
movsb committed May 11, 2024
1 parent c59d1f7 commit a91e125
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 0 deletions.
34 changes: 34 additions & 0 deletions modules/utils/debounce.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package utils
import (
"sync"
"time"

"github.com/phuslu/lru"
)

// 防抖神器。
Expand All @@ -12,6 +14,8 @@ import (
// 比如:我需要在模板文件更新后刷新模板,但是切分支的时候会有收到大量文件修改通知,
// 此时不能每收到一个文件就触发一次刷新,需要在“稳定”(分支切换完全后、几秒内没有文件再有修改)
// 的情况下方可刷新。可避免掉大量无意义的刷新。
//
// NOTE:回调函数 fn 是在独立的线程中被调用的。
func NewDebouncer(interval time.Duration, fn func()) *_Debouncer {
if interval < time.Second {
panic(`invalid debouncer interval`)
Expand Down Expand Up @@ -66,3 +70,33 @@ func (d *_Debouncer) wait() {
}()
}
}

// 节流神器。
//
// 仅在动作完成一定时间后才允许再度执行。
// 基于时间,而不是漏桶🪣或令牌。
//
// 比如:十秒钟内只允许评论一次。
func NewThrottler[Key comparable]() *Throttler[Key] {
t := &Throttler[Key]{
// TODO:不是很清楚这个容量满了会是怎样?
// 如果满了就被迫删除,那岂不是仍然可以通过刷入大量 key 的
// 情况下强制失效后使旧 key 再度合法?
cache: lru.NewTTLCache[Key, time.Time](1024),
}
return t
}

type Throttler[Key comparable] struct {
cache *lru.TTLCache[Key, time.Time]
}

// 检测并更新
func (t *Throttler[Key]) Throttled(key Key, interval time.Duration) bool {
last, ok := t.cache.Get(key)
if ok && time.Since(last) < interval {
return true
}
t.cache.Set(key, time.Now(), interval)
return false
}
17 changes: 17 additions & 0 deletions modules/utils/debouncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,20 @@ func TestDebouncer(t *testing.T) {
d.Enter()
time.Sleep(time.Second)
}

func TestThrottler(t *testing.T) {
l := utils.NewThrottler[int]()

expect := func(b, e bool) {
if b != e {
t.Fatal(b, e)
}
}

expect(l.Throttled(0, time.Second), false)
expect(l.Throttled(0, time.Second), true)
time.Sleep(time.Millisecond * 100)
expect(l.Throttled(0, time.Second), true)
time.Sleep(time.Second)
expect(l.Throttled(0, time.Second), false)
}
1 change: 1 addition & 0 deletions modules/utils/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func (l *TemplateLoader) parseNamed() {
l.lock.Lock()
defer l.lock.Unlock()
names, _ := fs.Glob(l.fsys, `[^_]*.html`)
l.named = make(map[string]*template.Template)
for _, name := range names {
// NOTE: name 如果包含 pattern 字符的话,这里大概率会出错。奇怪为什么没有按 name parse 的。
t2, err := template.New(name).Funcs(l.funcs).ParseFS(l.fsys, name)
Expand Down
2 changes: 2 additions & 0 deletions service/comment.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func in5min(t int32) bool {
//
// NOTE:只支持更新评论内容。
// NOTE:带上时间戳,防止异地多次更新的冲突(太严格了吧!)
// NOTE:带节流。
func (s *Service) UpdateComment(ctx context.Context, req *protocols.UpdateCommentRequest) (*protocols.Comment, error) {
ac := auth.Context(ctx)
cmtOld := s.getComment2(req.Comment.Id)
Expand Down Expand Up @@ -315,6 +316,7 @@ const (
// Content 自动由 source 生成。
//
// NOTE: 默认的 modified 修改时间为 0,表示从未被修改过。
// NOTE: 带节流。
func (s *Service) CreateComment(ctx context.Context, in *protocols.Comment) (*protocols.Comment, error) {
ac := auth.Context(ctx)

Expand Down
53 changes: 53 additions & 0 deletions service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"html/template"
"log"
"net"
"net/netip"
"os"
"strings"
"sync/atomic"
Expand All @@ -17,6 +18,7 @@ import (
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/movsb/taoblog/cmd/config"
"github.com/movsb/taoblog/modules/auth"
"github.com/movsb/taoblog/modules/utils"
"github.com/movsb/taoblog/modules/version"
"github.com/movsb/taoblog/protocols"
"github.com/movsb/taoblog/service/modules/cache"
Expand All @@ -31,6 +33,28 @@ import (
"google.golang.org/grpc/status"
)

// 请求节流器限流信息。
// 由于没有用户系统,目前根据 IP 限流。
// 这样会对网吧、办公网络非常不友好。
type _RequestThrottlerKey struct {
UserID int
IP netip.Addr
Method string // 指 RPC 方法,用路径代替。
}

func throttlerKeyOf(ctx context.Context) _RequestThrottlerKey {
ac := auth.Context(ctx)
method, ok := grpc.Method(ctx)
if !ok {
panic(status.Error(codes.Internal, "没有找到调用方法。"))
}
return _RequestThrottlerKey{
UserID: int(ac.User.ID),
IP: ac.RemoteAddr,
Method: method,
}
}

// Service implements IServer.
type Service struct {
cfg *config.Config
Expand All @@ -51,6 +75,9 @@ type Service struct {
postContentCaches *lru.TTLCache[_PostContentCacheKey, string]
postCaches *cache.RelativeCacheKeys[int64, _PostContentCacheKey]

// 请求节流器。
throttler *utils.Throttler[_RequestThrottlerKey]

avatarCache *AvatarCache

// 搜索引擎启动需要时间,所以如果网站一运行即搜索,则可能出现引擎不可用
Expand All @@ -73,6 +100,7 @@ func NewService(cfg *config.Config, db *sql.DB, auther *auth.Auth) *Service {
cache: lru.NewTTLCache[string, any](1024),
postContentCaches: lru.NewTTLCache[_PostContentCacheKey, string](1024),
postCaches: cache.NewRelativeCacheKeys[int64, _PostContentCacheKey](),
throttler: utils.NewThrottler[_RequestThrottlerKey](),
}

s.cmtntf = &comment_notify.CommentNotifier{
Expand All @@ -93,6 +121,7 @@ func NewService(cfg *config.Config, db *sql.DB, auther *auth.Auth) *Service {
grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(exceptionRecoveryHandler)),
s.auth.UserFromGatewayUnaryInterceptor(),
s.auth.UserFromClientTokenUnaryInterceptor(),
s.throttlerGatewayInterceptor,
grpcLoggerUnary,
),
grpc_middleware.WithStreamServerChain(
Expand Down Expand Up @@ -244,3 +273,27 @@ func (s *Service) CreateCustomThemeApplyFunc() func() string {
return w.String()
}
}

var methodThrottlerInfo = map[string]struct {
Interval time.Duration
Message string
}{
`/protocols.TaoBlog/CreateComment`: {
Interval: time.Second * 10,
Message: `评论发表过于频繁,请稍等几秒后再试。`,
},
`/protocols.TaoBlog/UpdateComment`: {
Interval: time.Second * 5,
Message: `评论更新过于频繁,请稍等几秒后再试。`,
},
}

func (s *Service) throttlerGatewayInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
if info, ok := methodThrottlerInfo[info.FullMethod]; ok {
if s.throttler.Throttled(throttlerKeyOf(ctx), info.Interval) {
msg := utils.IIF(info.Message != "", info.Message, `你被节流了,请稍候再试。You've been throttled.`)
return nil, status.Error(codes.Aborted, msg)
}
}
return handler(ctx, req)
}

0 comments on commit a91e125

Please sign in to comment.