Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle LineReader buffer wrapparound in LineProtocolFilter (#502) #503

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 29 additions & 14 deletions pkg/csv2lp/lp_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ type LineProtocolFilterReader struct {
lineReader *LineReader
// LineNumber represents line number of csv.Reader, 1 is the first
LineNumber int
// line buffer
line []byte
// wrapparound buffer
wrapBuf []byte
}

// LineProtocolFilter creates a reader wrapper that parses points, skipping if invalid
Expand All @@ -21,24 +25,35 @@ func LineProtocolFilter(reader io.Reader) *LineProtocolFilterReader {
lineReader.LineNumber = 1 // start counting from 1
return &LineProtocolFilterReader{
lineReader: lineReader,
line: make([]byte, 4096),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4096 should be the defaultBufSize constant, not a literal

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I am suggesting elsewhere that this buffer is unnecessary.

wrapBuf: make([]byte, 4096),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4096 should be the defaultBufSize constant, not a literal

Copy link
Contributor

@davidby-influx davidby-influx Mar 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: I am suggesting elsewhere that this buffer is not necessary.

}
}

func (state *LineProtocolFilterReader) Read(b []byte) (int, error) {
for {
bytesRead, err := state.lineReader.Read(b)
if err != nil {
return bytesRead, err
}
state.LineNumber = state.lineReader.LastLineNumber
buf := b[0:bytesRead]
pts, err := models.ParsePoints(buf) // any time precision because we won't actually use this point
if err != nil {
log.Printf("invalid point on line %d: %v\n", state.LineNumber, err)
continue
} else if len(pts) == 0 { // no points on this line
continue
bytesRead, err := state.lineReader.Read(state.line)
if err != nil && bytesRead == 0 {
return 0, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conditional changes the behavior; in the case where bytes are read, but an error is returned (perhaps EOF from a final line without a newline), this will continue, whereas the existing code will return the bytesRead and the error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a fix for a problem you found, or an inadvertent change in behavior?

}

//read again when we read a partial line at the end of the line reader buffer
if bytesRead > 0 && state.line[bytesRead-1] != '\n' {
wrapBytesRead, _ := state.lineReader.Read(state.wrapBuf[0:])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code ignores any errors from lineReader.Read. Errors need to be handled and returned up the stack.

if wrapBytesRead > 0 {
copy(state.line[bytesRead:], state.wrapBuf[0:wrapBytesRead])
bytesRead = bytesRead + wrapBytesRead
}
return bytesRead, nil
}

state.LineNumber = state.lineReader.LastLineNumber
pts, err := models.ParsePoints(state.line[0:bytesRead]) // any time precision because we won't actually use this point
if err != nil {
log.Printf("invalid point on line %d: %v\n", state.LineNumber, err)
return 0, nil
} else if len(pts) == 0 {
return 0, nil
}

copy(b, state.line[0:bytesRead])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code has an extra copy for every read, and sometimes two extra copies on a line wrap.

Are those copies necessary? Would it be possible to do all the reads into the passed in buffer, b, calling the second read (when there is no \n found) b[bytesRead:] or something like that?

return bytesRead, nil
}
65 changes: 62 additions & 3 deletions pkg/csv2lp/lp_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,59 @@ import (
"github.com/stretchr/testify/require"
)

var testData = []string{
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.10149335861206055 1680843957087735646",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.10599446296691895 1680844070301214389",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.11674189567565918 1680844094394325888",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.12114214897155762 1680844139344799828",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.11209964752197266 1680844301203178190",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.1039726734161377 1680844347876425153",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.1268465518951416 1680844370106414825",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.12303543090820312 1680844646311648005",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.11537313461303711 1680844695335169551",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.10717368125915527 1680845112709821861",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.1027822494506836 1680845196950133820",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.12188148498535156 1680845320198611842",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.10436582565307617 1680845335147764695",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.11761021614074707 1680845473279430934",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.10162472724914551 1680845611228521315",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.12307000160217285 1680845680031549517",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.11197781562805176 1680845736813021569",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.11208724975585938 1680846223638448981",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.0956425666809082 1680846370123147298",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.1171119213104248 1680846439166725830",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.1193227767944336 1680846639652055730",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.11297607421875 1680846714839454444",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.11377334594726562 1680846991135713316",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.10156655311584473 1680847055803239552",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.09974861145019531 1680847194100005178",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration=0.1182551383972168 1680847198099239798",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680843957087735646",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680844070301214389",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680844094394325888",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680844139344799828",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680844301203178190",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680844347876425153",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680844370106414825",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680844646311648005",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680844695335169551",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680845112709821861",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680845196950133820",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680845320198611842",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680845335147764695",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680845473279430934",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680845611228521315",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680845680031549517",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680845736813021569",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680846223638448981",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680846370123147298",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680846439166725830",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680846639652055730",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680846714839454444",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680846991135713316",
"celery_task_duration,env=MOOO,host=foobar-batch,queue=high,service=BATCH,task=monitoring.tasks.celery_throughput_high duration_limit_exceeded=0i 1680847055803239552",
}

func TestLineProtocolFilter(t *testing.T) {
var tests = []struct {
input string
Expand Down Expand Up @@ -55,14 +108,20 @@ func TestLineProtocolFilter(t *testing.T) {
"weather,location=us-central temperature=31 1465839830100400205",
}, "\n"),
},
{
strings.Join(testData, "\n"),
strings.Join(testData, "\n"),
},
}
for _, tt := range tests {
reader := LineProtocolFilter(strings.NewReader(tt.input))
b, err := io.ReadAll(reader)
if err != nil {
// test data should fit in b
b := make([]byte, 163984)
bytesRead, err := io.ReadFull(reader, b)
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
t.Errorf("failed reading: %v", err)
continue
}
require.Equal(t, strings.TrimSpace(string(b)), strings.TrimSpace(tt.expected))
require.Equal(t, strings.TrimSpace(tt.expected), strings.TrimSpace(string(b[0:bytesRead])))
}
}