forked from goadapp/goad
/
aggregation.go
131 lines (115 loc) · 4.31 KB
/
aggregation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package queue
import (
"time"
"github.com/aws/aws-sdk-go/aws"
)
// AggData type
type AggData struct {
TotalReqs int `json:"total-reqs"`
TotalTimedOut int `json:"total-timed-out"`
TotalConnectionError int `json:"total-conn-error"`
AveTimeToFirst int64 `json:"ave-time-to-first"`
TotBytesRead int `json:"tot-bytes-read"`
Statuses map[string]int `json:"statuses"`
AveTimeForReq int64 `json:"ave-time-for-req"`
AveReqPerSec float32 `json:"ave-req-per-sec"`
AveKBytesPerSec float32 `json:"ave-kbytes-per-sec"`
Slowest int64 `json:"slowest"`
Fastest int64 `json:"fastest"`
Region string `json:"region"`
FatalError string `json:"fatal-error"`
Finished bool `json:"finished"`
FinishedLambdas int `json:"finished-lambdas"`
}
// RegionsAggData type
type RegionsAggData struct {
Regions map[string]AggData
TotalExpectedRequests int
lambdasByRegion int
}
func (d *RegionsAggData) allRequestsReceived() bool {
var requests int
var finishedRegions int
for _, region := range d.Regions {
requests += region.TotalReqs
if region.FinishedLambdas == d.lambdasByRegion {
finishedRegions += 1
}
}
return d.TotalExpectedRequests > 0 && requests == d.TotalExpectedRequests || finishedRegions == len(d.Regions)
}
func addResult(data *AggData, result *AggData, isFinalSum bool) {
initCountOk := int64(data.TotalReqs - data.TotalTimedOut - data.TotalConnectionError)
addCountOk := int64(result.TotalReqs - result.TotalTimedOut - result.TotalConnectionError)
totalCountOk := initCountOk + addCountOk
data.TotalReqs += result.TotalReqs
data.TotalTimedOut += result.TotalTimedOut
data.TotalConnectionError += result.TotalConnectionError
if totalCountOk > 0 {
data.AveTimeToFirst = (data.AveTimeToFirst*initCountOk + result.AveTimeToFirst*addCountOk) / totalCountOk
data.AveTimeForReq = (data.AveTimeForReq*initCountOk + result.AveTimeForReq*addCountOk) / totalCountOk
if isFinalSum {
data.AveReqPerSec = data.AveReqPerSec + result.AveReqPerSec
data.AveKBytesPerSec = data.AveKBytesPerSec + result.AveKBytesPerSec
} else {
data.AveReqPerSec = (data.AveReqPerSec*float32(initCountOk) + result.AveReqPerSec*float32(addCountOk)) / float32(totalCountOk)
data.AveKBytesPerSec = (data.AveKBytesPerSec*float32(initCountOk) + result.AveKBytesPerSec*float32(addCountOk)) / float32(totalCountOk)
}
}
data.TotBytesRead += result.TotBytesRead
for key, value := range result.Statuses {
data.Statuses[key] += value
}
if result.Slowest > data.Slowest {
data.Slowest = result.Slowest
}
if result.Fastest > 0 && (data.Fastest == 0 || result.Fastest < data.Fastest) {
data.Fastest = result.Fastest
}
if result.Finished {
data.FinishedLambdas += 1
}
}
// SumRegionResults adds all the results together
func SumRegionResults(regionData *RegionsAggData) *AggData {
var totals AggData
totals.Statuses = make(map[string]int)
for _, data := range regionData.Regions {
addResult(&totals, &data, true)
}
return &totals
}
// Aggregate listens for results and sends totals, closing the channel when done
func Aggregate(awsConfig *aws.Config, queueURL string, totalExpectedRequests int, lambdasByRegion int) chan RegionsAggData {
results := make(chan RegionsAggData)
go aggregate(results, awsConfig, queueURL, totalExpectedRequests, lambdasByRegion)
return results
}
func aggregate(results chan RegionsAggData, awsConfig *aws.Config, queueURL string, totalExpectedRequests int, lambdasByRegion int) {
defer close(results)
data := RegionsAggData{make(map[string]AggData), totalExpectedRequests, lambdasByRegion}
adaptor := NewSQSAdapter(awsConfig, queueURL)
timeoutStart := time.Now()
for {
result := adaptor.Receive()
if result != nil {
regionData, ok := data.Regions[result.Region]
if !ok {
regionData.Statuses = make(map[string]int)
regionData.Region = result.Region
}
addResult(®ionData, result, false)
data.Regions[result.Region] = regionData
results <- data
if data.allRequestsReceived() {
break
}
timeoutStart = time.Now()
} else {
waited := time.Since(timeoutStart)
if waited.Seconds() > 20 {
break
}
}
}
}