forked from keybase/client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
state_checker.go
421 lines (381 loc) · 12.6 KB
/
state_checker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
// Copyright 2016 Keybase Inc. All rights reserved.
// Use of this source code is governed by a BSD
// license that can be found in the LICENSE file.
package libkbfs
import (
"errors"
"fmt"
"reflect"
"time"
"github.com/keybase/client/go/kbfs/kbfsblock"
"github.com/keybase/client/go/kbfs/kbfsmd"
"github.com/keybase/client/go/kbfs/tlf"
"github.com/keybase/client/go/logger"
"github.com/keybase/client/go/protocol/keybase1"
"golang.org/x/net/context"
)
// StateChecker verifies that the server-side state for KBFS is
// consistent. Useful mostly for testing because it isn't scalable
// and loads all the state in memory.
type StateChecker struct {
config Config
log logger.Logger
}
// NewStateChecker returns a new StateChecker instance.
func NewStateChecker(config Config) *StateChecker {
return &StateChecker{config, config.MakeLogger("")}
}
// findAllFileBlocks adds all file blocks found under this block to
// the blockSizes map, if the given path represents an indirect block.
func (sc *StateChecker) findAllFileBlocks(ctx context.Context,
lState *lockState, ops *folderBranchOps, kmd KeyMetadata,
file path, blockSizes map[BlockPointer]uint32) error {
infos, err := ops.blocks.GetIndirectFileBlockInfos(ctx, lState, kmd, file)
if err != nil {
return err
}
for _, info := range infos {
blockSizes[info.BlockPointer] = info.EncodedSize
}
return nil
}
// findAllDirBlocks adds all dir blocks found under this block to the
// blockSizes map, if the given path represents an indirect block.
func (sc *StateChecker) findAllDirBlocks(ctx context.Context,
lState *lockState, ops *folderBranchOps, kmd KeyMetadata,
dir path, blockSizes map[BlockPointer]uint32) error {
infos, err := ops.blocks.GetIndirectDirBlockInfos(ctx, lState, kmd, dir)
if err != nil {
return err
}
for _, info := range infos {
blockSizes[info.BlockPointer] = info.EncodedSize
}
return nil
}
// findAllBlocksInPath adds all blocks found within this directory to
// the blockSizes map, and then recursively checks all
// subdirectories.
func (sc *StateChecker) findAllBlocksInPath(ctx context.Context,
lState *lockState, ops *folderBranchOps, kmd KeyMetadata,
dir path, blockSizes map[BlockPointer]uint32) error {
children, err := ops.blocks.GetEntries(ctx, lState, kmd, dir)
if err != nil {
return err
}
err = sc.findAllDirBlocks(ctx, lState, ops, kmd, dir, blockSizes)
if err != nil {
return err
}
for name, de := range children {
if de.Type == Sym {
continue
}
blockSizes[de.BlockPointer] = de.EncodedSize
p := dir.ChildPath(name, de.BlockPointer)
if de.Type == Dir {
err := sc.findAllBlocksInPath(ctx, lState, ops, kmd, p, blockSizes)
if err != nil {
return err
}
} else {
// If it's a file, check to see if it's indirect.
err := sc.findAllFileBlocks(ctx, lState, ops, kmd, p, blockSizes)
if err != nil {
return err
}
}
}
return nil
}
func (sc *StateChecker) getLastGCData(ctx context.Context,
tlfID tlf.ID) (time.Time, kbfsmd.Revision) {
config, ok := sc.config.(*ConfigLocal)
if !ok {
return time.Time{}, kbfsmd.RevisionUninitialized
}
var latestTime time.Time
var latestRev kbfsmd.Revision
for _, c := range *config.allKnownConfigsForTesting {
ops := c.KBFSOps().(*KBFSOpsStandard).getOpsIfExists(
context.Background(), FolderBranch{tlfID, MasterBranch})
if ops == nil {
continue
}
rt, rev := ops.fbm.getLastQRData()
if rt.After(latestTime) && rev > latestRev {
latestTime = rt
latestRev = rev
}
}
if latestTime.IsZero() {
return latestTime, latestRev
}
sc.log.CDebugf(ctx, "Last qr data for TLF %s: revTime=%s, rev=%d",
tlfID, latestTime, latestRev)
return latestTime.Add(
-sc.config.Mode().QuotaReclamationMinUnrefAge()), latestRev
}
// CheckMergedState verifies that the state for the given tlf is
// consistent.
func (sc *StateChecker) CheckMergedState(ctx context.Context, tlfID tlf.ID) error {
// Blow away MD cache so we don't have any lingering re-embedded
// block changes (otherwise we won't be able to learn their sizes).
sc.config.SetMDCache(NewMDCacheStandard(defaultMDCacheCapacity))
// Fetch all the MD updates for this folder, and use the block
// change lists to build up the set of currently referenced blocks.
rmds, err := getMergedMDUpdates(ctx, sc.config, tlfID,
kbfsmd.RevisionInitial, nil)
if err != nil {
return err
}
if len(rmds) == 0 {
sc.log.CDebugf(ctx, "No state to check for folder %s", tlfID)
return nil
}
lState := makeFBOLockState()
// Re-embed block changes.
kbfsOps, ok := sc.config.KBFSOps().(*KBFSOpsStandard)
if !ok {
return errors.New("Unexpected KBFSOps type")
}
fb := FolderBranch{tlfID, MasterBranch}
ops := kbfsOps.getOps(context.Background(), fb, FavoritesOpNoChange)
lastGCRevisionTime, lastGCRev := sc.getLastGCData(ctx, tlfID)
// Build the expected block list.
expectedLiveBlocks := make(map[BlockPointer]bool)
expectedRef := uint64(0)
expectedMDRef := uint64(0)
archivedBlocks := make(map[BlockPointer]bool)
actualLiveBlocks := make(map[BlockPointer]uint32)
// See what the last GC op revision is. All unref'd pointers from
// that revision or earlier should be deleted from the block
// server.
gcRevision := kbfsmd.RevisionUninitialized
for _, rmd := range rmds {
// Don't process copies.
if rmd.IsWriterMetadataCopiedSet() {
continue
}
for _, op := range rmd.data.Changes.Ops {
GCOp, ok := op.(*GCOp)
if !ok {
continue
}
gcRevision = GCOp.LatestRev
}
}
for _, rmd := range rmds {
// Don't process copies.
if rmd.IsWriterMetadataCopiedSet() {
continue
}
// Unembedded block changes count towards the MD size.
if info := rmd.data.cachedChanges.Info; info.BlockPointer != zeroPtr {
sc.log.CDebugf(ctx, "Unembedded block change: %v, %d",
info.BlockPointer, info.EncodedSize)
actualLiveBlocks[info.BlockPointer] = info.EncodedSize
// Any child block change pointers?
file := path{FolderBranch{tlfID, MasterBranch},
[]pathNode{{
info.BlockPointer,
fmt.Sprintf("<MD with revision %d>", rmd.Revision())}}}
err := sc.findAllFileBlocks(ctx, lState, ops, rmd.ReadOnly(),
file, actualLiveBlocks)
if err != nil {
return err
}
}
var hasGCOp bool
updated := make(map[BlockPointer]bool)
for _, op := range rmd.data.Changes.Ops {
_, isGCOp := op.(*GCOp)
hasGCOp = hasGCOp || isGCOp
opRefs := make(map[BlockPointer]bool)
for _, ptr := range op.Refs() {
if ptr != zeroPtr {
expectedLiveBlocks[ptr] = true
opRefs[ptr] = true
}
}
if !isGCOp {
for _, ptr := range op.Unrefs() {
if updated[ptr] {
return fmt.Errorf(
"%s already updated in this revision %d",
ptr, rmd.Revision())
}
delete(expectedLiveBlocks, ptr)
if ptr != zeroPtr {
// If the revision has been garbage-collected,
// or if the pointer has been referenced and
// unreferenced within the same op (which
// indicates a failed and retried sync), the
// corresponding block should already be
// cleaned up.
if rmd.Revision() <= gcRevision || opRefs[ptr] {
delete(archivedBlocks, ptr)
} else {
archivedBlocks[ptr] = true
}
}
}
}
for _, update := range op.allUpdates() {
if update.Ref != update.Unref {
updated[update.Unref] = true
delete(expectedLiveBlocks, update.Unref)
}
if update.Unref != zeroPtr && update.Ref != update.Unref {
if rmd.Revision() <= gcRevision {
delete(archivedBlocks, update.Unref)
} else {
archivedBlocks[update.Unref] = true
}
}
if update.Ref != zeroPtr && update.Ref != update.Unref {
expectedLiveBlocks[update.Ref] = true
}
}
}
expectedRef += rmd.RefBytes()
expectedRef -= rmd.UnrefBytes()
expectedMDRef += rmd.MDRefBytes()
if len(rmd.data.Changes.Ops) == 1 && hasGCOp {
// Don't check GC status for GC revisions
continue
}
// Make sure that if this revision should be covered by a GC
// op, it is. Note that this assumes that if QR is ever run,
// it will be run completely and not left partially done due
// to there being too many pointers to collect in one sweep.
mtime := time.Unix(0, rmd.data.Dir.Mtime)
if !lastGCRevisionTime.Before(mtime) && rmd.Revision() <= lastGCRev &&
rmd.Revision() > gcRevision {
return fmt.Errorf("Revision %d happened on or before the last "+
"gc time %s rev %d, but was not included in the latest "+
"gc op revision %d", rmd.Revision(), lastGCRevisionTime,
lastGCRev, gcRevision)
}
}
sc.log.CDebugf(ctx, "Folder %v has %d expected live blocks, "+
"total %d bytes (%d MD bytes)", tlfID, len(expectedLiveBlocks),
expectedRef, expectedMDRef)
currMD := rmds[len(rmds)-1]
expectedUsage := currMD.DiskUsage()
if expectedUsage != expectedRef {
return fmt.Errorf("Expected ref bytes %d doesn't match latest disk "+
"usage %d", expectedRef, expectedUsage)
}
expectedMDUsage := currMD.MDDiskUsage()
if expectedMDUsage != expectedMDRef {
return fmt.Errorf("Expected MD ref bytes %d doesn't match latest disk "+
"MD usage %d", expectedMDRef, expectedMDUsage)
}
// Then, using the current MD head, start at the root of the FS
// and recursively walk the directory tree to find all the blocks
// that are currently accessible.
rootNode, _, _, err := ops.getRootNode(ctx)
if err != nil {
return err
}
rootPath := ops.nodeCache.PathFromNode(rootNode)
if g, e := rootPath.tailPointer(), currMD.data.Dir.BlockPointer; g != e {
return fmt.Errorf("Current MD root pointer %v doesn't match root "+
"node pointer %v", e, g)
}
actualLiveBlocks[rootPath.tailPointer()] = currMD.data.Dir.EncodedSize
if err := sc.findAllBlocksInPath(ctx, lState, ops, currMD.ReadOnly(),
rootPath, actualLiveBlocks); err != nil {
return err
}
sc.log.CDebugf(ctx, "Folder %v has %d actual live blocks",
tlfID, len(actualLiveBlocks))
// Compare the two and see if there are any differences. Don't use
// reflect.DeepEqual so we can print out exactly what's wrong.
var extraBlocks []BlockPointer
actualSize := uint64(0)
actualMDSize := uint64(0)
for ptr, size := range actualLiveBlocks {
if ptr.GetBlockType() == keybase1.BlockType_MD {
actualMDSize += uint64(size)
} else {
actualSize += uint64(size)
}
if !expectedLiveBlocks[ptr] {
extraBlocks = append(extraBlocks, ptr)
}
}
if len(extraBlocks) != 0 {
sc.log.CWarningf(ctx, "%v: Extra live blocks found: %v",
tlfID, extraBlocks)
return fmt.Errorf("Folder %v has inconsistent state", tlfID)
}
var missingBlocks []BlockPointer
for ptr := range expectedLiveBlocks {
if _, ok := actualLiveBlocks[ptr]; !ok {
missingBlocks = append(missingBlocks, ptr)
}
}
if len(missingBlocks) != 0 {
sc.log.CWarningf(ctx, "%v: Expected live blocks not found: %v",
tlfID, missingBlocks)
return fmt.Errorf("Folder %v has inconsistent state", tlfID)
}
if actualSize != expectedRef {
return fmt.Errorf("Actual size %d doesn't match expected size %d",
actualSize, expectedRef)
}
if actualMDSize != expectedMDRef {
return fmt.Errorf("Actual MD size %d doesn't match expected MD size %d",
actualMDSize, expectedMDRef)
}
// Check that the set of referenced blocks matches exactly what
// the block server knows about.
bserverLocal, ok := sc.config.BlockServer().(blockServerLocal)
if !ok {
if jbs, jok := sc.config.BlockServer().(journalBlockServer); jok {
bserverLocal, ok = jbs.BlockServer.(blockServerLocal)
if !ok {
sc.log.CDebugf(ctx, "Bad block server: %T", jbs.BlockServer)
}
}
}
if !ok {
return errors.New("StateChecker only works against BlockServerLocal")
}
bserverKnownBlocks, err := bserverLocal.getAllRefsForTest(ctx, tlfID)
if err != nil {
return err
}
blockRefsByID := make(map[kbfsblock.ID]blockRefMap)
for ptr := range expectedLiveBlocks {
if _, ok := blockRefsByID[ptr.ID]; !ok {
blockRefsByID[ptr.ID] = make(blockRefMap)
}
blockRefsByID[ptr.ID].put(ptr.Context, liveBlockRef, "")
}
for ptr := range archivedBlocks {
if _, ok := blockRefsByID[ptr.ID]; !ok {
blockRefsByID[ptr.ID] = make(blockRefMap)
}
blockRefsByID[ptr.ID].put(ptr.Context, archivedBlockRef, "")
}
if g, e := bserverKnownBlocks, blockRefsByID; !reflect.DeepEqual(g, e) {
for id, eRefs := range e {
if gRefs := g[id]; !reflect.DeepEqual(gRefs, eRefs) {
sc.log.CDebugf(ctx, "Refs for ID %v don't match. "+
"Got %v, expected %v", id, gRefs, eRefs)
}
}
for id, gRefs := range g {
if _, ok := e[id]; !ok {
sc.log.CDebugf(ctx, "Did not find matching expected "+
"ID for found block %v (with refs %v)", id, gRefs)
}
}
return fmt.Errorf("Folder %v has inconsistent state", tlfID)
}
// TODO: Check the archived and deleted blocks as well.
return nil
}