Skip to content

Commit

Permalink
[exporterhelper] Fix invalid write index updates in the persistent qu…
Browse files Browse the repository at this point in the history
…eue (#8963)

**Description:**
Fixing a bug where the in-memory value of the persistent queue's write
index would be updated even if writing to the storage failed. This
normally wouldn't have any negative effect other than inflating the
queue size temporarily, as the read loop would simply skip over the
nonexistent record. However, in the case where the storage doesn't have
any available space, the in-memory and in-storage write index could
become significantly different, at which point a collector restart would
leave the queue in an inconsistent state.

Worth noting that the same issue affects reading from the queue, but in
that case the writes are very small, and in practice the storage will
almost always have enough space to carry them out.

**Link to tracking Issue:** #8115

**Testing:**
The `TestPersistentQueue_StorageFull` test actually only passed by
accident. Writing would leave one additional item in the put channel,
then the first read would fail (as there is not enough space to do the
read index and dispatched items writes), but subsequent reads would
succeed, so the bugs would cancel out. I modified this test to check for
the number of items in the queue after inserting them, and also to
expect one fewer item to be returned.
  • Loading branch information
swiatekm committed Nov 27, 2023
1 parent b7f49f1 commit c0deae5
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 5 deletions.
25 changes: 25 additions & 0 deletions .chloggen/fix_persistentstorage_index-updates.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix invalid write index updates in the persistent queue

# One or more tracking issues or pull requests related to the change
issues: [8115]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
17 changes: 12 additions & 5 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,20 +215,27 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error {
}

itemKey := getItemKey(pq.writeIndex)
pq.writeIndex++
newIndex := pq.writeIndex + 1

reqBuf, err := pq.marshaler(req)
if err != nil {
return err
}
err = pq.client.Batch(ctx,
storage.SetOperation(writeIndexKey, itemIndexToBytes(pq.writeIndex)),
storage.SetOperation(itemKey, reqBuf))

// Carry out a transaction where we both add the item and update the write index
ops := []storage.Operation{
storage.SetOperation(writeIndexKey, itemIndexToBytes(newIndex)),
storage.SetOperation(itemKey, reqBuf),
}
if storageErr := pq.client.Batch(ctx, ops...); storageErr != nil {
return storageErr
}

pq.writeIndex = newIndex
// Inform the loop that there's some data to process
pq.putChan <- struct{}{}

return err
return nil
}

// getNextItem pulls the next available item from the persistent storage along with a callback function that should be
Expand Down
6 changes: 6 additions & 0 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,9 @@ func TestPersistentQueue_StorageFull(t *testing.T) {
reqCount++
}

// Check that the size is correct
require.Equal(t, reqCount, ps.Size(), "Size must be equal to the number of items inserted")

// Manually set the storage to only have a small amount of free space left
newMaxSize := client.GetSizeInBytes() + freeSpaceInBytes
client.SetMaxSizeInBytes(newMaxSize)
Expand All @@ -634,6 +637,9 @@ func TestPersistentQueue_StorageFull(t *testing.T) {
require.Error(t, ps.Offer(context.Background(), req))

// Take out all the items
// Getting the first item fails, as we can't update the state in storage, so we just delete it without returning it
// Subsequent items succeed, as deleting the first item frees enough space for the state update
reqCount--
for i := reqCount; i > 0; i-- {
require.True(t, ps.Consume(func(context.Context, ptrace.Traces) {}))
}
Expand Down

0 comments on commit c0deae5

Please sign in to comment.