/
concurrent_list.go
87 lines (74 loc) · 2.32 KB
/
concurrent_list.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package offset
import (
"sync"
)
// ConcurrentList is a list that maintains kafka offsets with thread-safe Insert and setToHighestContiguous operations
type ConcurrentList struct {
offsets []int64
mutex sync.Mutex
}
func newConcurrentList(minOffset int64) *ConcurrentList {
return &ConcurrentList{offsets: []int64{minOffset}}
}
// Insert into the list in O(1) time.
// This operation is thread-safe
func (s *ConcurrentList) insert(offset int64) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.offsets = append(s.offsets, offset)
}
// setToHighestContiguous sets head to highestContiguous and returns the message and status.
// This is a O(n) operation.
// highestContiguous is defined as the highest sequential integer encountered while traversing from the head of the
// list.
// For e.g., if the list is [1, 2, 3, 5], the highestContiguous is 3.
// This operation is thread-safe
func (s *ConcurrentList) setToHighestContiguous() int64 {
s.mutex.Lock()
offsets := s.offsets
s.offsets = nil
s.mutex.Unlock()
highestContiguousOffset := getHighestContiguous(offsets)
var higherOffsets []int64
for _, offset := range offsets {
if offset >= highestContiguousOffset {
higherOffsets = append(higherOffsets, offset)
}
}
s.mutex.Lock()
s.offsets = append(s.offsets, higherOffsets...)
s.mutex.Unlock()
return highestContiguousOffset
}
func getHighestContiguous(offsets []int64) int64 {
offsetSet := make(map[int64]struct{}, len(offsets))
minOffset := offsets[0]
for _, offset := range offsets {
offsetSet[offset] = struct{}{}
if minOffset > offset {
minOffset = offset
}
}
highestContiguous := minOffset
for {
if _, ok := offsetSet[highestContiguous+1]; ok {
highestContiguous++
} else {
break
}
}
return highestContiguous
}