/
goroutine_request_consumer.go
50 lines (40 loc) · 1.23 KB
/
goroutine_request_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package bamboo
import (
"context"
"log/slog"
"google.golang.org/protobuf/proto"
"github.com/pecolynx/bamboo/internal"
pb "github.com/pecolynx/bamboo/proto"
)
type goroutineBambooRequestConsumer struct {
queue <-chan []byte
}
func NewGoroutineBambooRequestConsumer(queue <-chan []byte) BambooRequestConsumer {
return &goroutineBambooRequestConsumer{
queue: queue,
}
}
func (c *goroutineBambooRequestConsumer) Consume(ctx context.Context) (*pb.WorkerParameter, error) {
logger := GetLoggerFromContext(ctx, BambooRequestConsumerLoggerContextKey)
ctx = WithLoggerName(ctx, BambooRequestConsumerLoggerContextKey)
logger.DebugContext(ctx, "start consuming loop")
for {
select {
case <-ctx.Done():
return nil, internal.Errorf("ctx.Done(). stop consuming loop. err: %w", ErrContextCanceled)
case reqBytes := <-c.queue:
req := pb.WorkerParameter{}
if err := proto.Unmarshal(reqBytes, &req); err != nil {
logger.WarnContext(ctx, "invalid parameter. failed to proto.Unmarshal.", slog.Any("err", err))
continue
}
return &req, nil
}
}
}
func (c *goroutineBambooRequestConsumer) Ping(ctx context.Context) error {
return nil
}
func (c *goroutineBambooRequestConsumer) Close(ctx context.Context) error {
return nil
}