Skip to content

Commit

Permalink
Add timeout to queryLoki (#8449) (#8451)
Browse files Browse the repository at this point in the history
* Add timeout to queryLoki to prevent hanging

* Improve comments for max logs consts

* Get TestQueryLoki passing

* test timeout logic

* server waits for request to timeout instead of sleeping
  • Loading branch information
Albert committed Dec 16, 2022
1 parent 24c925f commit 169178d
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 23 deletions.
37 changes: 24 additions & 13 deletions src/server/debug/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,9 +777,6 @@ func (s *debugServer) collectPipelineDumpFunc(limit int64) collectPipelineFunc {
}
if s.env.Config().LokiHost != "" {
if err := s.forEachWorkerLoki(ctx, pipelineInfo, func(pod string) error {
// Loki requests can hang if the size of the log lines is too big, so we set a timeout
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
workerPrefix := join(podPrefix, pod)
if len(prefix) > 0 {
workerPrefix = join(prefix[0], workerPrefix)
Expand Down Expand Up @@ -843,12 +840,13 @@ func (s *debugServer) getWorkerPodsLoki(ctx context.Context, pipelineInfo *pps.P
return pods, nil
}

// This used to be 5,000 but a comment said this was too few logs, so now it's 30,000. If it still
// seems too small, bump it up again. 5,000 remains the maximum value of "limit" in queries that
// Loki seems to accept.
const (
maxLogs = 30000
serverMaxLogs = 5000
// maxLogs used to be 5,000 but a comment said this was too few logs, so now it's 30,000.
// If it still seems too small, bump it up again.
maxLogs = 30000
// 5,000 is the maximum value of "limit" in queries that Loki seems to accept.
// We set serverMaxLogs below the actual limit because of a bug in Loki where it hangs when you request too much data from it.
serverMaxLogs = 1000
)

type lokiLog struct {
Expand All @@ -857,6 +855,12 @@ type lokiLog struct {
}

func (s *debugServer) queryLoki(ctx context.Context, queryStr string) ([]lokiLog, error) {
sortLogs := func(logs []lokiLog) {
sort.Slice(logs, func(i, j int) bool {
return logs[i].Entry.Timestamp.Before(logs[j].Entry.Timestamp)
})
}

c, err := s.env.GetLokiClient()
if err != nil {
return nil, errors.EnsureStack(errors.Errorf("get loki client: %v", err))
Expand All @@ -877,10 +881,18 @@ func (s *debugServer) queryLoki(ctx context.Context, queryStr string) ([]lokiLog
start := time.Now().Add(-30 * 24 * time.Hour) // 30 days. (Loki maximum range is 30 days + 1 hour.)

for numLogs := 0; (end.IsZero() || start.Before(end)) && numLogs < maxLogs; {
resp, err := c.QueryRange(ctx, queryStr, serverMaxLogs, start, end, "BACKWARD", 0, 0, true)
// Loki requests can hang if the size of the log lines is too big
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
resp, err := c.QueryRange(ctx, queryStr, serverMaxLogs, start, end, "BACKWARD", 0 /* step */, 0 /* interval */, true /* quiet */)
if err != nil {
// Note: the error from QueryRange has a stack.
return nil, errors.Errorf("query range (query=%v, maxLogs=%v, start=%v, end=%v): %+v", queryStr, maxLogs, start.Format(time.RFC3339), end.Format(time.RFC3339), err)
if errors.Is(err, context.DeadlineExceeded) {
log.Debugf("query range timed out (query=%v, limit=%v, start=%v, end=%v): %+v", queryStr, serverMaxLogs, start.Format(time.RFC3339), end.Format(time.RFC3339), err)
sortLogs(result)
return result, nil
}
return nil, errors.Errorf("query range (query=%v, limit=%v, start=%v, end=%v): %+v", queryStr, serverMaxLogs, start.Format(time.RFC3339), end.Format(time.RFC3339), err)
}

streams, ok := resp.Data.Result.(loki.Streams)
Expand Down Expand Up @@ -912,10 +924,9 @@ func (s *debugServer) queryLoki(ctx context.Context, queryStr string) ([]lokiLog
if readThisIteration == 0 {
break
}
cancel()
}
sort.Slice(result, func(i, j int) bool {
return result[i].Entry.Timestamp.Before(result[j].Entry.Timestamp)
})
sortLogs(result)
return result, nil
}

Expand Down
76 changes: 66 additions & 10 deletions src/server/debug/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,20 @@ func mustParseQuerystringInt64(r *http.Request, field string) int64 {
}

type fakeLoki struct {
entries []loki.Entry // Must be sorted by time ascending.
entries []loki.Entry // Must be sorted by time ascending.
page int // Keep track of the current page.
sleepAtPage int // Which page to put the server to sleep. 0 means don't sleep.
}

func (l *fakeLoki) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Simulate the bug where Loki server hangs due to large logs.
l.page++
if l.sleepAtPage > 0 && l.page >= l.sleepAtPage {
// wait for request to time out on purpose
<-r.Context().Done()
return
}

var (
start, end time.Time
limit int
Expand Down Expand Up @@ -135,6 +145,7 @@ func (l *fakeLoki) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func TestQueryLoki(t *testing.T) {
testData := []struct {
name string
sleepAtPage int
buildEntries func() []loki.Entry
buildWant func() []int
}{
Expand All @@ -147,7 +158,7 @@ func TestQueryLoki(t *testing.T) {
name: "all logs",
buildEntries: func() []loki.Entry {
var entries []loki.Entry
for i := -99; i >= 0; i++ {
for i := -99; i <= 0; i++ {
entries = append(entries, loki.Entry{
Timestamp: time.Now().Add(time.Duration(-1) * time.Second),
Line: fmt.Sprintf("%v", i),
Expand All @@ -157,7 +168,7 @@ func TestQueryLoki(t *testing.T) {
},
buildWant: func() []int {
var want []int
for i := -99; i >= 0; i++ {
for i := -99; i <= 0; i++ {
want = append(want, i)
}
return want
Expand Down Expand Up @@ -208,7 +219,7 @@ func TestQueryLoki(t *testing.T) {
buildWant: func() []int {
var want []int
want = append(want, -2)
for i := 0; i < 4999; i++ {
for i := 0; i < serverMaxLogs-1; i++ {
want = append(want, -1)
}
want = append(want, 0)
Expand All @@ -230,7 +241,7 @@ func TestQueryLoki(t *testing.T) {
Line: "-1",
})
}
for i := 0; i < 5000; i++ {
for i := 0; i < serverMaxLogs; i++ {
entries = append(entries, loki.Entry{
Timestamp: start,
Line: "0",
Expand All @@ -241,31 +252,76 @@ func TestQueryLoki(t *testing.T) {
buildWant: func() []int {
var want []int
want = append(want, -2)
for i := 0; i < 5000; i++ {
for i := 0; i < serverMaxLogs; i++ {
want = append(want, -1)
}
for i := 0; i < 5000; i++ {
for i := 0; i < serverMaxLogs; i++ {
want = append(want, 0)
}
return want
},
},
{
name: "timeout on 1st page",
sleepAtPage: 1,
buildEntries: func() []loki.Entry {
var entries []loki.Entry
for i := -10000; i <= 0; i++ {
entries = append(entries, loki.Entry{
Timestamp: time.Now().Add(time.Duration(-1) * time.Second),
Line: fmt.Sprintf("%v", i),
})
}
return entries
},
buildWant: func() []int {
// server should've timed out right away, so no results
return nil
},
},
{
name: "timeout on 2nd page",
sleepAtPage: 2,
buildEntries: func() []loki.Entry {
var entries []loki.Entry
for i := -10000; i <= 0; i++ {
entries = append(entries, loki.Entry{
Timestamp: time.Now().Add(time.Duration(-1) * time.Second),
Line: fmt.Sprintf("%v", i),
})
}
return entries
},
buildWant: func() []int {
// expect only the first page due to server timing out
var want []int
for i := -999; i <= 0; i++ {
want = append(want, i)
}
return want
},
},
}

for _, test := range testData {
t.Run(test.name, func(t *testing.T) {
entries := test.buildEntries()
want := test.buildWant()

s := httptest.NewServer(&fakeLoki{entries: entries})
s := httptest.NewServer(&fakeLoki{
entries: entries,
sleepAtPage: test.sleepAtPage,
})
d := &debugServer{
env: &serviceenv.TestServiceEnv{
LokiClient: &loki.Client{Address: s.URL},
},
}

var got []int
out, err := d.queryLoki(context.Background(), `{foo="bar"}`)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
out, err := d.queryLoki(ctx, `{foo="bar"}`)
if err != nil {
t.Fatalf("query loki: %v", err)
}
Expand All @@ -279,7 +335,7 @@ func TestQueryLoki(t *testing.T) {

if diff := cmp.Diff(got, want); diff != "" {
t.Errorf(`result differs:
first | last | len
first | last | len
+--------+--------+-------
got | %6d | %6d | %6d
want | %6d | %6d | %6d
Expand Down

0 comments on commit 169178d

Please sign in to comment.