Skip to content

Commit

Permalink
Implementation of basic crawler organisation.
Browse files Browse the repository at this point in the history
`crawler.Crawler` interface is defined, where a crawler has to implement
a `Crawl` method that forwards document found by the crawler to a channel.

A helper function that launches a list of crawlers concurrently and
merges their channels into one main output channel, forwarding errors is
also implemented.

Finally, a test that verifies the correctness and concurrency of the
helper method is provided.
  • Loading branch information
damienr74 committed Aug 16, 2019
1 parent c02b4f3 commit ca41674
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 0 deletions.
76 changes: 76 additions & 0 deletions internal/search/crawler/crawler.go
@@ -0,0 +1,76 @@
// Package crawler provides helper methods and defines an interface for lauching
// source repository crawlers that retrieve files from a source and forwards
// to a channel for indexing and retrieval.
package crawler

import (
"context"
"fmt"
"sync"

"sigs.k8s.io/kustomize/internal/search/doc"
)

// Crawler forwards documents from source repositories to index and store them
// for searching. Each crawler is responsible for querying it's source of
// information, and forwarding files that have not been seen before or that need
// updating.
type Crawler interface {
// Crawl returns when it is done processing. This method does not take
// ownership of the channel. The channel is write only, and it
// designates where the crawler should forward the documents.
Crawl(ctx context.Context, output chan<- *doc.KustomizationDocument) error
}

// CrawlerRunner is a blocking function and only returns once all of the
// crawlers are finished with execution.
//
// This function uses the output channel to forward kustomization documents
// from a list of crawlers. The output is to be consumed by a database/search
// indexer for later retrieval.
//
// The return value is an array of errors in which each index represents the
// index of the crawler that emitted the error. Although the errors themselves
// can be nil, the array will always be exactly the size of the crawlers array.
func CrawlerRunner(ctx context.Context,
output chan<- *doc.KustomizationDocument, crawlers []Crawler) []error {

errs := make([]error, len(crawlers))
wg := sync.WaitGroup{}

for i, crawler := range crawlers {
// Crawler implementations get their own channels to prevent a
// crawler from closing the main output channel.
docs := make(chan *doc.KustomizationDocument)
wg.Add(2)

// Forward all of the documents from this crawler's channel to
// the main output channel.
go func(docs <-chan *doc.KustomizationDocument) {
defer wg.Done()
for doc := range docs {
output <- doc
}
}(docs)

// Run this crawler and capture its returned error.
go func(idx int, crawler Crawler,
docs chan<- *doc.KustomizationDocument) {

defer func() {
wg.Done()
if r := recover(); r != nil {
errs[idx] = fmt.Errorf(
"%+v panicked: %v, additional error %v",
crawler, r, errs[idx],
)
}
}()
defer close(docs)
errs[idx] = crawler.Crawl(ctx, docs)
}(i, crawler, docs) // Copies the index and the crawler
}

wg.Wait()
return errs
}
124 changes: 124 additions & 0 deletions internal/search/crawler/crawler_test.go
@@ -0,0 +1,124 @@
package crawler

import (
"context"
"errors"
"reflect"
"sort"
"sync"
"testing"

"sigs.k8s.io/kustomize/internal/search/doc"
)

// Simple crawler that forwards it's list of documents to a provided channel and
// returns it's error to the caller.
type testCrawler struct {
docs []doc.KustomizationDocument
err error
}

// Crawl implements the Crawler interface for testing.
func (c testCrawler) Crawl(ctx context.Context,
output chan<- *doc.KustomizationDocument) error {

for i := range c.docs {
output <- &c.docs[i]
}
return c.err
}

// Used to make sure that we're comparing documents in order. This is needed
// since these documents will be sent concurrently.
type sortableDocs []doc.KustomizationDocument

func (s sortableDocs) Less(i, j int) bool {
return s[i].FilePath < s[j].FilePath
}

func (s sortableDocs) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

func (s sortableDocs) Len() int {
return len(s)
}

func TestCrawlerRunner(t *testing.T) {
tests := []struct {
tc []Crawler
errs []error
docs sortableDocs
}{
{
tc: []Crawler{
testCrawler{
docs: []doc.KustomizationDocument{
{FilePath: "crawler1/doc1"},
{FilePath: "crawler1/doc2"},
{FilePath: "crawler1/doc3"},
},
},
testCrawler{err: errors.New("crawler2")},
testCrawler{},
testCrawler{
docs: []doc.KustomizationDocument{
{FilePath: "crawler4/doc1"},
{FilePath: "crawler4/doc2"},
},
err: errors.New("crawler4"),
},
},
errs: []error{
nil,
errors.New("crawler2"),
nil,
errors.New("crawler4"),
},
docs: sortableDocs{
{FilePath: "crawler1/doc1"},
{FilePath: "crawler1/doc2"},
{FilePath: "crawler1/doc3"},
{FilePath: "crawler4/doc1"},
{FilePath: "crawler4/doc2"},
},
},
}

for _, test := range tests {
output := make(chan *doc.KustomizationDocument)
wg := sync.WaitGroup{}
wg.Add(1)

// Run the Crawler runner with a list of crawlers.
go func() {
defer close(output)
defer wg.Done()

errs := CrawlerRunner(context.Background(), output,
test.tc)

// Check that errors are returned as they should be.
if !reflect.DeepEqual(errs, test.errs) {
t.Errorf("Expected errs (%v) to equal (%v)",
errs, test.errs)
}

}()

// Iterate over the output channel of Crawler runner.
returned := make(sortableDocs, 0, len(test.docs))
for doc := range output {
returned = append(returned, *doc)
}

// Check that all documents are received.
sort.Sort(returned)
if !reflect.DeepEqual(returned, test.docs) {
t.Errorf("Expected docs (%v) to equal (%v)\n",
returned, test.docs)
}

wg.Wait()
}
}

0 comments on commit ca41674

Please sign in to comment.