Skip to content

Commit

Permalink
report worker metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
henrod committed Oct 2, 2018
1 parent 66f9926 commit b34b3b4
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 3 deletions.
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,7 @@
[[constraint]]
name = "github.com/prometheus/client_golang"
version = "0.8.0"

[[override]]
name = "github.com/topfreegames/go-workers"
branch = "feature/stats"
4 changes: 4 additions & 0 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,10 @@ func initSysRemotes() {
func periodicMetrics() {
period := app.config.GetDuration("pitaya.metrics.periodicMetrics.period")
go metrics.ReportSysMetrics(app.metricsReporters, period)

if app.worker.Started() {
go worker.Report(app.metricsReporters, period)
}
}

// Start starts the app
Expand Down
6 changes: 6 additions & 0 deletions metrics/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,10 @@ var (
HeapSize = "heapsize"
// HeapObjects reports the number of allocated heap objects
HeapObjects = "heapobjects"
// WorkerJobsTotal reports the number of executed jobs
WorkerJobsTotal = "worker_jobs_total"
// WorkerJobsRetry reports the number of retried jobs
WorkerJobsRetry = "worker_jobs_retry_total"
// WorkerQueueSize reports the queue size on worker
WorkerQueueSize = "worker_queue_size"
)
33 changes: 33 additions & 0 deletions metrics/prometheus_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,39 @@ func (p *PrometheusReporter) registerMetrics(
additionalLabelsKeys,
)

p.gaugeReportersMap[WorkerJobsRetry] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pitaya",
Subsystem: "worker",
Name: WorkerJobsRetry,
Help: "the current number of job retries",
ConstLabels: constLabels,
},
additionalLabelsKeys,
)

p.gaugeReportersMap[WorkerQueueSize] = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pitaya",
Subsystem: "worker",
Name: WorkerQueueSize,
Help: "the current queue size",
ConstLabels: constLabels,
},
append([]string{"queue"}, additionalLabelsKeys...),
)

p.countReportersMap[WorkerJobsTotal] = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "pitaya",
Subsystem: "worker",
Name: WorkerJobsTotal,
Help: "the total executed jobs",
ConstLabels: constLabels,
},
append([]string{"status"}, additionalLabelsKeys...),
)

toRegister := make([]prometheus.Collector, 0)
for _, c := range p.countReportersMap {
toRegister = append(toRegister, c)
Expand Down
62 changes: 62 additions & 0 deletions worker/report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package worker

import (
"strconv"
"time"

"github.com/topfreegames/pitaya/logger"
"github.com/topfreegames/pitaya/metrics"

workers "github.com/topfreegames/go-workers"
)

// Report sends periodic of worker reports
func Report(reporters []metrics.Reporter, period time.Duration) {
for {
time.Sleep(period)

workerStats := workers.GetStats()
for _, r := range reporters {
reportJobsRetry(r, workerStats.Retries)
reportQueueSizes(r, workerStats.Enqueued)
reportJobsTotal(r, workerStats.Failed, workerStats.Processed)
}
}
}

func reportJobsRetry(r metrics.Reporter, retries int64) {
err := r.ReportGauge(metrics.WorkerJobsRetry, map[string]string{}, float64(retries))
checkReportErr(metrics.WorkerJobsRetry, err)
}

func reportQueueSizes(r metrics.Reporter, queues map[string]string) {
for queue, size := range queues {
tags := map[string]string{"queue": queue}
sizeFlt, err := strconv.ParseFloat(size, 64)
if err != nil {
logger.Log.Errorf("queue size is not int: queue=%s size=%s", queue, size)
continue
}

err = r.ReportGauge(metrics.WorkerQueueSize, tags, sizeFlt)
checkReportErr(metrics.WorkerQueueSize, err)
}
}

func reportJobsTotal(r metrics.Reporter, failed, processed int) {
err := r.ReportCount(metrics.WorkerJobsTotal, map[string]string{
"status": "failed",
}, float64(failed))
checkReportErr(metrics.WorkerJobsTotal, err)

err = r.ReportCount(metrics.WorkerJobsTotal, map[string]string{
"status": "ok",
}, float64(processed))
checkReportErr(metrics.WorkerJobsTotal, err)
}

func checkReportErr(metric string, err error) {
if err != nil {
logger.Log.Errorf("failed to report to %s: %q\n", metric, err)
}
}
87 changes: 87 additions & 0 deletions worker/report_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package worker

import (
"errors"
"testing"

"github.com/golang/mock/gomock"
"github.com/topfreegames/pitaya/metrics"
"github.com/topfreegames/pitaya/metrics/mocks"
)

func TestReportJobsRetry(t *testing.T) {
t.Parallel()

t.Run("success", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockReporter := mocks.NewMockReporter(ctrl)
mockReporter.EXPECT().ReportGauge(
metrics.WorkerJobsRetry,
map[string]string{},
float64(10))

reportJobsRetry(mockReporter, 10)
})

t.Run("error", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockReporter := mocks.NewMockReporter(ctrl)
mockReporter.EXPECT().ReportGauge(
metrics.WorkerJobsRetry,
map[string]string{},
float64(10)).Return(errors.New("err"))

reportJobsRetry(mockReporter, 10)
})
}

func TestReportQueueSizes(t *testing.T) {
t.Parallel()

var (
queue1 = "queuename1"
queue2 = "queuename2"
)

ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockReporter := mocks.NewMockReporter(ctrl)
mockReporter.EXPECT().ReportGauge(
metrics.WorkerQueueSize,
map[string]string{"queue": queue1},
float64(10))
mockReporter.EXPECT().ReportGauge(
metrics.WorkerQueueSize,
map[string]string{"queue": queue2},
float64(20))

reportQueueSizes(mockReporter, map[string]string{
queue1: "10",
queue2: "20",
})
}

func TestReportJobsTotal(t *testing.T) {
t.Parallel()

ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockReporter := mocks.NewMockReporter(ctrl)
mockReporter.EXPECT().ReportCount(
metrics.WorkerJobsTotal,
map[string]string{"status": "failed"},
float64(10))

mockReporter.EXPECT().ReportCount(
metrics.WorkerJobsTotal,
map[string]string{"status": "ok"},
float64(20))

reportJobsTotal(mockReporter, 10, 20)
}
9 changes: 9 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Worker struct {
concurrency int
registered bool
opts *EnqueueOpts
config *config.Config
started bool
}

// NewWorker configures and returns a *Worker
Expand All @@ -56,6 +58,7 @@ func NewWorker(config *config.Config) (*Worker, error) {
return &Worker{
concurrency: config.GetInt("pitaya.worker.concurrency"),
opts: NewEnqueueOpts(config),
config: config,
}, nil
}

Expand All @@ -67,6 +70,12 @@ func (w *Worker) SetLogger(logger Logger) {
// Start starts worker in another gorotine
func (w *Worker) Start() {
go workers.Start()
w.started = true
}

// Started returns true if worker was started
func (w *Worker) Started() bool {
return w != nil && w.started
}

// EnqueueRPC enqueues rpc job to worker
Expand Down

0 comments on commit b34b3b4

Please sign in to comment.