Skip to content

Commit

Permalink
Crawler performance improvements, better structure
Browse files Browse the repository at this point in the history
Refactored the crawler implementation to make the whole thing more
testable. Added a document interface to make the crawler generic.
This will be useful for collecting plugins, and other documents.
  • Loading branch information
damienr74 committed Aug 30, 2019
1 parent a66808a commit c2d6f09
Show file tree
Hide file tree
Showing 13 changed files with 1,250 additions and 98 deletions.
170 changes: 165 additions & 5 deletions internal/tools/crawler/crawler.go
Expand Up @@ -6,11 +6,19 @@ package crawler
import (
"context"
"fmt"
"log"
"os"
"sync"

_ "github.com/gomodule/redigo/redis"

"sigs.k8s.io/kustomize/internal/tools/doc"
)

var (
logger = log.New(os.Stdout, "Crawler: ", log.LstdFlags|log.LUTC|log.Llongfile)
)

// Crawler forwards documents from source repositories to index and store them
// for searching. Each crawler is responsible for querying it's source of
// information, and forwarding files that have not been seen before or that need
Expand All @@ -19,7 +27,152 @@ type Crawler interface {
// Crawl returns when it is done processing. This method does not take
// ownership of the channel. The channel is write only, and it
// designates where the crawler should forward the documents.
Crawl(ctx context.Context, output chan<- *doc.KustomizationDocument) error
Crawl(ctx context.Context, output chan<- CrawlerDocument) error

// Get the document data given the FilePath, Repo, and Ref/Tag/Branch.
FetchDocument(context.Context, *doc.Document) error
// Write to the document what the created time is.
SetCreated(context.Context, *doc.Document) error

Match(*doc.Document) bool
}

type CrawlerDocument interface {
ID() string
GetDocument() *doc.Document
GetResources() ([]*doc.Document, error)
WasCached() bool
}

type CrawlerSeed []*doc.Document

type IndexFunc func(CrawlerDocument, Crawler) error
type Converter func(*doc.Document) (CrawlerDocument, error)

// Cleaner, more efficient, and more extensible crawler implementation.
// The seed must include the ids of each document in the index.
func CrawlFromSeed(ctx context.Context, seed CrawlerSeed,
crawlers []Crawler, conv Converter, indx IndexFunc) {

seen := make(map[string]struct{})

logIfErr := func(err error) {
if err == nil {
return
}
logger.Println("error: ", err)
}

stack := make(CrawlerSeed, 0)

findMatch := func(d *doc.Document) Crawler {
for _, crawl := range crawlers {
if crawl.Match(d) {
return crawl
}
}

return nil
}

addBranches := func(cdoc CrawlerDocument, match Crawler) {
if _, ok := seen[cdoc.ID()]; ok {
return
}

seen[cdoc.ID()] = struct{}{}
// Insert into index
err := indx(cdoc, match)
logIfErr(err)
if err != nil {
return
}

deps, err := cdoc.GetResources()
logIfErr(err)
if err != nil {
return
}
for _, dep := range deps {
if _, ok := seen[dep.ID()]; ok {
continue
}
stack = append(stack, dep)
}
}

doCrawl := func(docsPtr *CrawlerSeed) {
for len(*docsPtr) > 0 {
back := len(*docsPtr) - 1
next := (*docsPtr)[back]
*docsPtr = (*docsPtr)[:back]

match := findMatch(next)
if match == nil {
logIfErr(fmt.Errorf(
"%v could not match any crawler", next))
continue
}

err := match.FetchDocument(ctx, next)
logIfErr(err)
// If there was no change or there is an error, we don't have
// to branch out, since the dependencies are already in the
// index, or we cannot find the document.
if err != nil || next.WasCached() {
continue
}

cdoc, err := conv(next)
logIfErr(err)
if err != nil {
continue
}

addBranches(cdoc, match)
}
}
// Exploit seed to update bulk of corpus.
logger.Printf("updating %d documents from seed\n", len(seed))
doCrawl(&seed)
// Traverse any new links added while updating corpus.
logger.Printf("crawling %d new documents found in the seed\n", len(stack))
doCrawl(&stack)

ch := make(chan CrawlerDocument, 1<<10)
wg := sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
for cdoc := range ch {
if _, ok := seen[cdoc.ID()]; ok {
continue
}
match := findMatch(cdoc.GetDocument())
if match == nil {
logIfErr(fmt.Errorf(
"%v could not match any crawler", cdoc))
continue
}
addBranches(cdoc, match)
}
}()

// Exploration through APIs.
errs := CrawlerRunner(ctx, ch, crawlers)
if errs != nil {
for _, err := range errs {
logIfErr(err)
}
}
close(ch)
logger.Println("Processing the new documents from the crawlers' exploration.")
wg.Wait()
// Handle deps of newly discovered documents.
logger.Printf("crawling the %d new documents from the crawlers' exploration.",
len(stack))
doCrawl(&stack)
}

// CrawlerRunner is a blocking function and only returns once all of the
Expand All @@ -32,21 +185,28 @@ type Crawler interface {
// The return value is an array of errors in which each index represents the
// index of the crawler that emitted the error. Although the errors themselves
// can be nil, the array will always be exactly the size of the crawlers array.
//
// Crawler Runner takes in a seed, which represents the documents stored in an
// index somewhere. The document data is not required to be populated. If there
// are many documents, this is preferable. The order of iteration over the seed
// is not garanteed, but the CrawlerRunner does guarantee that every element
// from the seed will be processed before any other documents from the
// crawlers.
func CrawlerRunner(ctx context.Context,
output chan<- *doc.KustomizationDocument, crawlers []Crawler) []error {
output chan<- CrawlerDocument, crawlers []Crawler) []error {

errs := make([]error, len(crawlers))
wg := sync.WaitGroup{}

for i, crawler := range crawlers {
// Crawler implementations get their own channels to prevent a
// crawler from closing the main output channel.
docs := make(chan *doc.KustomizationDocument)
docs := make(chan CrawlerDocument)
wg.Add(2)

// Forward all of the documents from this crawler's channel to
// the main output channel.
go func(docs <-chan *doc.KustomizationDocument) {
go func(docs <-chan CrawlerDocument) {
defer wg.Done()
for doc := range docs {
output <- doc
Expand All @@ -55,7 +215,7 @@ func CrawlerRunner(ctx context.Context,

// Run this crawler and capture its returned error.
go func(idx int, crawler Crawler,
docs chan<- *doc.KustomizationDocument) {
docs chan<- CrawlerDocument) {

defer func() {
wg.Done()
Expand Down

0 comments on commit c2d6f09

Please sign in to comment.