forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
/
replica_sideload.go
262 lines (236 loc) · 9.93 KB
/
replica_sideload.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
// Copyright 2017 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.
package storage
import (
"runtime/debug"
"time"
"golang.org/x/net/context"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/coreos/etcd/raft/raftpb"
"github.com/pkg/errors"
)
const bulkIOWriteLimiterLongWait = 500 * time.Millisecond
func limitBulkIOWrite(ctx context.Context, st *cluster.Settings, cost int) {
// The limiter disallows anything greater than its burst (set to
// BulkIOWriteLimiterBurst), so cap the batch size if it would overflow.
//
// TODO(dan): This obviously means the limiter is no longer accounting for
// the full cost. I've tried calling WaitN in a loop to fully cover the
// cost, but that doesn't seem to be as smooth in practice (TPCH-10 restores
// on azure local disks), I think because the file is written all at once at
// the end. This could be fixed by writing the file in chunks, which also
// would likely help the overall smoothness, too.
if cost > cluster.BulkIOWriteLimiterBurst {
cost = cluster.BulkIOWriteLimiterBurst
}
begin := timeutil.Now()
if err := st.BulkIOWriteLimiter.WaitN(ctx, cost); err != nil {
log.Errorf(ctx, "error rate limiting bulk io write: %+v", err)
}
if d := timeutil.Since(begin); d > bulkIOWriteLimiterLongWait {
log.Warningf(ctx, "bulk io write limiter took %s (>%s):\n%s",
d, bulkIOWriteLimiterLongWait, debug.Stack())
}
}
var errSideloadedFileNotFound = errors.New("sideloaded file not found")
// sideloadStorage is the interface used for Raft SSTable sideloading.
// Implementations do not need to be thread safe.
type sideloadStorage interface {
// The directory in which the sideloaded files are stored. May or may not
// exist.
Dir() string
// Writes the given contents to the file specified by the given index and
// term. Does not perform the write if the file exists.
PutIfNotExists(_ context.Context, index, term uint64, contents []byte) error
// Load the file at the given index and term. Return errSideloadedFileNotFound when no
// such file is present.
Get(_ context.Context, index, term uint64) ([]byte, error)
// Purge removes the file at the given index and term. It may also
// remove any leftover files at the same index and earlier terms, but
// is not required to do so. When no file at the given index and term
// exists, returns errSideloadedFileNotFound.
Purge(_ context.Context, index, term uint64) error
// Clear files that may have been written by this sideloadStorage.
Clear(context.Context) error
// TruncateTo removes all files belonging to an index strictly smaller than
// the given one.
TruncateTo(_ context.Context, index uint64) error
// Returns an absolute path to the file that Get() would return the contents
// of. Does not check whether the file actually exists.
Filename(_ context.Context, index, term uint64) (string, error)
}
// maybeSideloadEntriesRaftMuLocked should be called with a slice of "fat"
// entries before appending them to the Raft log. For those entries which are
// sideloadable, this is where the actual sideloading happens: in come fat
// proposals, out go thin proposals. Note that this method is to be called
// before modifications are persisted to the log. The other way around is
// incorrect since an ill-timed crash gives you thin proposals and no files.
//
// The passed-in slice is not mutated.
func (r *Replica) maybeSideloadEntriesRaftMuLocked(
ctx context.Context, entriesToAppend []raftpb.Entry,
) (_ []raftpb.Entry, sideloadedEntriesSize int64, _ error) {
// TODO(tschottdorf): allocating this closure could be expensive. If so make
// it a method on Replica.
maybeRaftCommand := func(cmdID storagebase.CmdIDKey) (storagebase.RaftCommand, bool) {
r.mu.Lock()
defer r.mu.Unlock()
cmd, ok := r.mu.proposals[cmdID]
if ok {
return cmd.command, true
}
return storagebase.RaftCommand{}, false
}
return maybeSideloadEntriesImpl(ctx, entriesToAppend, r.raftMu.sideloaded, maybeRaftCommand)
}
// maybeSideloadEntriesImpl iterates through the provided slice of entries. If
// no sideloadable entries are found, it returns the same slice. Otherwise, it
// returns a new slice in which all applicable entries have been sideloaded to
// the specified sideloadStorage. maybeRaftCommand is called when sideloading is
// necessary and can optionally supply a pre-Unmarshaled RaftCommand (which
// usually is provided by the Replica in-flight proposal map.
func maybeSideloadEntriesImpl(
ctx context.Context,
entriesToAppend []raftpb.Entry,
sideloaded sideloadStorage,
maybeRaftCommand func(storagebase.CmdIDKey) (storagebase.RaftCommand, bool),
) (_ []raftpb.Entry, sideloadedEntriesSize int64, _ error) {
cow := false
for i := range entriesToAppend {
var err error
if sniffSideloadedRaftCommand(entriesToAppend[i].Data) {
log.Event(ctx, "sideloading command in append")
if !cow {
// Avoid mutating the passed-in entries directly. The caller
// wants them to remain "fat".
log.Eventf(ctx, "copying entries slice of length %d", len(entriesToAppend))
cow = true
entriesToAppend = append([]raftpb.Entry(nil), entriesToAppend...)
}
ent := &entriesToAppend[i]
cmdID, data := DecodeRaftCommand(ent.Data) // cheap
strippedCmd, ok := maybeRaftCommand(cmdID)
if ok {
// Happy case: we have this proposal locally (i.e. we proposed
// it). In this case, we can save unmarshalling the fat proposal
// because it's already in-memory.
if strippedCmd.ReplicatedEvalResult.AddSSTable == nil {
log.Fatalf(ctx, "encountered sideloaded non-AddSSTable command: %+v", strippedCmd)
}
log.Eventf(ctx, "command already in memory")
// The raft proposal is immutable. To respect that, shallow-copy
// the (nullable) AddSSTable struct which we intend to modify.
addSSTableCopy := *strippedCmd.ReplicatedEvalResult.AddSSTable
strippedCmd.ReplicatedEvalResult.AddSSTable = &addSSTableCopy
} else {
// Bad luck: we didn't have the proposal in-memory, so we'll
// have to unmarshal it.
log.Event(ctx, "proposal not already in memory; unmarshaling")
if err := strippedCmd.Unmarshal(data); err != nil {
return nil, 0, err
}
}
if strippedCmd.ReplicatedEvalResult.AddSSTable == nil {
// Still no AddSSTable; someone must've proposed a v2 command
// but not becaused it contains an inlined SSTable. Strange, but
// let's be future proof.
log.Warning(ctx, "encountered sideloaded Raft command without inlined payload")
continue
}
// Actually strip the command.
dataToSideload := strippedCmd.ReplicatedEvalResult.AddSSTable.Data
strippedCmd.ReplicatedEvalResult.AddSSTable.Data = nil
{
var err error
data, err = strippedCmd.Marshal()
if err != nil {
return nil, 0, errors.Wrap(err, "while marshalling stripped sideloaded command")
}
}
ent.Data = encodeRaftCommandV2(cmdID, data)
log.Eventf(ctx, "writing payload at index=%d term=%d", ent.Index, ent.Term)
if err = sideloaded.PutIfNotExists(ctx, ent.Index, ent.Term, dataToSideload); err != nil {
return nil, 0, err
}
sideloadedEntriesSize += int64(len(dataToSideload))
}
}
return entriesToAppend, sideloadedEntriesSize, nil
}
func sniffSideloadedRaftCommand(data []byte) (sideloaded bool) {
return len(data) > 0 && data[0] == byte(raftVersionSideloaded)
}
// maybeInlineSideloadedRaftCommand takes an entry and inspects it. If its
// command encoding version indicates a sideloaded entry, it uses the entryCache
// or sideloadStorage to inline the payload, returning a new entry (which must
// be treated as immutable by the caller) or nil (if inlining does not apply)
//
// If a payload is missing, returns an error whose Cause() is
// errSideloadedFileNotFound.
func maybeInlineSideloadedRaftCommand(
ctx context.Context,
rangeID roachpb.RangeID,
ent raftpb.Entry,
sideloaded sideloadStorage,
entryCache *raftEntryCache,
) (*raftpb.Entry, error) {
if !sniffSideloadedRaftCommand(ent.Data) {
return nil, nil
}
log.Event(ctx, "inlining sideloaded SSTable")
// We could unmarshal this yet again, but if it's committed we
// are very likely to have appended it recently, in which case
// we can save work.
cachedSingleton, _, _ := entryCache.getEntries(
nil, rangeID, ent.Index, ent.Index+1, 1<<20,
)
if len(cachedSingleton) > 0 {
log.Event(ctx, "using cache hit")
return &cachedSingleton[0], nil
}
// Make a shallow copy.
entCpy := ent
ent = entCpy
log.Event(ctx, "inlined entry not cached")
// Out of luck, for whatever reason the inlined proposal isn't in the cache.
cmdID, data := DecodeRaftCommand(ent.Data)
var command storagebase.RaftCommand
if err := command.Unmarshal(data); err != nil {
return nil, err
}
if len(command.ReplicatedEvalResult.AddSSTable.Data) > 0 {
// The entry we started out with was already "fat". This happens when
// the entry reached us through a preemptive snapshot (when we didn't
// have a ReplicaID yet).
log.Event(ctx, "entry already inlined")
return &ent, nil
}
sideloadedData, err := sideloaded.Get(ctx, ent.Index, ent.Term)
if err != nil {
return nil, errors.Wrap(err, "loading sideloaded data")
}
command.ReplicatedEvalResult.AddSSTable.Data = sideloadedData
{
data, err := command.Marshal()
if err != nil {
return nil, err
}
ent.Data = encodeRaftCommandV2(cmdID, data)
}
return &ent, nil
}