forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
write_buffer.go
171 lines (151 loc) · 5.04 KB
/
write_buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package cluster
import (
"protocol"
"reflect"
"time"
log "code.google.com/p/log4go"
)
// Acts as a buffer for writes
type WriteBuffer struct {
writer Writer
wal WAL
serverId uint32
writes chan *protocol.Request
stoppedWrites chan uint32
bufferSize int
shardIds map[uint32]bool
shardLastRequestNumber map[uint32]uint32
shardCommitedRequestNumber map[uint32]uint32
writerInfo string
}
type Writer interface {
Write(request *protocol.Request) error
}
func NewWriteBuffer(writerInfo string, writer Writer, wal WAL, serverId uint32, bufferSize int) *WriteBuffer {
log.Info("%s: Initializing write buffer with buffer size of %d", writerInfo, bufferSize)
buff := &WriteBuffer{
writer: writer,
wal: wal,
serverId: serverId,
writes: make(chan *protocol.Request, bufferSize),
stoppedWrites: make(chan uint32, 1),
bufferSize: bufferSize,
shardIds: make(map[uint32]bool),
shardLastRequestNumber: map[uint32]uint32{},
shardCommitedRequestNumber: map[uint32]uint32{},
writerInfo: writerInfo,
}
go buff.handleWrites()
return buff
}
func (self *WriteBuffer) ShardsRequestNumber() map[uint32]uint32 {
return self.shardLastRequestNumber
}
func (self *WriteBuffer) HasUncommitedWrites() bool {
return !reflect.DeepEqual(self.shardCommitedRequestNumber, self.shardLastRequestNumber)
}
// This method never blocks. It'll buffer writes until they fill the buffer then drop the on the
// floor and let the background goroutine replay from the WAL
func (self *WriteBuffer) Write(request *protocol.Request) {
self.shardLastRequestNumber[request.GetShardId()] = request.GetRequestNumber()
select {
case self.writes <- request:
log.Debug("Buffering %d:%d for %s", request.GetRequestNumber(), request.GetShardId(), self.writerInfo)
return
default:
select {
case self.stoppedWrites <- *request.RequestNumber:
return
default:
return
}
}
}
func (self *WriteBuffer) handleWrites() {
for {
select {
case requestDropped := <-self.stoppedWrites:
self.replayAndRecover(requestDropped)
case request := <-self.writes:
self.write(request)
}
}
}
func (self *WriteBuffer) write(request *protocol.Request) {
attempts := 0
for {
self.shardIds[*request.ShardId] = true
err := self.writer.Write(request)
if err == nil {
requestNumber := request.RequestNumber
if requestNumber == nil {
return
}
self.shardCommitedRequestNumber[request.GetShardId()] = *requestNumber
log.Debug("Commiting %d:%d for %s", request.GetRequestNumber(), request.GetShardId(), self.writerInfo)
self.wal.Commit(*requestNumber, self.serverId)
return
}
if attempts%100 == 0 {
log.Error("%s: WriteBuffer: error on write to server %d: %s", self.writerInfo, self.serverId, err)
}
attempts += 1
// backoff happens in the writer, just sleep for a small fixed amount of time before retrying
time.Sleep(time.Millisecond * 100)
}
}
func (self *WriteBuffer) replayAndRecover(missedRequest uint32) {
var req *protocol.Request
// empty out the buffer before the replay so new writes can buffer while we're replaying
channelLen := len(self.writes)
// This is the first run through the replay. Start from the start of the write queue
for i := 0; i < channelLen; i++ {
r := <-self.writes
if req == nil {
req = r
}
}
if req == nil {
log.Error("%s: REPLAY: emptied channel, but no request set", self.writerInfo)
return
}
log.Debug("%s: REPLAY: Emptied out channel", self.writerInfo)
shardIds := make([]uint32, 0)
for shardId, _ := range self.shardIds {
shardIds = append(shardIds, shardId)
}
// while we're behind keep replaying from WAL
for {
log.Info("%s: REPLAY: Replaying dropped requests...", self.writerInfo)
log.Debug("%s: REPLAY: from request %d. Shards: %v", self.writerInfo, req.GetRequestNumber(), shardIds)
self.wal.RecoverServerFromRequestNumber(*req.RequestNumber, shardIds, func(request *protocol.Request, shardId uint32) error {
log.Debug("%s: REPLAY: writing request number: %d", self.writerInfo, request.GetRequestNumber())
req = request
request.ShardId = &shardId
self.write(request)
return nil
})
log.Info("%s: REPLAY: Emptying out reqeusts from buffer that we've already replayed", self.writerInfo)
RequestLoop:
for {
select {
case newReq := <-self.writes:
if *newReq.RequestNumber == *req.RequestNumber {
break RequestLoop
}
default:
log.Error("%s: REPLAY: Got to the end of the write buffer without getting to the last written request.", self.writerInfo)
break RequestLoop
}
}
log.Info("%s: REPLAY: done.", self.writerInfo)
// now make sure that no new writes were dropped. If so, do the replay again from this place.
select {
case <-self.stoppedWrites:
log.Info("%s: REPLAY: Buffer backed up while replaying, going again.", self.writerInfo)
continue
default:
return
}
}
}