Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zgxme committed Jan 18, 2022
1 parent b44b062 commit 15e4feb
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 17 deletions.
25 changes: 21 additions & 4 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@ import (
"fmt"
"log"
"net/http"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rickzhen/hey/snapshot"
"github.com/rickzhen/hey/utils"
)

var rwlock sync.RWMutex

type Miner struct {
stopChan chan struct{}
inerval time.Duration
host string
port int
start time.Duration
snapshot snapshot.Report
snapshot *snapshot.Report

average prometheus.Gauge
}

func NewMiner() *Miner {
Expand All @@ -26,9 +32,15 @@ func NewMiner() *Miner {

func (m *Miner) Init() {
m.stopChan = make(chan struct{})
m.inerval = 1
m.inerval = 500
m.host = "localhost"
m.port = 1010
m.snapshot = &snapshot.Report{}
m.average = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "hey_average",
Help: "hey average"},
)
prometheus.MustRegister(m.average)
}

func (m *Miner) Run() {
Expand All @@ -43,11 +55,14 @@ func (m *Miner) Run() {
}
}()
for {
rwlock.RLock()
//TODO add prometheus report
//parse snapshot
m.average.Set(m.snapshot.Average)
fmt.Printf("snap: +%#v\n", m.snapshot)
// fmt.Printf("total: %f\n", m.snapshot.Fastest)
time.Sleep(time.Second * m.inerval)
rwlock.RUnlock()
time.Sleep(time.Millisecond * m.inerval)
select {
case <-m.stopChan:
return
Expand All @@ -61,6 +76,8 @@ func (m *Miner) Stop() {
m.stopChan <- struct{}{}
}

func (m *Miner) SetSnapshot(snap snapshot.Report) {
func (m *Miner) SetSnapshot(snap *snapshot.Report) {
rwlock.Lock()
m.snapshot = snap
rwlock.Unlock()
}
59 changes: 59 additions & 0 deletions requester/miner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package requester

import "github.com/rickzhen/hey/snapshot"

func (w *Work) runMiner() {
// cap := min(w.N, maxRes)
snapshot := &snapshot.Report{
StatusCodes: make([]int, 0),
}
var avgTotal float64
var avgConn float64
var avgDelay float64
var avgDNS float64
var avgReq float64
var avgRes float64
var succNum int64
for res := range w.snapshots {
//TODO refresh total
snapshot.NumRes += 1
if res.err != nil {
// snapshot.ErrorDist[res.err.Error()]++
continue
} else {
avgTotal += res.duration.Seconds()
avgConn += res.connDuration.Seconds()
avgDelay += res.delayDuration.Seconds()
avgDNS += res.dnsDuration.Seconds()
avgReq += res.reqDuration.Seconds()
avgRes += res.resDuration.Seconds()
succNum += 1
}
if res.contentLength > 0 {
snapshot.SizeTotal += res.contentLength
}
snapshot.Total = now() - w.start
snapshot.Rps = float64(snapshot.NumRes) / snapshot.Total.Seconds()
snapshot.Average = avgTotal / float64(succNum)
snapshot.AvgConn = avgConn / float64(succNum)
snapshot.AvgDelay = avgDelay / float64(succNum)
snapshot.AvgDNS = avgDNS / float64(succNum)
snapshot.AvgReq = avgReq / float64(succNum)
snapshot.AvgRes = avgRes / float64(succNum)
snapshot.SizeReq = snapshot.SizeTotal / succNum
w.miner.SetSnapshot(snapshot)
}
}

// func fmax(a, b float64) float64 {
// if a > b {
// return a
// }
// return b
// }
// func fmin(a, b float64) float64 {
// if a < b {
// return a
// }
// return b
// }
20 changes: 7 additions & 13 deletions requester/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type Work struct {

initOnce sync.Once
results chan *result
snapshots chan struct{}
snapshots chan *result
stopCh chan struct{}
start time.Duration

Expand All @@ -114,7 +114,7 @@ func (b *Work) writer() io.Writer {
func (b *Work) Init() {
b.initOnce.Do(func() {
b.results = make(chan *result, min(b.C*1000, maxResult))
b.snapshots = make(chan struct{}, min(b.C*1000, maxResult))
b.snapshots = make(chan *result, min(b.C*1000, maxResult))
b.stopCh = make(chan struct{}, b.C)
})
}
Expand All @@ -128,10 +128,10 @@ func (b *Work) Run() {
b.miner = metrics.NewMiner()
// Run the reporter first, it polls the result channel until it is closed.
go func() {
b.runMiner()
b.miner.Run()
}()
go func() {
b.miner.Run()
b.runMiner()
}()
go func() {
runReporter(b.report)
Expand Down Expand Up @@ -204,7 +204,7 @@ func (b *Work) makeRequest(c *http.Client) {
t := now()
resDuration = t - resStart
finish := t - s
b.results <- &result{
r := &result{
offset: s,
statusCode: code,
duration: finish,
Expand All @@ -216,7 +216,8 @@ func (b *Work) makeRequest(c *http.Client) {
resDuration: resDuration,
delayDuration: delayDuration,
}
b.snapshots <- struct{}{}
b.results <- r
b.snapshots <- r
}

func (b *Work) runWorker(client *http.Client, n int) {
Expand Down Expand Up @@ -298,10 +299,3 @@ func min(a, b int) int {
}
return b
}

func (w *Work) runMiner() {
for range w.snapshots {
//TODO refresh total
w.miner.SetSnapshot(w.report.snapshot())
}
}
8 changes: 8 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package utils

func min(a, b int) int {
if a < b {
return a
}
return b
}

0 comments on commit 15e4feb

Please sign in to comment.