forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
global_state.go
171 lines (144 loc) · 4.1 KB
/
global_state.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package wal
import (
"bufio"
"encoding/gob"
"fmt"
"io"
"math"
"os"
)
type GlobalState struct {
// used for creating index entries
CurrentFileSuffix uint32
CurrentFileOffset int64
// keep track of the next request number
LargestRequestNumber uint32
// used for rollover
FirstSuffix uint32
// last seq number used
ShardLastSequenceNumber map[uint32]uint64
// committed request number per server
ServerLastRequestNumber map[uint32]uint32
// path to the state file
path string
}
// from version 0.7 to 0.8 the Suffix variables changed from
// ints to uint32s. We need this struct to convert them.
type oldGlobalState struct {
GlobalState
CurrentFileSuffix int
FirstSuffix int
}
func newGlobalState(path string) (*GlobalState, error) {
f, err := os.Open(path)
if err == nil {
defer f.Close()
}
state := &GlobalState{
ServerLastRequestNumber: map[uint32]uint32{},
ShardLastSequenceNumber: map[uint32]uint64{},
path: path,
}
if os.IsNotExist(err) {
return state, nil
}
if err != nil {
return nil, err
}
if err := state.read(f); err != nil {
return nil, err
}
state.path = path
return state, nil
}
func (self *GlobalState) writeToFile() error {
newFile, err := os.OpenFile(self.path+".new", os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0644)
if err != nil {
return err
}
// always close and ignore any errors on exit
defer newFile.Close()
if err := self.write(newFile); err != nil {
return err
}
if err := newFile.Sync(); err != nil {
return err
}
if err := newFile.Close(); err != nil {
return err
}
if err := os.Remove(self.path); nil != err && !os.IsNotExist(err) {
return err
}
return os.Rename(self.path+".new", self.path)
}
func (self *GlobalState) write(w io.Writer) error {
fmt.Fprintf(w, "%d\n", 1) // write the version
return gob.NewEncoder(w).Encode(self)
}
func (self *GlobalState) read(r *os.File) error {
// skip the version
reader := bufio.NewReader(r)
// read the version line
_, err := reader.ReadString('\n')
if err != nil {
return err
}
err = gob.NewDecoder(reader).Decode(self)
// from version 0.7 to 0.8 the type of the Suffix variables
// changed to uint32. Catch this and convert to a new GlobalState object.
if err != nil {
old := &oldGlobalState{}
r.Seek(int64(0), 0)
reader := bufio.NewReader(r)
// read the version line
_, err := reader.ReadString('\n')
if err != nil {
return err
}
err = gob.NewDecoder(reader).Decode(old)
if err != nil {
return err
}
self.CurrentFileOffset = old.CurrentFileOffset
self.CurrentFileSuffix = uint32(old.CurrentFileSuffix)
self.LargestRequestNumber = old.LargestRequestNumber
self.FirstSuffix = uint32(old.FirstSuffix)
self.ShardLastSequenceNumber = old.ShardLastSequenceNumber
self.ServerLastRequestNumber = old.ServerLastRequestNumber
}
return nil
}
func (self *GlobalState) recover(shardId uint32, sequenceNumber uint64) {
lastSequenceNumber := self.ShardLastSequenceNumber[shardId]
if sequenceNumber > lastSequenceNumber {
self.ShardLastSequenceNumber[shardId] = sequenceNumber
}
}
func (self *GlobalState) getNextRequestNumber() uint32 {
self.LargestRequestNumber++
return self.LargestRequestNumber
}
func (self *GlobalState) getCurrentSequenceNumber(shardId uint32) uint64 {
return self.ShardLastSequenceNumber[shardId]
}
func (self *GlobalState) setCurrentSequenceNumber(shardId uint32, sequenceNumber uint64) {
self.ShardLastSequenceNumber[shardId] = sequenceNumber
}
func (self *GlobalState) commitRequestNumber(serverId, requestNumber uint32) {
// TODO: we need a way to verify the request numbers, the following
// won't work though when the request numbers roll over
// if currentRequestNumber := self.ServerLastRequestNumber[serverId]; requestNumber < currentRequestNumber {
// panic(fmt.Errorf("Expected rn %d to be >= %d", requestNumber, currentRequestNumber))
// }
self.ServerLastRequestNumber[serverId] = requestNumber
}
func (self *GlobalState) LowestCommitedRequestNumber() uint32 {
requestNumber := uint32(math.MaxUint32)
for _, number := range self.ServerLastRequestNumber {
if number < requestNumber {
requestNumber = number
}
}
return requestNumber
}