-
-
Notifications
You must be signed in to change notification settings - Fork 2.2k
/
log_read.go
139 lines (119 loc) · 3.78 KB
/
log_read.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package log_buffer
import (
"bytes"
"fmt"
"time"
"google.golang.org/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
var (
ResumeError = fmt.Errorf("resume")
ResumeFromDiskError = fmt.Errorf("resumeFromDisk")
)
type MessagePosition struct {
time.Time // this is the timestamp of the message
BatchIndex int64 // this is only used when the timestamp is not enough to identify the next message, when the timestamp is in the previous batch.
}
func NewMessagePosition(tsNs int64, batchIndex int64) MessagePosition {
return MessagePosition{
Time: time.Unix(0, tsNs).UTC(),
BatchIndex: batchIndex,
}
}
func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition MessagePosition, stopTsNs int64,
waitForDataFn func() bool, eachLogDataFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error) {
// loop through all messages
var bytesBuf *bytes.Buffer
var batchIndex int64
lastReadPosition = startPosition
var entryCounter int64
defer func() {
if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf)
}
// println("LoopProcessLogData", readerName, "sent messages total", entryCounter)
}()
for {
if bytesBuf != nil {
logBuffer.ReleaseMemory(bytesBuf)
}
bytesBuf, batchIndex, err = logBuffer.ReadFromBuffer(lastReadPosition)
if err == ResumeFromDiskError {
time.Sleep(1127 * time.Millisecond)
return lastReadPosition, isDone, ResumeFromDiskError
}
readSize := 0
if bytesBuf != nil {
readSize = bytesBuf.Len()
}
glog.V(4).Infof("%s ReadFromBuffer at %v batch %d. Read bytes %v batch %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex)
if bytesBuf == nil {
if batchIndex >= 0 {
lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex)
}
if stopTsNs != 0 {
isDone = true
return
}
logBuffer.RLock()
lastTsNs := logBuffer.LastTsNs
logBuffer.RUnlock()
loopTsNs := lastTsNs // make a copy
for lastTsNs == loopTsNs {
if waitForDataFn() {
// Update loopTsNs and loop again
logBuffer.RLock()
loopTsNs = logBuffer.LastTsNs
logBuffer.RUnlock()
continue
} else {
isDone = true
return
}
}
if logBuffer.IsStopping() {
isDone = true
return
}
continue
}
buf := bytesBuf.Bytes()
// fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadPosition, len(buf))
batchSize := 0
for pos := 0; pos+4 < len(buf); {
size := util.BytesToUint32(buf[pos : pos+4])
if pos+4+int(size) > len(buf) {
err = ResumeError
glog.Errorf("LoopProcessLogData: %s read buffer %v read %d entries [%d,%d) from [0,%d)", readerName, lastReadPosition, batchSize, pos, pos+int(size)+4, len(buf))
return
}
entryData := buf[pos+4 : pos+4+int(size)]
logEntry := &filer_pb.LogEntry{}
if err = proto.Unmarshal(entryData, logEntry); err != nil {
glog.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
pos += 4 + int(size)
continue
}
if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
isDone = true
// println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs)
return
}
lastReadPosition = NewMessagePosition(logEntry.TsNs, batchIndex)
if isDone, err = eachLogDataFn(logEntry); err != nil {
glog.Errorf("LoopProcessLogData: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err)
return
}
if isDone {
glog.V(0).Infof("LoopProcessLogData2: %s process log entry %d", readerName, batchSize+1)
return
}
pos += 4 + int(size)
batchSize++
entryCounter++
}
glog.V(4).Infof("%s sent messages ts[%+v,%+v] size %d\n", readerName, startPosition, lastReadPosition, batchSize)
}
}