-
Notifications
You must be signed in to change notification settings - Fork 7
/
cursor.go
119 lines (101 loc) · 3.38 KB
/
cursor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package rpatterns
import (
"context"
"strconv"
"sync"
"github.com/luno/reflex"
)
// ReadThroughCursorStore provides a cursor store that queries the fallback
// cursor store if the cursor is not found in the primary. It writes the cursor
// value to the primary if not. Cursor updates always go directly to the primary.
//
// Use cases:
// - Migrating cursor stores: Use the new cursor store as the primary
// and the old cursor store as the fallback. Revert to just the new
// cursor store after the migration.
// - Programmatic seeding of a cursor: Use a MemCursorStore with the cursor
// seeded by WithMemCursor as the fallback and the target cursor store as the primary.
// Revert to just the target cursor store afterwards.
func ReadThroughCursorStore(primary, fallback reflex.CursorStore) reflex.CursorStore {
return &readThroughCursorStore{CursorStore: primary, fallback: fallback}
}
type readThroughCursorStore struct {
reflex.CursorStore // Primary
fallback reflex.CursorStore
}
func (c *readThroughCursorStore) GetCursor(ctx context.Context, consumerName string) (string, error) {
// Attempt to read from the primary.
cursor, err := c.CursorStore.GetCursor(ctx, consumerName)
if err != nil {
return "", err
}
// The cursor is in the primary, so we can return it.
if cursor != "" {
return cursor, nil
}
// Otherwise get the cursor from the fallback...
cursor, err = c.fallback.GetCursor(ctx, consumerName)
if err != nil {
return "", err
} else if cursor == "" {
return cursor, nil
}
// ...and write the cursor to the primary.
//
// This could lead to a race condition - we might have called SetCursor()
// with a later cursor value since the first GetCursor() call. This
// SetCursor() may then fail, but on retry the first GetCursor() will
// return a value.
if err := c.CursorStore.SetCursor(ctx, consumerName, cursor); err != nil {
return "", err
}
return cursor, nil
}
// MemCursorStore returns an in-memory cursor store. Note that it obviously
// does not provide any persistence guarantees.
//
// Use cases:
// - Testing
// - Programmatic seeding of a cursor: See ReadThroughCursorStore above.
func MemCursorStore(opts ...MemOpt) reflex.CursorStore {
res := &memCursorStore{cursors: make(map[string]string)}
for _, opt := range opts {
opt(res)
}
return res
}
type memCursorStore struct {
mu sync.Mutex
cursors map[string]string
}
func (m *memCursorStore) GetCursor(_ context.Context, consumerName string) (string, error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.cursors[consumerName], nil
}
func (m *memCursorStore) SetCursor(_ context.Context, consumerName string, cursor string) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.cursors == nil {
m.cursors = make(map[string]string)
}
m.cursors[consumerName] = cursor
return nil
}
func (m *memCursorStore) Flush(_ context.Context) error { return nil }
// MemOpt are options for the MemCursorStore
type MemOpt func(*memCursorStore)
// WithMemCursor returns an option that stores the cursor in the
// MemCursorStore.
func WithMemCursor(name, cursor string) MemOpt {
return func(m *memCursorStore) {
_ = m.SetCursor(nil, name, cursor)
}
}
// WithMemCursorInt returns a option that stores the int cursor in the
// MemCursorStore.
func WithMemCursorInt(name string, cursor int64) MemOpt {
return func(m *memCursorStore) {
_ = m.SetCursor(nil, name, strconv.FormatInt(cursor, 10))
}
}