-
Notifications
You must be signed in to change notification settings - Fork 0
/
daemon.go
125 lines (106 loc) · 3.09 KB
/
daemon.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main
import (
"context"
"log"
"sync"
"time"
"github.com/buildkite/go-buildkite/v3/buildkite"
"go.opentelemetry.io/otel/trace"
)
// daemon contains all the info needed by the goroutines inside the long-lived process
type daemon struct {
lastFinishedAt time.Time
tracer trace.Tracer
buildKite *buildkite.Client
pipelines []string
wg *sync.WaitGroup
cacheFilePath string
sleepDuration time.Duration
}
// NewDaemon produce daemon struct that can be executed as a long-lived process
func NewDaemon(
tracer trace.Tracer,
buildKite *buildkite.Client,
pipelines []string,
sleepDuration time.Duration,
cacheFilePath string,
) *daemon {
wg := &sync.WaitGroup{}
// Default to HoneycombMaxRetention on initial run
// should be updated on subsequent runs
lastFinishedAt := time.Now().Add(-1 * HoneycombMaxRetention)
return &daemon{
lastFinishedAt: lastFinishedAt,
tracer: tracer,
buildKite: buildKite,
pipelines: pipelines,
wg: wg,
sleepDuration: sleepDuration,
cacheFilePath: cacheFilePath,
}
}
// Exec execute the daemon as a long-lived process
func (d *daemon) Exec(ctx context.Context) {
// TODO: implement graceful shutdown when SIGTERM/SIGKILL
for {
for _, pipeline := range d.pipelines {
d.wg.Add(1)
go d.processBuildKite(ctx, pipeline)
}
d.wg.Wait()
log.Printf("sleeping for %s", d.sleepDuration)
time.Sleep(d.sleepDuration)
}
}
// BuildKite pagination loop
func (d *daemon) processBuildKite(ctx context.Context, pipeline string) {
cache := NewCache(d.cacheFilePath)
defer cache.fileStore.Close()
cachedBuildIDs := cache.loadCache()
buildListOptions := &buildkite.BuildsListOptions{
// Only query from last run's cut off point to limit the number of
// requests needed on subsequent runs.
FinishedFrom: d.lastFinishedAt,
// Possible values are: running, scheduled, passed, failed, canceled, skipped and not_run.
// filters for only 'finished' states
State: []string{"passed", "failed", "canceled", "skipped"},
// Pagination options
ListOptions: buildkite.ListOptions{
Page: 1,
PerPage: BuildKiteMaxPagination,
},
}
for {
log.Println("Calling API on page", buildListOptions.Page)
builds, resp, err := d.buildKite.Builds.ListByPipeline(BuildKiteOrgName, pipeline, buildListOptions)
if err != nil {
log.Printf("Issues calling BuildKite API: %v\n", err)
continue
}
for _, b := range builds {
if _, ok := cachedBuildIDs[*b.ID]; ok {
// build ID is in cache, skip processing
log.Println("Skipping build:", *b.ID)
continue
}
// add build ID to cache
cachedBuildIDs[*b.ID] = struct{}{}
if b.FinishedAt != nil && b.FinishedAt.After(d.lastFinishedAt) {
d.lastFinishedAt = b.FinishedAt.Time
}
d.wg.Add(1)
go d.processBuild(ctx, b)
}
// use buildkite response header to determine next page
if resp.NextPage == 0 {
break
}
buildListOptions.Page = resp.NextPage
}
// store all build IDs each run into cache
err := cache.writeCache(cachedBuildIDs)
if err != nil {
log.Fatalf("error writing cache: %v", err)
}
d.wg.Done()
}