-
Notifications
You must be signed in to change notification settings - Fork 239
/
directory.go
231 lines (188 loc) · 6.11 KB
/
directory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
package sqlite
import (
"fmt"
"os"
"path/filepath"
"strings"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/yaml"
"github.com/operator-framework/operator-registry/pkg/registry"
)
const ClusterServiceVersionKind = "ClusterServiceVersion"
type SQLPopulator interface {
Populate() error
}
// DirectoryLoader loads a directory of resources into the database
type DirectoryLoader struct {
store registry.Load
directory string
}
var _ SQLPopulator = &DirectoryLoader{}
func NewSQLLoaderForDirectory(store registry.Load, directory string) *DirectoryLoader {
return &DirectoryLoader{
store: store,
directory: directory,
}
}
func (d *DirectoryLoader) Populate() error {
log := logrus.WithField("dir", d.directory)
log.Info("loading Bundles")
errs := make([]error, 0)
if err := filepath.Walk(d.directory, collectWalkErrs(d.LoadBundleWalkFunc, &errs)); err != nil {
errs = append(errs, err)
}
log.Info("loading Packages and Entries")
if err := filepath.Walk(d.directory, collectWalkErrs(d.LoadPackagesWalkFunc, &errs)); err != nil {
errs = append(errs, err)
}
return utilerrors.NewAggregate(errs)
}
// collectWalkErrs calls the given walk func and appends any non-nil, non skip dir error returned to the given errors slice.
func collectWalkErrs(walk filepath.WalkFunc, errs *[]error) filepath.WalkFunc {
return func(path string, f os.FileInfo, err error) (walkErr error) {
if walkErr = walk(path, f, err); walkErr != nil && walkErr != filepath.SkipDir {
*errs = append(*errs, walkErr)
return nil
}
return walkErr
}
}
// LoadBundleWalkFunc walks the directory. When it sees a `.clusterserviceversion.yaml` file, it
// attempts to load the surrounding files in the same directory as a bundle, and stores them in the
// db for querying
func (d *DirectoryLoader) LoadBundleWalkFunc(path string, f os.FileInfo, err error) error {
if f == nil {
return fmt.Errorf("invalid file: %v", f)
}
log := logrus.WithFields(logrus.Fields{"dir": d.directory, "file": f.Name(), "load": "bundles"})
if f.IsDir() {
if strings.HasPrefix(f.Name(), ".") {
log.Info("skipping hidden directory")
return filepath.SkipDir
}
log.Info("directory")
return nil
}
if strings.HasPrefix(f.Name(), ".") {
log.Info("skipping hidden file")
return nil
}
fileReader, err := os.Open(path)
if err != nil {
return fmt.Errorf("unable to load file %s: %s", path, err)
}
defer fileReader.Close()
decoder := yaml.NewYAMLOrJSONDecoder(fileReader, 30)
csv := unstructured.Unstructured{}
if err = decoder.Decode(&csv); err != nil {
return nil
}
if csv.GetKind() != ClusterServiceVersionKind {
return nil
}
log.Info("found csv, loading bundle")
var errs []error
bundle, err := loadBundle(csv.GetName(), filepath.Dir(path))
if err != nil {
errs = append(errs, fmt.Errorf("error loading objs in directory: %s", err))
}
if bundle == nil || bundle.Size() == 0 {
errs = append(errs, fmt.Errorf("no bundle objects found"))
return utilerrors.NewAggregate(errs)
}
if err := bundle.AllProvidedAPIsInBundle(); err != nil {
errs = append(errs, fmt.Errorf("error checking provided apis in bundle %s: %s", bundle.Name, err))
}
if err := d.store.AddOperatorBundle(bundle); err != nil {
version, _ := bundle.Version()
errs = append(errs, fmt.Errorf("error adding operator bundle %s/%s/%s: %s", csv.GetName(), version, bundle.BundleImage, err))
}
return utilerrors.NewAggregate(errs)
}
// LoadPackagesWalkFunc attempts to unmarshal the file at the given path into a PackageManifest resource.
// If unmarshaling is successful, the PackageManifest is added to the loader's store.
func (d *DirectoryLoader) LoadPackagesWalkFunc(path string, f os.FileInfo, err error) error {
if f == nil {
return fmt.Errorf("invalid file: %v", f)
}
log := logrus.WithFields(logrus.Fields{"dir": d.directory, "file": f.Name(), "load": "package"})
if f.IsDir() {
if strings.HasPrefix(f.Name(), ".") {
log.Info("skipping hidden directory")
return filepath.SkipDir
}
log.Info("directory")
return nil
}
if strings.HasPrefix(f.Name(), ".") {
log.Info("skipping hidden file")
return nil
}
fileReader, err := os.Open(path)
if err != nil {
return fmt.Errorf("unable to load package from file %s: %s", path, err)
}
defer fileReader.Close()
decoder := yaml.NewYAMLOrJSONDecoder(fileReader, 30)
manifest := registry.PackageManifest{}
if err = decoder.Decode(&manifest); err != nil {
if err != nil {
return fmt.Errorf("could not decode contents of file %s into package: %s", path, err)
}
}
if manifest.PackageName == "" {
return nil
}
if err := d.store.AddPackageChannels(manifest); err != nil {
return fmt.Errorf("error loading package into db: %s", err)
}
return nil
}
// loadBundle takes the directory that a CSV is in and assumes the rest of the objects in that directory
// are part of the bundle.
func loadBundle(csvName string, dir string) (*registry.Bundle, error) {
log := logrus.WithFields(logrus.Fields{"dir": dir, "load": "bundle", "name": csvName})
files, err := os.ReadDir(dir)
if err != nil {
return nil, err
}
var errs []error
bundle := ®istry.Bundle{
Name: csvName,
}
for _, f := range files {
log = log.WithField("file", f.Name())
if f.IsDir() {
log.Info("skipping directory")
continue
}
if strings.HasPrefix(f.Name(), ".") {
log.Info("skipping hidden file")
continue
}
log.Info("loading bundle file")
path := filepath.Join(dir, f.Name())
fileReader, err := os.Open(path)
if err != nil {
errs = append(errs, fmt.Errorf("unable to load file %s: %s", path, err))
continue
}
defer fileReader.Close()
decoder := yaml.NewYAMLOrJSONDecoder(fileReader, 30)
obj := &unstructured.Unstructured{}
if err = decoder.Decode(obj); err != nil {
logrus.WithError(err).Debugf("could not decode file contents for %s", path)
continue
}
// Don't include other CSVs in the bundle
if obj.GetKind() == "ClusterServiceVersion" && obj.GetName() != csvName {
continue
}
if obj.Object != nil {
bundle.Add(obj)
}
}
return bundle, utilerrors.NewAggregate(errs)
}