mirrored from https://chromium.googlesource.com/infra/luci/luci-go
-
Notifications
You must be signed in to change notification settings - Fork 43
/
logStream.go
431 lines (375 loc) · 12.9 KB
/
logStream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
// Copyright 2015 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package coordinator
import (
"context"
"fmt"
"regexp"
"strings"
"time"
"google.golang.org/grpc/codes"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/timestamp"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/proto/google"
ds "go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/grpc/grpcutil"
"go.chromium.org/luci/logdog/api/logpb"
"go.chromium.org/luci/logdog/common/types"
)
// CurrentSchemaVersion is the current schema version of the LogStream.
// Changes that are not backward-compatible should update this field so
// migration logic and scripts can translate appropriately.
//
// History:
// 1 - Contained _Tags and _C queryable fields
// 2 - Removed _Tags and _C queryable fields and applied noindex to
// most fields, since query filtering is now implemented in-memory instead
// of via datastore filters.
// 3 - Removed all non-indexed fields which are redundant with content in
// Descriptor.
const CurrentSchemaVersion = "3"
// ErrPathNotFound is the canonical error returned when a Log Stream Path is not found.
var ErrPathNotFound = grpcutil.Errf(codes.NotFound, "path not found")
// LogStream is the primary datastore model containing information and state of
// an individual log stream.
type LogStream struct {
// ID is the LogStream ID. It is generated from the stream's Prefix/Name
// fields.
ID HashID `gae:"$id"`
// Schema is the datastore schema version for this object. This can be used
// to facilitate schema migrations.
//
// The current schema is currentSchemaVersion.
Schema string // index needed for batch conversions
// Prefix is this log stream's prefix value. Log streams with the same prefix
// are logically grouped.
//
// This value should not be changed once populated, as it will invalidate the
// ID.
Prefix string // index needed for Query RPC
// Name is the unique name of this log stream within the Prefix scope.
//
// This value should not be changed once populated, as it will invalidate the
// ID.
Name string `gae:",noindex"`
// Created is the time when this stream was created.
Created time.Time // index needed for Query RPC
// Purged, if true, indicates that this log stream has been marked as purged.
// Non-administrative queries and requests for this stream will operate as
// if this entry doesn't exist.
Purged bool `gae:",noindex"`
// PurgedTime is the time when this stream was purged.
PurgedTime time.Time `gae:",noindex"`
// ProtoVersion is the version string of the protobuf, as reported by the
// Collector (and ultimately self-identified by the Butler).
ProtoVersion string `gae:",noindex"`
// Descriptor is the binary protobuf data LogStreamDescriptor.
Descriptor []byte `gae:",noindex"`
// extra causes datastore to ignore unrecognized fields and strip them in
// future writes.
extra ds.PropertyMap `gae:"-,extra"`
// noDSValidate is a testing parameter to instruct the LogStream not to
// validate before reading/writing to datastore. It can be controlled by
// calling SetDSValidate().
noDSValidate bool
}
var _ interface {
ds.PropertyLoadSaver
} = (*LogStream)(nil)
// LogStreamID returns the HashID for a given log stream path.
func LogStreamID(path types.StreamPath) HashID {
return makeHashID(string(path))
}
// LogPrefix returns a keyed (but not loaded) LogPrefix struct for this
// LogStream's Prefix.
func (s *LogStream) LogPrefix() *LogPrefix {
return &LogPrefix{ID: s.ID}
}
// PopulateState populates the datastore key fields for the supplied
// LogStreamState, binding them to the current LogStream.
func (s *LogStream) PopulateState(c context.Context, lst *LogStreamState) {
lst.Parent = ds.KeyForObj(c, s)
}
// State returns the LogStreamState keyed for this LogStream.
func (s *LogStream) State(c context.Context) *LogStreamState {
var lst LogStreamState
s.PopulateState(c, &lst)
return &lst
}
// Path returns the LogDog path for this log stream.
func (s *LogStream) Path() types.StreamPath {
return types.StreamName(s.Prefix).Join(types.StreamName(s.Name))
}
// Load implements ds.PropertyLoadSaver.
func (s *LogStream) Load(pmap ds.PropertyMap) error {
// Drop old _C and _Tags fields to save memory.
// * _C is is derived entirely from Prefix and Name
// * _Tags is derived entirely from Descriptor
// * Tags is derived entirely from Descriptor (and briefly appeared in
// schema version 2)
delete(pmap, "_C")
delete(pmap, "_Tags")
delete(pmap, "Tags")
if err := ds.GetPLS(s).Load(pmap); err != nil {
return err
}
// Validate the log stream. Don't enforce ID correctness, since
// datastore hasn't populated that field yet.
if !s.noDSValidate {
if err := s.validateImpl(false); err != nil {
return err
}
}
return nil
}
// Save implements ds.PropertyLoadSaver.
func (s *LogStream) Save(withMeta bool) (ds.PropertyMap, error) {
if !s.noDSValidate {
if err := s.validateImpl(true); err != nil {
return nil, err
}
}
s.Schema = CurrentSchemaVersion
return ds.GetPLS(s).Save(withMeta)
}
// Validate evaluates the state and data contents of the LogStream and returns
// an error if it is invalid.
func (s *LogStream) Validate() error {
return s.validateImpl(true)
}
func (s *LogStream) validateImpl(enforceHashID bool) error {
if enforceHashID {
// Make sure our Prefix and Name match the Hash ID.
if hid := LogStreamID(s.Path()); hid != s.ID {
return fmt.Errorf("hash IDs don't match (%q != %q)", hid, s.ID)
}
}
if err := types.StreamName(s.Prefix).Validate(); err != nil {
return fmt.Errorf("invalid prefix: %s", err)
}
if err := types.StreamName(s.Name).Validate(); err != nil {
return fmt.Errorf("invalid name: %s", err)
}
if s.Created.IsZero() {
return errors.New("created time is not set")
}
// Ensure that our Descriptor can be unmarshalled.
if _, err := s.DescriptorProto(); err != nil {
return fmt.Errorf("could not unmarshal descriptor: %v", err)
}
return nil
}
// LoadDescriptor loads the fields in the log stream descriptor into this
// LogStream entry. These fields are:
// - Prefix
// - Name
// - Descriptor
func (s *LogStream) LoadDescriptor(desc *logpb.LogStreamDescriptor) error {
if err := desc.Validate(true); err != nil {
return fmt.Errorf("invalid descriptor: %v", err)
}
pb, err := proto.Marshal(desc)
if err != nil {
return fmt.Errorf("failed to marshal descriptor: %v", err)
}
s.Prefix = desc.Prefix
s.Name = desc.Name
s.Descriptor = pb
return nil
}
// DescriptorProto unmarshals a LogStreamDescriptor from the stream's Descriptor
// field. It will return an error if the unmarshalling fails.
func (s *LogStream) DescriptorProto() (*logpb.LogStreamDescriptor, error) {
desc := logpb.LogStreamDescriptor{}
if err := proto.Unmarshal(s.Descriptor, &desc); err != nil {
return nil, err
}
return &desc, nil
}
// SetDSValidate controls whether this LogStream is validated prior to being
// read from or written to datastore.
//
// This is a testing parameter, and should NOT be used in production code.
func (s *LogStream) SetDSValidate(v bool) {
s.noDSValidate = !v
}
// LogStreamQuery is a function returning `true` if the provided LogStream
// matches.
type LogStreamQuery struct {
q *ds.Query
includePurged bool
checks []func(*LogStream) bool
descChecks []func(*logpb.LogStreamDescriptor) bool
}
// NewLogStreamQuery returns a new LogStreamQuery constrained to the prefix of
// `pathGlob`, and with a filter function for the stream name in `pathGlob`.
//
// By default, it will exclude purged logs.
//
// pathGlob must have a prefix without wildcards, and a stream name portion
// which can include `*` or `**` in any combination.
//
// Returns an error if the supplied pathGlob string describes an invalid query.
func NewLogStreamQuery(pathGlob string) (*LogStreamQuery, error) {
prefix, name := types.StreamPath(pathGlob).Split()
if prefix == "" {
return nil, errors.New("prefix invalid: empty")
}
if strings.ContainsRune(string(prefix), '*') {
return nil, errors.New("prefix invalid: contains wildcard `*`")
}
if err := prefix.Validate(); err != nil {
return nil, errors.Annotate(err, "prefix invalid").Err()
}
if name == "" {
name = "**"
}
if err := types.StreamName(strings.Replace(string(name), "*", "a", -1)).Validate(); err != nil {
return nil, errors.Annotate(err, "name invalid").Err()
}
ret := &LogStreamQuery{
q: ds.NewQuery("LogStream").Eq("Prefix", string(prefix)).Order("-Created"),
}
// Escape all regexp metachars. This will have the effect of escaping * as
// well. We can then replace sequences of escaped *'s to get the expression we
// want.
nameEscaped := regexp.QuoteMeta(string(name))
exp := strings.NewReplacer(
"/\\*\\*/", "(.*)/",
"/\\*\\*", "(.*)",
"\\*\\*/", "(.*)",
"\\*\\*", "(.*)",
"\\*", "([^/][^/]*)",
).Replace(nameEscaped)
re, err := regexp.Compile(fmt.Sprintf("^%s$", exp))
if err != nil {
return nil, errors.Annotate(err, "compiling name regex").Err()
}
// this function implements the check for purged as well as the name
// assertion.
ret.checks = append(ret.checks, func(ls *LogStream) bool {
if !ret.includePurged && ls.Purged {
return false
}
return re.MatchString(ls.Name)
})
return ret, nil
}
// SetCursor causes the LogStreamQuery to start from the given encoded cursor.
func (lsp *LogStreamQuery) SetCursor(ctx context.Context, cursor string) error {
if cursor == "" {
return nil
}
cursorObj, err := ds.DecodeCursor(ctx, cursor)
if err != nil {
return err
}
lsp.q = lsp.q.Start(cursorObj)
return nil
}
// OnlyContentType constrains the LogStreamQuery to only return LogStreams of
// the given content type.
func (lsp *LogStreamQuery) OnlyContentType(ctype string) {
if ctype == "" {
return
}
lsp.descChecks = append(lsp.descChecks, func(desc *logpb.LogStreamDescriptor) bool {
return desc.ContentType == ctype
})
}
// OnlyStreamType constrains the LogStreamQuery to only return LogStreams of
// the given stream type.
func (lsp *LogStreamQuery) OnlyStreamType(stype logpb.StreamType) error {
if _, ok := logpb.StreamType_name[int32(stype)]; !ok {
return errors.New("unknown StreamType")
}
lsp.descChecks = append(lsp.descChecks, func(desc *logpb.LogStreamDescriptor) bool {
return desc.StreamType == stype
})
return nil
}
// IncludePurged will have the LogStreamQuery return purged logs as well.
func (lsp *LogStreamQuery) IncludePurged() {
lsp.includePurged = true
}
// OnlyPurged will have the LogStreamQuery return ONLY purged logs.
//
// Will result in NO logs if IncludePurged hasn't been set.
func (lsp *LogStreamQuery) OnlyPurged() {
lsp.checks = append(lsp.checks, func(ls *LogStream) bool {
return ls.Purged
})
}
// TimeBound constrains LogStreams returned to be bound by the given lower and
// upper creation timestamps.
func (lsp *LogStreamQuery) TimeBound(lower, upper *timestamp.Timestamp) {
// we use a datastore filter here because lsp.q is already ordered by
// -Created, and so we can apply an inequality to it.
if lower != nil {
lsp.q = lsp.q.Gt("Created", google.TimeFromProto(lower).UTC())
}
if upper != nil {
lsp.q = lsp.q.Lt("Created", google.TimeFromProto(upper).UTC())
}
}
// MustHaveTags constrains LogStreams returned to have all of the given tags.
func (lsp *LogStreamQuery) MustHaveTags(tags map[string]string) {
lsp.descChecks = append(lsp.descChecks, func(desc *logpb.LogStreamDescriptor) bool {
for k, v := range tags {
actual, ok := desc.Tags[k]
if !ok {
return false
}
if v != "" && v != actual {
return false
}
}
return true
})
}
func (lsp *LogStreamQuery) filter(ls *LogStream) bool {
for _, checkFn := range lsp.checks {
if !checkFn(ls) {
return false
}
}
if len(lsp.descChecks) > 0 {
desc, err := ls.DescriptorProto()
if err != nil {
return false
}
for _, checkFn := range lsp.descChecks {
if !checkFn(desc) {
return false
}
}
}
return true
}
// Run executes the LogStreamQuery and calls `cb` with each LogStream which
// matches the LogStreamQuery.
//
// If `cb` returns ds.Stop, the query will stop with a nil error.
// If `cb` returns a different error, the query will stop with the returned
// error.
// If `cb` returns nil, the query continues until it exhausts.
func (lsp *LogStreamQuery) Run(ctx context.Context, cb func(*LogStream, ds.CursorCB) error) error {
return ds.Run(ctx, lsp.q, func(ls *LogStream, getCursor ds.CursorCB) (err error) {
if lsp.filter(ls) {
err = cb(ls, getCursor)
}
return
})
}