-
Notifications
You must be signed in to change notification settings - Fork 0
/
state.go
250 lines (205 loc) · 5.57 KB
/
state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
package logd
import (
"errors"
"fmt"
"os"
"github.com/jeffrom/logd/protocol"
)
var ErrProcessing = errors.New("message already being processed")
// StatePusher saves recently written offsets for later processing.
type StatePusher interface {
Push(off uint64) error
}
// Backlogger saves failed batch writes so they can be attempted later.
type Backlogger interface {
// Backlog() should return a channel with the desired backfill size. The client
// should attempt to write failed batches to the channel, but if the channel is
// full, it will log the error and continue.
Backlog() chan *Backlog
}
type Backlog struct {
Batch *protocol.Batch
Err error
}
type ErrorHandler interface {
HandleError(err error)
}
// StatePuller pulls messages from the state and marks them completed or
// failed.
type StatePuller interface {
// Get returns the oldest message offset and delta that isn't already being
// processed.
Get() (uint64, uint64, error)
// Start marks a message as being processed. It should return
// ErrProcessing if the message is already being processed.
Start(off, delta uint64) error
// Complete marks a message as completed or failed. Failure is written when
// err is not nil.
Complete(off, delta uint64, err error) error
}
// NoopStatePusher discards input
type NoopStatePusher struct {
}
// Push implements StatePusher
func (m *NoopStatePusher) Push(off uint64) error {
return nil
}
// Close implements StatePusher
func (m *NoopStatePusher) Close() error {
return nil
}
type NoopBacklogger struct{}
func (bl *NoopBacklogger) Backlog() chan *Backlog { return nil }
type NoopErrorHandler struct{}
func (eh *NoopErrorHandler) HandleError(err error) {}
// func (bl *NoopBacklogger) GetBacklog() (*protocol.Batch, error) {
// return nil, nil
// }
// StateOutputter writes offsets to a file. Intended for use by command line
// applications.
type StateOutputter struct {
f *os.File
}
// NewStateOutputter returns a new oneee
func NewStateOutputter(f *os.File) *StateOutputter {
return &StateOutputter{
f: f,
}
}
// Push implements StatePusher
func (m *StateOutputter) Push(off uint64) error {
_, err := fmt.Fprintf(m.f, "%d\n", off)
return err
}
// Close implements StatePusher
func (m *StateOutputter) Close() error {
return nil
}
// MockStatePusher saves pushed state so it can be read in tests
type MockStatePusher struct {
offs []uint64
errs []error
batches []*protocol.Batch
n int
serr error
}
// NewMockStatePusher returns a new instance of MockStatePusher
func NewMockStatePusher() *MockStatePusher {
return &MockStatePusher{
offs: make([]uint64, 0),
errs: make([]error, 0),
batches: make([]*protocol.Batch, 0),
}
}
// Push implements StatePusher
func (m *MockStatePusher) Push(off uint64) error {
m.offs = append(m.offs, off)
// m.errs = append(m.errs, oerr)
// m.batches = append(m.batches, batch)
return m.serr
}
// SetError sets the error to be returned from calls to Push
func (m *MockStatePusher) SetError(err error) {
m.serr = err
}
// Next returns the next offset, error, and batch, starting from the first. if
// there are no more pushed states, the last return value will be false
func (m *MockStatePusher) Next() (uint64, bool) {
if m.n >= len(m.offs) {
return 0, false
}
off := m.offs[m.n]
m.n++
return off, true
}
// ErrNoState should be returned by StatePullers when the state hasn't
// stored any offset information yet.
var ErrNoState = errors.New("state uninitialized")
type NoopStatePuller int
// Get implements StatePuller interface.
func (h NoopStatePuller) Get() (uint64, uint64, error) {
return 0, 0, ErrNoState
}
// Start implements StatePuller interface.
func (h NoopStatePuller) Start(off, delta uint64) error {
return nil
}
// Complete implements StatePuller interface.
func (h NoopStatePuller) Complete(off, delta uint64, err error) error {
return nil
}
// FileStatePuller tracks offset state in a file.
type FileStatePuller struct {
conf *Config
name string
f *os.File
}
// NewFileStatePuller returns a new instance of *FileStatePuller.
func NewFileStatePuller(conf *Config, name string) *FileStatePuller {
return &FileStatePuller{
name: name,
conf: conf,
}
}
func (m *FileStatePuller) isReady() bool {
if m.f == nil {
return false
}
return true
}
// Setup implements internal.LifecycleManager.
func (m *FileStatePuller) Setup() error {
panic("not implemented")
return nil
}
// Shutdown implements internal.LifecycleManager.
func (m *FileStatePuller) Shutdown() error {
if m.f != nil {
return m.f.Close()
}
return nil
}
// Get implements StatePuller interface.
func (m *FileStatePuller) Get() (uint64, uint64, error) {
panic("not implemented")
return 0, 0, nil
}
func (m *FileStatePuller) Start(off, delta uint64) error {
panic("not implemented")
return nil
}
// Complete implements StatePuller interface.
func (m *FileStatePuller) Complete(off, delta uint64) error {
panic("not implemented")
return nil
}
type MemoryStatePuller struct {
conf *Config
off uint64
delta uint64
initialized bool
err error
}
func NewMemoryStatePuller(conf *Config) *MemoryStatePuller {
return &MemoryStatePuller{
conf: conf,
}
}
// Get implements StatePuller interface
func (m *MemoryStatePuller) Get() (uint64, uint64, error) {
if !m.initialized {
return 0, 0, ErrNoState
}
return m.off, m.delta, nil
}
func (m *MemoryStatePuller) Start(off, delta uint64) error {
return nil
}
// Complete implements StatePuller interface
func (m *MemoryStatePuller) Complete(off, delta uint64, err error) error {
m.initialized = true
m.off = off
m.delta = delta
m.err = err
return nil
}