-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathgitrepository_watcher.go
100 lines (80 loc) · 2.93 KB
/
gitrepository_watcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/*
Copyright 2022 The Flux authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"fmt"
"os"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/fluxcd/pkg/http/fetch"
"github.com/fluxcd/pkg/tar"
sourcev1 "github.com/fluxcd/source-controller/api/v1"
)
// GitRepositoryWatcher watches GitRepository objects for revision changes
type GitRepositoryWatcher struct {
client.Client
artifactFetcher *fetch.ArchiveFetcher
HttpRetry int
}
func (r *GitRepositoryWatcher) SetupWithManager(mgr ctrl.Manager) error {
r.artifactFetcher = fetch.New(
fetch.WithRetries(r.HttpRetry),
fetch.WithMaxDownloadSize(tar.UnlimitedUntarSize),
fetch.WithUntar(tar.WithMaxUntarSize(tar.UnlimitedUntarSize)),
fetch.WithHostnameOverwrite(os.Getenv("SOURCE_CONTROLLER_LOCALHOST")),
fetch.WithLogger(nil),
)
return ctrl.NewControllerManagedBy(mgr).
For(&sourcev1.GitRepository{}, builder.WithPredicates(GitRepositoryRevisionChangePredicate{})).
Complete(r)
}
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories,verbs=get;list;watch
// +kubebuilder:rbac:groups=source.toolkit.fluxcd.io,resources=gitrepositories/status,verbs=get
func (r *GitRepositoryWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
// get source object
var repository sourcev1.GitRepository
if err := r.Get(ctx, req.NamespacedName, &repository); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
artifact := repository.Status.Artifact
log.Info("New revision detected", "revision", artifact.Revision)
// create tmp dir
tmpDir, err := os.MkdirTemp("", repository.Name)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to create temp dir, error: %w", err)
}
defer func(path string) {
err := os.RemoveAll(path)
if err != nil {
log.Error(err, "unable to remove temp dir")
}
}(tmpDir)
// download and extract artifact
if err := r.artifactFetcher.Fetch(artifact.URL, artifact.Digest, tmpDir); err != nil {
log.Error(err, "unable to fetch artifact")
return ctrl.Result{}, err
}
// list artifact content
files, err := os.ReadDir(tmpDir)
if err != nil {
return ctrl.Result{}, fmt.Errorf("failed to list files, error: %w", err)
}
// do something with the artifact content
for _, f := range files {
log.Info("Processing " + f.Name())
}
return ctrl.Result{}, nil
}