forked from keybase/client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
rekey_queue.go
129 lines (115 loc) · 3.86 KB
/
rekey_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Copyright 2016 Keybase Inc. All rights reserved.
// Use of this source code is governed by a BSD
// license that can be found in the LICENSE file.
package libkbfs
import (
"sync"
"github.com/keybase/client/go/kbfs/tlf"
"github.com/keybase/client/go/logger"
"golang.org/x/net/context"
"golang.org/x/time/rate"
)
// When provisioning a new device from an existing device, the provisionee
// needs one of the existing devices to rekey for it, or it has to use paperkey
// for the rekey. For the case where an existing device does the rekey, there
// are three routines which eventually all go through this rekey queue. These
// three rekey routines are:
//
// 1. When a new device is added, the service on provisioner calls an RPC into
// KBFS, notifying the latter about the new device (provisionee) and that it
// needs rekey.
// 2. On KBFS client, a background routine runs once per hour. It asks the
// mdserver to check for TLFs that needs rekey. Note that this happens on all
// KBFS devices, no matter it has rekey capability or now.
//
// Both 1 and 2 do this by calling MDServerRemote.CheckForRekeys to send back a
// FoldersNeedRekey request.
//
// 3. When the provisionee gets provisioned, it goes through all TLFs and sends
// a MD update for each one of them, by merely copying (since it doesn't have
// access to the key yet) the existing MD revision while setting the rekey bit
// in the flag.
const (
numConcurrentRekeys = 64
rekeysPerSecond rate.Limit = 16
)
// RekeyQueueStandard implements the RekeyQueue interface.
type RekeyQueueStandard struct {
config Config
log logger.Logger
queue chan tlf.ID
limiter *rate.Limiter
cancel context.CancelFunc
mu sync.RWMutex // guards everything below
pendings map[tlf.ID]bool
}
// Test that RekeyQueueStandard fully implements the RekeyQueue interface.
var _ RekeyQueue = (*RekeyQueueStandard)(nil)
// NewRekeyQueueStandard creates a new rekey queue.
func NewRekeyQueueStandard(config Config) (rkq *RekeyQueueStandard) {
ctx, cancel := context.WithCancel(context.Background())
rkq = &RekeyQueueStandard{
config: config,
log: config.MakeLogger("RQ"),
queue: make(chan tlf.ID, config.Mode().RekeyQueueSize()),
limiter: rate.NewLimiter(rekeysPerSecond, numConcurrentRekeys),
pendings: make(map[tlf.ID]bool),
cancel: cancel,
}
if config.Mode().RekeyWorkers() > 0 {
rkq.start(ctx)
}
return rkq
}
// start spawns a goroutine that dispatches rekey requests to correct folder
// branch ops while conforming to the rater limiter.
func (rkq *RekeyQueueStandard) start(ctx context.Context) {
go func() {
for {
select {
case id := <-rkq.queue:
if err := rkq.limiter.Wait(ctx); err != nil {
rkq.log.Debug("Waiting on rate limiter for tlf=%v error: %v", id, err)
return
}
rkq.config.KBFSOps().RequestRekey(context.Background(), id)
func(id tlf.ID) {
rkq.mu.Lock()
defer rkq.mu.Unlock()
delete(rkq.pendings, id)
}(id)
case err := <-ctx.Done():
rkq.log.Debug("Rekey queue background routine context done: %v", err)
return
}
}
}()
}
// Enqueue implements the RekeyQueue interface for RekeyQueueStandard.
func (rkq *RekeyQueueStandard) Enqueue(id tlf.ID) {
rkq.mu.Lock()
defer rkq.mu.Unlock()
rkq.pendings[id] = true
select {
case rkq.queue <- id:
default:
// The queue is full; drop this one for now until the next
// request to the server for more rekeys.
rkq.log.Debug("Rekey queue is full; dropping %s", id)
}
}
// IsRekeyPending implements the RekeyQueue interface for RekeyQueueStandard.
func (rkq *RekeyQueueStandard) IsRekeyPending(id tlf.ID) bool {
rkq.mu.RLock()
defer rkq.mu.RUnlock()
return rkq.pendings[id]
}
// Shutdown implements the RekeyQueue interface for RekeyQueueStandard.
func (rkq *RekeyQueueStandard) Shutdown() {
rkq.mu.Lock()
defer rkq.mu.Unlock()
if rkq.cancel != nil {
rkq.cancel()
rkq.cancel = nil
}
}