/
stream.go
84 lines (71 loc) · 1.71 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package sse
import (
"context"
"net/http"
"github.com/pkg/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/render/problem"
)
var (
// default error
errBadStream = errors.New("Unexpected stream error")
// known errors
ErrRateLimited = errors.New("Rate limit exceeded")
)
type Stream struct {
ctx context.Context
w http.ResponseWriter
done bool
eventsSent int
limit int
initialized bool
}
// NewStream creates a new stream against the provided response writer.
func NewStream(ctx context.Context, w http.ResponseWriter) *Stream {
return &Stream{
ctx: ctx,
w: w,
}
}
// Init function is only executed once. It writes the preamble event which includes the HTTP response code and a
// hello message. This should be called before any method that writes to the client to ensure that the preamble
// has been sent first.
func (s *Stream) Init() {
if !s.initialized {
s.initialized = true
ok := WritePreamble(s.ctx, s.w)
if !ok {
s.done = true
}
}
}
func (s *Stream) Send(e Event) {
s.Init()
WriteEvent(s.ctx, s.w, e)
s.eventsSent++
}
func (s *Stream) SetLimit(limit int) {
s.limit = limit
}
func (s *Stream) Done() {
s.Init()
WriteEvent(s.ctx, s.w, goodbyeEvent)
s.done = true
}
func (s *Stream) Err(err error) {
// We haven't initialized the stream, we should simply return the normal HTTP
// error because it means that we haven't sent the preamble.
if !s.initialized {
problem.Render(s.ctx, s.w, err)
return
}
if knownErr := problem.IsKnownError(err); knownErr != nil {
err = knownErr
} else {
log.Ctx(s.ctx).WithStack(err).Error(err)
err = errBadStream
}
s.Init()
WriteEvent(s.ctx, s.w, Event{Error: err})
s.done = true
}