-
Notifications
You must be signed in to change notification settings - Fork 2
/
message.go
99 lines (83 loc) · 1.69 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package queue
import (
"context"
"sync"
"github.com/mss-boot-io/redisqueue/v2"
"github.com/mss-boot-io/mss-boot-admin/storage"
)
type Message struct {
redisqueue.Message
ErrorCount int
mux sync.RWMutex
ctx context.Context
}
func (m *Message) GetID() string {
return m.ID
}
func (m *Message) GetStream() string {
m.mux.Lock()
defer m.mux.Unlock()
return m.Stream
}
func (m *Message) GetValues() map[string]interface{} {
m.mux.Lock()
defer m.mux.Unlock()
data := make(map[string]interface{})
for k, v := range m.Values {
data[k] = v
}
data["__id"] = m.ID
data["__steam"] = m.Stream
return data
}
func (m *Message) SetID(id string) {
m.ID = id
}
func (m *Message) SetStream(stream string) {
m.mux.Lock()
defer m.mux.Unlock()
m.Stream = stream
}
func (m *Message) SetValues(values map[string]interface{}) {
m.mux.Lock()
defer m.mux.Unlock()
if m.ID == "" {
m.ID, _ = values["__id"].(string)
}
if m.Stream == "" {
m.Stream, _ = values["__steam"].(string)
}
delete(values, "__id")
delete(values, "__steam")
m.Values = values
}
func (m *Message) SetContext(ctx context.Context) {
m.ctx = ctx
}
func (m *Message) GetPrefix() (prefix string) {
m.mux.Lock()
defer m.mux.Unlock()
if m.Values == nil {
return
}
v, _ := m.Values[storage.PrefixKey]
prefix, _ = v.(string)
return
}
func (m *Message) SetPrefix(prefix string) {
m.mux.Lock()
defer m.mux.Unlock()
if m.Values == nil {
m.Values = make(map[string]interface{})
}
m.Values[storage.PrefixKey] = prefix
}
func (m *Message) SetErrorCount(count int) {
m.ErrorCount = count
}
func (m *Message) GetErrorCount() int {
return m.ErrorCount
}
func (m *Message) GetContext() context.Context {
return m.ctx
}