Skip to content

Commit

Permalink
Merge pull request #51 from bitleak/feature/pusher-metrics
Browse files Browse the repository at this point in the history
ADD: pusher metrics
  • Loading branch information
git-hulk committed Sep 2, 2020
2 parents 36adb63 + 0881ac4 commit 54b1cfd
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 1 deletion.
1 change: 1 addition & 0 deletions push/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func Setup(conf *config.Config, logger *logrus.Logger) error {
if cli.Ping().Err() != nil {
return errors.New("can not connect to admin redis")
}
setupMetrics()
_manager, err = NewManger(cli, logger)
return err
}
Expand Down
53 changes: 53 additions & 0 deletions push/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package push

import "github.com/prometheus/client_golang/prometheus"

type PerformanceMetrics struct {
ConsumeLatencies *prometheus.SummaryVec
PushLatencies *prometheus.SummaryVec
PushHTTPCodes *prometheus.CounterVec
}

var metrics *PerformanceMetrics

func setupMetrics() {
metrics = &PerformanceMetrics{}
consumeLatencies := prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: "infra",
Subsystem: "lmstfy_pusher",
Name: "consume_latency_milliseconds",
Help: "latencies",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.001},
},
[]string{"pool", "namespace", "queue"},
)

pushLatencies := prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: "infra",
Subsystem: "lmstfy_pusher",
Name: "push_latency_milliseconds",
Help: "push http latencies",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.001},
},
[]string{"pool", "namespace", "queue"},
)

httpCodes := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "infra",
Subsystem: "lmstfy_pusher",
Name: "push_http_codes",
Help: "push response code",
},
[]string{"pool", "namespace", "queue", "code"},
)

prometheus.MustRegister(consumeLatencies)
prometheus.MustRegister(pushLatencies)
prometheus.MustRegister(httpCodes)
metrics.ConsumeLatencies = consumeLatencies
metrics.PushLatencies = pushLatencies
metrics.PushHTTPCodes = httpCodes
}
22 changes: 21 additions & 1 deletion push/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io/ioutil"
"net/http"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -75,6 +76,7 @@ func (p *Pusher) pollQueue() {

engine := engine.GetEngine(p.Pool)
for {
now := time.Now()
job, err := engine.ConsumeByPush(p.Namespace, p.Queue, p.Timeout, 3)
if err != nil {
p.logger.WithFields(logrus.Fields{
Expand All @@ -86,6 +88,10 @@ func (p *Pusher) pollQueue() {
}
select {
case p.jobCh <- job:
metrics.ConsumeLatencies.WithLabelValues(
p.Pool,
p.Namespace,
p.Queue).Observe(time.Since(now).Seconds() * 1000)
/* do nothing */
case <-p.stopCh:
p.logger.WithFields(logrus.Fields{
Expand Down Expand Up @@ -151,6 +157,19 @@ func (p *Pusher) startWorker(num int) {
}

func (p *Pusher) sendJobToUser(job engine.Job) error {
var statusCode int
defer func(t time.Time) {
metrics.PushLatencies.WithLabelValues(
p.Pool,
p.Namespace,
p.Queue).Observe(time.Since(t).Seconds() * 1000)
metrics.PushHTTPCodes.WithLabelValues(
p.Pool,
p.Namespace,
p.Queue,
strconv.Itoa(statusCode),
).Inc()
}(time.Now())
jobBytes, _ := job.MarshalText()
req, err := http.NewRequest(http.MethodPost, p.Endpoint, bytes.NewReader(jobBytes))
if err != nil {
Expand All @@ -163,7 +182,8 @@ func (p *Pusher) sendJobToUser(job engine.Job) error {
}
ioutil.ReadAll(resp.Body)
resp.Body.Close()
if resp.StatusCode >= http.StatusOK && resp.StatusCode < http.StatusMultipleChoices {
statusCode = resp.StatusCode
if statusCode >= http.StatusOK && statusCode < http.StatusMultipleChoices {
e := engine.GetEngine(p.Pool)
return e.Delete(p.Namespace, p.Queue, job.ID())
}
Expand Down

0 comments on commit 54b1cfd

Please sign in to comment.