Skip to content

Commit

Permalink
Print a notice when the file reader reaches EOF
Browse files Browse the repository at this point in the history
  • Loading branch information
jmacd committed May 20, 2024
1 parent c623b97 commit d155204
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 6 deletions.
6 changes: 5 additions & 1 deletion collector/receiver/filereceiver/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)

// stringReader is the only function we use from *bufio.Reader. We define it
Expand All @@ -38,12 +39,14 @@ type fileReader struct {
unmarshaler unmarshaler
consumer consumerType
timer *replayTimer
logger *zap.Logger
}

func newFileReader(consumer consumerType, file *os.File, timer *replayTimer, format string, compression string) fileReader {
func newFileReader(consumer consumerType, file *os.File, timer *replayTimer, format string, compression string, logger *zap.Logger) fileReader {
fr := fileReader{
consumer: consumer,
timer: timer,
logger: logger,
}

if compression == compressionTypeZSTD {
Expand Down Expand Up @@ -96,6 +99,7 @@ func (fr fileReader) readAllLines(ctx context.Context) error {

if err != nil {
if errors.Is(err, io.EOF) {
fr.logger.Info("reached end-of-file")
return nil
}
return err
Expand Down
9 changes: 5 additions & 4 deletions collector/receiver/filereceiver/file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)

func TestFileReader_Readline(t *testing.T) {
Expand All @@ -23,7 +24,7 @@ func TestFileReader_Readline(t *testing.T) {
}
f, err := os.Open(filepath.Join("testdata", "metrics.json"))
require.NoError(t, err)
fr := newFileReader(cons, f, newReplayTimer(0), "json", "none")
fr := newFileReader(cons, f, newReplayTimer(0), "json", "none", zap.Must(zap.NewDevelopment()))
err = fr.readMetricLine(context.Background())
require.NoError(t, err)
assert.Equal(t, 1, len(tc.consumed))
Expand All @@ -43,7 +44,7 @@ func TestFileReader_ReadChunk(t *testing.T) {
}
f, err := os.Open(filepath.Join("testdata", "metrics.pb"))
require.NoError(t, err)
fr := newFileReader(cons, f, newReplayTimer(0), "proto", "")
fr := newFileReader(cons, f, newReplayTimer(0), "proto", "", zap.Must(zap.NewDevelopment()))
err = fr.readMetricChunk(context.Background())
require.NoError(t, err)
assert.Equal(t, 1, len(tc.consumed))
Expand Down Expand Up @@ -84,7 +85,7 @@ func TestFileReader_ReadAll(t *testing.T) {
throttle: 2,
sleepFunc: sleeper.fakeSleep,
}
fr := newFileReader(cons, f, rt, "json", "")
fr := newFileReader(cons, f, rt, "json", "", zap.Must(zap.NewDevelopment()))
err = fr.readAllLines(context.Background())
require.NoError(t, err)
const expectedSleeps = 10
Expand All @@ -110,7 +111,7 @@ func TestFileReader_ReadAllChunks(t *testing.T) {
throttle: 2,
sleepFunc: sleeper.fakeSleep,
}
fr := newFileReader(cons, f, rt, "proto", "")
fr := newFileReader(cons, f, rt, "proto", "", zap.Must(zap.NewDevelopment()))
err = fr.readAllChunks(context.Background())
require.NoError(t, err)
const expectedSleeps = 10
Expand Down
2 changes: 1 addition & 1 deletion collector/receiver/filereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (r *fileReceiver) Start(ctx context.Context, _ component.Host) error {
return fmt.Errorf("failed to open file %q: %w", r.path, err)
}

fr := newFileReader(r.consumer, file, newReplayTimer(r.throttle), r.format, r.compression)
fr := newFileReader(r.consumer, file, newReplayTimer(r.throttle), r.format, r.compression, r.logger)
go func() {
var err error
if r.format == formatTypeProto {
Expand Down

0 comments on commit d155204

Please sign in to comment.