Skip to content

Commit

Permalink
fix: return and respect cursor errors (#24791) (#24846) (#24847)
Browse files Browse the repository at this point in the history
ArrayCursors were ignoring errors, which led to panics when nil
cursors were operated on. This fix passes errors back up the stack
and uses them to enforce healthy cursor creation.

Closes #24789
---------
Co-authored-by: Stuart Carnie <stuart.carnie@gmail.com>

(cherry picked from commit fe6c64b)

closes #24836

(cherry picked from commit 49d0bef)

closes #24826
  • Loading branch information
davidby-influx committed Mar 27, 2024
1 parent d2af577 commit 8f09e36
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 93 deletions.
40 changes: 25 additions & 15 deletions storage/reads/array_cursor.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,15 @@ func (c *floatMultiShardArrayCursor) nextArrayCursor() bool {

var itr cursors.CursorIterator
var cur cursors.Cursor
for cur == nil && len(c.itrs) > 0 {
var err error
for cur == nil && len(c.itrs) > 0 && err == nil {
itr, c.itrs = c.itrs[0], c.itrs[1:]
cur, _ = itr.Next(c.ctx, c.req)
cur, err = itr.Next(c.ctx, c.req)
}

c.err = err
var ok bool
if cur != nil {
if cur != nil && err == nil {
var next cursors.FloatArrayCursor
next, ok = cur.(cursors.FloatArrayCursor)
if !ok {
Expand Down Expand Up @@ -1196,13 +1198,15 @@ func (c *integerMultiShardArrayCursor) nextArrayCursor() bool {

var itr cursors.CursorIterator
var cur cursors.Cursor
for cur == nil && len(c.itrs) > 0 {
var err error
for cur == nil && len(c.itrs) > 0 && err == nil {
itr, c.itrs = c.itrs[0], c.itrs[1:]
cur, _ = itr.Next(c.ctx, c.req)
cur, err = itr.Next(c.ctx, c.req)
}

c.err = err
var ok bool
if cur != nil {
if cur != nil && err == nil {
var next cursors.IntegerArrayCursor
next, ok = cur.(cursors.IntegerArrayCursor)
if !ok {
Expand Down Expand Up @@ -2077,13 +2081,15 @@ func (c *unsignedMultiShardArrayCursor) nextArrayCursor() bool {

var itr cursors.CursorIterator
var cur cursors.Cursor
for cur == nil && len(c.itrs) > 0 {
var err error
for cur == nil && len(c.itrs) > 0 && err == nil {
itr, c.itrs = c.itrs[0], c.itrs[1:]
cur, _ = itr.Next(c.ctx, c.req)
cur, err = itr.Next(c.ctx, c.req)
}

c.err = err
var ok bool
if cur != nil {
if cur != nil && err == nil {
var next cursors.UnsignedArrayCursor
next, ok = cur.(cursors.UnsignedArrayCursor)
if !ok {
Expand Down Expand Up @@ -2958,13 +2964,15 @@ func (c *stringMultiShardArrayCursor) nextArrayCursor() bool {

var itr cursors.CursorIterator
var cur cursors.Cursor
for cur == nil && len(c.itrs) > 0 {
var err error
for cur == nil && len(c.itrs) > 0 && err == nil {
itr, c.itrs = c.itrs[0], c.itrs[1:]
cur, _ = itr.Next(c.ctx, c.req)
cur, err = itr.Next(c.ctx, c.req)
}

c.err = err
var ok bool
if cur != nil {
if cur != nil && err == nil {
var next cursors.StringArrayCursor
next, ok = cur.(cursors.StringArrayCursor)
if !ok {
Expand Down Expand Up @@ -3384,13 +3392,15 @@ func (c *booleanMultiShardArrayCursor) nextArrayCursor() bool {

var itr cursors.CursorIterator
var cur cursors.Cursor
for cur == nil && len(c.itrs) > 0 {
var err error
for cur == nil && len(c.itrs) > 0 && err == nil {
itr, c.itrs = c.itrs[0], c.itrs[1:]
cur, _ = itr.Next(c.ctx, c.req)
cur, err = itr.Next(c.ctx, c.req)
}

c.err = err
var ok bool
if cur != nil {
if cur != nil && err == nil {
var next cursors.BooleanArrayCursor
next, ok = cur.(cursors.BooleanArrayCursor)
if !ok {
Expand Down
10 changes: 6 additions & 4 deletions storage/reads/array_cursor.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,15 @@ func (c *{{.name}}MultiShardArrayCursor) nextArrayCursor() bool {

var itr cursors.CursorIterator
var cur cursors.Cursor
for cur == nil && len(c.itrs) > 0 {
var err error
for cur == nil && len(c.itrs) > 0 && err == nil {
itr, c.itrs = c.itrs[0], c.itrs[1:]
cur, _ = itr.Next(c.ctx, c.req)
}
cur, err = itr.Next(c.ctx, c.req)
}

c.err = err
var ok bool
if cur != nil {
if cur != nil && err == nil {
var next cursors.{{.Name}}ArrayCursor
next, ok = cur.(cursors.{{.Name}}ArrayCursor)
if !ok {
Expand Down
5 changes: 3 additions & 2 deletions storage/reads/array_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,13 @@ func (m *multiShardArrayCursors) createCursor(row SeriesRow) cursors.Cursor {

var shard cursors.CursorIterator
var cur cursors.Cursor
var err error
for cur == nil && len(row.Query) > 0 {
shard, row.Query = row.Query[0], row.Query[1:]
cur, _ = shard.Next(m.ctx, &m.req)
cur, err = shard.Next(m.ctx, &m.req)
}

if cur == nil {
if cur == nil || err != nil {
return nil
}

Expand Down
90 changes: 70 additions & 20 deletions tsdb/engine/tsm1/array_cursor.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,23 @@ func newFloatArrayAscendingCursor() *floatArrayAscendingCursor {
return c
}

func (c *floatArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *floatArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
c.end = end
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
return c.cache.values[i].UnixNano() >= seek
})

c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] >= seek
})
return nil
}

func (c *floatArrayAscendingCursor) Err() error { return nil }
Expand Down Expand Up @@ -182,7 +187,8 @@ func newFloatArrayDescendingCursor() *floatArrayDescendingCursor {
return c
}

func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
// Search for the time value greater than the seek time (not included)
// and then move our position back one which will include the values in
// our time range.
Expand All @@ -194,11 +200,15 @@ func (c *floatArrayDescendingCursor) reset(seek, end int64, cacheValues Values,
c.cache.pos--

c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadFloatArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] > seek
})
c.tsm.pos--
return nil
}

func (c *floatArrayDescendingCursor) Err() error { return nil }
Expand Down Expand Up @@ -321,18 +331,23 @@ func newIntegerArrayAscendingCursor() *integerArrayAscendingCursor {
return c
}

func (c *integerArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *integerArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
c.end = end
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
return c.cache.values[i].UnixNano() >= seek
})

c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] >= seek
})
return nil
}

func (c *integerArrayAscendingCursor) Err() error { return nil }
Expand Down Expand Up @@ -464,7 +479,8 @@ func newIntegerArrayDescendingCursor() *integerArrayDescendingCursor {
return c
}

func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
// Search for the time value greater than the seek time (not included)
// and then move our position back one which will include the values in
// our time range.
Expand All @@ -476,11 +492,15 @@ func (c *integerArrayDescendingCursor) reset(seek, end int64, cacheValues Values
c.cache.pos--

c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadIntegerArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] > seek
})
c.tsm.pos--
return nil
}

func (c *integerArrayDescendingCursor) Err() error { return nil }
Expand Down Expand Up @@ -603,18 +623,23 @@ func newUnsignedArrayAscendingCursor() *unsignedArrayAscendingCursor {
return c
}

func (c *unsignedArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *unsignedArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
c.end = end
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
return c.cache.values[i].UnixNano() >= seek
})

c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] >= seek
})
return nil
}

func (c *unsignedArrayAscendingCursor) Err() error { return nil }
Expand Down Expand Up @@ -746,7 +771,8 @@ func newUnsignedArrayDescendingCursor() *unsignedArrayDescendingCursor {
return c
}

func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
// Search for the time value greater than the seek time (not included)
// and then move our position back one which will include the values in
// our time range.
Expand All @@ -758,11 +784,15 @@ func (c *unsignedArrayDescendingCursor) reset(seek, end int64, cacheValues Value
c.cache.pos--

c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadUnsignedArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] > seek
})
c.tsm.pos--
return nil
}

func (c *unsignedArrayDescendingCursor) Err() error { return nil }
Expand Down Expand Up @@ -885,18 +915,23 @@ func newStringArrayAscendingCursor() *stringArrayAscendingCursor {
return c
}

func (c *stringArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *stringArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
c.end = end
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
return c.cache.values[i].UnixNano() >= seek
})

c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] >= seek
})
return nil
}

func (c *stringArrayAscendingCursor) Err() error { return nil }
Expand Down Expand Up @@ -1028,7 +1063,8 @@ func newStringArrayDescendingCursor() *stringArrayDescendingCursor {
return c
}

func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
// Search for the time value greater than the seek time (not included)
// and then move our position back one which will include the values in
// our time range.
Expand All @@ -1040,11 +1076,15 @@ func (c *stringArrayDescendingCursor) reset(seek, end int64, cacheValues Values,
c.cache.pos--

c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadStringArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] > seek
})
c.tsm.pos--
return nil
}

func (c *stringArrayDescendingCursor) Err() error { return nil }
Expand Down Expand Up @@ -1167,18 +1207,23 @@ func newBooleanArrayAscendingCursor() *booleanArrayAscendingCursor {
return c
}

func (c *booleanArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *booleanArrayAscendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
c.end = end
c.cache.values = cacheValues
c.cache.pos = sort.Search(len(c.cache.values), func(i int) bool {
return c.cache.values[i].UnixNano() >= seek
})

c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] >= seek
})
return nil
}

func (c *booleanArrayAscendingCursor) Err() error { return nil }
Expand Down Expand Up @@ -1310,7 +1355,8 @@ func newBooleanArrayDescendingCursor() *booleanArrayDescendingCursor {
return c
}

func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) {
func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values, tsmKeyCursor *KeyCursor) error {
var err error
// Search for the time value greater than the seek time (not included)
// and then move our position back one which will include the values in
// our time range.
Expand All @@ -1322,11 +1368,15 @@ func (c *booleanArrayDescendingCursor) reset(seek, end int64, cacheValues Values
c.cache.pos--

c.tsm.keyCursor = tsmKeyCursor
c.tsm.values, _ = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf)
c.tsm.values, err = c.tsm.keyCursor.ReadBooleanArrayBlock(c.tsm.buf)
if err != nil {
return err
}
c.tsm.pos = sort.Search(c.tsm.values.Len(), func(i int) bool {
return c.tsm.values.Timestamps[i] > seek
})
c.tsm.pos--
return nil
}

func (c *booleanArrayDescendingCursor) Err() error { return nil }
Expand Down
Loading

0 comments on commit 8f09e36

Please sign in to comment.