/
crawler.go
365 lines (308 loc) · 11.8 KB
/
crawler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
// Package crawler provides helper methods and defines an interface for lauching
// source repository crawlers that retrieve files from a source and forwards
// to a channel for indexing and retrieval.
package crawler
import (
"context"
"fmt"
"log"
"os"
"sync"
"sigs.k8s.io/kustomize/api/internal/crawl/utils"
"sigs.k8s.io/kustomize/api/internal/crawl/index"
_ "github.com/gomodule/redigo/redis"
"sigs.k8s.io/kustomize/api/internal/crawl/doc"
)
var (
logger = log.New(os.Stdout, "Crawler: ", log.LstdFlags|log.LUTC|log.Llongfile)
)
// Crawler forwards documents from source repositories to index and store them
// for searching. Each crawler is responsible for querying it's source of
// information, and forwarding files that have not been seen before or that need
// updating.
type Crawler interface {
// Crawl returns when it is done processing. This method does not take
// ownership of the channel. The channel is write only, and it
// designates where the crawler should forward the documents.
Crawl(ctx context.Context, output chan<- CrawledDocument, seen utils.SeenMap) error
// Get the document data given the FilePath, Repo, and Ref/Tag/Branch.
FetchDocument(context.Context, *doc.Document) error
// Write to the document what the created time is.
SetCreated(context.Context, *doc.Document) error
SetDefaultBranch(*doc.Document)
Match(*doc.Document) bool
}
type CrawledDocument interface {
ID() string
GetDocument() *doc.Document
// Get all the Documents directly referred in a Document.
// For a Document representing a non-kustomization file, an empty slice will be returned.
// For a Document representing a kustomization file:
// the `includeResources` parameter determines whether the documents referred in the `resources` field are returned or not;
// the `includeTransformers` parameter determines whether the documents referred in the `transformers` field are returned or not;
// the `includeGenerators` parameter determines whether the documents referred in the `generators` field are returned or not.
GetResources(includeResources, includeTransformers, includeGenerators bool) ([]*doc.Document, error)
WasCached() bool
}
type CrawlSeed []*doc.Document
type IndexFunc func(CrawledDocument, index.Mode) error
type Converter func(*doc.Document) (CrawledDocument, error)
func logIfErr(err error) {
if err == nil {
return
}
logger.Println("error: ", err)
}
func findMatch(d *doc.Document, crawlers []Crawler) Crawler {
for _, crawl := range crawlers {
if crawl.Match(d) {
return crawl
}
}
return nil
}
func addBranches(cdoc CrawledDocument, match Crawler, indx IndexFunc,
seen utils.SeenMap, stack *CrawlSeed) {
seen.Set(cdoc.ID(), cdoc.GetDocument().FileType)
match.SetDefaultBranch(cdoc.GetDocument())
// Insert into index
if err := indx(cdoc, index.InsertOrUpdate); err != nil {
logger.Printf("Failed to insert or update doc(%s): %v",
cdoc.GetDocument().Path(), err)
return
}
deps, err := cdoc.GetResources(true, true, true)
if err != nil {
logger.Println(err)
return
}
for _, dep := range deps {
if seen.Seen(dep.ID()) && seen.Value(dep.ID()) == dep.FileType {
continue
}
*stack = append(*stack, dep)
}
}
func doCrawl(ctx context.Context, docsPtr *CrawlSeed, crawlers []Crawler, conv Converter, indx IndexFunc,
seen utils.SeenMap, stack *CrawlSeed, refreshDoc bool, updateFileType bool) {
UpdatedDocCount := 0
seenDocCount := 0
cachedDocCount := 0
findMatchErrCount := 0
FetchDocumentErrCount := 0
SetCreatedErrCount := 0
convErrCount := 0
deleteDocCount := 0
crawledDocCount := 0
// During the execution of the for loop, more Documents may be added into (*docsPtr).
for len(*docsPtr) > 0 {
// get the last Document in (*docPtr), which will be crawled in this iteration.
tail := (*docsPtr)[len(*docsPtr)-1]
// remove the last Document in (*docPtr)
*docsPtr = (*docsPtr)[:(len(*docsPtr) - 1)]
crawledDocCount++
logger.Printf("Crawling doc %d: %s", crawledDocCount, tail.Path())
if seen.Seen(tail.ID()) {
if !updateFileType || seen.Value(tail.ID()) == tail.FileType {
logger.Printf("this doc has been seen before")
seenDocCount++
continue
}
}
if tail.WasCached() {
logger.Printf("doc(%s) is cached already", tail.Path())
cachedDocCount++
continue
}
match := findMatch(tail, crawlers)
if match == nil {
logIfErr(fmt.Errorf("%v could not match any crawler", tail))
findMatchErrCount++
continue
}
if tail.User == "" {
tail.User = doc.UserName(tail.RepositoryURL)
}
// If the Document represents a kustomization root, FetchDcoument will change
// the `filePath` field of the Document by adding `kustomization.yaml` or
// `kustomization.yml` or `kustomization` into the the field.
// Therefore, it is necessary to add the ID of the Document into seen before
// calling FetchDocument. Otherwise, the binary may enter into an infinite loop
// if a kustomization file points to its kustmozation root in its `resources` or
// `bases` field.
seen.Set(tail.ID(), tail.FileType)
if refreshDoc || tail.DefaultBranch == "" {
match.SetDefaultBranch(tail)
}
if refreshDoc || tail.DocumentData == "" {
if err := match.FetchDocument(ctx, tail); err != nil {
logger.Printf("FetchDocument failed on doc(%s): %v", tail.Path(), err)
FetchDocumentErrCount++
// delete the document from the index
cdoc := &doc.KustomizationDocument{
Document: *tail,
}
seen.Set(cdoc.ID(), tail.FileType)
if err := indx(cdoc, index.Delete); err != nil {
logger.Printf("Failed to delete doc(%s): %v", cdoc.Path(), err)
}
deleteDocCount++
continue
}
}
if refreshDoc || tail.CreationTime == nil {
if err := match.SetCreated(ctx, tail); err != nil {
logger.Printf("SetCreated failed on doc(%s): %v", tail.Path(), err)
SetCreatedErrCount++
}
}
cdoc, err := conv(tail)
// If conv returns an error, cdoc can still be added into the index so that
// cdoc.Document can be searched.
if err != nil {
logger.Printf("conv failed on doc(%s): %v", tail.Path(), err)
convErrCount++
}
UpdatedDocCount++
addBranches(cdoc, match, indx, seen, stack)
}
logger.Printf("Summary of doCrawl:\n")
logger.Printf("\t%d documents were updated\n", UpdatedDocCount)
logger.Printf("\t%d documents were seen by the crawler already and skipped\n", seenDocCount)
logger.Printf("\t%d documents were cached already and skipped\n", cachedDocCount)
logger.Printf("\t%d documents didn't have a matching crawler and skipped\n", findMatchErrCount)
logger.Printf("\t%d documents cannot be fetched, %d out of them are deleted\n",
FetchDocumentErrCount, deleteDocCount)
logger.Printf("\t%d documents cannot update its creation time but still were inserted or updated in the index\n", SetCreatedErrCount)
logger.Printf("\t%d documents cannot be converted but still were inserted or updated in the index\n", convErrCount)
}
// CrawlFromSeedIterator iterates all the documents in the index and call CrawlFromSeed for each document.
func CrawlFromSeedIterator(ctx context.Context, it *index.KustomizeIterator, crawlers []Crawler,
conv Converter, indx IndexFunc, seen utils.SeenMap) {
docCount := 0
for it.Next() {
for _, hit := range it.Value().Hits.Hits {
docCount++
logger.Printf("updating document %d from seed\n", docCount)
singleSeed := CrawlSeed{&(hit.Document.Document)}
CrawlFromSeed(ctx, singleSeed, crawlers, conv, indx, seen)
}
}
if err := it.Err(); err != nil {
log.Fatalf("Error iterating the index: %v\n", err)
}
}
// CrawlFromSeed updates all the documents in seed, and crawls all the new
// documents referred in the seed.
func CrawlFromSeed(ctx context.Context, seed CrawlSeed, crawlers []Crawler,
conv Converter, indx IndexFunc, seen utils.SeenMap) {
// stack tracks the documents directly referred in the seed.
stack := make(CrawlSeed, 0)
// each unique document in seed will be crawled once.
doCrawl(ctx, &seed, crawlers, conv, indx, seen, &stack, true, false)
logger.Printf("crawling %d new documents referred by doc\n", len(stack))
// While crawling each document in stack, the documents directly referred in the document
// will be added into stack.
// After this statement is done, stack will become empty.
doCrawl(ctx, &stack, crawlers, conv, indx, seen, &stack, false, true)
}
// CrawlGithubRunner is a blocking function and only returns once all of the
// crawlers are finished with execution.
//
// This function uses the output channel to forward kustomization documents
// from a list of crawlers. The output is to be consumed by a database/search
// indexer for later retrieval.
//
// The return value is an array of errors in which each index represents the
// index of the crawler that emitted the error. Although the errors themselves
// can be nil, the array will always be exactly the size of the crawlers array.
//
// CrawlGithubRunner takes in a seed, which represents the documents stored in an
// index somewhere. The document data is not required to be populated. If there
// are many documents, this is preferable. The order of iteration over the seed
// is not guaranteed, but the CrawlGithub does guarantee that every element
// from the seed will be processed before any other documents from the
// crawlers.
func CrawlGithubRunner(ctx context.Context, output chan<- CrawledDocument,
crawlers []Crawler, seen utils.SeenMap) []error {
errs := make([]error, len(crawlers))
wg := sync.WaitGroup{}
for i, crawler := range crawlers {
// Crawler implementations get their own channels to prevent a
// crawler from closing the main output channel.
docs := make(chan CrawledDocument)
wg.Add(2)
// Forward all of the documents from this crawler's channel to
// the main output channel.
go func(docs <-chan CrawledDocument) {
defer wg.Done()
for d := range docs {
output <- d
}
}(docs)
// Run this crawler and capture its returned error.
go func(idx int, crawler Crawler,
docs chan<- CrawledDocument) {
defer func() {
wg.Done()
if r := recover(); r != nil {
errs[idx] = fmt.Errorf(
"%+v panicked: %v, additional error %v",
crawler, r, errs[idx],
)
}
}()
defer close(docs)
errs[idx] = crawler.Crawl(ctx, docs, seen)
}(i, crawler, docs) // Copies the index and the crawler
}
wg.Wait()
return errs
}
// CrawlGithub crawls all the kustomization files on Github.
func CrawlGithub(ctx context.Context, crawlers []Crawler, conv Converter,
indx IndexFunc, seen utils.SeenMap) {
// ch is channel where all the crawlers sends the crawled documents to.
ch := make(chan CrawledDocument, 1<<10)
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
docCount := 0
for cdoc := range ch {
docCount++
logger.Printf("Processing doc %d found on Github", docCount)
// all the docs here are kustomization files found by querying Github, and
// their `FileType` fields all should be empty.
if seen.Seen(cdoc.ID()) {
logger.Printf("the doc has been seen before")
continue
}
match := findMatch(cdoc.GetDocument(), crawlers)
if match == nil {
logIfErr(fmt.Errorf(
"%v could not match any crawler", cdoc))
continue
}
// stack tracks the documents directly referred in the document.
stack := make(CrawlSeed, 0)
addBranches(cdoc, match, indx, seen, &stack)
if len(stack) > 0 {
// here the documents referred in a kustomization file are crawled separately,
// to avoid accumulating all the referred documents into a single gigantic
// mem-inentive stack.
logger.Printf("crawling the %d new documents referred in doc %d",
len(stack), docCount)
doCrawl(ctx, &stack, crawlers, conv, indx, seen, &stack, false, true)
}
}
}()
logger.Println("processing the documents found from crawling github")
if errs := CrawlGithubRunner(ctx, ch, crawlers, seen); errs != nil {
for _, err := range errs {
logIfErr(err)
}
}
close(ch)
wg.Wait()
}