Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change source reporter interfaces to not return errors #3900

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update common implementations
  • Loading branch information
mcastorina committed Feb 11, 2025
commit 2edb879222146fb7cfdd9da6a7501739a509a8e2
21 changes: 10 additions & 11 deletions pkg/sources/legacy_reporters.go
Original file line number Diff line number Diff line change
@@ -12,13 +12,12 @@ type ChanReporter struct {
Ch chan<- *Chunk
}

func (c ChanReporter) ChunkOk(ctx context.Context, chunk Chunk) error {
return common.CancellableWrite(ctx, c.Ch, &chunk)
func (c ChanReporter) ChunkOk(ctx context.Context, chunk Chunk) {
_ = common.CancellableWrite(ctx, c.Ch, &chunk)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you leave a comment explaining what kind of errors can get swallowed here and why that's ok to do?

}

func (ChanReporter) ChunkErr(ctx context.Context, err error) error {
func (ChanReporter) ChunkErr(ctx context.Context, err error) {
ctx.Logger().Error(err, "error chunking")
return ctx.Err()
}

var _ UnitReporter = (*VisitorReporter)(nil)
@@ -27,18 +26,18 @@ var _ UnitReporter = (*VisitorReporter)(nil)
// finding units and reporting errors. VisitErr is optional; if unset it will
// log the error.
type VisitorReporter struct {
VisitUnit func(context.Context, SourceUnit) error
VisitErr func(context.Context, error) error
VisitUnit func(context.Context, SourceUnit)
VisitErr func(context.Context, error)
}

func (v VisitorReporter) UnitOk(ctx context.Context, unit SourceUnit) error {
return v.VisitUnit(ctx, unit)
func (v VisitorReporter) UnitOk(ctx context.Context, unit SourceUnit) {
v.VisitUnit(ctx, unit)
}

func (v VisitorReporter) UnitErr(ctx context.Context, err error) error {
func (v VisitorReporter) UnitErr(ctx context.Context, err error) {
if v.VisitErr == nil {
ctx.Logger().Error(err, "error enumerating")
return ctx.Err()
return
}
return v.VisitErr(ctx, err)
v.VisitErr(ctx, err)
}
14 changes: 6 additions & 8 deletions pkg/sources/source_manager.go
Original file line number Diff line number Diff line change
@@ -649,16 +649,15 @@ type mgrUnitReporter struct {

// UnitOk implements the UnitReporter interface by recording the unit in the
// report and sending it on the SourceUnit channel.
func (s *mgrUnitReporter) UnitOk(ctx context.Context, unit SourceUnit) error {
func (s *mgrUnitReporter) UnitOk(ctx context.Context, unit SourceUnit) {
s.report.ReportUnit(unit)
return common.CancellableWrite(ctx, s.unitCh, unit)
_ = common.CancellableWrite(ctx, s.unitCh, unit)
}

// UnitErr implements the UnitReporter interface by recording the error in the
// report.
func (s *mgrUnitReporter) UnitErr(ctx context.Context, err error) error {
func (s *mgrUnitReporter) UnitErr(ctx context.Context, err error) {
s.report.ReportError(err)
return nil
}

// mgrChunkReporter implements the ChunkReporter interface.
@@ -672,14 +671,13 @@ type mgrChunkReporter struct {

// ChunkOk implements the ChunkReporter interface by recording the chunk and
// its associated unit in the report and sending it on the Chunk channel.
func (s *mgrChunkReporter) ChunkOk(ctx context.Context, chunk Chunk) error {
func (s *mgrChunkReporter) ChunkOk(ctx context.Context, chunk Chunk) {
s.report.ReportChunk(s.unit, &chunk)
return common.CancellableWrite(ctx, s.chunkCh, &chunk)
_ = common.CancellableWrite(ctx, s.chunkCh, &chunk)
}

// ChunkErr implements the ChunkReporter interface by recording the error and
// its associated unit in the report.
func (s *mgrChunkReporter) ChunkErr(ctx context.Context, err error) error {
func (s *mgrChunkReporter) ChunkErr(ctx context.Context, err error) {
s.report.ReportError(ChunkError{s.unit, err})
return nil
}
10 changes: 4 additions & 6 deletions pkg/sources/sources.go
Original file line number Diff line number Diff line change
@@ -110,20 +110,18 @@ type baseUnitReporter struct {
progress *JobProgress
}

func (b baseUnitReporter) UnitOk(ctx context.Context, unit SourceUnit) error {
func (b baseUnitReporter) UnitOk(ctx context.Context, unit SourceUnit) {
b.progress.ReportUnit(unit)
if b.child != nil {
return b.child.UnitOk(ctx, unit)
b.child.UnitOk(ctx, unit)
}
return nil
}

func (b baseUnitReporter) UnitErr(ctx context.Context, err error) error {
func (b baseUnitReporter) UnitErr(ctx context.Context, err error) {
b.progress.ReportError(err)
if b.child != nil {
return b.child.UnitErr(ctx, err)
b.child.UnitErr(ctx, err)
}
return nil
}

// UnitReporter defines the interface a source will use to report whether a