/
memory.go
979 lines (852 loc) · 24.2 KB
/
memory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
package store
import (
"context"
"errors"
"fmt"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/docker/go-events"
"github.com/docker/go-metrics"
"github.com/docker/swarmkit/api"
pb "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/watch"
gogotypes "github.com/gogo/protobuf/types"
memdb "github.com/hashicorp/go-memdb"
)
const (
indexID = "id"
indexName = "name"
indexRuntime = "runtime"
indexServiceID = "serviceid"
indexNodeID = "nodeid"
indexSlot = "slot"
indexDesiredState = "desiredstate"
indexTaskState = "taskstate"
indexRole = "role"
indexMembership = "membership"
indexNetwork = "network"
indexSecret = "secret"
indexConfig = "config"
indexKind = "kind"
indexCustom = "custom"
prefix = "_prefix"
// MaxChangesPerTransaction is the number of changes after which a new
// transaction should be started within Batch.
MaxChangesPerTransaction = 200
// MaxTransactionBytes is the maximum serialized transaction size.
MaxTransactionBytes = 1.5 * 1024 * 1024
)
var (
// ErrExist is returned by create operations if the provided ID is already
// taken.
ErrExist = errors.New("object already exists")
// ErrNotExist is returned by altering operations (update, delete) if the
// provided ID is not found.
ErrNotExist = errors.New("object does not exist")
// ErrNameConflict is returned by create/update if the object name is
// already in use by another object.
ErrNameConflict = errors.New("name conflicts with an existing object")
// ErrInvalidFindBy is returned if an unrecognized type is passed to Find.
ErrInvalidFindBy = errors.New("invalid find argument type")
// ErrSequenceConflict is returned when trying to update an object
// whose sequence information does not match the object in the store's.
ErrSequenceConflict = errors.New("update out of sequence")
objectStorers []ObjectStoreConfig
schema = &memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{},
}
errUnknownStoreAction = errors.New("unknown store action")
// WedgeTimeout is the maximum amount of time the store lock may be
// held before declaring a suspected deadlock.
WedgeTimeout = 30 * time.Second
// update()/write tx latency timer.
updateLatencyTimer metrics.Timer
// view()/read tx latency timer.
viewLatencyTimer metrics.Timer
// lookup() latency timer.
lookupLatencyTimer metrics.Timer
// Batch() latency timer.
batchLatencyTimer metrics.Timer
// timer to capture the duration for which the memory store mutex is locked.
storeLockDurationTimer metrics.Timer
)
func init() {
ns := metrics.NewNamespace("swarm", "store", nil)
updateLatencyTimer = ns.NewTimer("write_tx_latency",
"Raft store write tx latency.")
viewLatencyTimer = ns.NewTimer("read_tx_latency",
"Raft store read tx latency.")
lookupLatencyTimer = ns.NewTimer("lookup_latency",
"Raft store read latency.")
batchLatencyTimer = ns.NewTimer("batch_latency",
"Raft store batch latency.")
storeLockDurationTimer = ns.NewTimer("memory_store_lock_duration",
"Duration for which the raft memory store lock was held.")
metrics.Register(ns)
}
func register(os ObjectStoreConfig) {
objectStorers = append(objectStorers, os)
schema.Tables[os.Table.Name] = os.Table
}
// timedMutex wraps a sync.Mutex, and keeps track of when it was locked.
type timedMutex struct {
sync.Mutex
lockedAt atomic.Value
}
func (m *timedMutex) Lock() {
m.Mutex.Lock()
m.lockedAt.Store(time.Now())
}
// Unlocks the timedMutex and captures the duration
// for which it was locked in a metric.
func (m *timedMutex) Unlock() {
unlockedTimestamp := m.lockedAt.Load()
m.lockedAt.Store(time.Time{})
m.Mutex.Unlock()
lockedFor := time.Since(unlockedTimestamp.(time.Time))
storeLockDurationTimer.Update(lockedFor)
}
func (m *timedMutex) LockedAt() time.Time {
lockedTimestamp := m.lockedAt.Load()
if lockedTimestamp == nil {
return time.Time{}
}
return lockedTimestamp.(time.Time)
}
// MemoryStore is a concurrency-safe, in-memory implementation of the Store
// interface.
type MemoryStore struct {
// updateLock must be held during an update transaction.
updateLock timedMutex
memDB *memdb.MemDB
queue *watch.Queue
proposer state.Proposer
}
// NewMemoryStore returns an in-memory store. The argument is an optional
// Proposer which will be used to propagate changes to other members in a
// cluster.
func NewMemoryStore(proposer state.Proposer) *MemoryStore {
memDB, err := memdb.NewMemDB(schema)
if err != nil {
// This shouldn't fail
panic(err)
}
return &MemoryStore{
memDB: memDB,
queue: watch.NewQueue(),
proposer: proposer,
}
}
// Close closes the memory store and frees its associated resources.
func (s *MemoryStore) Close() error {
return s.queue.Close()
}
func fromArgs(args ...interface{}) ([]byte, error) {
if len(args) != 1 {
return nil, fmt.Errorf("must provide only a single argument")
}
arg, ok := args[0].(string)
if !ok {
return nil, fmt.Errorf("argument must be a string: %#v", args[0])
}
// Add the null character as a terminator
arg += "\x00"
return []byte(arg), nil
}
func prefixFromArgs(args ...interface{}) ([]byte, error) {
val, err := fromArgs(args...)
if err != nil {
return nil, err
}
// Strip the null terminator, the rest is a prefix
n := len(val)
if n > 0 {
return val[:n-1], nil
}
return val, nil
}
// ReadTx is a read transaction. Note that transaction does not imply
// any internal batching. It only means that the transaction presents a
// consistent view of the data that cannot be affected by other
// transactions.
type ReadTx interface {
lookup(table, index, id string) api.StoreObject
get(table, id string) api.StoreObject
find(table string, by By, checkType func(By) error, appendResult func(api.StoreObject)) error
}
type readTx struct {
memDBTx *memdb.Txn
}
// View executes a read transaction.
func (s *MemoryStore) View(cb func(ReadTx)) {
defer metrics.StartTimer(viewLatencyTimer)()
memDBTx := s.memDB.Txn(false)
readTx := readTx{
memDBTx: memDBTx,
}
cb(readTx)
memDBTx.Commit()
}
// Tx is a read/write transaction. Note that transaction does not imply
// any internal batching. The purpose of this transaction is to give the
// user a guarantee that its changes won't be visible to other transactions
// until the transaction is over.
type Tx interface {
ReadTx
create(table string, o api.StoreObject) error
update(table string, o api.StoreObject) error
delete(table, id string) error
}
type tx struct {
readTx
curVersion *api.Version
changelist []api.Event
}
// changelistBetweenVersions returns the changes after "from" up to and
// including "to".
func (s *MemoryStore) changelistBetweenVersions(from, to api.Version) ([]api.Event, error) {
if s.proposer == nil {
return nil, errors.New("store does not support versioning")
}
changes, err := s.proposer.ChangesBetween(from, to)
if err != nil {
return nil, err
}
var changelist []api.Event
for _, change := range changes {
for _, sa := range change.StoreActions {
event, err := api.EventFromStoreAction(sa, nil)
if err != nil {
return nil, err
}
changelist = append(changelist, event)
}
changelist = append(changelist, state.EventCommit{Version: change.Version.Copy()})
}
return changelist, nil
}
// ApplyStoreActions updates a store based on StoreAction messages.
func (s *MemoryStore) ApplyStoreActions(actions []api.StoreAction) error {
s.updateLock.Lock()
memDBTx := s.memDB.Txn(true)
tx := tx{
readTx: readTx{
memDBTx: memDBTx,
},
}
for _, sa := range actions {
if err := applyStoreAction(&tx, sa); err != nil {
memDBTx.Abort()
s.updateLock.Unlock()
return err
}
}
memDBTx.Commit()
for _, c := range tx.changelist {
s.queue.Publish(c)
}
if len(tx.changelist) != 0 {
s.queue.Publish(state.EventCommit{})
}
s.updateLock.Unlock()
return nil
}
func applyStoreAction(tx Tx, sa api.StoreAction) error {
for _, os := range objectStorers {
err := os.ApplyStoreAction(tx, sa)
if err != errUnknownStoreAction {
return err
}
}
return errors.New("unrecognized action type")
}
func (s *MemoryStore) update(proposer state.Proposer, cb func(Tx) error) error {
defer metrics.StartTimer(updateLatencyTimer)()
s.updateLock.Lock()
memDBTx := s.memDB.Txn(true)
var curVersion *api.Version
if proposer != nil {
curVersion = proposer.GetVersion()
}
var tx tx
tx.init(memDBTx, curVersion)
err := cb(&tx)
if err == nil {
if proposer == nil {
memDBTx.Commit()
} else {
var sa []api.StoreAction
sa, err = tx.changelistStoreActions()
if err == nil {
if len(sa) != 0 {
err = proposer.ProposeValue(context.Background(), sa, func() {
memDBTx.Commit()
})
} else {
memDBTx.Commit()
}
}
}
}
if err == nil {
for _, c := range tx.changelist {
s.queue.Publish(c)
}
if len(tx.changelist) != 0 {
if proposer != nil {
curVersion = proposer.GetVersion()
}
s.queue.Publish(state.EventCommit{Version: curVersion})
}
} else {
memDBTx.Abort()
}
s.updateLock.Unlock()
return err
}
func (s *MemoryStore) updateLocal(cb func(Tx) error) error {
return s.update(nil, cb)
}
// Update executes a read/write transaction.
func (s *MemoryStore) Update(cb func(Tx) error) error {
return s.update(s.proposer, cb)
}
// Batch provides a mechanism to batch updates to a store.
type Batch struct {
tx tx
store *MemoryStore
// applied counts the times Update has run successfully
applied int
// transactionSizeEstimate is the running count of the size of the
// current transaction.
transactionSizeEstimate int
// changelistLen is the last known length of the transaction's
// changelist.
changelistLen int
err error
}
// Update adds a single change to a batch. Each call to Update is atomic, but
// different calls to Update may be spread across multiple transactions to
// circumvent transaction size limits.
func (batch *Batch) Update(cb func(Tx) error) error {
if batch.err != nil {
return batch.err
}
if err := cb(&batch.tx); err != nil {
return err
}
batch.applied++
for batch.changelistLen < len(batch.tx.changelist) {
sa, err := api.NewStoreAction(batch.tx.changelist[batch.changelistLen])
if err != nil {
return err
}
batch.transactionSizeEstimate += sa.Size()
batch.changelistLen++
}
if batch.changelistLen >= MaxChangesPerTransaction || batch.transactionSizeEstimate >= (MaxTransactionBytes*3)/4 {
if err := batch.commit(); err != nil {
return err
}
// Yield the update lock
batch.store.updateLock.Unlock()
runtime.Gosched()
batch.store.updateLock.Lock()
batch.newTx()
}
return nil
}
func (batch *Batch) newTx() {
var curVersion *api.Version
if batch.store.proposer != nil {
curVersion = batch.store.proposer.GetVersion()
}
batch.tx.init(batch.store.memDB.Txn(true), curVersion)
batch.transactionSizeEstimate = 0
batch.changelistLen = 0
}
func (batch *Batch) commit() error {
if batch.store.proposer != nil {
var sa []api.StoreAction
sa, batch.err = batch.tx.changelistStoreActions()
if batch.err == nil {
if len(sa) != 0 {
batch.err = batch.store.proposer.ProposeValue(context.Background(), sa, func() {
batch.tx.memDBTx.Commit()
})
} else {
batch.tx.memDBTx.Commit()
}
}
} else {
batch.tx.memDBTx.Commit()
}
if batch.err != nil {
batch.tx.memDBTx.Abort()
return batch.err
}
for _, c := range batch.tx.changelist {
batch.store.queue.Publish(c)
}
if len(batch.tx.changelist) != 0 {
batch.store.queue.Publish(state.EventCommit{})
}
return nil
}
// Batch performs one or more transactions that allow reads and writes
// It invokes a callback that is passed a Batch object. The callback may
// call batch.Update for each change it wants to make as part of the
// batch. The changes in the batch may be split over multiple
// transactions if necessary to keep transactions below the size limit.
// Batch holds a lock over the state, but will yield this lock every
// it creates a new transaction to allow other writers to proceed.
// Thus, unrelated changes to the state may occur between calls to
// batch.Update.
//
// This method allows the caller to iterate over a data set and apply
// changes in sequence without holding the store write lock for an
// excessive time, or producing a transaction that exceeds the maximum
// size.
//
// If Batch returns an error, no guarantees are made about how many updates
// were committed successfully.
func (s *MemoryStore) Batch(cb func(*Batch) error) error {
defer metrics.StartTimer(batchLatencyTimer)()
s.updateLock.Lock()
batch := Batch{
store: s,
}
batch.newTx()
if err := cb(&batch); err != nil {
batch.tx.memDBTx.Abort()
s.updateLock.Unlock()
return err
}
err := batch.commit()
s.updateLock.Unlock()
return err
}
func (tx *tx) init(memDBTx *memdb.Txn, curVersion *api.Version) {
tx.memDBTx = memDBTx
tx.curVersion = curVersion
tx.changelist = nil
}
func (tx tx) changelistStoreActions() ([]api.StoreAction, error) {
var actions []api.StoreAction
for _, c := range tx.changelist {
sa, err := api.NewStoreAction(c)
if err != nil {
return nil, err
}
actions = append(actions, sa)
}
return actions, nil
}
// lookup is an internal typed wrapper around memdb.
func (tx readTx) lookup(table, index, id string) api.StoreObject {
defer metrics.StartTimer(lookupLatencyTimer)()
j, err := tx.memDBTx.First(table, index, id)
if err != nil {
return nil
}
if j != nil {
return j.(api.StoreObject)
}
return nil
}
// create adds a new object to the store.
// Returns ErrExist if the ID is already taken.
func (tx *tx) create(table string, o api.StoreObject) error {
if tx.lookup(table, indexID, o.GetID()) != nil {
return ErrExist
}
copy := o.CopyStoreObject()
meta := copy.GetMeta()
if err := touchMeta(&meta, tx.curVersion); err != nil {
return err
}
copy.SetMeta(meta)
err := tx.memDBTx.Insert(table, copy)
if err == nil {
tx.changelist = append(tx.changelist, copy.EventCreate())
o.SetMeta(meta)
}
return err
}
// Update updates an existing object in the store.
// Returns ErrNotExist if the object doesn't exist.
func (tx *tx) update(table string, o api.StoreObject) error {
oldN := tx.lookup(table, indexID, o.GetID())
if oldN == nil {
return ErrNotExist
}
meta := o.GetMeta()
if tx.curVersion != nil {
if oldN.GetMeta().Version != meta.Version {
return ErrSequenceConflict
}
}
copy := o.CopyStoreObject()
if err := touchMeta(&meta, tx.curVersion); err != nil {
return err
}
copy.SetMeta(meta)
err := tx.memDBTx.Insert(table, copy)
if err == nil {
tx.changelist = append(tx.changelist, copy.EventUpdate(oldN))
o.SetMeta(meta)
}
return err
}
// Delete removes an object from the store.
// Returns ErrNotExist if the object doesn't exist.
func (tx *tx) delete(table, id string) error {
n := tx.lookup(table, indexID, id)
if n == nil {
return ErrNotExist
}
err := tx.memDBTx.Delete(table, n)
if err == nil {
tx.changelist = append(tx.changelist, n.EventDelete())
}
return err
}
// Get looks up an object by ID.
// Returns nil if the object doesn't exist.
func (tx readTx) get(table, id string) api.StoreObject {
o := tx.lookup(table, indexID, id)
if o == nil {
return nil
}
return o.CopyStoreObject()
}
// findIterators returns a slice of iterators. The union of items from these
// iterators provides the result of the query.
func (tx readTx) findIterators(table string, by By, checkType func(By) error) ([]memdb.ResultIterator, error) {
switch by.(type) {
case byAll, orCombinator: // generic types
default: // all other types
if err := checkType(by); err != nil {
return nil, err
}
}
switch v := by.(type) {
case byAll:
it, err := tx.memDBTx.Get(table, indexID)
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case orCombinator:
var iters []memdb.ResultIterator
for _, subBy := range v.bys {
it, err := tx.findIterators(table, subBy, checkType)
if err != nil {
return nil, err
}
iters = append(iters, it...)
}
return iters, nil
case byName:
it, err := tx.memDBTx.Get(table, indexName, strings.ToLower(string(v)))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byIDPrefix:
it, err := tx.memDBTx.Get(table, indexID+prefix, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byNamePrefix:
it, err := tx.memDBTx.Get(table, indexName+prefix, strings.ToLower(string(v)))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byRuntime:
it, err := tx.memDBTx.Get(table, indexRuntime, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byNode:
it, err := tx.memDBTx.Get(table, indexNodeID, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byService:
it, err := tx.memDBTx.Get(table, indexServiceID, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case bySlot:
it, err := tx.memDBTx.Get(table, indexSlot, v.serviceID+"\x00"+strconv.FormatUint(v.slot, 10))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byDesiredState:
it, err := tx.memDBTx.Get(table, indexDesiredState, strconv.FormatInt(int64(v), 10))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byTaskState:
it, err := tx.memDBTx.Get(table, indexTaskState, strconv.FormatInt(int64(v), 10))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byRole:
it, err := tx.memDBTx.Get(table, indexRole, strconv.FormatInt(int64(v), 10))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byMembership:
it, err := tx.memDBTx.Get(table, indexMembership, strconv.FormatInt(int64(v), 10))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byReferencedNetworkID:
it, err := tx.memDBTx.Get(table, indexNetwork, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byReferencedSecretID:
it, err := tx.memDBTx.Get(table, indexSecret, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byReferencedConfigID:
it, err := tx.memDBTx.Get(table, indexConfig, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byKind:
it, err := tx.memDBTx.Get(table, indexKind, string(v))
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byCustom:
var key string
if v.objType != "" {
key = v.objType + "|" + v.index + "|" + v.value
} else {
key = v.index + "|" + v.value
}
it, err := tx.memDBTx.Get(table, indexCustom, key)
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
case byCustomPrefix:
var key string
if v.objType != "" {
key = v.objType + "|" + v.index + "|" + v.value
} else {
key = v.index + "|" + v.value
}
it, err := tx.memDBTx.Get(table, indexCustom+prefix, key)
if err != nil {
return nil, err
}
return []memdb.ResultIterator{it}, nil
default:
return nil, ErrInvalidFindBy
}
}
// find selects a set of objects calls a callback for each matching object.
func (tx readTx) find(table string, by By, checkType func(By) error, appendResult func(api.StoreObject)) error {
fromResultIterators := func(its ...memdb.ResultIterator) {
ids := make(map[string]struct{})
for _, it := range its {
for {
obj := it.Next()
if obj == nil {
break
}
o := obj.(api.StoreObject)
id := o.GetID()
if _, exists := ids[id]; !exists {
appendResult(o.CopyStoreObject())
ids[id] = struct{}{}
}
}
}
}
iters, err := tx.findIterators(table, by, checkType)
if err != nil {
return err
}
fromResultIterators(iters...)
return nil
}
// Save serializes the data in the store.
func (s *MemoryStore) Save(tx ReadTx) (*pb.StoreSnapshot, error) {
var snapshot pb.StoreSnapshot
for _, os := range objectStorers {
if err := os.Save(tx, &snapshot); err != nil {
return nil, err
}
}
return &snapshot, nil
}
// Restore sets the contents of the store to the serialized data in the
// argument.
func (s *MemoryStore) Restore(snapshot *pb.StoreSnapshot) error {
return s.updateLocal(func(tx Tx) error {
for _, os := range objectStorers {
if err := os.Restore(tx, snapshot); err != nil {
return err
}
}
return nil
})
}
// WatchQueue returns the publish/subscribe queue.
func (s *MemoryStore) WatchQueue() *watch.Queue {
return s.queue
}
// ViewAndWatch calls a callback which can observe the state of this
// MemoryStore. It also returns a channel that will return further events from
// this point so the snapshot can be kept up to date. The watch channel must be
// released with watch.StopWatch when it is no longer needed. The channel is
// guaranteed to get all events after the moment of the snapshot, and only
// those events.
func ViewAndWatch(store *MemoryStore, cb func(ReadTx) error, specifiers ...api.Event) (watch chan events.Event, cancel func(), err error) {
// Using Update to lock the store and guarantee consistency between
// the watcher and the the state seen by the callback. snapshotReadTx
// exposes this Tx as a ReadTx so the callback can't modify it.
err = store.Update(func(tx Tx) error {
if err := cb(tx); err != nil {
return err
}
watch, cancel = state.Watch(store.WatchQueue(), specifiers...)
return nil
})
if watch != nil && err != nil {
cancel()
cancel = nil
watch = nil
}
return
}
// WatchFrom returns a channel that will return past events from starting
// from "version", and new events until the channel is closed. If "version"
// is nil, this function is equivalent to
//
// state.Watch(store.WatchQueue(), specifiers...).
//
// If the log has been compacted and it's not possible to produce the exact
// set of events leading from "version" to the current state, this function
// will return an error, and the caller should re-sync.
//
// The watch channel must be released with watch.StopWatch when it is no
// longer needed.
func WatchFrom(store *MemoryStore, version *api.Version, specifiers ...api.Event) (chan events.Event, func(), error) {
if version == nil {
ch, cancel := state.Watch(store.WatchQueue(), specifiers...)
return ch, cancel, nil
}
if store.proposer == nil {
return nil, nil, errors.New("store does not support versioning")
}
var (
curVersion *api.Version
watch chan events.Event
cancelWatch func()
)
// Using Update to lock the store
err := store.Update(func(tx Tx) error {
// Get current version
curVersion = store.proposer.GetVersion()
// Start the watch with the store locked so events cannot be
// missed
watch, cancelWatch = state.Watch(store.WatchQueue(), specifiers...)
return nil
})
if watch != nil && err != nil {
cancelWatch()
return nil, nil, err
}
if curVersion == nil {
cancelWatch()
return nil, nil, errors.New("could not get current version from store")
}
changelist, err := store.changelistBetweenVersions(*version, *curVersion)
if err != nil {
cancelWatch()
return nil, nil, err
}
ch := make(chan events.Event)
stop := make(chan struct{})
cancel := func() {
close(stop)
}
go func() {
defer cancelWatch()
matcher := state.Matcher(specifiers...)
for _, change := range changelist {
if matcher(change) {
select {
case ch <- change:
case <-stop:
return
}
}
}
for {
select {
case <-stop:
return
case e := <-watch:
ch <- e
}
}
}()
return ch, cancel, nil
}
// touchMeta updates an object's timestamps when necessary and bumps the version
// if provided.
func touchMeta(meta *api.Meta, version *api.Version) error {
// Skip meta update if version is not defined as it means we're applying
// from raft or restoring from a snapshot.
if version == nil {
return nil
}
now, err := gogotypes.TimestampProto(time.Now())
if err != nil {
return err
}
meta.Version = *version
// Updated CreatedAt if not defined
if meta.CreatedAt == nil {
meta.CreatedAt = now
}
meta.UpdatedAt = now
return nil
}
// Wedged returns true if the store lock has been held for a long time,
// possibly indicating a deadlock.
func (s *MemoryStore) Wedged() bool {
lockedAt := s.updateLock.LockedAt()
if lockedAt.IsZero() {
return false
}
return time.Since(lockedAt) > WedgeTimeout
}