-
Notifications
You must be signed in to change notification settings - Fork 3
/
scheduler.go
169 lines (145 loc) · 5.1 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package scheduler
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"time"
"github.com/robfig/cron/v3"
log "github.com/sirupsen/logrus"
"github.com/khulnasoft-lab/package-feeds/pkg/feeds"
"github.com/khulnasoft-lab/package-feeds/pkg/publisher"
)
// Scheduler is a registry of feeds that should be run on a schedule.
type Scheduler struct {
registry map[string]feeds.ScheduledFeed
publisher publisher.Publisher
httpPort int
}
// New returns a new Scheduler with a publisher and feeds configured for polling.
func New(feedsMap map[string]feeds.ScheduledFeed, pub publisher.Publisher, httpPort int) *Scheduler {
return &Scheduler{
registry: feedsMap,
publisher: pub,
httpPort: httpPort,
}
}
type pollResult struct {
name string
feed feeds.ScheduledFeed
packages []*feeds.Package
errs []error
}
// healthCheckHandler is a simple health check handler for the HTTP server.
func healthCheckHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
// Define a simple JSON response structure
response := struct {
Status string `json:"status"`
Message string `json:"message"`
}{
Status: "OK",
Message: "Service is up and running",
}
// Serialize the response structure to JSON
jsonResponse, err := json.Marshal(response)
if err != nil {
// If there's an error, log it and return a 500 Internal Server Error
log.Errorf("Failed to marshal health check response: %v", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
// Write an HTTP status code to the response
w.WriteHeader(http.StatusOK)
// Write the JSON response body
_, err = w.Write(jsonResponse)
if err != nil {
log.Errorf("Failed to write health check response: %v", err)
}
}
// Runs several services for the operation of scheduler, this call is blocking until application exit
// or failure in the HTTP server
// Services include: Cron polling via FeedGroups, HTTP serving of FeedGroupsHandler.
func (s *Scheduler) Run(initialCutoff time.Duration, enableDefaultTimer bool) error {
defaultSchedule := fmt.Sprintf("@every %s", initialCutoff.String())
schedules, err := buildSchedules(s.registry, s.publisher, initialCutoff)
if err != nil {
return err
}
var feedGroups []*FeedGroup
var pollFeedNames []string
// Configure cron job for scheduled polling.
cronJob := cron.New(
cron.WithLogger(cron.PrintfLogger(log.StandardLogger())),
cron.WithParser(cron.NewParser(
cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor,
)))
for schedule, feedGroup := range schedules {
var feedNames []string
for _, f := range feedGroup.feeds {
feedNames = append(feedNames, f.feed.GetName())
}
if schedule == "" {
if !enableDefaultTimer {
// Without the default timer enabled, undefined schedules depend on HTTP request polling.
// This avoids race conditions where the cron based request is in flight when an HTTP
// request is made (or visa-versa).
feedGroups = append(feedGroups, feedGroup)
pollFeedNames = append(pollFeedNames, feedNames...)
continue
}
// Undefined schedules will follow the default schedule, if the default timer is enabled.
schedule = defaultSchedule
}
_, err := cronJob.AddJob(schedule, cron.NewChain(
cron.SkipIfStillRunning(cron.VerbosePrintfLogger(log.StandardLogger())),
).Then(feedGroup))
if err != nil {
return fmt.Errorf("failed to parse schedule `%s`: %w", schedule, err)
}
log.Printf("Running a timer for %s with schedule %s", strings.Join(feedNames, ", "), schedule)
}
cronJob.Start()
// Start http server for polling via HTTP requests
pollServer := NewFeedGroupsHandler(feedGroups)
log.Infof("Listening on port %v for %s", s.httpPort, strings.Join(pollFeedNames, ", "))
http.Handle("/", pollServer)
http.HandleFunc("/health", healthCheckHandler)
server := &http.Server{
Addr: fmt.Sprintf(":%v", s.httpPort),
// default 60s timeout used from nginx
// https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6
ReadHeaderTimeout: 60 * time.Second,
}
if err := server.ListenAndServe(); err != nil {
return err
}
return nil
}
// buildSchedules prepares a map of FeedGroups indexed by their appropriate cron schedule
// The resulting map may have index "" with a FeedGroup of feeds without a schedule option configured.
//
//nolint:lll
func buildSchedules(registry map[string]feeds.ScheduledFeed, pub publisher.Publisher, initialCutoff time.Duration) (map[string]*FeedGroup, error) {
schedules := map[string]*FeedGroup{}
for _, feed := range registry {
options := feed.GetFeedOptions()
pollRate := options.PollRate
cutoff := initialCutoff
var err error
var schedule string
if pollRate != "" {
cutoff, err = time.ParseDuration(pollRate)
if err != nil {
return nil, fmt.Errorf("failed to parse `%s` as duration: %w", pollRate, err)
}
schedule = fmt.Sprintf("@every %s", pollRate)
}
// Initialize new schedules in map.
if _, ok := schedules[schedule]; !ok {
schedules[schedule] = NewFeedGroup([]feeds.ScheduledFeed{}, pub, cutoff)
}
schedules[schedule].AddFeed(feed)
}
return schedules, nil
}