-
Notifications
You must be signed in to change notification settings - Fork 0
/
cron.go
172 lines (144 loc) · 4.27 KB
/
cron.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package cron
import (
"context"
"fmt"
"log"
"sort"
"time"
"github.com/gorilla/mux"
"github.com/podcreep/server/rss"
"github.com/podcreep/server/store"
"github.com/podcreep/server/util"
)
var (
Jobs map[string]func(context.Context) error
)
// cronCheckUpdates checks for updates to our podcasts. To decide which podcast to update, we look
// at how long it has been since the last update: we update all podcasts that have not been updated
// in at last the last hour.
// TODO: allow us to configure the refresh frequency on a per-podcast basis.
func cronCheckUpdates(ctx context.Context) error {
podcasts, err := store.LoadPodcasts(ctx)
if err != nil {
return err
}
if len(podcasts) == 0 {
log.Printf("No podcasts.")
return nil
}
// Sort the podcasts by LastFetchTime, so that the first podcast in the list is the one that
// we haven't fetched for the longer time.
sort.Slice(podcasts, func(i, j int) bool {
return podcasts[i].LastFetchTime.Before(podcasts[j].LastFetchTime)
})
// Loop through all the podcasts, and stop when we get one that was updated in the last
// hour.
for _, p := range podcasts {
if p.LastFetchTime.After(time.Now().Add(-1 * time.Hour)) {
log.Printf("This podcast ('%s') was only updated at %v, not updating again.", p.Title, p.LastFetchTime)
return nil
}
log.Printf("Updating podcast %s, LastFetchTime = %v", p.Title, p.LastFetchTime)
numUpdated, err := UpdatePodcast(ctx, p, 0 /*flags*/)
if err != nil {
return fmt.Errorf("error updating podcast: %w", err)
}
log.Printf(" - updated %d episodes", numUpdated)
}
return nil
}
func UpdatePodcast(ctx context.Context, podcast *store.Podcast, flags rss.UpdatePodcastFlags) (int, error) {
// The podcast we get here will not have the episodes populated, as it comes from the list.
// So fetch the episodes manually. We just get the latest 10 episodes. Anything older than this
// we will ignore entirely.
episodes, err := store.LoadEpisodes(ctx, podcast.ID, 10)
if err != nil {
return 0, fmt.Errorf("error fetching podcast: %v", err)
}
podcast.Episodes = episodes
// Actually do the update.
numUpdated, err := rss.UpdatePodcast(ctx, podcast, flags)
if err != nil {
return 0, fmt.Errorf("error updating podcast '%s': %v", podcast.Title, err)
}
// Update the last fetch time.
podcast.LastFetchTime = time.Now()
_, err = store.SavePodcast(ctx, podcast)
return numUpdated, err
}
func RunCronJob(ctx context.Context, now time.Time, job *store.CronJob) error {
found := false
for n, fn := range Jobs {
if n == job.Name {
found = true
err := fn(ctx)
if err != nil {
return fmt.Errorf("error running job: %v", err)
}
}
}
if !found {
job.Enabled = false
return fmt.Errorf("job does not exist: %s", job.Name)
}
sched, err := util.ParseSchedule(job.Schedule)
if err != nil {
job.Enabled = false
return fmt.Errorf("job has invalid schedule, cannot reschedule: %w", err)
}
nextRun := sched.NextTime(now)
job.NextRun = &nextRun
return nil
}
// cronIterate is run in a goroutine to actually execute the cron tasks.
func cronIterate() error {
ctx := context.Background()
now := time.Now()
jobs, err := store.LoadPendingCronJobs(ctx, now)
if err != nil {
return err
}
for _, job := range jobs {
err := RunCronJob(ctx, now, job)
if err != nil {
return err
} else {
err := store.SaveCronJob(ctx, job)
if err != nil {
return err
}
}
}
return nil
}
// runCronIterate is a helper that runs cronIterate and then schedules itself to run again.
func runCronIterate() {
ctx := context.Background()
now := time.Now()
timeToWait := store.GetTimeToNextCronJob(ctx, now)
log.Printf("Waiting %v to next cron job", timeToWait)
time.Sleep(timeToWait)
err := cronIterate()
if err != nil {
log.Printf("Error running cronIterate: %v", err)
// Keep going, schedule again.
}
// Schedule to run again.
go runCronIterate()
}
// Gets a list of the cron job names.
func GetCronJobNames() []string {
var names []string
for k := range Jobs {
names = append(names, k)
}
return names
}
// Setup is called from server.go and sets up our routes, etc.
func Setup(r *mux.Router) error {
Jobs = make(map[string]func(context.Context) error)
Jobs["check-updates"] = cronCheckUpdates
// Run the cron goroutine start away.
go runCronIterate()
return nil
}