-
Notifications
You must be signed in to change notification settings - Fork 1
/
server.go
67 lines (56 loc) · 1.74 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package task
import (
"context"
"fmt"
"github.com/hibiken/asynq"
"golang.org/x/exp/slog"
)
type Server struct {
cfg *asynq.Config
srv *asynq.Server
}
type ServerOption func(s *Server)
func WithGroupAggregator(groupAggregator asynq.GroupAggregator) ServerOption {
return func(s *Server) {
s.cfg.GroupAggregator = groupAggregator
}
}
func WithErrorHandler(errorHandler asynq.ErrorHandler) ServerOption {
return func(s *Server) {
s.cfg.ErrorHandler = errorHandler
}
}
func NewServer(cfg *ServerConfig, redisConnOpt asynq.RedisConnOpt, logger *slog.Logger, options ...ServerOption) *Server {
srv := &Server{
cfg: &asynq.Config{
Concurrency: cfg.Concurrency,
Queues: cfg.Queues,
StrictPriority: cfg.StrictPriority,
HealthCheckInterval: cfg.HealthCheckInterval,
DelayedTaskCheckInterval: cfg.DelayedTaskCheckInterval,
GroupGracePeriod: cfg.GroupGracePeriod,
GroupMaxDelay: cfg.GroupMaxDelay,
GroupMaxSize: cfg.GroupMaxSize,
ShutdownTimeout: cfg.GracefulTimeout,
Logger: &asynqLogger{logger: logger},
LogLevel: level(context.Background(), logger),
ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
logger.Error("handle task error", "err", err, "task", task.Type(), "payload", task.Payload())
}),
},
}
for _, option := range options {
option(srv)
}
srv.srv = asynq.NewServer(redisConnOpt, *srv.cfg)
return srv
}
func (s *Server) Start(handler asynq.Handler, errCh chan<- error) {
if err := s.srv.Start(handler); err != nil {
errCh <- fmt.Errorf("%s %w", OpServerStart, err)
}
}
func (s *Server) Stop() {
s.srv.Stop()
s.srv.Shutdown()
}