/
client.go
90 lines (76 loc) · 2.46 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package sse
import (
"context"
"log"
"golang.org/x/sync/errgroup"
)
type CtxWorkerID string
const CtxWorkerIDKey CtxWorkerID = "workerID"
type MiddlewareHandler interface {
Process(handler HandlerFunc) HandlerFunc
}
type Middleware func(handler HandlerFunc) HandlerFunc
// Client is a facade that provide convenient interface to process data from the stream,
// and unites Streamer and Consumer under implementation. Also, the Client allows to register global middleware
// that will be applied for all handlers.
type Client struct {
Streamer *Streamer
Consumer *Consumer
EventStream chan RawEvent
streamErrors chan error
consumerErrors chan error
StreamErrorHandler func(<-chan error)
ConsumerErrorHandler func(<-chan error)
middlewares []Middleware
WorkersCount int
}
func NewClient(url string) *Client {
return &Client{
Streamer: DefaultStreamer(url),
Consumer: NewConsumer(),
EventStream: make(chan RawEvent, 10),
streamErrors: make(chan error, 1),
consumerErrors: make(chan error, 1),
StreamErrorHandler: logErrors,
ConsumerErrorHandler: logErrors,
WorkersCount: 1,
}
}
func (p *Client) Start(ctx context.Context, lastEventID int) error {
groupErrs, ctx := errgroup.WithContext(ctx)
groupErrs.Go(func() error {
return p.Streamer.FillStream(ctx, lastEventID, p.EventStream, p.streamErrors)
})
for i := 0; i < p.WorkersCount; i++ {
newCtx := context.WithValue(ctx, CtxWorkerIDKey, i)
groupErrs.Go(func() error {
return p.Consumer.Run(newCtx, p.EventStream, p.consumerErrors)
})
}
go p.StreamErrorHandler(p.streamErrors)
go p.ConsumerErrorHandler(p.consumerErrors)
return groupErrs.Wait()
}
func (p *Client) RegisterMiddleware(one Middleware) {
p.middlewares = append(p.middlewares, one)
}
func (p *Client) RegisterHandler(eventType EventType, handler HandlerFunc) {
p.Streamer.RegisterEvent(eventType)
// Loop backwards through the middleware invoking each one. Replace the
// handler with the new wrapped handler. Looping backwards ensures that the
// first middleware of the slice is the first to be executed by requests.
for i := len(p.middlewares) - 1; i >= 0; i-- {
handler = p.middlewares[i](handler)
}
p.Consumer.RegisterHandler(eventType, handler)
}
func logErrors(source <-chan error) {
for one := range source {
log.Println(one)
}
}
func (p *Client) Stop() {
close(p.EventStream)
close(p.streamErrors)
close(p.consumerErrors)
}