Skip to content

Commit

Permalink
Add code to always process yesterday
Browse files Browse the repository at this point in the history
  • Loading branch information
Gregory Russell committed Nov 10, 2019
1 parent 5c0d71c commit 96176f9
Showing 1 changed file with 37 additions and 1 deletion.
38 changes: 37 additions & 1 deletion reproc/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,50 @@ queueLoop:
return maxDate, nil
}

func findNextRecentDay(start time.Time, skip int) time.Time {
yesterday := time.Now().Add(-8 * time.Hour).Truncate(24 * time.Hour)
if skip == 0 {
return yesterday
}

nextCurrentDate := start
next := nextCurrentDate.AddDate(0, 0, 1+skip)

for next.Before(yesterday) {
nextCurrentDate = next
next = nextCurrentDate.AddDate(0, 0, 1+skip)
}

return nextCurrentDate
}

// doDispatchLoop just sequences through archives in date order.
// It will generally be blocked on the queues.
// It will start processing at startDate, and when it catches up to "now" it will restart at restartDate.
func doDispatchLoop(ctx context.Context, handler *TaskHandler, bucket string, exp string, startDate time.Time, restartDate time.Time, dateSkip int) {
log.Println("(Re)starting at", startDate)
next := startDate

nextRecent := findNextRecentDay(startDate, dateSkip)
next := startDate.Truncate(24 * time.Hour)

for {
// First check if we need to process yesterday.
if time.Since(nextRecent) > 30*time.Hour {
// Only process if next isn't same or later date.
// TODO - should we unit test this?
if nextRecent.After(next.Add(time.Hour)) {
prefix := fmt.Sprintf("gs://%s/%s/", bucket, exp) + next.Format("2006/01/02/")

log.Println("Processing yesterday:", nextRecent)
// Note that this blocks until a queue is available.
err := handler.AddTask(ctx, prefix)
if err != nil {
// Only error expected here is ErrTerminating
log.Println(err)
}
}
}

prefix := fmt.Sprintf("gs://%s/%s/", bucket, exp) + next.Format("2006/01/02/")

// Note that this blocks until a queue is available.
Expand Down

0 comments on commit 96176f9

Please sign in to comment.