Skip to content

Commit

Permalink
metrics: register, record and expose reckon runtime metrics (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
waltzofpearls committed Sep 9, 2021
1 parent 4b08a07 commit 7a02009
Show file tree
Hide file tree
Showing 14 changed files with 1,046 additions and 107 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ jobs:
- name: Run go unit tests
run: |
export PKG_CONFIG_PATH=/opt/hostedtoolcache/Python/${{ matrix.python_version }}/x64/lib/pkgconfig:$PKG_CONFIG_PATH
go test -cover ./...
go test -cover -race ./...
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ run: venv build

.PHONY: test
test:
go test -cover ./...
go test -cover -race ./...

gen:
mockgen -package=mocks -mock_names=Logger=Logger \
Expand Down
65 changes: 57 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,25 @@ exposed from reckon:
```
# HELP sensehat_humidity_prophet Prophet forecasted metric value
# TYPE sensehat_humidity_prophet gauge
sensehat_humidity_prophet{column="original",instance="sensehat.rpi.topbass.studio:8000",job="sensehat_exporter"} 63.694435119628906
sensehat_humidity_prophet{column="yhat",instance="sensehat.rpi.topbass.studio:8000",job="sensehat_exporter"} 65.39311782093829
sensehat_humidity_prophet{column="yhat_lower",instance="sensehat.rpi.topbass.studio:8000",job="sensehat_exporter"} 64.17657873755101
sensehat_humidity_prophet{column="yhat_upper",instance="sensehat.rpi.topbass.studio:8000",job="sensehat_exporter"} 66.55323480537575
sensehat_humidity_prophet{column="yhat",instance="sensehat.rpi.topbass.studio:8000",job="sensehat_exporter"} 60.491915201576944
sensehat_humidity_prophet{column="yhat_lower",instance="sensehat.rpi.topbass.studio:8000",job="sensehat_exporter"} 59.233345022648194
sensehat_humidity_prophet{column="yhat_upper",instance="sensehat.rpi.topbass.studio:8000",job="sensehat_exporter"} 61.69595781236965
# HELP sensehat_temperature_prophet Prophet forecasted metric value
# TYPE sensehat_temperature_prophet gauge
sensehat_temperature_prophet{column="original",instance="sensehat.rpi.topbass.studio:8000",job="sensehat_exporter"} 28.072917938232422
sensehat_temperature_prophet{column="yhat",instance="sensehat.rpi.topbass.studio:8000",job="sensehat_exporter"} 27.972340899541923
sensehat_temperature_prophet{column="yhat_lower",instance="sensehat.rpi.topbass.studio:8000",job="sensehat_exporter"} 27.675004226891883
sensehat_temperature_prophet{column="yhat_upper",instance="sensehat.rpi.topbass.studio:8000",job="sensehat_exporter"} 28.28264431712968
sensehat_temperature_prophet{column="yhat",instance="sensehat.rpi.topbass.studio:8000",job="sensehat_exporter"} 26.479665209525724
sensehat_temperature_prophet{column="yhat_lower",instance="sensehat.rpi.topbass.studio:8000",job="sensehat_exporter"} 26.17373164147707
sensehat_temperature_prophet{column="yhat_upper",instance="sensehat.rpi.topbass.studio:8000",job="sensehat_exporter"} 26.767488257211966
```

Each metric configured in reckon will have its forecasted metric generated and exported in 3 dimensions (labels):

- `yhat`: forecasted value
- `yhat_upper` and `yhat_lower`: [uncertainty interval](https://en.wikipedia.org/wiki/Confidence_interval)

<p align="center">
<img alt="reckon+grafana" src="./docs/reckon+grafana.png">
</p>

## Try it

Gather the following info before start:
Expand Down Expand Up @@ -65,3 +72,45 @@ PROM_CLIENT_URL={prometheus_server_address} \
WATCH_LIST={comma_separated_metric_names} \
make run
```

## Configure it

Reckon can be configured with the following environment variables:

| Environment variable | Required? | Default value | Description |
| ---------------------------------- | :---------: | ------------------- | ----------------------------------------------- |
| `SCHEDULE` | Yes | `@every 120m` | schedule for model training |
| `TIMEZONE` | Yes | `America/Vancouver` | timezone for calculating schedule |
| `WATCH_LIST` | Yes | | list of metrics (comma separated) to scrape |
| `MODELS` | Yes | `Prophet` | ML models for training and forecasting |
| `PROM_EXPORTER_ADDR` | Yes | `:8080` | address for reckon to expose forecasted metrics |
| `PROM_CLIENT_URL` | Yes | | reckon will scrape metrics from this URL |
| `PROM_CLIENT_TLS_CA` | No | | CA cert if `PROM_CLIENT_URL` is https |
| `PROM_CLIENT_TLS_CERT` | No | | TLS cert if `PROM_CLIENT_URL` is https |
| `PROM_CLIENT_TLS_KEY` | No | | TLS key if `PROM_CLIENT_URL` is https |
| `PROM_CLIENT_INSECURE_SKIP_VERIFY` | No | | skip TLS verification on `PROM_CLIENT_URL` |
| `DEFAULT_CHUNK_SIZE` | Yes | `120m` | duration of original data to scrape |
| `ROLLING_WINDOW` | Yes | `72h` | duration of original data to keep for training |

## Monitor it

In addition to forecasted metrics, reckon also expose runtime metrics to help monitor reckon itself.

| Metric | Type | Description |
| ---------------------------------------------- | ------- | ---------------------------------------------------------------- |
| `reckon_prometheus_client_scrape_time_seconds` | Gauge | timestamp of the last prometheus client scrape |
| `reckon_exporter_scraped_time_seconds` | Gauge | timestamp of the last time reckon exporter scraped by prometheus |
| `reckon_model_train_time_seconds` | Gauge | timestamp of the last reckon model training |
| `reckon_forecast_data_received_time_seconds` | Gauge | timestamp of the last time receiving forecast data |
| `reckon_model_train_total` | Counter | number of times calling model train |
| `reckon_model_train_duration_seconds` | Gauge | time taken in seconds from the last model training |
| `reckon_model_train_errors_total` | Counter | number of model training errors |
| `reckon_prometheus_client_scrape_errors_total` | Counter | prometheus client scraping errors |
| `reckon_exporter_scraped_total` | Counter | number of times reckon exporter scraped by prometheus |
| `reckon_prometheus_client_scrape_total` | Counter | number of prometheus client scrape |
| `reckon_data_scraped_duration_minutes` | Gauge | duration of data scraped from prometheus |
| `reckon_forecast_data_duration_minutes` | Gauge | duration of data being kept in memory |
| `reckon_training_data_duration_minutes` | Gauge | duration of data sent to model for training |
| `reckon_data_scraped_values` | Gauge | number of the last scraped data points |
| `reckon_forecast_data_values` | Gauge | number of the existing data points kept in memory |
| `reckon_training_data_values` | Gauge | number of data points recently sent to model for training |
Binary file added docs/reckon+grafana.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
38 changes: 27 additions & 11 deletions metric/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,43 @@ func NewCollector(cf *config.Config, lg *zap.Logger, st *Store) prometheus.Colle
}

func (c *Collector) Describe(ch chan<- *prometheus.Desc) {
c.logger.Info("describe prometheus metrics")
c.store.ForEach(func(key string, delegate *Delegate) {
for _, desc := range delegate.Descs() {
c.logger.Info("describe reckon exporter metrics")
c.store.ForEach(func(key string, del *delegate) {
for _, desc := range del.descs {
ch <- desc
}
})
}

func (c *Collector) Collect(ch chan<- prometheus.Metric) {
c.logger.Info("collect prometheus metrics")
c.store.ForEach(func(key string, delegate *Delegate) {
for modelName, desc := range delegate.Descs() {
values := delegate.ValuesFrom(modelName, time.Now().In(c.config.Location()))
for column, value := range values {
c.logger.Info("scrape reckon exporter metrics")
c.store.ForEach(func(key string, del *delegate) {
for modelOrRuntimeMetric, desc := range del.descs {
if runtimeMetric, exists := del.runtimeRegistry[modelOrRuntimeMetric]; exists {
// desc is a runtime metric
// modelOrRuntimeMetric = {model_name}::{runtime_metric_name}
ch <- prometheus.MustNewConstMetric(
desc,
prometheus.GaugeValue,
value,
append(delegate.LabelValues(), string(column))...,
runtimeMetric.typ,
runtimeMetric.into(),
append(del.labelValues(), runtimeMetric.labels()...)...,
)
} else {
// desc is a forecast metric
modelName := modelOrRuntimeMetric
values := del.valuesFrom(modelName, time.Now().In(c.config.Location()))
for column, value := range values {
ch <- prometheus.MustNewConstMetric(
desc,
prometheus.GaugeValue,
value,
append(del.labelValues(), string(column))...,
)
}
}
}
del.runtimeRegistry.nowAll(del.modelNames, "reckon_exporter_scraped_time_seconds")
del.runtimeRegistry.incAll(del.modelNames, "reckon_exporter_scraped_total")
c.store.Save(key, del)
})
}
84 changes: 59 additions & 25 deletions metric/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"golang.org/x/sync/semaphore"
)

type Delegate struct {
type delegate struct {
logger *zap.Logger
config *config.Config
client *prom.Client
Expand All @@ -30,9 +30,12 @@ type Delegate struct {
descs map[string]*prometheus.Desc
models map[string]model.Trainer
forecasts map[string]model.Forecasts

modelNames []string
runtimeRegistry runtimeRegistry
}

func NewDelegate(lg *zap.Logger, cf *config.Config, cl *prom.Client, data prom.Metric) *Delegate {
func newDelegate(lg *zap.Logger, cf *config.Config, cl *prom.Client, data prom.Metric) *delegate {
labels := data.LabelNames()
descs := make(map[string]*prometheus.Desc)
models := make(map[string]model.Trainer)
Expand All @@ -51,33 +54,42 @@ func NewDelegate(lg *zap.Logger, cf *config.Config, cl *prom.Client, data prom.M
)
models[modelName] = mod
}
return &Delegate{
logger: lg,
config: cf,
client: cl,
original: data,
labels: labels,
key: data.Key(),
sem: semaphore.NewWeighted(1),
registry := newRuntimeRegistry(data.Name, cf.Models)
for key, metric := range registry {
descs[key] = prometheus.NewDesc(
metric.metric,
metric.help,
append(labels, "original_metric", "reckon_model", "reckon_host"),
nil,
)
}
return &delegate{
logger: lg,
config: cf,
client: cl,

original: data,
labels: labels,
key: data.Key(),

sem: semaphore.NewWeighted(1),

descs: descs,
models: models,
forecasts: make(map[string]model.Forecasts),
}
}

func (d *Delegate) Key() string {
return d.key
}

func (d *Delegate) Descs() map[string]*prometheus.Desc {
return d.descs
modelNames: cf.Models,
// runtime metrics from each delegate
// record number of model trainings, errors, durations and timestamps
runtimeRegistry: registry,
}
}

func (d *Delegate) LabelValues() []string {
func (d *delegate) labelValues() []string {
return d.original.LabelValuesFor(d.labels)
}

func (d *Delegate) ValuesFrom(modelName string, nearest time.Time) map[model.Column]float64 {
func (d *delegate) valuesFrom(modelName string, nearest time.Time) map[model.Column]float64 {
values := make(map[model.Column]float64)
if forecasts, exists := d.forecasts[modelName]; exists && len(forecasts) > 0 {
forecast := forecasts.Nearest(nearest.Unix())
Expand All @@ -90,7 +102,7 @@ func (d *Delegate) ValuesFrom(modelName string, nearest time.Time) map[model.Col
return values
}

func (d *Delegate) Query() string {
func (d *delegate) query() string {
if len(d.original.Labels) == 0 {
return d.original.Name
}
Expand All @@ -101,12 +113,12 @@ func (d *Delegate) Query() string {
return fmt.Sprintf("%s{%s}", d.original.Name, strings.Join(labels, ","))
}

func (d *Delegate) Train(ctx context.Context, module *python3.PyObject) {
func (d *delegate) train(ctx context.Context, module *python3.PyObject) {
logger := d.logger.With(zap.String("prom_url", d.config.PromClientURL),
zap.String("metric_name", d.original.Name), zap.Any("metric_labels", d.original.Labels))

if !d.sem.TryAcquire(1) {
logger.Info("skip training - metric delegate is already busy", zap.String("metric", d.Key()))
logger.Info("skip training - metric delegate is already busy", zap.String("metric", d.key))
return
}
defer d.sem.Release(1)
Expand All @@ -115,27 +127,34 @@ func (d *Delegate) Train(ctx context.Context, module *python3.PyObject) {
if len(d.forecasts) == 0 { // initial run
duration = d.config.RollingWindow
}
query := d.Query()
query := d.query()
from := time.Now().UTC().Add(-duration)
to := time.Now().UTC()

fromField, toField := zap.Time("from", from), zap.Time("to", to)
logger.Info("query prometheus and fetch metrics data", fromField, toField)
d.runtimeRegistry.nowAll(d.modelNames, "reckon_prometheus_client_scrape_time_seconds")
d.runtimeRegistry.incAll(d.modelNames, "reckon_prometheus_client_scrape_total")
d.runtimeRegistry.setAll(d.modelNames, "reckon_data_scraped_duration_minutes", duration.Minutes())

data, err := d.client.GetMetricRangeData(ctx, query, from, to, 0)
if err != nil {
logger.Error("failed querying prometheus data range", fromField, toField, zap.Error(err))
d.runtimeRegistry.incAll(d.modelNames, "reckon_prometheus_client_scrape_errors_total")
return
}
if len(data) == 0 {
logger.Info("no data from range query", fromField, toField, zap.Error(err))
return
}
d.runtimeRegistry.setAll(d.modelNames, "reckon_data_scraped_values", float64(len(data[0].Values)))
if err := d.original.Append(data[0], d.config.RollingWindow); err != nil {
logger.Error("failed appending new data to original metric data",
zap.String("other_name", data[0].Name), zap.Any("other_labels", data[0].Labels), zap.Error(err))
d.runtimeRegistry.incAll(d.modelNames, "reckon_prometheus_client_scrape_errors_total")
return
}
d.runtimeRegistry.setAll(d.modelNames, "reckon_training_data_values", float64(len(d.original.Values)))

var wg sync.WaitGroup
for modelName, mod := range d.models {
Expand All @@ -144,12 +163,27 @@ func (d *Delegate) Train(ctx context.Context, module *python3.PyObject) {
zap.Int("length", len(d.original.Values)), zap.String("rolling_window", d.config.RollingWindow.String()),
zap.String("data_range", duration.String()), zap.String("start", d.original.Start().String()),
zap.String("end", d.original.End().String()))
d.runtimeRegistry.now(modelName, "reckon_model_train_time_seconds")
d.runtimeRegistry.inc(modelName, "reckon_model_train_total")
d.runtimeRegistry.set(modelName, "reckon_training_data_duration_minutes", duration.Minutes())
now := time.Now()
wg.Add(1)
go func(modelName string, mod model.Trainer) {
defer wg.Done()
d.forecasts[modelName] = mod.Train(ctx, module, d.original, duration)
defer func() {
d.runtimeRegistry.set(modelName, "reckon_model_train_duration_seconds", time.Since(now).Seconds())
}()
d.forecasts[modelName], err = mod.Train(ctx, module, d.original, duration)
if err != nil {
logger.Error("unable to train model", zap.Error(err))
d.runtimeRegistry.inc(modelName, "reckon_model_train_errors_total")
return
}
logger.Info("received forecasted data", zap.Int("length", len(d.forecasts[modelName])))
d.runtimeRegistry.set(modelName, "reckon_forecast_data_duration_minutes", duration.Minutes())
d.runtimeRegistry.set(modelName, "reckon_forecast_data_values", float64(len(d.forecasts[modelName])))
}(modelName, mod)
d.runtimeRegistry.now(modelName, "reckon_forecast_data_received_time_seconds")
}
wg.Wait()
}
Loading

0 comments on commit 7a02009

Please sign in to comment.