Skip to content

Commit

Permalink
Merge branch 'add-sink-metric' of https://github.com/odpf/meteor into…
Browse files Browse the repository at this point in the history
… add-sink-metric
  • Loading branch information
GrayFlash committed Jun 24, 2022
2 parents 7c501d5 + 7ea3050 commit 7738293
Showing 1 changed file with 12 additions and 11 deletions.
23 changes: 12 additions & 11 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,13 @@ func (r *Agent) setupProcessor(ctx context.Context, pr recipe.PluginRecipe, str

func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *stream, recipe recipe.Recipe) (err error) {
var sink plugins.Syncer
success := true
pluginType := "sink"

if sink, err = r.sinkFactory.Get(sr.Name); err != nil {
return errors.Wrapf(err, "could not find sink \"%s\"", sr.Name)
}
if err = sink.Init(ctx, sr.Config); err != nil {
return errors.Wrapf(err, "could not initiate sink \"%s\"", sr.Name)
}

retryNotification := func(e error, d time.Duration) {
r.logger.Info(
fmt.Sprintf("retrying sink in %d", d),
Expand All @@ -256,19 +254,22 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s
err := sink.Sink(ctx, records)
return err
}, retryNotification)
success = err == nil
r.monitor.RecordPlugin(recipe.Name, sr.Name, pluginType, success)

// error (after exhausted retries) will just be skipped and logged
var success bool
if err != nil {
// once it reaches here, it means that the retry has been exhausted and still got error
success = false
r.logger.Error("error running sink", "sink", sr.Name, "error", err.Error())
if !r.stopOnSinkError {
err = nil
}
return err
} else {
success = true
r.logger.Info("Successfully published record", "sink", sr.Name, "recipe", recipe.Name)
}
r.logger.Info("Successfully published record", "sink", sr.Name, "recipe", recipe.Name)

r.monitor.RecordPlugin(recipe.Name, sr.Name, "sink", success)

if !r.stopOnSinkError {
err = nil
}
// TODO: create a new error to signal stopping stream.
// returning nil so stream wont stop.
return err
Expand Down

0 comments on commit 7738293

Please sign in to comment.