-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub.go
117 lines (97 loc) · 3.04 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package qp
import (
"github.com/stretchr/slog"
)
// Event defines all the fields and information
// included as part of a Event to a request.
type Event struct {
// From is the instance ID of the sender.
From string `json:"from"`
// Data is the payload of the event.
Data interface{} `json:"data"`
}
// Publisher represents types capable of publishing events.
type Publisher interface {
// Publish publishes the object on the specified channel.
Publish(channel string, obj interface{}) error
}
// publisher allows events to be published.
type publisher struct {
name string
instanceID string
uniqueID string
codec Codec
transport PubSubTransport
}
// NewPublisher makes a new publisher capable of Publishing events.
func NewPublisher(name, instanceID string, codec Codec, transport PubSubTransport) Publisher {
return &publisher{
name: name,
instanceID: instanceID,
uniqueID: name + "." + instanceID,
codec: codec,
transport: transport,
}
}
func (p *publisher) Publish(channel string, obj interface{}) error {
event := &Event{From: p.uniqueID, Data: obj}
data, err := p.codec.Marshal(event)
if err != nil {
return err
}
if err := p.transport.Publish(channel, data); err != nil {
return err
}
return nil
}
// EventHandler represents types capable of handling Requests.
type EventHandler interface {
Handle(*Event)
}
// EventHandlerFunc represents functions capable of handling
// Requests.
type EventHandlerFunc func(*Event)
// Handle calls the EventHandlerFunc in order to handle
// the specific Event.
func (f EventHandlerFunc) Handle(r *Event) {
f(r)
}
// Subscriber represents types capable of subscribing to
// events.
type Subscriber interface {
// Subscribe binds the handler to the specified channel.
Subscribe(channel string, handler EventHandler) error
// SubscribeFunc binds the EventHandlerFunc to the specified channel.
SubscribeFunc(channel string, fn EventHandlerFunc) error
}
// subscriber allows events to be subscribed to.
type subscriber struct {
codec Codec
transport PubSubTransport
log slog.Logger
}
// NewSubscriber creates a Subscriber object capable of subscribing
// to events.
func NewSubscriber(codec Codec, transport PubSubTransport) Subscriber {
return NewSubscriberLogger(codec, transport, slog.NilLogger)
}
// NewSubscriberLogger creates a Subscriber object capable of subscribing
// to events, while logging errors to the specified logger.
func NewSubscriberLogger(codec Codec, transport PubSubTransport, logger slog.Logger) Subscriber {
return &subscriber{codec: codec, transport: transport, log: logger}
}
func (s *subscriber) Subscribe(channel string, handler EventHandler) error {
return s.transport.Subscribe(channel, HandlerFunc(func(msg *Message) {
var event Event
if err := s.codec.Unmarshal(msg.Data, &event); err != nil {
if s.log.Err() {
s.log.Err("Unmarshal error in Subscribe:", err)
}
return
}
handler.Handle(&event)
}))
}
func (s *subscriber) SubscribeFunc(channel string, fn EventHandlerFunc) error {
return s.Subscribe(channel, fn)
}