-
Notifications
You must be signed in to change notification settings - Fork 1k
/
stream.go
110 lines (96 loc) · 2.55 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package client
import (
"encoding/json"
"time"
pb "github.com/micro/micro/v3/proto/events"
"github.com/micro/micro/v3/service/client"
"github.com/micro/micro/v3/service/context"
"github.com/micro/micro/v3/service/events"
"github.com/micro/micro/v3/service/events/util"
log "github.com/micro/micro/v3/service/logger"
)
// NewStream returns an initialized stream service
func NewStream() events.Stream {
return new(stream)
}
type stream struct {
Client pb.StreamService
}
func (s *stream) Publish(topic string, msg interface{}, opts ...events.PublishOption) error {
// parse the options
options := events.PublishOptions{
Timestamp: time.Now(),
}
for _, o := range opts {
o(&options)
}
// encode the message if it's not already encoded
var payload []byte
if p, ok := msg.([]byte); ok {
payload = p
} else {
p, err := json.Marshal(msg)
if err != nil {
return events.ErrEncodingMessage
}
payload = p
}
// execute the RPC
_, err := s.client().Publish(context.DefaultContext, &pb.PublishRequest{
Topic: topic,
Payload: payload,
Metadata: options.Metadata,
Timestamp: options.Timestamp.Unix(),
}, client.WithAuthToken())
return err
}
func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan events.Event, error) {
// parse options
options := events.ConsumeOptions{AutoAck: true}
for _, o := range opts {
o(&options)
}
subReq := &pb.ConsumeRequest{
Topic: topic,
Group: options.Group,
Offset: options.Offset.Unix(),
AutoAck: options.AutoAck,
AckWait: options.AckWait.Nanoseconds(),
RetryLimit: int64(options.GetRetryLimit()),
}
// start the stream
stream, err := s.client().Consume(context.DefaultContext, subReq, client.WithAuthToken())
if err != nil {
return nil, err
}
evChan := make(chan events.Event)
go func() {
for {
ev, err := stream.Recv()
if err != nil {
log.Errorf("Error receiving from stream %s", err)
close(evChan)
return
}
evt := util.DeserializeEvent(ev)
if !options.AutoAck {
evt.SetNackFunc(func() error {
return stream.SendMsg(&pb.AckRequest{Id: evt.ID, Success: false})
})
evt.SetAckFunc(func() error {
return stream.SendMsg(&pb.AckRequest{Id: evt.ID, Success: true})
})
}
evChan <- evt
}
}()
return evChan, nil
}
// this is a tmp solution since the client isn't initialized when NewStream is called. There is a
// fix in the works in another PR.
func (s *stream) client() pb.StreamService {
if s.Client == nil {
s.Client = pb.NewStreamService("events", client.DefaultClient)
}
return s.Client
}