forked from keybase/client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
folder_block_ops.go
3962 lines (3584 loc) · 130 KB
/
folder_block_ops.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2016 Keybase Inc. All rights reserved.
// Use of this source code is governed by a BSD
// license that can be found in the LICENSE file.
package libkbfs
import (
"fmt"
pathlib "path"
"time"
"github.com/keybase/client/go/kbfs/data"
"github.com/keybase/client/go/kbfs/idutil"
"github.com/keybase/client/go/kbfs/kbfscodec"
"github.com/keybase/client/go/kbfs/kbfssync"
"github.com/keybase/client/go/kbfs/libkey"
"github.com/keybase/client/go/kbfs/tlf"
"github.com/keybase/client/go/kbfs/tlfhandle"
"github.com/keybase/client/go/libkb"
"github.com/keybase/client/go/logger"
"github.com/keybase/client/go/protocol/keybase1"
"github.com/pkg/errors"
"golang.org/x/net/context"
"golang.org/x/sync/errgroup"
)
type overallBlockState int
const (
// cleanState: no outstanding local writes.
cleanState overallBlockState = iota
// dirtyState: there are outstanding local writes that haven't yet been
// synced.
dirtyState
)
const (
// numBlockSizeWorkersMax is the max number of workers to use when
// fetching a set of block sizes.
numBlockSizeWorkersMax = 50
// How many pointers to downgrade in a single block size call.
numBlockSizesPerChunk = 20
// truncateExtendCutoffPoint is the amount of data in extending
// truncate that will trigger the extending with a hole algorithm.
truncateExtendCutoffPoint = 128 * 1024
)
type mdToCleanIfUnused struct {
md ReadOnlyRootMetadata
bps blockPutStateCopiable
}
type syncInfo struct {
oldInfo data.BlockInfo
op *syncOp
unrefs []data.BlockInfo
bps blockPutStateCopiable
refBytes uint64
unrefBytes uint64
toCleanIfUnused []mdToCleanIfUnused
}
func (si *syncInfo) DeepCopy(
ctx context.Context, codec kbfscodec.Codec) (newSi *syncInfo, err error) {
newSi = &syncInfo{
oldInfo: si.oldInfo,
refBytes: si.refBytes,
unrefBytes: si.unrefBytes,
}
newSi.unrefs = make([]data.BlockInfo, len(si.unrefs))
copy(newSi.unrefs, si.unrefs)
if si.bps != nil {
newSi.bps, err = si.bps.deepCopy(ctx)
if err != nil {
return nil, err
}
}
if si.op != nil {
err := kbfscodec.Update(codec, &newSi.op, si.op)
if err != nil {
return nil, err
}
}
newSi.toCleanIfUnused = make([]mdToCleanIfUnused, len(si.toCleanIfUnused))
for i, toClean := range si.toCleanIfUnused {
// It might be overkill to deep-copy these MDs and bpses,
// which are probably immutable, but for now let's do the safe
// thing.
copyMd, err := toClean.md.deepCopy(codec)
if err != nil {
return nil, err
}
newSi.toCleanIfUnused[i].md = copyMd.ReadOnly()
newSi.toCleanIfUnused[i].bps, err = toClean.bps.deepCopy(ctx)
if err != nil {
return nil, err
}
}
return newSi, nil
}
func (si *syncInfo) removeReplacedBlock(ctx context.Context,
log logger.Logger, ptr data.BlockPointer) {
for i, ref := range si.op.RefBlocks {
if ref == ptr {
log.CDebugf(ctx, "Replacing old ref %v", ptr)
si.op.RefBlocks = append(si.op.RefBlocks[:i],
si.op.RefBlocks[i+1:]...)
for j, unref := range si.unrefs {
if unref.BlockPointer == ptr {
si.unrefs = append(si.unrefs[:j], si.unrefs[j+1:]...)
}
}
break
}
}
}
func (si *syncInfo) mergeUnrefCache(md *RootMetadata) {
for _, info := range si.unrefs {
// it's ok if we push the same ptr.ID/RefNonce multiple times,
// because the subsequent ones should have a QuotaSize of 0.
md.AddUnrefBlock(info)
}
}
type deferredState struct {
// Writes and truncates for blocks that were being sync'd, and
// need to be replayed after the sync finishes on top of the new
// versions of the blocks.
writes []func(
context.Context, *kbfssync.LockState, KeyMetadataWithRootDirEntry,
data.Path) error
// Blocks that need to be deleted from the dirty cache before any
// deferred writes are replayed.
dirtyDeletes []data.BlockPointer
waitBytes int64
}
// folderBlockOps contains all the fields that must be synchronized by
// blockLock. It will eventually also contain all the methods that
// must be synchronized by blockLock, so that folderBranchOps will
// have no knowledge of blockLock.
//
// -- And now, a primer on tracking dirty bytes --
//
// The DirtyBlockCache tracks the number of bytes that are dirtied
// system-wide, as the number of bytes that haven't yet been synced
// ("unsynced"), and a number of bytes that haven't yet been resolved
// yet because the overall file Sync hasn't finished yet ("total").
// This data helps us decide when we need to block incoming Writes, in
// order to keep memory usage from exploding.
//
// It's the responsibility of folderBlockOps (and its helper struct
// dirtyFile) to update these totals in DirtyBlockCache for the
// individual files within this TLF. This is complicated by a few things:
// * New writes to a file are "deferred" while a Sync is happening, and
// are replayed after the Sync finishes.
// * Syncs can be canceled or error out halfway through syncing the blocks,
// leaving the file in a dirty state until the next Sync.
// * Syncs can fail with a /recoverable/ error, in which case they get
// retried automatically by folderBranchOps. In that case, the retried
// Sync also sucks in any outstanding deferred writes.
//
// With all that in mind, here is the rough breakdown of how this
// bytes-tracking is implemented:
// * On a Write/Truncate to a block, folderBranchOps counts all the
// newly-dirtied bytes in a file as "unsynced". That is, if the block was
// already in the dirty cache (and not already being synced), only
// extensions to the block count as "unsynced" bytes.
// * When a Sync starts, dirtyFile remembers the total of bytes being synced,
// and the size of each block being synced.
// * When each block put finishes successfully, dirtyFile subtracts the size
// of that block from "unsynced".
// * When a Sync finishes successfully, the total sum of bytes in that sync
// are subtracted from the "total" dirty bytes outstanding.
// * If a Sync fails, but some blocks were put successfully, those blocks
// are "re-dirtied", which means they count as unsynced bytes again.
// dirtyFile handles this.
// * When a Write/Truncate is deferred due to an ongoing Sync, its bytes
// still count towards the "unsynced" total. In fact, this essentially
// creates a new copy of those blocks, and the whole size of that block
// (not just the newly-dirtied bytes) count for the total. However,
// when the write gets replayed, folderBlockOps first subtracts those bytes
// from the system-wide numbers, since they are about to be replayed.
// * When a Sync is retried after a recoverable failure, dirtyFile adds
// the newly-dirtied deferred bytes to the system-wide numbers, since they
// are now being assimilated into this Sync.
// * dirtyFile also exposes a concept of "orphaned" blocks. These are child
// blocks being synced that are now referenced via a new, permanent block
// ID from the parent indirect block. This matters for when hard failures
// occur during a Sync -- the blocks will no longer be accessible under
// their previous old pointers, and so dirtyFile needs to know their old
// bytes can be cleaned up now.
type folderBlockOps struct {
config Config
log logger.Logger
vlog *libkb.VDebugLog
folderBranch data.FolderBranch
observers *observerList
// forceSyncChan can be sent on to trigger an immediate
// Sync(). It is a blocking channel.
forceSyncChan chan<- struct{}
// protects access to blocks in this folder and all fields
// below.
blockLock blockLock
// Which files are currently dirty and have dirty blocks that are either
// currently syncing, or waiting to be sync'd.
dirtyFiles map[data.BlockPointer]*data.DirtyFile
// For writes and truncates, track the unsynced to-be-unref'd
// block infos, per-path.
unrefCache map[data.BlockRef]*syncInfo
// dirtyDirs track which directories are currently dirty in this
// TLF.
dirtyDirs map[data.BlockPointer][]data.BlockInfo
dirtyDirsSyncing bool
deferredDirUpdates []func(lState *kbfssync.LockState) error
// dirtyRootDirEntry is a DirEntry representing the root of the
// TLF (to be copied into the RootMetadata on a sync).
dirtyRootDirEntry *data.DirEntry
chargedTo keybase1.UserOrTeamID
// Track deferred operations on a per-file basis.
deferred map[data.BlockRef]deferredState
// set to true if this write or truncate should be deferred
doDeferWrite bool
// While this channel is non-nil and non-closed, writes get blocked.
holdNewWritesCh <-chan struct{}
// nodeCache itself is goroutine-safe, but write/truncate must
// call PathFromNode() only under blockLock (see nodeCache
// comments in folder_branch_ops.go).
nodeCache NodeCache
}
// Only exported methods of folderBlockOps should be used outside of this
// file.
//
// Although, temporarily, folderBranchOps is allowed to reach in and
// manipulate folderBlockOps fields and methods directly.
func (fbo *folderBlockOps) id() tlf.ID {
return fbo.folderBranch.Tlf
}
func (fbo *folderBlockOps) branch() data.BranchName {
return fbo.folderBranch.Branch
}
func (fbo *folderBlockOps) isSyncedTlf() bool {
return fbo.branch() == data.MasterBranch && fbo.config.IsSyncedTlf(fbo.id())
}
// GetState returns the overall block state of this TLF.
func (fbo *folderBlockOps) GetState(
lState *kbfssync.LockState) overallBlockState {
fbo.blockLock.RLock(lState)
defer fbo.blockLock.RUnlock(lState)
if len(fbo.dirtyFiles) == 0 && len(fbo.dirtyDirs) == 0 &&
fbo.dirtyRootDirEntry == nil {
return cleanState
}
return dirtyState
}
// getCleanEncodedBlockSizesLocked retrieves the encoded sizes and
// block statuses of the clean blocks pointed to each of the block
// pointers in `ptrs`, which must be valid, either from the cache or
// from the server. If `rtype` is `blockReadParallel`, it's assumed
// that some coordinating goroutine is holding the correct locks, and
// in that case `lState` must be `nil`.
func (fbo *folderBlockOps) getCleanEncodedBlockSizesLocked(ctx context.Context,
lState *kbfssync.LockState, kmd libkey.KeyMetadata,
ptrs []data.BlockPointer, branch data.BranchName,
rtype data.BlockReqType, assumeCacheIsLive bool) (
sizes []uint32, statuses []keybase1.BlockStatus, err error) {
if rtype != data.BlockReadParallel {
if rtype == data.BlockWrite {
panic("Cannot get the size of a block for writing")
}
fbo.blockLock.AssertAnyLocked(lState)
} else if lState != nil {
panic("Non-nil lState passed to getCleanEncodedBlockSizeLocked " +
"with blockReadParallel")
}
sizes = make([]uint32, len(ptrs))
statuses = make([]keybase1.BlockStatus, len(ptrs))
var toFetchIndices []int
var ptrsToFetch []data.BlockPointer
for i, ptr := range ptrs {
if !ptr.IsValid() {
return nil, nil, InvalidBlockRefError{ptr.Ref()}
}
if assumeCacheIsLive {
// If we're assuming all blocks in the cache are live, we just
// need to get the block size, which we can do from either one
// of the caches.
if block, err := fbo.config.BlockCache().Get(ptr); err == nil {
sizes[i] = block.GetEncodedSize()
statuses[i] = keybase1.BlockStatus_LIVE
continue
}
if diskBCache := fbo.config.DiskBlockCache(); diskBCache != nil {
cacheType := DiskBlockAnyCache
if fbo.isSyncedTlf() {
cacheType = DiskBlockSyncCache
}
if buf, _, _, err := diskBCache.Get(
ctx, fbo.id(), ptr.ID, cacheType); err == nil {
sizes[i] = uint32(len(buf))
statuses[i] = keybase1.BlockStatus_LIVE
continue
}
}
}
if err := checkDataVersion(fbo.config, data.Path{}, ptr); err != nil {
return nil, nil, err
}
// Fetch this block from the server.
ptrsToFetch = append(ptrsToFetch, ptr)
toFetchIndices = append(toFetchIndices, i)
}
defer func() {
fbo.vlog.CLogf(
ctx, libkb.VLog1, "GetEncodedSizes ptrs=%v sizes=%d statuses=%s: "+
"%+v", ptrs, sizes, statuses, err)
// In certain testing situations, a block might be represented
// with a 0 size in our journal or be missing from our local
// data stores, and we need to reconstruct the size using the
// cache in order to make the accounting work out for the test.
for i, ptr := range ptrs {
if sizes[i] == 0 {
if block, cerr := fbo.config.BlockCache().Get(
ptr); cerr == nil {
fbo.vlog.CLogf(
ctx, libkb.VLog1,
"Fixing encoded size of %v with cached copy", ptr)
sizes[i] = block.GetEncodedSize()
}
}
}
}()
// Unlock the blockLock while we wait for the network, only if
// it's locked for reading by a single goroutine. If it's locked
// for writing, that indicates we are performing an atomic write
// operation, and we need to ensure that nothing else comes in and
// modifies the blocks, so don't unlock.
//
// If there may be multiple goroutines fetching blocks under the
// same lState, we can't safely unlock since some of the other
// goroutines may be operating on the data assuming they have the
// lock.
bops := fbo.config.BlockOps()
var fetchedSizes []uint32
var fetchedStatuses []keybase1.BlockStatus
if rtype != data.BlockReadParallel && rtype != data.BlockLookup {
fbo.blockLock.DoRUnlockedIfPossible(lState, func(*kbfssync.LockState) {
fetchedSizes, fetchedStatuses, err = bops.GetEncodedSizes(
ctx, kmd, ptrsToFetch)
})
} else {
fetchedSizes, fetchedStatuses, err = bops.GetEncodedSizes(
ctx, kmd, ptrsToFetch)
}
if err != nil {
return nil, nil, err
}
for i, j := range toFetchIndices {
sizes[j] = fetchedSizes[i]
statuses[j] = fetchedStatuses[i]
}
return sizes, statuses, nil
}
// getBlockHelperLocked retrieves the block pointed to by ptr, which
// must be valid, either from the cache or from the server. If
// notifyPath is valid and the block isn't cached, trigger a read
// notification. If `rtype` is `blockReadParallel`, it's assumed that
// some coordinating goroutine is holding the correct locks, and
// in that case `lState` must be `nil`.
//
// This must be called only by get{File,Dir}BlockHelperLocked().
func (fbo *folderBlockOps) getBlockHelperLocked(ctx context.Context,
lState *kbfssync.LockState, kmd libkey.KeyMetadata, ptr data.BlockPointer,
branch data.BranchName, newBlock makeNewBlock, lifetime data.BlockCacheLifetime,
notifyPath data.Path, rtype data.BlockReqType) (data.Block, error) {
if rtype != data.BlockReadParallel {
fbo.blockLock.AssertAnyLocked(lState)
} else if lState != nil {
panic("Non-nil lState passed to getBlockHelperLocked " +
"with blockReadParallel")
}
if !ptr.IsValid() {
return nil, InvalidBlockRefError{ptr.Ref()}
}
if block, err := fbo.config.DirtyBlockCache().Get(
ctx, fbo.id(), ptr, branch); err == nil {
return block, nil
}
if block, lifetime, err := fbo.config.BlockCache().GetWithLifetime(ptr); err == nil {
if lifetime != data.PermanentEntry {
// If the block was cached in the past, and is not a permanent
// block (i.e., currently being written by the user), we need
// to handle it as if it's an on-demand request so that its
// downstream prefetches are triggered correctly according to
// the new on-demand fetch priority.
action := fbo.config.Mode().DefaultBlockRequestAction()
if fbo.isSyncedTlf() {
action = action.AddSync()
}
prefetchStatus := fbo.config.PrefetchStatus(ctx, fbo.id(), ptr)
fbo.config.BlockOps().Prefetcher().ProcessBlockForPrefetch(ctx, ptr,
block, kmd, defaultOnDemandRequestPriority-1, lifetime,
prefetchStatus, action)
}
return block, nil
}
if err := checkDataVersion(fbo.config, notifyPath, ptr); err != nil {
return nil, err
}
if notifyPath.IsValidForNotification() {
fbo.config.Reporter().Notify(ctx, readNotification(notifyPath, false))
defer fbo.config.Reporter().Notify(ctx,
readNotification(notifyPath, true))
}
// Unlock the blockLock while we wait for the network, only if
// it's locked for reading by a single goroutine. If it's locked
// for writing, that indicates we are performing an atomic write
// operation, and we need to ensure that nothing else comes in and
// modifies the blocks, so don't unlock.
//
// If there may be multiple goroutines fetching blocks under the
// same lState, we can't safely unlock since some of the other
// goroutines may be operating on the data assuming they have the
// lock.
// fetch the block, and add to cache
block := newBlock()
bops := fbo.config.BlockOps()
var err error
if rtype != data.BlockReadParallel && rtype != data.BlockLookup {
fbo.blockLock.DoRUnlockedIfPossible(lState, func(*kbfssync.LockState) {
err = bops.Get(ctx, kmd, ptr, block, lifetime, fbo.branch())
})
} else {
err = bops.Get(ctx, kmd, ptr, block, lifetime, fbo.branch())
}
if err != nil {
return nil, err
}
return block, nil
}
// getFileBlockHelperLocked retrieves the block pointed to by ptr,
// which must be valid, either from an internal cache, the block
// cache, or from the server. An error is returned if the retrieved
// block is not a file block. If `rtype` is `blockReadParallel`, it's
// assumed that some coordinating goroutine is holding the correct
// locks, and in that case `lState` must be `nil`.
//
// This must be called only by GetFileBlockForReading(),
// getFileBlockLocked(), and getFileLocked().
//
// p is used only when reporting errors and sending read
// notifications, and can be empty.
func (fbo *folderBlockOps) getFileBlockHelperLocked(ctx context.Context,
lState *kbfssync.LockState, kmd libkey.KeyMetadata, ptr data.BlockPointer,
branch data.BranchName, p data.Path, rtype data.BlockReqType) (
*data.FileBlock, error) {
if rtype != data.BlockReadParallel {
fbo.blockLock.AssertAnyLocked(lState)
} else if lState != nil {
panic("Non-nil lState passed to getFileBlockHelperLocked " +
"with blockReadParallel")
}
block, err := fbo.getBlockHelperLocked(
ctx, lState, kmd, ptr, branch, data.NewFileBlock, data.TransientEntry, p, rtype)
if err != nil {
return nil, err
}
fblock, ok := block.(*data.FileBlock)
if !ok {
return nil, NotFileBlockError{ptr, branch, p}
}
return fblock, nil
}
// GetCleanEncodedBlocksSizeSum retrieves the sum of the encoded sizes
// of the blocks pointed to by ptrs, all of which must be valid,
// either from the cache or from the server.
//
// The caller can specify a set of pointers using
// `ignoreRecoverableForRemovalErrors` for which "recoverable" fetch
// errors are tolerated. In that case, the returned sum will not
// include the size for any pointers in the
// `ignoreRecoverableForRemovalErrors` set that hit such an error.
//
// This should be called for "internal" operations, like conflict
// resolution and state checking, which don't know what kind of block
// the pointers refer to. Any downloaded blocks will not be cached,
// if they weren't in the cache already.
//
// If `onlyCountIfLive` is true, the sum includes blocks that the
// bserver thinks are currently reachable from the merged branch
// (i.e., un-archived).
func (fbo *folderBlockOps) GetCleanEncodedBlocksSizeSum(ctx context.Context,
lState *kbfssync.LockState, kmd libkey.KeyMetadata, ptrs []data.BlockPointer,
ignoreRecoverableForRemovalErrors map[data.BlockPointer]bool,
branch data.BranchName, onlyCountIfLive bool) (uint64, error) {
fbo.blockLock.RLock(lState)
defer fbo.blockLock.RUnlock(lState)
ptrCh := make(chan []data.BlockPointer, len(ptrs))
sumCh := make(chan uint32, len(ptrs))
numChunks := (len(ptrs) + numBlockSizesPerChunk - 1) /
numBlockSizesPerChunk
numWorkers := numBlockSizeWorkersMax
if numChunks < numWorkers {
numWorkers = numChunks
}
currChunk := make([]data.BlockPointer, 0, numBlockSizesPerChunk)
for _, ptr := range ptrs {
currChunk = append(currChunk, ptr)
if len(currChunk) == numBlockSizesPerChunk {
ptrCh <- currChunk
currChunk = make([]data.BlockPointer, 0, numBlockSizesPerChunk)
}
}
if len(currChunk) > 0 {
ptrCh <- currChunk
}
// If we don't care if something's live or not, there's no reason
// not to use the cached block.
assumeCacheIsLive := !onlyCountIfLive
eg, groupCtx := errgroup.WithContext(ctx)
for i := 0; i < numWorkers; i++ {
eg.Go(func() error {
for ptrs := range ptrCh {
sizes, statuses, err := fbo.getCleanEncodedBlockSizesLocked(
groupCtx, nil, kmd, ptrs, branch,
data.BlockReadParallel, assumeCacheIsLive)
for i, ptr := range ptrs {
// TODO: we might be able to recover the size of the
// top-most block of a removed file using the merged
// directory entry, the same way we do in
// `folderBranchOps.unrefEntry`.
if isRecoverableBlockErrorForRemoval(err) &&
ignoreRecoverableForRemovalErrors[ptr] {
fbo.log.CDebugf(
groupCtx, "Hit an ignorable, recoverable "+
"error for block %v: %v", ptr, err)
continue
}
if err != nil {
return err
}
if onlyCountIfLive &&
statuses[i] != keybase1.BlockStatus_LIVE {
sumCh <- 0
} else {
sumCh <- sizes[i]
}
}
}
return nil
})
}
close(ptrCh)
if err := eg.Wait(); err != nil {
return 0, err
}
close(sumCh)
var sum uint64
for size := range sumCh {
sum += uint64(size)
}
return sum, nil
}
// getDirBlockHelperLocked retrieves the block pointed to by ptr, which
// must be valid, either from the cache or from the server. An error
// is returned if the retrieved block is not a dir block.
//
// This must be called only by GetDirBlockForReading() and
// getDirLocked().
//
// p is used only when reporting errors, and can be empty.
func (fbo *folderBlockOps) getDirBlockHelperLocked(ctx context.Context,
lState *kbfssync.LockState, kmd libkey.KeyMetadata, ptr data.BlockPointer,
branch data.BranchName, p data.Path, rtype data.BlockReqType) (*data.DirBlock, error) {
if rtype != data.BlockReadParallel {
fbo.blockLock.AssertAnyLocked(lState)
}
// Check data version explicitly here, with the right path, since
// we pass an empty path below.
if err := checkDataVersion(fbo.config, p, ptr); err != nil {
return nil, err
}
// Pass in an empty notify path because notifications should only
// trigger for file reads.
block, err := fbo.getBlockHelperLocked(
ctx, lState, kmd, ptr, branch, data.NewDirBlock, data.TransientEntry,
data.Path{}, rtype)
if err != nil {
return nil, err
}
dblock, ok := block.(*data.DirBlock)
if !ok {
return nil, NotDirBlockError{ptr, branch, p}
}
return dblock, nil
}
// GetFileBlockForReading retrieves the block pointed to by ptr, which
// must be valid, either from the cache or from the server. An error
// is returned if the retrieved block is not a file block.
//
// This should be called for "internal" operations, like conflict
// resolution and state checking. "Real" operations should use
// getFileBlockLocked() and getFileLocked() instead.
//
// p is used only when reporting errors, and can be empty.
func (fbo *folderBlockOps) GetFileBlockForReading(ctx context.Context,
lState *kbfssync.LockState, kmd libkey.KeyMetadata, ptr data.BlockPointer,
branch data.BranchName, p data.Path) (*data.FileBlock, error) {
fbo.blockLock.RLock(lState)
defer fbo.blockLock.RUnlock(lState)
return fbo.getFileBlockHelperLocked(
ctx, lState, kmd, ptr, branch, p, data.BlockRead)
}
// GetDirBlockForReading retrieves the block pointed to by ptr, which
// must be valid, either from the cache or from the server. An error
// is returned if the retrieved block is not a dir block.
//
// This should be called for "internal" operations, like conflict
// resolution and state checking. "Real" operations should use
// getDirLocked() instead.
//
// p is used only when reporting errors, and can be empty.
func (fbo *folderBlockOps) GetDirBlockForReading(ctx context.Context,
lState *kbfssync.LockState, kmd libkey.KeyMetadata, ptr data.BlockPointer,
branch data.BranchName, p data.Path) (*data.DirBlock, error) {
fbo.blockLock.RLock(lState)
defer fbo.blockLock.RUnlock(lState)
return fbo.getDirBlockHelperLocked(
ctx, lState, kmd, ptr, branch, p, data.BlockRead)
}
// getFileBlockLocked retrieves the block pointed to by ptr, which
// must be valid, either from the cache or from the server. An error
// is returned if the retrieved block is not a file block.
//
// The given path must be valid, and the given pointer must be its
// tail pointer or an indirect pointer from it. A read notification is
// triggered for the given path only if the block isn't in the cache.
//
// This shouldn't be called for "internal" operations, like conflict
// resolution and state checking -- use GetFileBlockForReading() for
// those instead.
//
// When rtype == blockWrite and the cached version of the block is
// currently clean, or the block is currently being synced, this
// method makes a copy of the file block and returns it. If this
// method might be called again for the same block within a single
// operation, it is the caller's responsibility to write that block
// back to the cache as dirty.
//
// Note that blockLock must be locked exactly when rtype ==
// blockWrite, and must be r-locked when rtype == blockRead. (This
// differs from getDirLocked.) This is because a write operation
// (like write, truncate and sync which lock blockLock) fetching a
// file block will almost always need to modify that block, and so
// will pass in blockWrite. If rtype == blockReadParallel, it's
// assumed that some coordinating goroutine is holding the correct
// locks, and in that case `lState` must be `nil`.
//
// file is used only when reporting errors and sending read
// notifications, and can be empty except that file.Branch must be set
// correctly.
//
// This method also returns whether the block was already dirty.
func (fbo *folderBlockOps) getFileBlockLocked(ctx context.Context,
lState *kbfssync.LockState, kmd libkey.KeyMetadata, ptr data.BlockPointer,
file data.Path, rtype data.BlockReqType) (
fblock *data.FileBlock, wasDirty bool, err error) {
switch rtype {
case data.BlockRead:
fbo.blockLock.AssertRLocked(lState)
case data.BlockWrite:
fbo.blockLock.AssertLocked(lState)
case data.BlockReadParallel:
// This goroutine might not be the official lock holder, so
// don't make any assertions.
if lState != nil {
panic("Non-nil lState passed to getFileBlockLocked " +
"with blockReadParallel")
}
case data.BlockLookup:
panic("blockLookup should only be used for directory blocks")
default:
panic(fmt.Sprintf("Unknown block req type: %d", rtype))
}
fblock, err = fbo.getFileBlockHelperLocked(
ctx, lState, kmd, ptr, file.Branch, file, rtype)
if err != nil {
return nil, false, err
}
wasDirty = fbo.config.DirtyBlockCache().IsDirty(fbo.id(), ptr, file.Branch)
if rtype == data.BlockWrite {
// Copy the block if it's for writing, and either the
// block is not yet dirty or the block is currently
// being sync'd and needs a copy even though it's
// already dirty.
df := fbo.dirtyFiles[file.TailPointer()]
if !wasDirty || (df != nil && df.BlockNeedsCopy(ptr)) {
fblock = fblock.DeepCopy()
}
}
return fblock, wasDirty, nil
}
// getFileLocked is getFileBlockLocked called with file.tailPointer().
func (fbo *folderBlockOps) getFileLocked(ctx context.Context,
lState *kbfssync.LockState, kmd libkey.KeyMetadata, file data.Path,
rtype data.BlockReqType) (*data.FileBlock, error) {
// Callers should have already done this check, but it doesn't
// hurt to do it again.
if !file.IsValid() {
return nil, errors.WithStack(InvalidPathError{file})
}
fblock, _, err := fbo.getFileBlockLocked(
ctx, lState, kmd, file.TailPointer(), file, rtype)
return fblock, err
}
func (fbo *folderBlockOps) getIndirectFileBlockInfosLocked(
ctx context.Context, lState *kbfssync.LockState, kmd libkey.KeyMetadata,
file data.Path) ([]data.BlockInfo, error) {
fbo.blockLock.AssertRLocked(lState)
var id keybase1.UserOrTeamID // Data reads don't depend on the id.
fd := fbo.newFileData(lState, file, id, kmd)
return fd.GetIndirectFileBlockInfos(ctx)
}
// GetIndirectFileBlockInfos returns a list of BlockInfos for all
// indirect blocks of the given file. If the returned error is a
// recoverable one (as determined by
// isRecoverableBlockErrorForRemoval), the returned list may still be
// non-empty, and holds all the BlockInfos for all found indirect
// blocks.
func (fbo *folderBlockOps) GetIndirectFileBlockInfos(ctx context.Context,
lState *kbfssync.LockState, kmd libkey.KeyMetadata, file data.Path) (
[]data.BlockInfo, error) {
fbo.blockLock.RLock(lState)
defer fbo.blockLock.RUnlock(lState)
return fbo.getIndirectFileBlockInfosLocked(ctx, lState, kmd, file)
}
// GetIndirectDirBlockInfos returns a list of BlockInfos for all
// indirect blocks of the given directory. If the returned error is a
// recoverable one (as determined by
// isRecoverableBlockErrorForRemoval), the returned list may still be
// non-empty, and holds all the BlockInfos for all found indirect
// blocks.
func (fbo *folderBlockOps) GetIndirectDirBlockInfos(
ctx context.Context, lState *kbfssync.LockState, kmd libkey.KeyMetadata,
dir data.Path) ([]data.BlockInfo, error) {
fbo.blockLock.RLock(lState)
defer fbo.blockLock.RUnlock(lState)
var id keybase1.UserOrTeamID // Data reads don't depend on the id.
fd := fbo.newDirDataLocked(lState, dir, id, kmd)
return fd.GetIndirectDirBlockInfos(ctx)
}
// GetIndirectFileBlockInfosWithTopBlock returns a list of BlockInfos
// for all indirect blocks of the given file, starting from the given
// top-most block. If the returned error is a recoverable one (as
// determined by isRecoverableBlockErrorForRemoval), the returned list
// may still be non-empty, and holds all the BlockInfos for all found
// indirect blocks. (This will be relevant when we handle multiple
// levels of indirection.)
func (fbo *folderBlockOps) GetIndirectFileBlockInfosWithTopBlock(
ctx context.Context, lState *kbfssync.LockState, kmd libkey.KeyMetadata, file data.Path,
topBlock *data.FileBlock) (
[]data.BlockInfo, error) {
fbo.blockLock.RLock(lState)
defer fbo.blockLock.RUnlock(lState)
var id keybase1.UserOrTeamID // Data reads don't depend on the id.
fd := fbo.newFileData(lState, file, id, kmd)
return fd.GetIndirectFileBlockInfosWithTopBlock(ctx, topBlock)
}
func (fbo *folderBlockOps) getChargedToLocked(
ctx context.Context, lState *kbfssync.LockState, kmd libkey.KeyMetadata) (
keybase1.UserOrTeamID, error) {
fbo.blockLock.AssertAnyLocked(lState)
if !fbo.chargedTo.IsNil() {
return fbo.chargedTo, nil
}
chargedTo, err := chargedToForTLF(
ctx, fbo.config.KBPKI(), fbo.config.KBPKI(), fbo.config,
kmd.GetTlfHandle())
if err != nil {
return keybase1.UserOrTeamID(""), err
}
fbo.chargedTo = chargedTo
return chargedTo, nil
}
// ClearChargedTo clears out the cached chargedTo UID for this FBO.
func (fbo *folderBlockOps) ClearChargedTo(lState *kbfssync.LockState) {
fbo.blockLock.Lock(lState)
defer fbo.blockLock.Unlock(lState)
fbo.chargedTo = keybase1.UserOrTeamID("")
}
// DeepCopyFile makes a complete copy of the given file, deduping leaf
// blocks and making new random BlockPointers for all indirect blocks.
// It returns the new top pointer of the copy, and all the new child
// pointers in the copy. It takes a custom DirtyBlockCache, which
// directs where the resulting block copies are stored.
func (fbo *folderBlockOps) deepCopyFileLocked(
ctx context.Context, lState *kbfssync.LockState, kmd libkey.KeyMetadata, file data.Path,
dirtyBcache data.DirtyBlockCacheSimple, dataVer data.Ver) (
newTopPtr data.BlockPointer, allChildPtrs []data.BlockPointer, err error) {
// Deep copying doesn't alter any data in use, it only makes copy,
// so only a read lock is needed.
fbo.blockLock.AssertRLocked(lState)
chargedTo, err := chargedToForTLF(
ctx, fbo.config.KBPKI(), fbo.config.KBPKI(), fbo.config,
kmd.GetTlfHandle())
if err != nil {
return data.BlockPointer{}, nil, err
}
fd := fbo.newFileDataWithCache(
lState, file, chargedTo, kmd, dirtyBcache)
return fd.DeepCopy(ctx, dataVer)
}
func (fbo *folderBlockOps) cacheHashBehavior() data.BlockCacheHashBehavior {
return cacheHashBehavior(fbo.config, fbo.config, fbo.id())
}
func (fbo *folderBlockOps) UndupChildrenInCopy(ctx context.Context,
lState *kbfssync.LockState, kmd libkey.KeyMetadata, file data.Path, bps blockPutState,
dirtyBcache data.DirtyBlockCacheSimple, topBlock *data.FileBlock) (
[]data.BlockInfo, error) {
fbo.blockLock.Lock(lState)
defer fbo.blockLock.Unlock(lState)
chargedTo, err := fbo.getChargedToLocked(ctx, lState, kmd)
if err != nil {
return nil, err
}
fd := fbo.newFileDataWithCache(
lState, file, chargedTo, kmd, dirtyBcache)
return fd.UndupChildrenInCopy(ctx, fbo.config.BlockCache(),
fbo.config.BlockOps(), bps, topBlock, fbo.cacheHashBehavior())
}
func (fbo *folderBlockOps) ReadyNonLeafBlocksInCopy(ctx context.Context,
lState *kbfssync.LockState, kmd libkey.KeyMetadata, file data.Path, bps blockPutState,
dirtyBcache data.DirtyBlockCacheSimple, topBlock *data.FileBlock) (
[]data.BlockInfo, error) {
fbo.blockLock.RLock(lState)
defer fbo.blockLock.RUnlock(lState)
chargedTo, err := fbo.getChargedToLocked(ctx, lState, kmd)
if err != nil {
return nil, err
}
fd := fbo.newFileDataWithCache(
lState, file, chargedTo, kmd, dirtyBcache)
return fd.ReadyNonLeafBlocksInCopy(ctx, fbo.config.BlockCache(),
fbo.config.BlockOps(), bps, topBlock, fbo.cacheHashBehavior())
}
// getDirLocked retrieves the block pointed to by the tail pointer of
// the given path, which must be valid, either from the cache or from
// the server. An error is returned if the retrieved block is not a
// dir block.
//
// This shouldn't be called for "internal" operations, like conflict
// resolution and state checking -- use GetDirBlockForReading() for
// those instead.
//
// When rtype == blockWrite and the cached version of the block is
// currently clean, this method makes a copy of the directory block
// and returns it. If this method might be called again for the same
// block within a single operation, it is the caller's responsibility
// to write that block back to the cache as dirty.
//
// Note that blockLock must be either r-locked or locked, but
// independently of rtype. (This differs from getFileLocked and
// getFileBlockLocked.) File write operations (which lock blockLock)
// don't need a copy of parent dir blocks, and non-file write
// operations do need to copy dir blocks for modifications.
func (fbo *folderBlockOps) getDirLocked(ctx context.Context,
lState *kbfssync.LockState, kmd libkey.KeyMetadata, ptr data.BlockPointer, dir data.Path,
rtype data.BlockReqType) (*data.DirBlock, bool, error) {
switch rtype {
case data.BlockRead, data.BlockWrite, data.BlockLookup:
fbo.blockLock.AssertAnyLocked(lState)
case data.BlockReadParallel:
// This goroutine might not be the official lock holder, so
// don't make any assertions.
if lState != nil {
panic("Non-nil lState passed to getFileBlockLocked " +
"with blockReadParallel")
}
default:
panic(fmt.Sprintf("Unknown block req type: %d", rtype))
}
// Callers should have already done this check, but it doesn't
// hurt to do it again.
if !dir.IsValid() {
return nil, false, errors.WithStack(InvalidPathError{dir})
}
// Get the block for the last element in the path.
dblock, err := fbo.getDirBlockHelperLocked(
ctx, lState, kmd, ptr, dir.Branch, dir, rtype)
if err != nil {
return nil, false, err
}
wasDirty := fbo.config.DirtyBlockCache().IsDirty(fbo.id(), ptr, dir.Branch)
if rtype == data.BlockWrite && !wasDirty {
// Copy the block if it's for writing and the block is
// not yet dirty.
dblock = dblock.DeepCopy()
}
return dblock, wasDirty, nil
}
// GetDir retrieves the block pointed to by the tail pointer of the
// given path, which must be valid, either from the cache or from the
// server. An error is returned if the retrieved block is not a dir
// block.
//
// This shouldn't be called for "internal" operations, like conflict
// resolution and state checking -- use GetDirBlockForReading() for
// those instead.
//
// When rtype == blockWrite and the cached version of the block is
// currently clean, this method makes a copy of the directory block
// and returns it. If this method might be called again for the same
// block within a single operation, it is the caller's responsibility
// to write that block back to the cache as dirty.
func (fbo *folderBlockOps) GetDir(
ctx context.Context, lState *kbfssync.LockState, kmd libkey.KeyMetadata, dir data.Path,
rtype data.BlockReqType) (*data.DirBlock, error) {
fbo.blockLock.RLock(lState)
defer fbo.blockLock.RUnlock(lState)
dblock, _, err := fbo.getDirLocked(
ctx, lState, kmd, dir.TailPointer(), dir, rtype)
return dblock, err
}
type dirCacheUndoFn func(lState *kbfssync.LockState)