Skip to content

Commit

Permalink
metrics: remove original from exported metrics (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
waltzofpearls committed Sep 1, 2021
1 parent c365e6b commit 9874deb
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 24 deletions.
19 changes: 9 additions & 10 deletions metric/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,6 @@ func (d *Delegate) LabelValues() []string {

func (d *Delegate) ValuesFrom(modelName string, nearest time.Time) map[model.Column]float64 {
values := make(map[model.Column]float64)
original := d.original.Values.Nearest(nearest)
if !original.IsZero() {
values[model.Original] = original.Value
}
if forecasts, exists := d.forecasts[modelName]; exists && len(forecasts) > 0 {
forecast := forecasts.Nearest(nearest.Unix())
if !forecast.IsZero() {
Expand All @@ -106,10 +102,11 @@ func (d *Delegate) Query() string {
}

func (d *Delegate) Train(ctx context.Context, module *python3.PyObject) {
logger := d.logger.With(zap.String("metric_name", d.original.Name), zap.Any("metric_labels", d.original.Labels))
logger := d.logger.With(zap.String("prom_url", d.config.PromClientURL),
zap.String("metric_name", d.original.Name), zap.Any("metric_labels", d.original.Labels))

if !d.sem.TryAcquire(1) {
logger.Info("skip training - metric delegate is already busy")
logger.Info("skip training - metric delegate is already busy", zap.String("metric", d.Key()))
return
}
defer d.sem.Release(1)
Expand All @@ -121,15 +118,17 @@ func (d *Delegate) Train(ctx context.Context, module *python3.PyObject) {
query := d.Query()
from := time.Now().UTC().Add(-duration)
to := time.Now().UTC()

fromField, toField := zap.Time("from", from), zap.Time("to", to)
logger.Info("query prometheus and fetch metrics data", fromField, toField)

data, err := d.client.GetMetricRangeData(ctx, query, from, to, 0)
if err != nil {
logger.Error("failed querying prometheus data range",
zap.String("query", query), zap.Time("from", from), zap.Time("to", to), zap.Error(err))
logger.Error("failed querying prometheus data range", fromField, toField, zap.Error(err))
return
}
if len(data) == 0 {
logger.Info("no data from range query",
zap.String("query", query), zap.Time("from", from), zap.Time("to", to), zap.Error(err))
logger.Info("no data from range query", fromField, toField, zap.Error(err))
return
}
if err := d.original.Append(data[0], d.config.RollingWindow); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion metric/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func NewScheduler(cf *config.Config, lg *zap.Logger, cl *prom.Client, st *Store)
func (s *Scheduler) Start(ctx context.Context, module *python3.PyObject) func() error {
return func() error {
s.store.ForEach(func(key string, delegate *Delegate) {
s.logger.Info("schedule initial model training")
s.logger.Info("schedule initial model training", zap.String("metric", key))
go func() {
delegate.Train(ctx, module)
s.store.Save(key, delegate)
Expand Down
1 change: 0 additions & 1 deletion model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ const (
Yhat Column = "yhat"
YhatUpper Column = "yhat_upper"
YhatLower Column = "yhat_lower"
Original Column = "original"
)

type Forecast struct {
Expand Down
19 changes: 10 additions & 9 deletions model/prophet.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ func NewProphet(lg *zap.Logger) Prophet {
}

func (p Prophet) Train(ctx context.Context, module *python3.PyObject, data prom.Metric, duration time.Duration) Forecasts {
p.logger.Info("train model with data", zap.Int("length", len(data.Values)), zap.String("duration", duration.String()))
logger := p.logger.With(zap.String("metric_name", data.Name), zap.Any("metric_labels", data.Labels))
logger.Info("train model with data", zap.Int("length", len(data.Values)), zap.String("duration", duration.String()))

runtime.LockOSThread()
s := python3.PyGILState_Ensure()
Expand All @@ -43,7 +44,7 @@ func (p Prophet) Train(ctx context.Context, module *python3.PyObject, data prom.
sampleValue.DecRef()
samplePair.DecRef()
argData.DecRef()
p.logger.Error("error setting sample")
logger.Error("error setting sample")
return nil
}
sampleTime = nil
Expand All @@ -56,7 +57,7 @@ func (p Prophet) Train(ctx context.Context, module *python3.PyObject, data prom.
if args == nil {
argData.DecRef()
argDuration.DecRef()
p.logger.Error("error creating args tuple")
logger.Error("error creating args tuple")
return nil
}
defer args.DecRef()
Expand All @@ -67,7 +68,7 @@ func (p Prophet) Train(ctx context.Context, module *python3.PyObject, data prom.
argData.DecRef()
argDuration.DecRef()
argData = nil
p.logger.Error("error setting args tuple argData")
logger.Error("error setting args tuple argData")
return nil
}
argData = nil
Expand All @@ -76,33 +77,33 @@ func (p Prophet) Train(ctx context.Context, module *python3.PyObject, data prom.
python3.PyErr_Print()
}
argDuration.DecRef()
p.logger.Error("error setting args tuple argDuration")
logger.Error("error setting args tuple argDuration")
return nil
}
argDuration = nil

dict := python3.PyModule_GetDict(module) // return value: borrowed
if !(dict != nil && python3.PyErr_Occurred() == nil) {
python3.PyErr_Print()
p.logger.Error("could not get dict for module")
logger.Error("could not get dict for module")
return nil
}
train := python3.PyDict_GetItemString(dict, "train")
if !(train != nil && python3.PyCallable_Check(train)) { // return value: borrowed
p.logger.Error("could not find function train()")
logger.Error("could not find function train()")
return nil
}
returned := train.CallObject(args)
if !(returned != nil && python3.PyErr_Occurred() == nil) { // return value: new reference
python3.PyErr_Print()
p.logger.Error("error calling function detect")
logger.Error("error calling function detect")
return nil
}
defer returned.DecRef()

forecasts, err := toForecastsList(returned)
if err != nil {
p.logger.Error("error converting python dict to go map", zap.Error(err))
logger.Error("error converting python dict to go map", zap.Error(err))
return nil
}

Expand Down
11 changes: 8 additions & 3 deletions prom/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,24 @@ func RegisterExporterEndpoints() {

func (e *Exporter) Start(ctx context.Context) func() error {
return func() error {
e.server = &http.Server{Addr: e.config.PromExporterAddr}
e.server = &http.Server{
Addr: e.config.PromExporterAddr,
}
addrField := zap.String("addr", e.config.PromExporterAddr)
e.logger.Info("starting prometheus exporter server...", addrField)
if err := e.server.ListenAndServe(); err != http.ErrServerClosed {
return fmt.Errorf("prometheus exporter server stopped: %w", err)
}
e.logger.Info("prometheus exporter server stopped")
e.logger.Info("prometheus exporter server stopped", addrField)
return nil
}
}

func (e *Exporter) Shutdown(ctx context.Context) func() error {
return func() error {
<-ctx.Done()
e.logger.Info("shutting down prometheus exporter server")
e.logger.Info("shutting down prometheus exporter server",
zap.String("addr", e.config.PromExporterAddr))

timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down

0 comments on commit 9874deb

Please sign in to comment.