mirrored from https://chromium.googlesource.com/infra/luci/luci-go
/
controller.go
860 lines (774 loc) · 30.4 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
// Copyright 2018 The LUCI Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dsmapper
import (
"context"
"fmt"
"math"
"sync"
"time"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
"go.chromium.org/luci/gae/service/datastore"
"go.chromium.org/luci/common/clock"
"go.chromium.org/luci/common/errors"
"go.chromium.org/luci/common/logging"
"go.chromium.org/luci/common/retry/transient"
"go.chromium.org/luci/common/sync/parallel"
"go.chromium.org/luci/server/tq"
"go.chromium.org/luci/server/dsmapper/dsmapperpb"
"go.chromium.org/luci/server/dsmapper/internal/splitter"
"go.chromium.org/luci/server/dsmapper/internal/tasks"
// Need this to enqueue tasks inside Datastore transactions.
_ "go.chromium.org/luci/server/tq/txn/datastore"
)
// ID identifies a mapper registered in the controller.
//
// It will be passed across processes, so all processes that execute mapper jobs
// should register same mappers under same IDs.
//
// The safest approach is to keep mapper IDs in the app unique, e.g. do NOT
// reuse them when adding new mappers or significantly changing existing ones.
type ID string
// Mapper applies some function to the given slice of entities, given by
// their keys.
//
// May be called multiple times for same key (thus should be idempotent).
//
// Returning a transient error indicates that the processing of this batch of
// keys should be retried (even if some keys were processed successfully).
//
// Returning a fatal error causes the entire shard (and eventually the entire
// job) to be marked as failed. The processing of the failed shard stops right
// away, but other shards are kept running until completion (or their own
// failure).
//
// The function is called outside of any transactions, so it can start its own
// if needed.
type Mapper func(ctx context.Context, keys []*datastore.Key) error
// Factory knows how to construct instances of Mapper.
//
// Factory is supplied by the users of the library and registered in the
// controller via RegisterFactory call.
//
// It is used to get a mapper to process a set of pages within a shard. It takes
// a Job (including its Config and Params) and a shard index, so it can prepare
// the mapper for processing of this specific shard.
//
// Returning a transient error triggers an eventual retry. Returning a fatal
// error causes the shard (eventually the entire job) to be marked as failed.
type Factory func(ctx context.Context, j *Job, shardIdx int) (Mapper, error)
// Controller is responsible for starting, progressing and finishing mapping
// jobs.
//
// It should be treated as a global singleton object. Having more than one
// controller in the production application is a bad idea (they'll collide with
// each other since they use global datastore namespace). It's still useful
// to instantiate multiple controllers in unit tests.
type Controller struct {
// MapperQueue is a name of the Cloud Tasks queue to use for mapping jobs.
//
// This queue will perform all "heavy" tasks. It should be configured
// appropriately to allow desired number of shards to run in parallel.
//
// For example, if the largest submitted job is expected to have 128 shards,
// max_concurrent_requests setting of the mapper queue should be at least 128,
// otherwise some shards will be stalled waiting for others to finish
// (defeating the purpose of having large number of shards).
//
// If empty, "default" is used.
MapperQueue string
// ControlQueue is a name of the Cloud Tasks queue to use for control signals.
//
// This queue is used very lightly when starting and stopping jobs (roughly
// 2*Shards tasks overall per job). A default queue.yaml settings for such
// queue should be sufficient (unless you run a lot of different jobs at
// once).
//
// If empty, "default" is used.
ControlQueue string
m sync.RWMutex
mappers map[ID]Factory
disp *tq.Dispatcher
}
// Install registers task queue task handlers in the given task queue
// dispatcher.
//
// This must be done before Controller is used.
//
// There can be at most one Controller installed into an instance of TQ
// dispatcher. Installing more will cause panics.
//
// If you need multiple different controllers for some reason, create multiple
// tq.Dispatchers (with different base URLs, so they don't conflict with each
// other) and install them all into the router.
func (ctl *Controller) Install(disp *tq.Dispatcher) {
ctl.m.Lock()
defer ctl.m.Unlock()
if ctl.disp != nil {
panic("mapper.Controller is already installed into a tq.Dispatcher")
}
ctl.disp = disp
controlQueue := ctl.ControlQueue
if controlQueue == "" {
controlQueue = "default"
}
mapperQueue := ctl.MapperQueue
if mapperQueue == "" {
mapperQueue = "default"
}
disp.RegisterTaskClass(tq.TaskClass{
ID: "dsmapper-split-and-launch",
Prototype: &tasks.SplitAndLaunch{},
Kind: tq.Transactional,
Queue: controlQueue,
Handler: ctl.splitAndLaunchHandler,
Quiet: true,
})
disp.RegisterTaskClass(tq.TaskClass{
ID: "dsmapper-fan-out-shards",
Prototype: &tasks.FanOutShards{},
Kind: tq.Transactional,
Queue: controlQueue,
Handler: ctl.fanOutShardsHandler,
Quiet: true,
})
disp.RegisterTaskClass(tq.TaskClass{
ID: "dsmapper-process-shard",
Prototype: &tasks.ProcessShard{},
Kind: tq.FollowsContext,
Queue: mapperQueue,
Handler: ctl.processShardHandler,
Quiet: true,
})
disp.RegisterTaskClass(tq.TaskClass{
ID: "dsmapper-request-job-state-update",
Prototype: &tasks.RequestJobStateUpdate{},
Kind: tq.Transactional,
Queue: controlQueue,
Handler: ctl.requestJobStateUpdateHandler,
Quiet: true,
})
disp.RegisterTaskClass(tq.TaskClass{
ID: "dsmapper-update-job-state",
Prototype: &tasks.UpdateJobState{},
Kind: tq.NonTransactional,
Queue: controlQueue,
Handler: ctl.updateJobStateHandler,
Quiet: true,
})
}
// tq returns a dispatcher set in Install or panics if not set yet.
//
// Grabs the reader lock inside.
func (ctl *Controller) tq() *tq.Dispatcher {
ctl.m.RLock()
defer ctl.m.RUnlock()
if ctl.disp == nil {
panic("mapper.Controller wasn't installed into tq.Dispatcher yet")
}
return ctl.disp
}
// RegisterFactory adds the given mapper factory to the internal registry.
//
// Intended to be used during init() time or early during the process
// initialization. Panics if a factory with such ID has already been registered.
//
// The mapper ID will be used internally to identify which mapper a job should
// be using. If a factory disappears while the job is running (e.g. if the
// service binary is updated and new binary doesn't have the mapper registered
// anymore), the job ends with a failure.
func (ctl *Controller) RegisterFactory(id ID, m Factory) {
ctl.m.Lock()
defer ctl.m.Unlock()
if _, ok := ctl.mappers[id]; ok {
panic(fmt.Sprintf("mapper %q is already registered", id))
}
if ctl.mappers == nil {
ctl.mappers = make(map[ID]Factory, 1)
}
ctl.mappers[id] = m
}
// getFactory returns a registered mapper factory or an error.
//
// Grabs the reader lock inside. Can return only fatal errors.
func (ctl *Controller) getFactory(id ID) (Factory, error) {
ctl.m.RLock()
defer ctl.m.RUnlock()
if m, ok := ctl.mappers[id]; ok {
return m, nil
}
return nil, errors.Reason("no mapper factory with ID %q registered", id).Err()
}
// initMapper instantiates a Mapper through a registered factory.
//
// May return fatal and transient errors.
func (ctl *Controller) initMapper(ctx context.Context, j *Job, shardIdx int) (Mapper, error) {
f, err := ctl.getFactory(j.Config.Mapper)
if err != nil {
return nil, errors.Annotate(err, "when initializing mapper").Err()
}
m, err := f(ctx, j, shardIdx)
if err != nil {
return nil, errors.Annotate(err, "error from mapper factory %q", j.Config.Mapper).Err()
}
return m, nil
}
// LaunchJob launches a new mapping job, returning its ID (that can be used to
// control it or query its status).
//
// Launches a datastore transaction inside.
func (ctl *Controller) LaunchJob(ctx context.Context, j *JobConfig) (JobID, error) {
disp := ctl.tq()
if err := j.Validate(); err != nil {
return 0, errors.Annotate(err, "bad job config").Err()
}
if _, err := ctl.getFactory(j.Mapper); err != nil {
return 0, errors.Annotate(err, "bad job config").Err()
}
// Prepare and store the job entity, generate its key. Launch a tq task that
// subdivides the key space and launches individual shards. We do it
// asynchronously since this can be potentially slow (for large number of
// shards).
var job Job
err := runTxn(ctx, func(ctx context.Context) error {
now := clock.Now(ctx).UTC()
job = Job{
Config: *j,
State: dsmapperpb.State_STARTING,
Created: now,
Updated: now,
}
if err := datastore.Put(ctx, &job); err != nil {
return errors.Annotate(err, "failed to store Job entity").Tag(transient.Tag).Err()
}
return disp.AddTask(ctx, &tq.Task{
Title: fmt.Sprintf("split:job-%d", job.ID),
Payload: &tasks.SplitAndLaunch{
JobId: int64(job.ID),
},
})
})
if err != nil {
return 0, err
}
return job.ID, nil
}
// GetJob fetches a previously launched job given its ID.
//
// Returns ErrNoSuchJob if not found. All other possible errors are transient
// and they are marked as such.
func (ctl *Controller) GetJob(ctx context.Context, id JobID) (*Job, error) {
// Even though we could have made getJob public, we want to force API users
// to use Controller as a single facade.
return getJob(ctx, id)
}
// AbortJob aborts a job and returns its most recent state.
//
// Silently does nothing if the job is finished or already aborted.
//
// Returns ErrNoSuchJob is there's no such job at all. All other possible errors
// are transient and they are marked as such.
func (ctl *Controller) AbortJob(ctx context.Context, id JobID) (job *Job, err error) {
err = runTxn(ctx, func(ctx context.Context) error {
var err error
switch job, err = getJob(ctx, id); {
case err != nil:
return err
case isFinalState(job.State) || job.State == dsmapperpb.State_ABORTING:
return nil // nothing to abort, already done
case job.State == dsmapperpb.State_STARTING:
// Shards haven't been launched yet. Kill the job right away.
job.State = dsmapperpb.State_ABORTED
case job.State == dsmapperpb.State_RUNNING:
// Running shards will discover that the job is aborting and will
// eventually move into ABORTED state (notifying the job about it). Once
// all shards report they are done, the job itself will switch into
// ABORTED state.
job.State = dsmapperpb.State_ABORTING
}
job.Updated = clock.Now(ctx).UTC()
return errors.Annotate(datastore.Put(ctx, job), "failed to store Job entity").Tag(transient.Tag).Err()
})
if err != nil {
job = nil // don't return bogus data in case txn failed to land
}
return
}
////////////////////////////////////////////////////////////////////////////////
// Task queue tasks handlers.
// errJobAborted is used internally as shard failure status when the job is
// being aborted.
//
// It causes the shard to switch into ABORTED state instead of FAIL.
var errJobAborted = errors.New("the job has been aborted")
// splitAndLaunchHandler splits the job into shards and enqueues tasks that
// process shards.
func (ctl *Controller) splitAndLaunchHandler(ctx context.Context, payload proto.Message) error {
msg := payload.(*tasks.SplitAndLaunch)
now := clock.Now(ctx).UTC()
// Fetch job details. Make sure it isn't canceled and isn't running already.
job, err := getJobInState(ctx, JobID(msg.JobId), dsmapperpb.State_STARTING)
if err != nil || job == nil {
return errors.Annotate(err, "in SplitAndLaunch").Err()
}
// Figure out key ranges for shards. There may be fewer shards than requested
// if there are too few entities.
dq := job.Config.Query.ToDatastoreQuery()
ranges, err := splitter.SplitIntoRanges(ctx, dq, splitter.Params{
Shards: job.Config.ShardCount,
Samples: 512, // should be enough for everyone...
})
if err != nil {
return errors.Annotate(err, "failed to split the query into shards").Tag(transient.Tag).Err()
}
// Create entities that hold shards state. Each one is in its own entity
// group, since the combined write rate to them is O(ShardCount), which can
// overcome limits of a single entity group.
shards := make([]*shard, len(ranges))
for idx, rng := range ranges {
shards[idx] = &shard{
JobID: job.ID,
Index: idx,
State: dsmapperpb.State_STARTING,
Range: rng,
ExpectedCount: -1,
Created: now,
Updated: now,
}
}
// Calculate number of entities in each shard to track shard processing
// progress. Note that this can be very slow if there are many entities.
if job.Config.TrackProgress {
logging.Infof(ctx, "Estimating the size of each shard...")
if err := fetchShardSizes(ctx, dq, shards); err != nil {
return errors.Annotate(err, "when estimating shard sizes").Err()
}
}
// We use auto-generated keys for shards to make sure crashed SplitAndLaunch
// task retries cleanly, even if the underlying key space we are mapping over
// changes between the retries (making a naive put using "<job-id>:<index>"
// key non-idempotent!).
logging.Infof(ctx, "Instantiating shards...")
if err := datastore.Put(ctx, shards); err != nil {
return errors.Annotate(err, "failed to store shards").Tag(transient.Tag).Err()
}
// Prepare shardList which is basically a manual fully consistent index for
// Job -> [Shard] relation. We can't use a regular index, since shards are all
// in different entity groups (see O(ShardCount) argument above).
//
// Log the resulting shards along the way.
shardsEnt := shardList{
Parent: datastore.KeyForObj(ctx, job),
Shards: make([]int64, len(shards)),
}
for idx, s := range shards {
shardsEnt.Shards[idx] = s.ID
l, r := "-inf", "+inf"
if s.Range.Start != nil {
l = s.Range.Start.String()
}
if s.Range.End != nil {
r = s.Range.End.String()
}
count := ""
if s.ExpectedCount != 0 {
count = fmt.Sprintf(" (%d entities)", s.ExpectedCount)
}
logging.Infof(ctx, "Shard #%d is %d: %s - %s%s", idx, s.ID, l, r, count)
}
// Transactionally associate shards with the job and launch the TQ task that
// kicks off the processing of each individual shard. We use an intermediary
// task for this since transactionally launching O(ShardCount) tasks hits TQ
// transaction limits.
//
// If SplitAndLaunch crashes before this transaction lands, there'll be some
// orphaned Shard entities, no big deal.
logging.Infof(ctx, "Updating the job and launching the fan out task...")
return runTxn(ctx, func(ctx context.Context) error {
job, err := getJobInState(ctx, JobID(msg.JobId), dsmapperpb.State_STARTING)
if err != nil || job == nil {
return errors.Annotate(err, "in SplitAndLaunch txn").Err()
}
job.State = dsmapperpb.State_RUNNING
job.Updated = now
if err := datastore.Put(ctx, job, &shardsEnt); err != nil {
return errors.Annotate(err,
"when storing Job %d and ShardList with %d shards", job.ID, len(shards),
).Tag(transient.Tag).Err()
}
return ctl.tq().AddTask(ctx, &tq.Task{
Title: fmt.Sprintf("fanout:job-%d", job.ID),
Payload: &tasks.FanOutShards{
JobId: int64(job.ID),
},
})
})
}
// fetchShardSizes makes a bunch of Count() queries to figure out size of each
// shard.
//
// Updates ExpectedCount in-place.
func fetchShardSizes(ctx context.Context, baseQ *datastore.Query, shards []*shard) error {
ctx, cancel := clock.WithTimeout(ctx, 10*time.Minute)
defer cancel()
err := parallel.WorkPool(32, func(tasks chan<- func() error) {
for _, sh := range shards {
sh := sh
tasks <- func() error {
n, err := datastore.CountBatch(ctx, 1024, sh.Range.Apply(baseQ))
if err == nil {
sh.ExpectedCount = n
}
return errors.Annotate(err, "for shard #%d", sh.Index).Err()
}
}
})
return transient.Tag.Apply(err)
}
// fanOutShardsHandler fetches a list of shards from the job and launches
// named ProcessShard tasks, one per shard.
func (ctl *Controller) fanOutShardsHandler(ctx context.Context, payload proto.Message) error {
msg := payload.(*tasks.FanOutShards)
// Make sure the job is still present. If it is aborted, we still need to
// launch the shards, so they notice they are being aborted. We could try
// to abort all shards right here and now, but it basically means implementing
// an alternative shard abort flow. Seems simpler just to let the regular flow
// to proceed.
job, err := getJobInState(ctx, JobID(msg.JobId), dsmapperpb.State_RUNNING, dsmapperpb.State_ABORTING)
if err != nil || job == nil {
return errors.Annotate(err, "in FanOutShards").Err()
}
// Grab the list of shards created in SplitAndLaunch. It must exist at this
// point, since the job is in Running state.
shardIDs, err := job.fetchShardIDs(ctx)
if err != nil {
return errors.Annotate(err, "in FanOutShards").Err()
}
// Enqueue a bunch of named ProcessShard tasks (one per shard) to actually
// launch shard processing. This is idempotent operation, so if FanOutShards
// crashes midway and later retried, nothing bad happens.
eg, ctx := errgroup.WithContext(ctx)
tq := ctl.tq()
for _, sid := range shardIDs {
task := makeProcessShardTask(job.ID, sid, 0, true)
eg.Go(func() error { return tq.AddTask(ctx, task) })
}
return eg.Wait()
}
// processShardHandler reads a bunch of entities (up to PageSize), and hands
// them to the mapper.
//
// After doing this in a loop for 1 min, it checkpoints the state and reenqueues
// itself to resume mapping in another instance of the task. This makes each
// processing TQ task relatively small, so it doesn't eat a lot of memory, or
// produces gigantic unreadable logs. It also makes TQ's "Pause queue" button
// more handy.
func (ctl *Controller) processShardHandler(ctx context.Context, payload proto.Message) error {
msg := payload.(*tasks.ProcessShard)
// Grab the shard. This returns (nil, nil) if this Task Queue task is stale
// (based on taskNum) and should be silently skipped.
sh, err := getActiveShard(ctx, msg.ShardId, msg.TaskNum)
if err != nil || sh == nil {
return errors.Annotate(err, "when fetching shard state").Err()
}
ctx = logging.SetField(ctx, "shardIdx", sh.Index)
logging.Infof(ctx,
"Resuming processing of the shard (launched %s ago)",
clock.Now(ctx).Sub(sh.Created))
// Grab the job config, make sure the job is still active.
job, err := getJobInState(ctx, JobID(msg.JobId), dsmapperpb.State_RUNNING, dsmapperpb.State_ABORTING)
if err != nil || job == nil {
return errors.Annotate(err, "in ProcessShard").Err()
}
// If the job is being killed, kill the shard as well. This will eventually
// notify the job about shard's completion. Once all shards are done, the
// job will switch into ABORTED state.
if job.State == dsmapperpb.State_ABORTING {
return ctl.finishShard(ctx, sh.ID, 0, errJobAborted)
}
// Prepare the mapper by giving the factory job parameters.
mapper, err := ctl.initMapper(ctx, job, sh.Index)
switch {
case transient.Tag.In(err):
return errors.Annotate(err, "transient error when instantiating a mapper").Err()
case err != nil:
// Kill the shard if the factory returns a fatal error.
return ctl.finishShard(ctx, sh.ID, 0, err)
}
baseQ := job.Config.Query.ToDatastoreQuery()
lastKey := sh.ResumeFrom
keys := make([]*datastore.Key, 0, job.Config.PageSize)
shardDone := false // true when finished processing the shard
pageCount := 0 // how many pages processed successfully
itemCount := int64(0) // how many entities processed successfully
// A soft deadline when to checkpoint the progress and reenqueue the
// processing task. We never abort processing of a page midway (causes too
// many complications), so if the mapper is extremely slow, it may end up
// running longer than this deadline.
dur := time.Minute
if job.Config.TaskDuration > 0 {
dur = job.Config.TaskDuration
}
deadline := clock.Now(ctx).Add(dur)
// Optionally also put a limit on number of processed pages. Useful if the
// mapper is somehow leaking resources (not sure it is possible in Go, but
// it was definitely possible in Python).
pageCountLimit := math.MaxInt32
if job.Config.PagesPerTask > 0 {
pageCountLimit = job.Config.PagesPerTask
}
for clock.Now(ctx).Before(deadline) && pageCount < pageCountLimit {
rng := sh.Range
if lastKey != nil {
rng.Start = lastKey
}
if rng.IsEmpty() {
shardDone = true
break
}
// Fetch next batch of keys. Return an error to the outer scope where it
// eventually will bubble up to TQ (so the task is retried with exponential
// backoff).
logging.Infof(ctx, "Fetching the next batch...")
q := rng.Apply(baseQ).Limit(int32(job.Config.PageSize)).KeysOnly(true)
keys = keys[:0]
if err = datastore.GetAll(ctx, q, &keys); err != nil {
err = errors.Annotate(err, "when querying for keys").Tag(transient.Tag).Err()
break
}
// No results within the range? Processing of the shard is complete!
if len(keys) == 0 {
shardDone = true
break
}
// Let the mapper do its thing. Remember where to resume from.
logging.Infof(ctx,
"Processing %d entities: %s - %s",
len(keys),
keys[0].String(),
keys[len(keys)-1].String())
if err = mapper(ctx, keys); err != nil {
err = errors.Annotate(err, "while mapping %d keys", len(keys)).Err()
break
}
lastKey = keys[len(keys)-1]
pageCount++
itemCount += int64(len(keys))
// Note: at this point we might try to checkpoint the progress, but we must
// be careful not to exceed 1 transaction per second limit. Considering we
// also MUST checkpoint the progress at the end of the task, it is a bit
// tricky to guarantee no two checkpoints are closer than 1 sec. We can do
// silly things like sleep 1 sec before the last checkpoint, but they
// provide no guarantees.
//
// So instead we store the progress after the deadline is up. If the task
// crashes midway, up to 1 min of work will be retried. No big deal.
}
// We are done with the shard when either processed all its range or failed
// with a fatal error. finishShard would take care of notifying the parent
// job about the shard's completion.
if shardDone || (err != nil && !transient.Tag.In(err)) {
return ctl.finishShard(ctx, sh.ID, itemCount, err)
}
if lastKey != nil {
logging.Infof(ctx, "The shard processing will resume from %s", lastKey)
} else {
logging.Infof(ctx, "The shard processing will resume from scratch")
}
// If the shard isn't done and we made no progress at all, then we hit
// a transient error. Ask TQ to retry.
if pageCount == 0 {
return err
}
// Otherwise need to checkpoint the progress and either to retry this task
// (on transient errors, to get an exponential backoff from TQ), or start
// a new task.
txnErr := shardTxn(ctx, sh.ID, func(ctx context.Context, sh *shard) (bool, error) {
switch {
case sh.ProcessTaskNum != msg.TaskNum:
logging.Warningf(ctx, "Unexpected shard state: its ProcessTaskNum is %d != %d", sh.ProcessTaskNum, msg.TaskNum)
return false, nil // some other task is already running
case sh.ResumeFrom != nil && lastKey.Less(sh.ResumeFrom):
logging.Warningf(ctx, "Unexpected shard state: its ResumeFrom is %s >= %s", sh.ResumeFrom, lastKey)
return false, nil // someone already claimed to process further, let them proceed
}
sh.State = dsmapperpb.State_RUNNING
sh.ResumeFrom = lastKey
sh.ProcessedCount += itemCount
// If the processing failed, just store the progress, but do not start a
// new TQ task. Retry the current task instead (to get exponential backoff).
if err != nil {
return true, nil
}
// Otherwise launch a new task in the chain. This essentially "resets"
// the exponential backoff counter.
sh.ProcessTaskNum++
return true, ctl.tq().AddTask(ctx,
makeProcessShardTask(sh.JobID, sh.ID, sh.ProcessTaskNum, false))
})
switch {
case err != nil && txnErr == nil:
return err
case err == nil && txnErr != nil:
return errors.Annotate(txnErr, "when storing shard progress").Err()
case err != nil && txnErr != nil:
return errors.Annotate(txnErr, "when storing shard progress after a transient error (%s)", err).Err()
default: // (nil, nil)
return nil
}
}
// finishShard marks the shard as finished (with status based on shardErr) and
// emits a task to update the parent job's status.
func (ctl *Controller) finishShard(ctx context.Context, shardID, processedCount int64, shardErr error) error {
err := shardTxn(ctx, shardID, func(ctx context.Context, sh *shard) (save bool, err error) {
runtime := clock.Now(ctx).Sub(sh.Created)
switch {
case shardErr == errJobAborted:
logging.Warningf(ctx, "The job has been aborted, aborting the shard after it has been running %s", runtime)
sh.State = dsmapperpb.State_ABORTED
sh.Error = errJobAborted.Error()
case shardErr != nil:
logging.Errorf(ctx, "The shard processing failed in %s with error: %s", runtime, shardErr)
sh.State = dsmapperpb.State_FAIL
sh.Error = shardErr.Error()
default:
logging.Infof(ctx, "The shard processing finished successfully in %s", runtime)
sh.State = dsmapperpb.State_SUCCESS
}
sh.ProcessedCount += processedCount
return true, ctl.requestJobStateUpdate(ctx, sh.JobID, sh.ID)
})
return errors.Annotate(err, "when marking the shard as finished").Err()
}
// makeProcessShardTask creates a ProcessShard tq.Task.
//
// If 'named' is true, assigns it a name. Tasks are named based on their shard
// IDs and an index in the chain of ProcessShard tasks (task number), so that
// on retries we don't rekick already finished tasks.
func makeProcessShardTask(job JobID, shardID, taskNum int64, named bool) *tq.Task {
// Note: strictly speaking including job ID in the task name is redundant,
// since shardID is already globally unique, but it doesn't hurt. Useful for
// debugging and when looking at logs and pending TQ tasks.
t := &tq.Task{
Title: fmt.Sprintf("map:job-%d-shard-%d-task-%d", job, shardID, taskNum),
Payload: &tasks.ProcessShard{
JobId: int64(job),
ShardId: shardID,
TaskNum: taskNum,
},
}
if named {
t.DeduplicationKey = fmt.Sprintf("v1-%d-%d-%d", job, shardID, taskNum)
}
return t
}
// requestJobStateUpdate submits RequestJobStateUpdate task, which eventually
// causes updateJobStateHandler to execute.
func (ctl *Controller) requestJobStateUpdate(ctx context.Context, jobID JobID, shardID int64) error {
return ctl.tq().AddTask(ctx, &tq.Task{
Title: fmt.Sprintf("notify:job-%d-shard-%d", jobID, shardID),
Payload: &tasks.RequestJobStateUpdate{
JobId: int64(jobID),
ShardId: shardID,
},
})
}
// requestJobStateUpdateHandler is called whenever state of some shard changes.
//
// It forwards this notification to the job (specifically updateJobStateHandler)
// throttling the rate to ~0.5 QPS to avoid overwhelming job's entity group with
// high write rate.
func (ctl *Controller) requestJobStateUpdateHandler(ctx context.Context, payload proto.Message) error {
msg := payload.(*tasks.RequestJobStateUpdate)
// Throttle to once per 2 sec (and make sure it is always in the future). We
// rely here on a pretty good (< .5s maximum skew) clock sync on servers.
eta := clock.Now(ctx).Unix()
eta = (eta/2 + 1) * 2
dedupKey := fmt.Sprintf("update-job-state-v1:%d:%d", msg.JobId, eta)
err := ctl.tq().AddTask(ctx, &tq.Task{
DeduplicationKey: dedupKey,
Title: fmt.Sprintf("update:job-%d", msg.JobId),
ETA: time.Unix(eta, 0),
Payload: &tasks.UpdateJobState{JobId: msg.JobId},
})
return errors.Annotate(err, "when adding UpdateJobState task").Err()
}
// updateJobStateHandler is called some time later after one or more shards have
// changed state.
//
// It calculates overall job state based on the state of its shards.
func (ctl *Controller) updateJobStateHandler(ctx context.Context, payload proto.Message) error {
msg := payload.(*tasks.UpdateJobState)
// Get the job and all its shards in their most recent state.
job, err := getJobInState(ctx, JobID(msg.JobId), dsmapperpb.State_RUNNING, dsmapperpb.State_ABORTING)
if err != nil || job == nil {
return errors.Annotate(err, "in UpdateJobState").Err()
}
shards, err := job.fetchShards(ctx)
if err != nil {
return errors.Annotate(err, "failed to fetch shards").Err()
}
// Switch the job into a final state only when all shards are done running.
perState := make(map[dsmapperpb.State]int, len(dsmapperpb.State_name))
finished := 0
for _, sh := range shards {
logging.Infof(ctx, "Shard #%d (%d) is in state %s", sh.Index, sh.ID, sh.State)
perState[sh.State]++
if isFinalState(sh.State) {
finished++
}
}
if finished != len(shards) {
return nil
}
jobState := dsmapperpb.State_SUCCESS
switch {
case perState[dsmapperpb.State_ABORTED] != 0:
jobState = dsmapperpb.State_ABORTED
case perState[dsmapperpb.State_FAIL] != 0:
jobState = dsmapperpb.State_FAIL
}
return runTxn(ctx, func(ctx context.Context) error {
job, err := getJobInState(ctx, JobID(msg.JobId), dsmapperpb.State_RUNNING, dsmapperpb.State_ABORTING)
if err != nil || job == nil {
return errors.Annotate(err, "in UpdateJobState txn").Err()
}
// Make sure an aborting job ends up in aborted state, even if all its
// shards manged to finish. It looks weird when an ABORTING job moves
// into e.g. SUCCESS state.
if job.State == dsmapperpb.State_ABORTING {
job.State = dsmapperpb.State_ABORTED
} else {
job.State = jobState
}
job.Updated = clock.Now(ctx).UTC()
runtime := job.Updated.Sub(job.Created)
switch job.State {
case dsmapperpb.State_SUCCESS:
logging.Infof(ctx, "The job finished successfully in %s", runtime)
case dsmapperpb.State_FAIL:
logging.Errorf(ctx, "The job finished with %d shards failing in %s", perState[dsmapperpb.State_FAIL], runtime)
for _, sh := range shards {
if sh.State == dsmapperpb.State_FAIL {
logging.Errorf(ctx, "Shard #%d (%d) error - %s", sh.Index, sh.ID, sh.Error)
}
}
case dsmapperpb.State_ABORTED:
logging.Warningf(ctx, "The job has been aborted after %s: %d shards succeeded, %d shards failed, %d shards aborted",
runtime, perState[dsmapperpb.State_SUCCESS], perState[dsmapperpb.State_FAIL], perState[dsmapperpb.State_ABORTED])
}
return transient.Tag.Apply(datastore.Put(ctx, job))
})
}