-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtimestamp2offset.go
More file actions
126 lines (105 loc) · 3.26 KB
/
Copy pathtimestamp2offset.go
File metadata and controls
126 lines (105 loc) · 3.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package kinesis2sse
import (
"cmp"
"errors"
"fmt"
"sync"
"time"
"modernc.org/b/v2"
)
// Timestamp2Offset is a map from offsets to timestamps. It's not thread-safe by default. Callers should use the embedded mutex.
type Timestamp2Offset struct {
*sync.Mutex
// capacity is the capacity of Timestamp2Offset.
capacity int
// lastOffset is the last added offset (used for error checking).
lastOffset int
// offset2Timestamp stores the mapping from offsets to timestamps.
offset2Timestamp map[int]time.Time
// timestamp2Offsets stores the mapping from timestamps to unordered offsets.
timestamp2Offsets *b.Tree[timestamp2OffsetsKey, struct{}]
}
type timestamp2OffsetsKey struct {
timestamp time.Time
offset int
}
func timestamp2OffsetsKeyCmp(a, b timestamp2OffsetsKey) int {
c := a.timestamp.Compare(b.timestamp)
if c == 0 {
c = cmp.Compare(a.offset, b.offset)
}
return c
}
// NewTimestamp2Offset returns a new Timestamp2Offset with the specified capacity.
func NewTimestamp2Offset(capacity int) (*Timestamp2Offset, error) {
if capacity <= 0 {
return nil, errors.New("capacity must be greater than 1")
}
return &Timestamp2Offset{
Mutex: &sync.Mutex{},
capacity: capacity,
lastOffset: -1,
offset2Timestamp: make(map[int]time.Time),
timestamp2Offsets: b.TreeNew[timestamp2OffsetsKey, struct{}](timestamp2OffsetsKeyCmp),
}, nil
}
// NearestOffset returns the smallest offset since the specified timestamp. If there is no smallest timestamp since
// the specified timestamp, it returns the next earliest offset, if any.
func (m *Timestamp2Offset) NearestOffset(timestamp time.Time) (int, bool) {
// Go forward…
e, _ := m.timestamp2Offsets.Seek(timestamp2OffsetsKey{
timestamp: timestamp,
offset: 0,
})
if k, _, err := e.Next(); err == nil {
return k.offset, true
}
// Go backward…
e, _ = m.timestamp2Offsets.Seek(timestamp2OffsetsKey{
timestamp: timestamp,
offset: 0,
})
if k, _, err := e.Prev(); err == nil {
return k.offset, true
}
return -1, false
}
// Add adds an offset and its timestamp. Offsets must be added in order.
func (m *Timestamp2Offset) Add(offset int, timestamp time.Time) error {
if offset < 0 {
return errors.New("offsets must be non-negative")
}
n := len(m.offset2Timestamp)
if n == 0 {
// Set the initial offset.
m.lastOffset = offset
} else if m.lastOffset != offset-1 {
return fmt.Errorf("cannot add offset %d when last offset was %d", offset, m.lastOffset)
}
if n == m.capacity {
// We are at capacity. Remove the oldest entry oldestOffset.
oldestOffset := offset - m.capacity
oldestOffsetTimestamp, ok := m.offset2Timestamp[oldestOffset]
if !ok {
return fmt.Errorf("old offset to remove %d not found", oldestOffset)
}
if ok = m.timestamp2Offsets.Delete(timestamp2OffsetsKey{
timestamp: oldestOffsetTimestamp,
offset: oldestOffset,
}); !ok {
return fmt.Errorf("timestamp %s for old offset to remove %d not found", oldestOffsetTimestamp.Format(time.RFC3339), oldestOffset)
}
delete(m.offset2Timestamp, oldestOffset)
}
// Add the newest entry offset.
m.offset2Timestamp[offset] = timestamp
m.timestamp2Offsets.Set(
timestamp2OffsetsKey{
timestamp: timestamp,
offset: offset,
},
struct{}{},
)
m.lastOffset = offset
return nil
}