-
Notifications
You must be signed in to change notification settings - Fork 568
/
common.go
109 lines (95 loc) · 2.51 KB
/
common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package pfsdb
import (
"context"
"fmt"
"strings"
"time"
"github.com/jmoiron/sqlx"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/stream"
)
type sortOrder string
const (
SortOrderNone = sortOrder("")
SortOrderAsc = sortOrder("ASC")
SortOrderDesc = sortOrder("DESC")
)
type (
ModelType interface {
Repo | Commit | Branch | Project
GetCreatedAtUpdatedAt() CreatedAtUpdatedAt
}
ColumnName interface {
string | projectColumn | branchColumn | commitColumn | repoColumn
}
)
type OrderByColumn[T ColumnName] struct {
Column T
Order sortOrder
}
func OrderByQuery[T ColumnName](orderBys ...OrderByColumn[T]) string {
if len(orderBys) == 0 {
return ""
}
values := make([]string, len(orderBys))
for i, col := range orderBys {
values[i] = fmt.Sprintf("%s %s", col.Column, col.Order)
}
return "ORDER BY " + strings.Join(values, ", ")
}
type pageIterator[T ModelType] struct {
query string
values []any
limit, offset uint64
page []T
pageIdx int
lastTimestamp time.Time
revision int64
}
func newPageIterator[T ModelType](ctx context.Context, query string, values []any, startPage, pageSize uint64) pageIterator[T] {
return pageIterator[T]{
query: query,
values: values,
revision: -1, // first revision should be 0 and we increment before returning.
limit: pageSize,
offset: startPage * pageSize,
}
}
func (i *pageIterator[T]) nextPage(ctx context.Context, extCtx sqlx.ExtContext) (err error) {
var page []T
query := i.query + fmt.Sprintf("\nLIMIT %d OFFSET %d", i.limit, i.offset)
if err := sqlx.SelectContext(ctx, extCtx, &page, query, i.values...); err != nil {
return errors.Wrap(err, "getting page")
}
if len(page) == 0 {
return stream.EOS()
}
i.page = page
i.pageIdx = 0
i.offset += i.limit
return nil
}
func (i *pageIterator[T]) hasNext() bool {
return i.pageIdx < len(i.page)
}
func (i *pageIterator[T]) next(ctx context.Context, extCtx sqlx.ExtContext) (*T, int64, error) {
if !i.hasNext() {
if err := i.nextPage(ctx, extCtx); err != nil {
return nil, 0, err
}
}
t := i.page[i.pageIdx]
createdAt := t.GetCreatedAtUpdatedAt().CreatedAt
if i.lastTimestamp.Before(createdAt) {
i.revision++
i.lastTimestamp = createdAt
}
i.pageIdx++
return &t, i.revision, nil
}
func IsNotFoundError(err error) bool {
return errors.As(err, &RepoNotFoundError{}) ||
errors.As(err, &ProjectNotFoundError{}) ||
errors.As(err, &CommitNotFoundError{}) ||
errors.As(err, &BranchNotFoundError{})
}