-
Notifications
You must be signed in to change notification settings - Fork 451
/
fs.go
193 lines (170 loc) · 5.92 KB
/
fs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
// Copyright (c) 2016 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package storage
import (
"sync"
"time"
"github.com/m3db/m3/src/dbnode/persist/fs/commitlog"
"github.com/m3db/m3/src/x/instrument"
"go.uber.org/zap"
)
type fileOpStatus int
const (
fileOpNotStarted fileOpStatus = iota
fileOpInProgress
fileOpSuccess
fileOpFailed
)
type fileOpState struct {
// WarmStatus is the status of data persistence for WarmWrites only.
// Each block will only be warm-flushed once, so not keeping track of a
// version here is okay. This is used in the buffer Tick to determine when
// a warm bucket is evictable from memory.
WarmStatus fileOpStatus
// ColdVersionRetrievable keeps track of data persistence for ColdWrites only.
// Each block can be cold-flushed multiple times, so this tracks which
// version of the flush completed successfully. This is ultimately used in
// the buffer Tick to determine which buckets are evictable. Note the distinction
// between ColdVersionRetrievable and ColdVersionFlushed described below.
ColdVersionRetrievable int
// ColdVersionFlushed is the same as ColdVersionRetrievable except in some cases it will be
// higher than ColdVersionRetrievable. ColdVersionFlushed will be higher than
// ColdVersionRetrievable in the situation that a cold flush has completed successfully but
// signaling the update to the BlockLeaseManager hasn't completed yet. As a result, the
// ColdVersionRetrievable can't be incremented yet because a concurrent tick could evict the
// block before it becomes queryable via the block retriever / seeker manager, however, the
// BlockLeaseVerifier needs to know that a higher cold flush version exists on disk so that
// it can approve the SeekerManager's request to open a lease on the latest version.
//
// In other words ColdVersionRetrievabled is used to keep track of the latest cold version that has
// been succesfully flushed and can be queried via the block retriever / seeker manager and
// as a result is safe to evict, while ColdVersionFlushed is used to keep track of the latest
// cold version that has been flushed and to validate lease requests from the SeekerManager when it
// receives a signal to open a new lease.
ColdVersionFlushed int
NumFailures int
}
type runType int
const (
syncRun runType = iota
asyncRun
)
type forceType int
const (
noForce forceType = iota
force
)
type fileSystemManager struct {
databaseFlushManager
databaseCleanupManager
sync.RWMutex
log *zap.Logger
database database
opts Options
status fileOpStatus
enabled bool
}
func newFileSystemManager(
database database,
commitLog commitlog.CommitLog,
opts Options,
) databaseFileSystemManager {
instrumentOpts := opts.InstrumentOptions()
scope := instrumentOpts.MetricsScope().SubScope("fs")
fm := newFlushManager(database, commitLog, scope)
cm := newCleanupManager(database, commitLog, scope)
return &fileSystemManager{
databaseFlushManager: fm,
databaseCleanupManager: cm,
log: instrumentOpts.Logger(),
database: database,
opts: opts,
status: fileOpNotStarted,
enabled: true,
}
}
func (m *fileSystemManager) Disable() fileOpStatus {
m.Lock()
status := m.status
m.enabled = false
m.Unlock()
return status
}
func (m *fileSystemManager) Enable() fileOpStatus {
m.Lock()
status := m.status
m.enabled = true
m.Unlock()
return status
}
func (m *fileSystemManager) Status() fileOpStatus {
m.RLock()
status := m.status
m.RUnlock()
return status
}
func (m *fileSystemManager) Run(
t time.Time,
runType runType,
forceType forceType,
) bool {
m.Lock()
if forceType == noForce && !m.shouldRunWithLock() {
m.Unlock()
return false
}
m.status = fileOpInProgress
m.Unlock()
// NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks.
flushFn := func() {
// NB(r): Use invariant here since flush errors were introduced
// and not caught in CI or integration tests.
// When an invariant occurs in CI tests it panics so as to fail
// the build.
if err := m.Cleanup(t, m.database.IsBootstrapped()); err != nil {
instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err))
})
}
if err := m.Flush(t); err != nil {
instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error("error when flushing data", zap.Time("time", t), zap.Error(err))
})
}
m.Lock()
m.status = fileOpNotStarted
m.Unlock()
}
if runType == syncRun {
flushFn()
} else {
go flushFn()
}
return true
}
func (m *fileSystemManager) Report() {
m.databaseCleanupManager.Report()
m.databaseFlushManager.Report()
}
func (m *fileSystemManager) shouldRunWithLock() bool {
return m.enabled && m.status != fileOpInProgress && m.database.IsBootstrapped()
}