-
Notifications
You must be signed in to change notification settings - Fork 7
/
memory.go
80 lines (73 loc) · 2.15 KB
/
memory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package memory
import (
"context"
"fmt"
"time"
"github.com/lunarway/release-manager/internal/broker"
"github.com/lunarway/release-manager/internal/log"
)
type Broker struct {
logger *log.Logger
queue chan broker.Publishable
}
// New allocates and returns an in-memory Broker with provided queue size.
func New(logger *log.Logger, queueSize int) *Broker {
return &Broker{
logger: logger,
queue: make(chan broker.Publishable, queueSize),
}
}
func (b *Broker) Close() error {
close(b.queue)
return nil
}
func (b *Broker) Publish(ctx context.Context, event broker.Publishable) error {
b.logger.WithFields("message", event).Info("Publishing message")
now := time.Now()
b.queue <- event
duration := time.Since(now).Milliseconds()
b.logger.With(
"eventType", event.Type(),
"res", map[string]interface{}{
"status": "ok",
"responseTime": duration,
}).Info("[publisher] [OK] Published message successfully")
return nil
}
func (b *Broker) StartConsumer(handlers map[string]func([]byte) error) error {
for msg := range b.queue {
logger := b.logger.With(
"eventType", msg.Type(),
)
logger.WithFields("message", msg).Infof("Received message type=%s", msg.Type())
handler, ok := handlers[msg.Type()]
if !ok {
logger.With("res", map[string]interface{}{
"status": "failed",
"error": "unprocessable",
}).Errorf("[consumer] [UNPROCESSABLE] Failed to handle message: no handler registered for event type '%s': dropping it", msg.Type)
continue
}
body, err := msg.Marshal()
if err != nil {
logger.Errorf("[consumer] [UNPROCESSABLE] Could not get body of message: %v", err)
continue
}
now := time.Now()
err = handler(body)
duration := time.Since(now).Milliseconds()
if err != nil {
logger.With("res", map[string]interface{}{
"status": "failed",
"responseTime": duration,
"error": fmt.Sprintf("%+v", err),
}).Errorf("[consumer] [FAILED] Failed to handle message: nacking and requeing: %v", err)
continue
}
logger.With("res", map[string]interface{}{
"status": "ok",
"responseTime": duration,
}).Info("[OK] Event handled successfully")
}
return broker.ErrBrokerClosed
}