Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.4.x] Add timeout to queryLoki (#8449) #8451

Merged
merged 1 commit into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 24 additions & 13 deletions src/server/debug/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,9 +777,6 @@ func (s *debugServer) collectPipelineDumpFunc(limit int64) collectPipelineFunc {
}
if s.env.Config().LokiHost != "" {
if err := s.forEachWorkerLoki(ctx, pipelineInfo, func(pod string) error {
// Loki requests can hang if the size of the log lines is too big, so we set a timeout
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
workerPrefix := join(podPrefix, pod)
if len(prefix) > 0 {
workerPrefix = join(prefix[0], workerPrefix)
Expand Down Expand Up @@ -843,12 +840,13 @@ func (s *debugServer) getWorkerPodsLoki(ctx context.Context, pipelineInfo *pps.P
return pods, nil
}

// This used to be 5,000 but a comment said this was too few logs, so now it's 30,000. If it still
// seems too small, bump it up again. 5,000 remains the maximum value of "limit" in queries that
// Loki seems to accept.
const (
maxLogs = 30000
serverMaxLogs = 5000
// maxLogs used to be 5,000 but a comment said this was too few logs, so now it's 30,000.
// If it still seems too small, bump it up again.
maxLogs = 30000
// 5,000 is the maximum value of "limit" in queries that Loki seems to accept.
// We set serverMaxLogs below the actual limit because of a bug in Loki where it hangs when you request too much data from it.
serverMaxLogs = 1000
)

type lokiLog struct {
Expand All @@ -857,6 +855,12 @@ type lokiLog struct {
}

func (s *debugServer) queryLoki(ctx context.Context, queryStr string) ([]lokiLog, error) {
sortLogs := func(logs []lokiLog) {
sort.Slice(logs, func(i, j int) bool {
return logs[i].Entry.Timestamp.Before(logs[j].Entry.Timestamp)
})
}

c, err := s.env.GetLokiClient()
if err != nil {
return nil, errors.EnsureStack(errors.Errorf("get loki client: %v", err))
Expand All @@ -877,10 +881,18 @@ func (s *debugServer) queryLoki(ctx context.Context, queryStr string) ([]lokiLog
start := time.Now().Add(-30 * 24 * time.Hour) // 30 days. (Loki maximum range is 30 days + 1 hour.)

for numLogs := 0; (end.IsZero() || start.Before(end)) && numLogs < maxLogs; {
resp, err := c.QueryRange(ctx, queryStr, serverMaxLogs, start, end, "BACKWARD", 0, 0, true)
// Loki requests can hang if the size of the log lines is too big
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
resp, err := c.QueryRange(ctx, queryStr, serverMaxLogs, start, end, "BACKWARD", 0 /* step */, 0 /* interval */, true /* quiet */)
if err != nil {
// Note: the error from QueryRange has a stack.
return nil, errors.Errorf("query range (query=%v, maxLogs=%v, start=%v, end=%v): %+v", queryStr, maxLogs, start.Format(time.RFC3339), end.Format(time.RFC3339), err)
if errors.Is(err, context.DeadlineExceeded) {
log.Debugf("query range timed out (query=%v, limit=%v, start=%v, end=%v): %+v", queryStr, serverMaxLogs, start.Format(time.RFC3339), end.Format(time.RFC3339), err)
sortLogs(result)
return result, nil
}
return nil, errors.Errorf("query range (query=%v, limit=%v, start=%v, end=%v): %+v", queryStr, serverMaxLogs, start.Format(time.RFC3339), end.Format(time.RFC3339), err)
}

streams, ok := resp.Data.Result.(loki.Streams)
Expand Down Expand Up @@ -912,10 +924,9 @@ func (s *debugServer) queryLoki(ctx context.Context, queryStr string) ([]lokiLog
if readThisIteration == 0 {
break
}
cancel()
}
sort.Slice(result, func(i, j int) bool {
return result[i].Entry.Timestamp.Before(result[j].Entry.Timestamp)
})
sortLogs(result)
return result, nil
}

Expand Down
76 changes: 66 additions & 10 deletions src/server/debug/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,20 @@ func mustParseQuerystringInt64(r *http.Request, field string) int64 {
}

type fakeLoki struct {
entries []loki.Entry // Must be sorted by time ascending.
entries []loki.Entry // Must be sorted by time ascending.
page int // Keep track of the current page.
sleepAtPage int // Which page to put the server to sleep. 0 means don't sleep.
}

func (l *fakeLoki) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Simulate the bug where Loki server hangs due to large logs.
l.page++
if l.sleepAtPage > 0 && l.page >= l.sleepAtPage {
// wait for request to time out on purpose
<-r.Context().Done()
return
}

var (
start, end time.Time
limit int
Expand Down Expand Up @@ -135,6 +145,7 @@ func (l *fakeLoki) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func TestQueryLoki(t *testing.T) {
testData := []struct {
name string
sleepAtPage int
buildEntries func() []loki.Entry
buildWant func() []int
}{
Expand All @@ -147,7 +158,7 @@ func TestQueryLoki(t *testing.T) {
name: "all logs",
buildEntries: func() []loki.Entry {
var entries []loki.Entry
for i := -99; i >= 0; i++ {
for i := -99; i <= 0; i++ {
entries = append(entries, loki.Entry{
Timestamp: time.Now().Add(time.Duration(-1) * time.Second),
Line: fmt.Sprintf("%v", i),
Expand All @@ -157,7 +168,7 @@ func TestQueryLoki(t *testing.T) {
},
buildWant: func() []int {
var want []int
for i := -99; i >= 0; i++ {
for i := -99; i <= 0; i++ {
want = append(want, i)
}
return want
Expand Down Expand Up @@ -208,7 +219,7 @@ func TestQueryLoki(t *testing.T) {
buildWant: func() []int {
var want []int
want = append(want, -2)
for i := 0; i < 4999; i++ {
for i := 0; i < serverMaxLogs-1; i++ {
want = append(want, -1)
}
want = append(want, 0)
Expand All @@ -230,7 +241,7 @@ func TestQueryLoki(t *testing.T) {
Line: "-1",
})
}
for i := 0; i < 5000; i++ {
for i := 0; i < serverMaxLogs; i++ {
entries = append(entries, loki.Entry{
Timestamp: start,
Line: "0",
Expand All @@ -241,31 +252,76 @@ func TestQueryLoki(t *testing.T) {
buildWant: func() []int {
var want []int
want = append(want, -2)
for i := 0; i < 5000; i++ {
for i := 0; i < serverMaxLogs; i++ {
want = append(want, -1)
}
for i := 0; i < 5000; i++ {
for i := 0; i < serverMaxLogs; i++ {
want = append(want, 0)
}
return want
},
},
{
name: "timeout on 1st page",
sleepAtPage: 1,
buildEntries: func() []loki.Entry {
var entries []loki.Entry
for i := -10000; i <= 0; i++ {
entries = append(entries, loki.Entry{
Timestamp: time.Now().Add(time.Duration(-1) * time.Second),
Line: fmt.Sprintf("%v", i),
})
}
return entries
},
buildWant: func() []int {
// server should've timed out right away, so no results
return nil
},
},
{
name: "timeout on 2nd page",
sleepAtPage: 2,
buildEntries: func() []loki.Entry {
var entries []loki.Entry
for i := -10000; i <= 0; i++ {
entries = append(entries, loki.Entry{
Timestamp: time.Now().Add(time.Duration(-1) * time.Second),
Line: fmt.Sprintf("%v", i),
})
}
return entries
},
buildWant: func() []int {
// expect only the first page due to server timing out
var want []int
for i := -999; i <= 0; i++ {
want = append(want, i)
}
return want
},
},
}

for _, test := range testData {
t.Run(test.name, func(t *testing.T) {
entries := test.buildEntries()
want := test.buildWant()

s := httptest.NewServer(&fakeLoki{entries: entries})
s := httptest.NewServer(&fakeLoki{
entries: entries,
sleepAtPage: test.sleepAtPage,
})
d := &debugServer{
env: &serviceenv.TestServiceEnv{
LokiClient: &loki.Client{Address: s.URL},
},
}

var got []int
out, err := d.queryLoki(context.Background(), `{foo="bar"}`)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
out, err := d.queryLoki(ctx, `{foo="bar"}`)
if err != nil {
t.Fatalf("query loki: %v", err)
}
Expand All @@ -279,7 +335,7 @@ func TestQueryLoki(t *testing.T) {

if diff := cmp.Diff(got, want); diff != "" {
t.Errorf(`result differs:
first | last | len
first | last | len
+--------+--------+-------
got | %6d | %6d | %6d
want | %6d | %6d | %6d
Expand Down