From 6aee925c3fe176b1cef82ea8877c6d06732608cd Mon Sep 17 00:00:00 2001 From: Prasad Lohakpure Date: Fri, 17 Apr 2026 15:27:11 +0530 Subject: [PATCH 1/3] FEAT: Pagination support for /jobs endpoint --- internal/pkg/heimdall/job_dal.go | 41 ++++++++++++++++--- .../pkg/heimdall/queries/job/select_jobs.sql | 3 -- internal/pkg/heimdall/result.go | 4 +- 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/internal/pkg/heimdall/job_dal.go b/internal/pkg/heimdall/job_dal.go index fc474415..ac83a3e1 100644 --- a/internal/pkg/heimdall/job_dal.go +++ b/internal/pkg/heimdall/job_dal.go @@ -6,6 +6,8 @@ import ( _ "embed" "encoding/json" "fmt" + "strconv" + "strings" "time" "github.com/hladush/go-telemetry/pkg/telemetry" @@ -88,6 +90,9 @@ var ( Join: `, `, Value: `js.job_status_name in ({{ .Slice }})`, }, + `cursor`: { + Value: `j.system_job_id < $%d`, + }, }, } ) @@ -218,13 +223,28 @@ func (h *Heimdall) getJob(ctx context.Context, j *jobRequest) (any, error) { } +const ( + defaultPageSize = 101 + maxPageSize = 1000 +) + func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) { - // Track DB connection for jobs list operation defer getJobsMethod.RecordLatency(time.Now()) getJobsMethod.CountRequest() - // open connection + // extract limit before rendering WHERE clause (it is not a filter condition) + pageSize := defaultPageSize + if v, ok := (*f)[`limit`]; ok { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + if n > maxPageSize { + n = maxPageSize + } + pageSize = n + } + delete(*f, `limit`) + } + sess, err := h.Database.NewSession(false) if err != nil { getJobsMethod.LogAndCountError(err, "new_session") @@ -238,6 +258,10 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) return nil, err } + // append LIMIT; fetch one extra row to detect whether a next page exists + query = fmt.Sprintf("%s\nlimit $%d", strings.TrimRight(query, "\n; \t"), len(args)+1) + args = append(args, pageSize+1) + rows, err := sess.Query(query, args...) if err != nil { getJobsMethod.LogAndCountError(err, "query") @@ -245,7 +269,7 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) } defer rows.Close() - result := make([]*job.Job, 0, 100) + result := make([]*job.Job, 0, pageSize+1) for rows.Next() { @@ -267,10 +291,15 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) } + rs := &resultset{Data: result} + if len(result) > pageSize { + rs.HasMore = true + rs.NextCursor = result[pageSize-1].SystemID + rs.Data = result[:pageSize] + } + getJobsMethod.CountSuccess() - return &resultset{ - Data: result, - }, nil + return rs, nil } diff --git a/internal/pkg/heimdall/queries/job/select_jobs.sql b/internal/pkg/heimdall/queries/job/select_jobs.sql index 1e6449a8..74b26ac2 100644 --- a/internal/pkg/heimdall/queries/job/select_jobs.sql +++ b/internal/pkg/heimdall/queries/job/select_jobs.sql @@ -26,6 +26,3 @@ where {{ .Clause }}{{end}} order by j.system_job_id desc -limit - 101 -; diff --git a/internal/pkg/heimdall/result.go b/internal/pkg/heimdall/result.go index fdd42d76..bf39e19c 100644 --- a/internal/pkg/heimdall/result.go +++ b/internal/pkg/heimdall/result.go @@ -1,5 +1,7 @@ package heimdall type resultset struct { - Data any `yaml:"data,omitempty" json:"data,omitempty"` + Data any `json:"data,omitempty"` + HasMore bool `json:"has_more,omitempty"` + NextCursor int64 `json:"next_cursor,omitempty"` } From 263bfd01aa2adc32bfd6be5108a358036d45bd8d Mon Sep 17 00:00:00 2001 From: Prasad Lohakpure Date: Fri, 17 Apr 2026 15:44:07 +0530 Subject: [PATCH 2/3] Use of single page size --- internal/pkg/heimdall/job_dal.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/internal/pkg/heimdall/job_dal.go b/internal/pkg/heimdall/job_dal.go index ac83a3e1..f8df5476 100644 --- a/internal/pkg/heimdall/job_dal.go +++ b/internal/pkg/heimdall/job_dal.go @@ -223,13 +223,11 @@ func (h *Heimdall) getJob(ctx context.Context, j *jobRequest) (any, error) { } -const ( - defaultPageSize = 101 - maxPageSize = 1000 -) +const defaultPageSize = 101 func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) { + // Track DB connection for jobs list operation defer getJobsMethod.RecordLatency(time.Now()) getJobsMethod.CountRequest() @@ -237,14 +235,15 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) pageSize := defaultPageSize if v, ok := (*f)[`limit`]; ok { if n, err := strconv.Atoi(v); err == nil && n > 0 { - if n > maxPageSize { - n = maxPageSize + if n > defaultPageSize { + n = defaultPageSize } pageSize = n } delete(*f, `limit`) } + // open connection sess, err := h.Database.NewSession(false) if err != nil { getJobsMethod.LogAndCountError(err, "new_session") From 27307a463dd358a249b1a559c8d9df28376dff45 Mon Sep 17 00:00:00 2001 From: Prasad Lohakpure Date: Mon, 20 Apr 2026 15:14:58 +0530 Subject: [PATCH 3/3] Filter using job tags --- internal/pkg/heimdall/job_dal.go | 38 ++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/internal/pkg/heimdall/job_dal.go b/internal/pkg/heimdall/job_dal.go index f8df5476..0fdc1e30 100644 --- a/internal/pkg/heimdall/job_dal.go +++ b/internal/pkg/heimdall/job_dal.go @@ -95,6 +95,11 @@ var ( }, }, } + + // tag filters use correlated EXISTS — one clause per value, all must match (AND semantics) + jobsTagsFilterConfig = map[string]string{ + `tags`: `exists (select 1 from job_tags jt where jt.system_job_id = j.system_job_id and jt.job_tag = $%d)`, + } ) type jobRequest struct { @@ -225,6 +230,35 @@ func (h *Heimdall) getJob(ctx context.Context, j *jobRequest) (any, error) { const defaultPageSize = 101 +func injectTagsFilter(f *database.Filter, key, existsTemplate string, query string, args []any) (string, []any) { + v, ok := (*f)[key] + if !ok { + return query, args + } + delete(*f, key) + + var clauses []string + for _, t := range strings.Split(v, `,`) { + if t = strings.TrimSpace(t); t != `` { + clauses = append(clauses, fmt.Sprintf(existsTemplate, len(args)+1)) + args = append(args, t) + } + } + if len(clauses) == 0 { + return query, args + } + + idx := strings.Index(query, "\norder by") + if idx < 0 { + return query, args + } + before, after, clause := query[:idx], query[idx:], strings.Join(clauses, " and\n ") + if strings.Contains(before, "where") { + return before + " and\n " + clause + after, args + } + return before + "\nwhere\n " + clause + after, args +} + func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) { // Track DB connection for jobs list operation @@ -257,6 +291,10 @@ func (h *Heimdall) getJobs(ctx context.Context, f *database.Filter) (any, error) return nil, err } + for key, tmpl := range jobsTagsFilterConfig { + query, args = injectTagsFilter(f, key, tmpl, query, args) + } + // append LIMIT; fetch one extra row to detect whether a next page exists query = fmt.Sprintf("%s\nlimit $%d", strings.TrimRight(query, "\n; \t"), len(args)+1) args = append(args, pageSize+1)