/
event.go
119 lines (98 loc) · 2.63 KB
/
event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package sse
import (
"strconv"
"strings"
"sync"
)
const (
sseDelimiter = ":"
sseData = "data"
sseEvent = "event"
sseID = "id"
sseRetry = "retry"
)
// RawEvent interface contains the methods that expose the incoming SSE properties
type RawEvent interface {
ID() string
Event() string
Data() string
Retry() int64
IsError() bool
IsEmpty() bool
}
// RawEventImpl represents an incoming SSE event
type RawEventImpl struct {
id string
event string
data string
retry int64
}
// ID returns the event id
func (r *RawEventImpl) ID() string { return r.id }
// Event returns the event type
func (r *RawEventImpl) Event() string { return r.event }
// Data returns the event associated data
func (r *RawEventImpl) Data() string { return r.data }
// Retry returns the expected retry time
func (r *RawEventImpl) Retry() int64 { return r.retry }
// IsError returns true if the message is an error
func (r *RawEventImpl) IsError() bool { return r.event == "error" }
// IsEmpty returns true if the event contains no id, event type and data
func (r *RawEventImpl) IsEmpty() bool { return r.event == "" && r.id == "" && r.data == "" }
// EventBuilder interface
type EventBuilder interface {
AddLine(string)
Build() *RawEventImpl
}
// EventBuilderImpl implenets the EventBuilder interface. Used to parse incoming event lines
type EventBuilderImpl struct {
mutex sync.Mutex
lines []string
}
// AddLine adds a new line belonging to the currently being processed event
func (b *EventBuilderImpl) AddLine(line string) {
if strings.HasPrefix(line, sseDelimiter) {
// Ignore comments
return
}
b.mutex.Lock()
defer b.mutex.Unlock()
b.lines = append(b.lines, line)
}
// Build processes all the added lines and builds the event
func (b *EventBuilderImpl) Build() *RawEventImpl {
b.mutex.Lock()
defer b.mutex.Unlock()
if len(b.lines) == 0 { // Empty event
return &RawEventImpl{}
}
e := &RawEventImpl{}
for _, line := range b.lines {
splitted := strings.SplitN(line, sseDelimiter, 2)
if len(splitted) != 2 {
// TODO: log invalid line.
continue
}
switch splitted[0] {
case sseID:
e.id = strings.TrimSpace(splitted[1])
case sseData:
e.data = strings.TrimSpace(splitted[1])
case sseEvent:
e.event = strings.TrimSpace(splitted[1])
case sseRetry:
e.retry, _ = strconv.ParseInt(strings.TrimSpace(splitted[1]), 10, 64)
}
}
return e
}
// Reset clears the lines accepted
func (b *EventBuilderImpl) Reset() {
b.mutex.Lock()
defer b.mutex.Unlock()
b.lines = []string{}
}
// NewEventBuilder constructs a new event builder
func NewEventBuilder() *EventBuilderImpl {
return &EventBuilderImpl{lines: []string{}}
}