-
Notifications
You must be signed in to change notification settings - Fork 18
/
events_log.go
149 lines (126 loc) · 3.47 KB
/
events_log.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package eventslog
import (
"fmt"
"io"
"strconv"
"time"
)
type EventType string
type EventStatus string
type EventsLogRecordId string
type Level string
const (
LevelInfo Level = "info"
LevelWarning Level = "warn"
LevelError Level = "error"
LevelDebug Level = "debug"
)
const (
EventTypeIncoming EventType = "incoming"
EventTypeProcessed EventType = "bulker_stream"
EventTypeBatch EventType = "bulker_batch"
)
type EventsLogFilter struct {
Start time.Time
End time.Time
BeforeId EventsLogRecordId
Filter func(event any) bool
}
type EventsLogRecord struct {
Id EventsLogRecordId `json:"id"`
Date time.Time `json:"date"`
Content any `json:"content"`
}
type ActorEvent struct {
EventType EventType
Level Level
ActorId string
Event any
Timestamp time.Time
}
type EventsLogService interface {
io.Closer
// PostEvent posts event to the events log
// actorId – id of entity of event origin. E.g. for 'incoming' event - id of site, for 'processed' event - id of destination
PostEvent(event *ActorEvent) (id EventsLogRecordId, err error)
PostAsync(event *ActorEvent)
GetEvents(eventType EventType, actorId string, level string, filter *EventsLogFilter, limit int) ([]EventsLogRecord, error)
}
type MultiEventsLogService struct {
Services []EventsLogService
}
func (m *MultiEventsLogService) PostEvent(event *ActorEvent) (id EventsLogRecordId, err error) {
for _, service := range m.Services {
id, err = service.PostEvent(event)
if err != nil {
return
}
}
return
}
func (m *MultiEventsLogService) PostAsync(event *ActorEvent) {
for _, service := range m.Services {
service.PostAsync(event)
}
}
func (m *MultiEventsLogService) GetEvents(eventType EventType, actorId string, level string, filter *EventsLogFilter, limit int) ([]EventsLogRecord, error) {
for _, service := range m.Services {
events, _ := service.GetEvents(eventType, actorId, level, filter, limit)
if len(events) > 0 {
return events, nil
}
}
return nil, nil
}
func (m *MultiEventsLogService) Close() error {
for _, service := range m.Services {
_ = service.Close()
}
return nil
}
// GetStartAndEndIds returns end and start ids for the stream
func (f *EventsLogFilter) GetStartAndEndIds() (start, end string, err error) {
end = "+"
start = "-"
if f == nil {
return
}
var endTime int64
if f.BeforeId != "" {
end = fmt.Sprintf("(%s", f.BeforeId)
tsTime, err := parseTimestamp(string(f.BeforeId))
if err != nil {
return "", "", err
}
endTime = tsTime.UnixMilli()
}
if !f.End.IsZero() {
if endTime == 0 || f.End.UnixMilli() < endTime {
end = fmt.Sprint(f.End.UnixMilli())
}
}
if !f.Start.IsZero() {
start = fmt.Sprint(f.Start.UnixMilli())
}
return
}
func parseTimestamp(id string) (time.Time, error) {
match := redisStreamIdTimestampPart.FindStringSubmatch(id)
if match == nil {
return time.Time{}, fmt.Errorf("failed to parse beforeId [%s] it is expected to start with timestamp", id)
}
ts, _ := strconv.ParseInt(match[0], 10, 64)
return time.UnixMilli(ts), nil
}
type DummyEventsLogService struct{}
func (d *DummyEventsLogService) PostAsync(_ *ActorEvent) {
}
func (d *DummyEventsLogService) PostEvent(_ *ActorEvent) (id EventsLogRecordId, err error) {
return "", nil
}
func (d *DummyEventsLogService) GetEvents(eventType EventType, actorId string, level string, filter *EventsLogFilter, limit int) ([]EventsLogRecord, error) {
return nil, nil
}
func (d *DummyEventsLogService) Close() error {
return nil
}