Skip to content


pkg/rt: try to clean up Subscriber.Run/query boundary
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbourgon committed Jul 22, 2019
1 parent 83efc10 commit 6eb0f36
Showing 1 changed file with 25 additions and 28 deletions.
53 changes: 25 additions & 28 deletions pkg/rt/subscriber.go
Expand Up @@ -107,62 +107,59 @@ func (s *Subscriber) Run(ctx context.Context) error {
return ctx.Err()

var err error
ts, err = s.runOnce(ctx, ts)
if err != nil {
return err
name, result, delay, newts, fatal := s.query(ctx, ts)
s.metrics.RealtimeAPIRequestsTotal.WithLabelValues(s.serviceID, name, string(result)).Inc()
if fatal != nil {
return fatal
if delay > 0 {
contextSleep(ctx, delay)
ts = newts

// runOnce queries for the service ID represented by the
// subscriber and with the provided timestamp. On success, it processes the
// received data and updates the metrics, returning the new timestamp value that
// should be used in the next call.
// query for the service ID represented by the subscriber, and
// with the provided starting timestamp. The function may block for several
// seconds; cancel the context to provoke early termination. On success, the
// received real-time data is processed, and the Prometheus metrics related to
// the Fastly service are updated.
// Non-fatal errors are logged. Some non-fatal errors cause runOnce to block for
// a period of seconds before returning. Any non-nil error returned by runOnce
// should be considered fatal to the subscriber.
func (s *Subscriber) runOnce(ctx context.Context, ts uint64) (newts uint64, err error) {
// Returns the current name of the service, the broad class of result of the API
// request, any delay that should pass before query is invoked again, the new
// timestamp that should be provided to the next call to query, and an error.
// Recoverable errors are logged internally and not returned, so any non-nil
// error returned by this method should be considered fatal to the subscriber.
func (s *Subscriber) query(ctx context.Context, ts uint64) (currentName string, result rtResult, delay time.Duration, newts uint64, fatal error) {
name, ver, found := s.provider.Metadata(s.serviceID)
version := strconv.Itoa(ver)
if !found {
name, version = s.serviceID, "unknown"

result := rtResultUnknown
defer func() {
s.metrics.RealtimeAPIRequestsTotal.WithLabelValues(s.serviceID, name, string(result)).Inc()

// blocks until it has data to return.
// It's safe to call in a (single-threaded!) hot loop.
u := fmt.Sprintf("", url.QueryEscape(s.serviceID), ts)
req, err := http.NewRequest("GET", u, nil)
if err != nil {
result = rtResultError
return ts, errors.Wrap(err, "error constructing real-time stats API request") // fatal for sure
return name, rtResultError, 0, ts, errors.Wrap(err, "error constructing real-time stats API request")

req.Header.Set("User-Agent", s.userAgent)
req.Header.Set("Fastly-Key", s.token)
req.Header.Set("Accept", "application/json")
resp, err := s.client.Do(req.WithContext(ctx))
if err != nil {
result = rtResultError
level.Error(s.logger).Log("during", "execute request", "err", err)
contextSleep(ctx, time.Second)
return ts, nil
return name, rtResultError, time.Second, ts, nil

var rt realtimeResponse
if err := jsoniterAPI.NewDecoder(resp.Body).Decode(&rt); err != nil {
result = rtResultError
level.Error(s.logger).Log("during", "decode response", "err", err)
contextSleep(ctx, time.Second)
return ts, nil
return name, rtResultError, time.Second, ts, nil

Expand All @@ -185,15 +182,15 @@ func (s *Subscriber) runOnce(ctx context.Context, ts uint64) (newts uint64, err
case http.StatusUnauthorized, http.StatusForbidden:
result = rtResultError
level.Error(s.logger).Log("status_code", resp.StatusCode, "response_ts", rt.Timestamp, "err", rterr, "msg", "token may be invalid")
contextSleep(ctx, 15*time.Second)
delay = 15 * time.Second

result = rtResultUnknown
level.Error(s.logger).Log("status_code", resp.StatusCode, "response_ts", rt.Timestamp, "err", rterr)
contextSleep(ctx, 5*time.Second)
delay = 5 * time.Second

return rt.Timestamp, nil
return name, result, delay, rt.Timestamp, nil

Expand Down

0 comments on commit 6eb0f36

Please sign in to comment.