/
stream.go
142 lines (119 loc) · 3.05 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package elasticsearch
import (
"context"
"encoding/json"
"sync"
"time"
statuserr "k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"
)
type pit struct {
ID string `json:"id"`
}
type stream struct {
usePIT bool
rest *elasticsearchREST
refreshRate time.Duration
ch chan watch.Event
done chan bool
pit pit
}
func (s *stream) errorAndAbort(err error) {
status := statuserr.NewBadRequest(err.Error()).Status()
s.ch <- watch.Event{
Type: watch.Error,
Object: &status,
}
}
func (s *stream) Start(ctx context.Context, options *metainternalversion.ListOptions) {
if s.usePIT {
res, err := s.rest.es.OpenPointInTime([]string{s.rest.opts.Backend.Index}, "5m")
if err != nil {
s.errorAndAbort(err)
return
}
if err := json.NewDecoder(res.Body).Decode(&s.pit); err != nil {
s.errorAndAbort(err)
return
}
}
var wg sync.WaitGroup
wg.Add(1)
batchResults := make(chan esResults, 2)
// read ahead buffer
go func() {
for {
klog.Info("start list query", "options", options)
query, err := queryFromListOptions(ctx, options, s.rest)
if err != nil {
return
}
if s.pit.ID != "" {
query["pit"] = map[string]interface{}{
"id": s.pit.ID,
"keep_alive": "5m",
}
}
esResults, err := s.rest.fetch(ctx, query, options)
if err != nil {
s.errorAndAbort(err)
return
}
batchResults <- esResults
if len(esResults.Hits.Hits) != int(s.rest.opts.Backend.BulkSize) && s.refreshRate == 0 {
klog.Info("All objects consumed from stream")
s.done <- true
wg.Done()
break
}
if len(esResults.Hits.Hits) != int(s.rest.opts.Backend.BulkSize) {
klog.InfoS("wait for next check", "sleep", s.refreshRate.String())
time.Sleep(s.refreshRate)
}
// The continue token represents the last sort value from the last hit.
// Which itself gets used in the next es query as search_after
// If there is no hit there will be no continue token as this means we reached the end of available results
if len(esResults.Hits.Hits) > 0 {
hit := esResults.Hits.Hits[len(esResults.Hits.Hits)-1]
if len(hit.Sort) > 0 {
b, err := json.Marshal(hit.Sort)
if err != nil {
s.errorAndAbort(err)
return
}
options.Continue = string(b)
}
}
// For the next search request the PIT from the previous search response needs to be taken as it can change over time
if s.pit.ID != "" {
s.pit.ID = esResults.PitID
}
}
}()
// loop over batched results
for esResults := range batchResults {
var hit esHit
for _, hit = range esResults.Hits.Hits {
decodedObj, err := s.rest.decodeFrom(hit)
if err != nil {
break
}
s.ch <- watch.Event{
Type: watch.Added,
Object: decodedObj,
}
}
}
wg.Wait()
}
func (s *stream) Stop() {
_, err := s.rest.es.ClosePointInTime()
if err != nil {
klog.ErrorS(err, "failed to close pit")
}
}
func (s *stream) ResultChan() <-chan watch.Event {
return s.ch
}