-
Notifications
You must be signed in to change notification settings - Fork 59
/
statsmgr.go
145 lines (133 loc) · 3.61 KB
/
statsmgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package stats
import (
"fmt"
"time"
"github.com/vesoft-inc/nebula-importer/v3/pkg/base"
"github.com/vesoft-inc/nebula-importer/v3/pkg/config"
"github.com/vesoft-inc/nebula-importer/v3/pkg/logger"
"github.com/vesoft-inc/nebula-importer/v3/pkg/reader"
)
type StatsMgr struct {
OutputStatsCh chan Stats
StatsCh chan base.Stats
DoneCh chan bool
Stats Stats
Done bool
CountFileDone bool
runnerLogger *logger.RunnerLogger
}
type Stats struct {
NumFailed int64 `json:"numFailed"`
NumReadFailed int64 `json:"numReadFailed"`
TotalCount int64 `json:"totalCount"`
TotalBatches int64 `json:"totalBatches"`
TotalLatency int64 `json:"totalLatency"`
TotalReqTime int64 `json:"totalReqTime"`
TotalBytes int64 `json:"totalBytes"`
TotalImportedBytes int64 `json:"totalImportedBytes"`
}
func NewStatsMgr(files []*config.File, runnerLogger *logger.RunnerLogger) *StatsMgr {
numReadingFiles := len(files)
stats := Stats{
NumFailed: 0,
TotalBytes: 0,
TotalCount: 0,
TotalLatency: 0,
TotalBatches: 0,
TotalReqTime: 0.0,
}
m := StatsMgr{
OutputStatsCh: make(chan Stats),
StatsCh: make(chan base.Stats),
DoneCh: make(chan bool),
Stats: stats,
runnerLogger: runnerLogger,
}
go m.startWorker(numReadingFiles)
return &m
}
func (s *StatsMgr) Close() {
s.runnerLogger.Infof("Stats manager closing")
close(s.StatsCh)
close(s.DoneCh)
close(s.OutputStatsCh)
s.Done = true
s.runnerLogger.Infof("Stats manager closed")
}
func (s *StatsMgr) updateStat(stat base.Stats) {
s.Stats.TotalBatches++
s.Stats.TotalCount += int64(stat.BatchSize)
s.Stats.TotalReqTime += stat.ReqTime
s.Stats.TotalLatency += stat.Latency
s.Stats.TotalImportedBytes += stat.ImportedBytes
}
func (s *StatsMgr) updateFailed(stat base.Stats) {
s.Stats.TotalBatches++
s.Stats.TotalCount += int64(stat.BatchSize)
s.Stats.NumFailed += int64(stat.BatchSize)
s.Stats.TotalImportedBytes += stat.ImportedBytes
}
func (s *StatsMgr) outputStats() {
s.OutputStatsCh <- s.Stats
}
func (s *StatsMgr) print(prefix string, now time.Time) {
if s.Stats.TotalCount == 0 {
return
}
secs := time.Since(now).Seconds()
avgLatency := s.Stats.TotalLatency / s.Stats.TotalBatches
avgReq := s.Stats.TotalReqTime / s.Stats.TotalBatches
rps := float64(s.Stats.TotalCount) / secs
logger.Log.Infof("%s: Time(%.2fs), Finished(%d), Failed(%d), Read Failed(%d), Latency AVG(%dus), Batches Req AVG(%dus), Rows AVG(%.2f/s)",
prefix, secs, s.Stats.TotalCount, s.Stats.NumFailed, s.Stats.NumReadFailed, avgLatency, avgReq, rps)
}
func (s *StatsMgr) CountFileBytes(freaders []*reader.FileReader) error {
if s.CountFileDone {
return nil
}
s.Stats.TotalBytes = 0
for _, r := range freaders {
if r == nil {
continue
}
bytes, err := r.DataReader.TotalBytes()
if err != nil {
return err
}
s.Stats.TotalBytes += bytes
}
s.CountFileDone = true
return nil
}
func (s *StatsMgr) startWorker(numReadingFiles int) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
now := time.Now()
for {
select {
case <-ticker.C:
s.print("Tick", now)
case stat, ok := <-s.StatsCh:
if !ok {
return
}
switch stat.Type {
case base.SUCCESS:
s.updateStat(stat)
case base.FAILURE:
s.updateFailed(stat)
case base.FILEDONE:
s.print(fmt.Sprintf("Done(%s)", stat.Filename), now)
numReadingFiles--
s.runnerLogger.Infof("Remaining read files %d", numReadingFiles)
if numReadingFiles == 0 {
s.DoneCh <- true
}
case base.OUTPUT:
s.outputStats()
default:
logger.Log.Errorf("Error stats type: %s", stat.Type)
}
}
}
}