Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion quickwit.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,24 @@ func (c *Client) loop() {

buf.Reset()

var encodeFailures []any
for _, x := range buffer {
if err := jsonEnc.Encode(x); err != nil {
slog.Error("quickwit: failed to encode record, discarding", "error", err)
c.invokeOnDiscard(x)
encodeFailures = append(encodeFailures, x)
continue
}
}

// All items were unencodable — discard them and report success so the
// caller clears the buffer and does not retry with the same items.
if buf.Len() == 0 {
for _, x := range encodeFailures {
c.invokeOnDiscard(x)
}
return true
}

ctx := context.Background()
if t := c.getIngestTimeout(); t != 0 {
var cancel context.CancelFunc
Expand Down Expand Up @@ -255,6 +265,11 @@ func (c *Client) loop() {
return false
}

// HTTP succeeded — now safe to discard items that failed encoding.
for _, x := range encodeFailures {
c.invokeOnDiscard(x)
}

if !resetBatchSizeAfter.IsZero() && time.Now().After(resetBatchSizeAfter) {
beforeSize := batchSize
batchSize = c.getBatchSize()
Expand Down
77 changes: 77 additions & 0 deletions quickwit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,83 @@ func TestIngest_JSONEncodeError_CallsOnDiscard(t *testing.T) {
}
}

// Regression #11: unencodable items caused OnDiscard to fire on every retry when the HTTP
// request also failed — the callback must fire exactly once per item.
func TestIngest_EncodeError_DiscardedExactlyOnce_OnHTTPFailure(t *testing.T) {
attempts := 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
attempts++
if attempts < 3 {
w.WriteHeader(http.StatusInternalServerError)
return
}
io.Copy(io.Discard, r.Body)
w.WriteHeader(http.StatusOK)
}))
defer server.Close()

var mu sync.Mutex
var discarded []any

c := quickwit.NewClient(server.URL + "/api/v1/test")
c.SetConcurrent(1)
c.OnDiscard(func(data any) {
mu.Lock()
discarded = append(discarded, data)
mu.Unlock()
})

ch := make(chan int) // not JSON-encodable
c.Ingest(map[string]any{"ok": true}, ch)
c.Close()

mu.Lock()
n := len(discarded)
mu.Unlock()

if n != 1 {
t.Errorf("OnDiscard called %d times, want exactly 1 (got %d HTTP attempts)", n, attempts)
}
}

// Regression #12: when all items in a batch fail to encode, flush sent an empty HTTP body.
// If the server rejects an empty body, retryFlush looped indefinitely re-discarding items.
func TestIngest_AllEncodeErrors_NoHTTPRequest(t *testing.T) {
requestCount := 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
requestCount++
w.WriteHeader(http.StatusBadRequest) // server rejects empty body
}))
defer server.Close()

var mu sync.Mutex
var discarded []any

c := quickwit.NewClient(server.URL + "/api/v1/test")
c.SetConcurrent(1)
c.OnDiscard(func(data any) {
mu.Lock()
discarded = append(discarded, data)
mu.Unlock()
})

ch1 := make(chan int)
ch2 := make(chan int)
c.Ingest(ch1, ch2)
c.Close()

mu.Lock()
n := len(discarded)
mu.Unlock()

if requestCount != 0 {
t.Errorf("expected no HTTP requests when all items are unencodable, got %d", requestCount)
}
if n != 2 {
t.Errorf("OnDiscard called %d times, want 2", n)
}
}

// Regression #10: when a 413 triggered batch-size reduction, re-flushing the oversized
// buffer in smaller chunks silently reversed or lost the tail of the record sequence.
func TestIngest_OversizeBatchPreservesOrder(t *testing.T) {
Expand Down
Loading