Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change source reporter interfaces to not return errors #3900

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions pkg/handlers/handlers.go
Original file line number Diff line number Diff line change
@@ -429,9 +429,7 @@ func handleChunksWithError(
if len(dataOrErr.Data) > 0 {
chunk := *chunkSkel
chunk.Data = dataOrErr.Data
if err := reporter.ChunkOk(ctx, chunk); err != nil {
return fmt.Errorf("error reporting chunk: %w", err)
}
reporter.ChunkOk(ctx, chunk)
}
case <-ctx.Done():
return ctx.Err()
5 changes: 2 additions & 3 deletions pkg/handlers/handlers_test.go
Original file line number Diff line number Diff line change
@@ -785,12 +785,11 @@ func getGitCommitHash(t *testing.T, gitDir string) string {

type mockReporter struct{ reportedChunks int }

func (m *mockReporter) ChunkOk(context.Context, sources.Chunk) error {
func (m *mockReporter) ChunkOk(context.Context, sources.Chunk) {
m.reportedChunks++
return nil
}

func (m *mockReporter) ChunkErr(context.Context, error) error { return nil }
func (m *mockReporter) ChunkErr(context.Context, error) {}

func TestHandleChunksWithError(t *testing.T) {
tests := []struct {
27 changes: 11 additions & 16 deletions pkg/sources/filesystem/filesystem.go
Original file line number Diff line number Diff line change
@@ -195,21 +195,18 @@ func (s *Source) Enumerate(ctx context.Context, reporter sources.UnitReporter) e
for _, path := range s.paths {
fileInfo, err := os.Lstat(filepath.Clean(path))
if err != nil {
if err := reporter.UnitErr(ctx, err); err != nil {
return err
}
reporter.UnitErr(ctx, err)
continue
}
if !fileInfo.IsDir() {
item := sources.CommonSourceUnit{ID: path}
if err := reporter.UnitOk(ctx, item); err != nil {
return err
}
reporter.UnitOk(ctx, item)
continue
}
err = fs.WalkDir(os.DirFS(path), ".", func(relativePath string, d fs.DirEntry, err error) error {
if err != nil {
return reporter.UnitErr(ctx, err)
reporter.UnitErr(ctx, err)
return nil
Comment on lines -212 to +209
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the cases where we were returning the reporter error, I changed it to return nil. This would be a behavior change for context cancellations, so maybe we should return ctx.Err() instead. Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that returning ctx.Err() instead is a good idea!

}
if d.IsDir() {
return nil
@@ -219,12 +216,11 @@ func (s *Source) Enumerate(ctx context.Context, reporter sources.UnitReporter) e
return nil
}
item := sources.CommonSourceUnit{ID: fullPath}
return reporter.UnitOk(ctx, item)
reporter.UnitOk(ctx, item)
return nil
})
if err != nil {
if err := reporter.UnitErr(ctx, err); err != nil {
return err
}
reporter.UnitErr(ctx, err)
}
}
return nil
@@ -238,7 +234,8 @@ func (s *Source) ChunkUnit(ctx context.Context, unit sources.SourceUnit, reporte
cleanPath := filepath.Clean(path)
fileInfo, err := os.Lstat(cleanPath)
if err != nil {
return reporter.ChunkErr(ctx, fmt.Errorf("unable to get file info: %w", err))
reporter.ChunkErr(ctx, fmt.Errorf("unable to get file info: %w", err))
return nil
}

ch := make(chan *sources.Chunk)
@@ -259,16 +256,14 @@ func (s *Source) ChunkUnit(ctx context.Context, unit sources.SourceUnit, reporte
if chunk == nil {
continue
}
if err := reporter.ChunkOk(ctx, *chunk); err != nil {
return err
}
reporter.ChunkOk(ctx, *chunk)
}

if scanErr != nil && !errors.Is(scanErr, io.EOF) {
if !errors.Is(scanErr, skipSymlinkErr) {
logger.Error(scanErr, "error scanning filesystem")
}
return reporter.ChunkErr(ctx, scanErr)
reporter.ChunkErr(ctx, scanErr)
}
return nil
}
40 changes: 14 additions & 26 deletions pkg/sources/git/git.go
Original file line number Diff line number Diff line change
@@ -299,7 +299,7 @@ func (s *Source) scanRepo(ctx context.Context, repoURI string, reporter sources.
return s.git.ScanRepo(ctx, repo, path, s.scanOptions, reporter)
}()
if err != nil {
return reporter.ChunkErr(ctx, err)
reporter.ChunkErr(ctx, err)
}
return nil
}
@@ -330,7 +330,8 @@ func (s *Source) scanDir(ctx context.Context, gitDir string, reporter sources.Ch
// try paths instead of url
repo, err := RepoFromPath(gitDir, s.scanOptions.Bare)
if err != nil {
return reporter.ChunkErr(ctx, err)
reporter.ChunkErr(ctx, err)
return nil
}

err = func() error {
@@ -341,7 +342,7 @@ func (s *Source) scanDir(ctx context.Context, gitDir string, reporter sources.Ch
return s.git.ScanRepo(ctx, repo, gitDir, s.scanOptions, reporter)
}()
if err != nil {
return reporter.ChunkErr(ctx, err)
reporter.ChunkErr(ctx, err)
}
return nil
}
@@ -626,9 +627,7 @@ func (s *Git) ScanCommits(ctx context.Context, repo *git.Repository, path string
Data: []byte(sb.String()),
Verify: s.verify,
}
if err := reporter.ChunkOk(ctx, chunk); err != nil {
return err
}
reporter.ChunkOk(ctx, chunk)
}

fileName := diff.PathB
@@ -701,7 +700,8 @@ func (s *Git) ScanCommits(ctx context.Context, repo *git.Repository, path string
Data: data,
Verify: s.verify,
}
return reporter.ChunkOk(ctx, chunk)
reporter.ChunkOk(ctx, chunk)
return nil
}
if err := chunkData(diff); err != nil {
return err
@@ -739,10 +739,7 @@ func (s *Git) gitChunk(ctx context.Context, diff *gitparse.Diff, fileName, email
Data: append([]byte{}, newChunkBuffer.Bytes()...),
Verify: s.verify,
}
if err := reporter.ChunkOk(ctx, chunk); err != nil {
// TODO: Return error.
return
}
reporter.ChunkOk(ctx, chunk)

newChunkBuffer.Reset()
lastOffset = offset
@@ -759,10 +756,7 @@ func (s *Git) gitChunk(ctx context.Context, diff *gitparse.Diff, fileName, email
Data: line,
Verify: s.verify,
}
if err := reporter.ChunkOk(ctx, chunk); err != nil {
// TODO: Return error.
return
}
reporter.ChunkOk(ctx, chunk)
continue
}
}
@@ -783,10 +777,7 @@ func (s *Git) gitChunk(ctx context.Context, diff *gitparse.Diff, fileName, email
Data: append([]byte{}, newChunkBuffer.Bytes()...),
Verify: s.verify,
}
if err := reporter.ChunkOk(ctx, chunk); err != nil {
// TODO: Return error.
return
}
reporter.ChunkOk(ctx, chunk)
}
}

@@ -903,7 +894,8 @@ func (s *Git) ScanStaged(ctx context.Context, repo *git.Repository, path string,
Data: data,
Verify: s.verify,
}
return reporter.ChunkOk(ctx, chunk)
reporter.ChunkOk(ctx, chunk)
return nil
}
if err := chunkData(diff); err != nil {
return err
@@ -1302,18 +1294,14 @@ func (s *Source) Enumerate(ctx context.Context, reporter sources.UnitReporter) e
continue
}
unit := SourceUnit{ID: repo, Kind: UnitDir}
if err := reporter.UnitOk(ctx, unit); err != nil {
return err
}
reporter.UnitOk(ctx, unit)
}
for _, repo := range s.conn.GetRepositories() {
if repo == "" {
continue
}
unit := SourceUnit{ID: repo, Kind: UnitRepo}
if err := reporter.UnitOk(ctx, unit); err != nil {
return err
}
reporter.UnitOk(ctx, unit)
}
return nil
}
66 changes: 22 additions & 44 deletions pkg/sources/github/github.go
Original file line number Diff line number Diff line change
@@ -341,9 +341,7 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk, tar
// We don't care about handling enumerated values as they happen during
// the normal Chunks flow because we enumerate and scan in two steps.
noopReporter := sources.VisitorReporter{
VisitUnit: func(context.Context, sources.SourceUnit) error {
return nil
},
VisitUnit: func(context.Context, sources.SourceUnit) {},
}
err := s.Enumerate(ctx, noopReporter)
if err != nil {
@@ -361,18 +359,18 @@ func (s *Source) Enumerate(ctx context.Context, reporter sources.UnitReporter) e
seenUnits := make(map[sources.SourceUnit]struct{})
// Wrapper reporter to deduplicate and filter found units.
dedupeReporter := sources.VisitorReporter{
VisitUnit: func(ctx context.Context, su sources.SourceUnit) error {
VisitUnit: func(ctx context.Context, su sources.SourceUnit) {
// Only report units that passed the user configured filter.
name := su.Display()
if !s.filteredRepoCache.Exists(name) {
return ctx.Err()
return
}
// Only report a unit once.
if _, ok := seenUnits[su]; ok {
return ctx.Err()
return
}
seenUnits[su] = struct{}{}
return reporter.UnitOk(ctx, su)
reporter.UnitOk(ctx, su)
},
VisitErr: reporter.UnitErr,
}
@@ -383,13 +381,9 @@ func (s *Source) Enumerate(ctx context.Context, reporter sources.UnitReporter) e
url, _ := s.filteredRepoCache.Get(name)
url, err := s.ensureRepoInfoCache(ctx, url, &unitErrorReporter{reporter})
if err != nil {
if err := dedupeReporter.UnitErr(ctx, err); err != nil {
return err
}
}
if err := dedupeReporter.UnitOk(ctx, RepoUnit{Name: name, URL: url}); err != nil {
return err
dedupeReporter.UnitErr(ctx, err)
}
dedupeReporter.UnitOk(ctx, RepoUnit{Name: name, URL: url})
}

// I'm not wild about switching on the connector type here (as opposed to dispatching to the connector itself) but
@@ -420,7 +414,7 @@ func (s *Source) Enumerate(ctx context.Context, reporter sources.UnitReporter) e
repo, err := s.ensureRepoInfoCache(ctx, repo, &unitErrorReporter{reporter})
if err != nil {
ctx.Logger().Error(err, "error caching repo info")
_ = dedupeReporter.UnitErr(ctx, fmt.Errorf("error caching repo info: %w", err))
dedupeReporter.UnitErr(ctx, fmt.Errorf("error caching repo info: %w", err))
}
s.repos = append(s.repos, repo)
}
@@ -693,9 +687,7 @@ func (s *Source) scanRepo(ctx context.Context, repoURL string, reporter sources.
// Ignore "Repository not found" errors.
// It's common for GitHub's API to say a repo has a wiki when it doesn't.
if !strings.Contains(err.Error(), "not found") {
if err := reporter.ChunkErr(ctx, fmt.Errorf("error scanning wiki: %w", err)); err != nil {
return err
}
reporter.ChunkErr(ctx, fmt.Errorf("error scanning wiki: %w", err))
}

// Don't return, it still might be possible to scan comments.
@@ -706,9 +698,7 @@ func (s *Source) scanRepo(ctx context.Context, repoURL string, reporter sources.
if s.includeGistComments || s.includeIssueComments || s.includePRComments {
if err := s.scanComments(ctx, repoURL, repoInfo, reporter); err != nil {
err := fmt.Errorf("error scanning comments: %w", err)
if err := reporter.ChunkErr(ctx, err); err != nil {
return err
}
reporter.ChunkErr(ctx, err)
}
}

@@ -745,25 +735,25 @@ var (

// errorReporter is an interface that captures just the error reporting functionality
type errorReporter interface {
Err(ctx context.Context, err error) error
Err(ctx context.Context, err error)
}

// wrapper to adapt UnitReporter to errorReporter
type unitErrorReporter struct {
reporter sources.UnitReporter
}

func (u unitErrorReporter) Err(ctx context.Context, err error) error {
return u.reporter.UnitErr(ctx, err)
func (u unitErrorReporter) Err(ctx context.Context, err error) {
u.reporter.UnitErr(ctx, err)
}

// wrapper to adapt ChunkReporter to errorReporter
type chunkErrorReporter struct {
reporter sources.ChunkReporter
}

func (c chunkErrorReporter) Err(ctx context.Context, err error) error {
return c.reporter.ChunkErr(ctx, err)
func (c chunkErrorReporter) Err(ctx context.Context, err error) {
c.reporter.ChunkErr(ctx, err)
}

// handleRateLimit handles GitHub API rate limiting with an optional error reporter.
@@ -814,7 +804,7 @@ func (s *Source) handleRateLimit(ctx context.Context, errIn error, reporters ...
ctx.Logger().Info(fmt.Sprintf("exceeded %s rate limit", limitType), "retry_after", retryAfter.String(), "resume_time", rateLimitResumeTime.Format(time.RFC3339))
// Only report the error if a reporter was provided
for _, reporter := range reporters {
_ = reporter.Err(ctx, fmt.Errorf("exceeded %s rate limit", limitType))
reporter.Err(ctx, fmt.Errorf("exceeded %s rate limit", limitType))
}
} else {
retryAfter = (5 * time.Minute) + jitter
@@ -874,9 +864,7 @@ func (s *Source) addUserGistsToCache(ctx context.Context, user string, reporter
for _, gist := range gists {
s.filteredRepoCache.Set(gist.GetID(), gist.GetGitPullURL())
s.cacheGistInfo(gist)
if err := reporter.UnitOk(ctx, GistUnit{Name: gist.GetID(), URL: gist.GetGitPullURL()}); err != nil {
return err
}
reporter.UnitOk(ctx, GistUnit{Name: gist.GetID(), URL: gist.GetGitPullURL()})
}

if res == nil || res.NextPage == 0 {
@@ -1184,9 +1172,7 @@ func (s *Source) chunkGistComments(ctx context.Context, gistURL string, gistInfo
Verify: s.verify,
}

if err := reporter.ChunkOk(ctx, chunk); err != nil {
return err
}
reporter.ChunkOk(ctx, chunk)
}
return nil
}
@@ -1293,9 +1279,7 @@ func (s *Source) chunkIssues(ctx context.Context, repoInfo repoInfo, issues []*g
Verify: s.verify,
}

if err := reporter.ChunkOk(ctx, chunk); err != nil {
return err
}
reporter.ChunkOk(ctx, chunk)
}
return nil
}
@@ -1360,9 +1344,7 @@ func (s *Source) chunkIssueComments(ctx context.Context, repoInfo repoInfo, comm
Verify: s.verify,
}

if err := reporter.ChunkOk(ctx, chunk); err != nil {
return err
}
reporter.ChunkOk(ctx, chunk)
}
return nil
}
@@ -1456,9 +1438,7 @@ func (s *Source) chunkPullRequests(ctx context.Context, repoInfo repoInfo, prs [
Verify: s.verify,
}

if err := reporter.ChunkOk(ctx, chunk); err != nil {
return err
}
reporter.ChunkOk(ctx, chunk)
}
return nil
}
@@ -1492,9 +1472,7 @@ func (s *Source) chunkPullRequestComments(ctx context.Context, repoInfo repoInfo
Verify: s.verify,
}

if err := reporter.ChunkOk(ctx, chunk); err != nil {
return err
}
reporter.ChunkOk(ctx, chunk)
}
return nil
}
Loading
Oops, something went wrong.
Loading
Oops, something went wrong.