/
family.go
360 lines (317 loc) · 10.8 KB
/
family.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
// Licensed to LinDB under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. LinDB licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kv
import (
"fmt"
"path/filepath"
"sync"
"go.uber.org/atomic"
"github.com/lindb/lindb/kv/table"
"github.com/lindb/lindb/kv/version"
"github.com/lindb/lindb/pkg/fileutil"
"github.com/lindb/lindb/pkg/logger"
"github.com/lindb/lindb/pkg/timeutil"
)
//go:generate mockgen -source ./family.go -destination=./family_mock.go -package kv
// for testing
var (
newCompactJobFunc = newCompactJob
removeDirFunc = fileutil.RemoveDir
)
// Family implements column family for data isolation each family.
type Family interface {
// ID return family's id.
ID() version.FamilyID
// Name return family's name.
Name() string
// NewFlusher creates flusher for saving data to family.
NewFlusher() Flusher
// GetSnapshot returns current version's snapshot.
GetSnapshot() version.Snapshot
// Compact compacts all files of level0.
Compact()
getStore() Store
// familyInfo return family info
familyInfo() string
// getFamilyVersion returns the family version
getFamilyVersion() version.FamilyVersion
// commitEditLog persists edit logs into manifest file.
commitEditLog(editLog version.EditLog) bool
// newTableBuilder creates table builder instance for storing kv data.
newTableBuilder() (table.Builder, error)
// needCompact returns level0 files if it needs to do compact job.
needCompact() bool
// compact does compaction job.
compact()
// getNewMerger returns new merger function, merger need implement Merger interface
getNewMerger() NewMerger
// addPendingOutput add a file which current writing file number
addPendingOutput(fileNumber table.FileNumber)
// removePendingOutput removes pending output file after compact or flush
removePendingOutput(fileNumber table.FileNumber)
// needRollup checks if it needs rollup source family data.
needRollup() bool
// rollup does rollup job.
rollup()
// cleanReferenceFiles cleans target family's reference files after delete source family's rollup files.
cleanReferenceFiles(sourceFamily Family, sourceFiles []table.FileNumber)
// doRollupWork does rollup job, merge source family data to target family.
doRollupWork(sourceFamily Family, rollup Rollup, sourceFiles []table.FileNumber) (err error)
// deleteObsoleteFiles deletes obsolete files.
deleteObsoleteFiles()
// close family, need wait background job completed then releases resource.
close()
}
// family implements Family interface
type family struct {
store Store
name string
familyPath string
option FamilyOption
merger NewMerger
familyVersion version.FamilyVersion
maxFileSize uint32
pendingOutputs sync.Map // keep all pending output files, includes flush/compact/rollup.
newCompactJobFunc func(family Family, state *compactionState, rollup Rollup) CompactJob
rolluping atomic.Bool
lastRollupTime *atomic.Int64
compacting atomic.Bool
condition sync.WaitGroup // compact/rollup job if it's doing
}
// newFamily creates new family or open existed family.
func newFamily(store Store, option FamilyOption) (Family, error) {
name := option.Name
familyPath := filepath.Join(store.Path(), name)
if !fileutil.Exist(familyPath) {
if err := mkDirFunc(familyPath); err != nil {
return nil, fmt.Errorf("mkdir family path error:%w", err)
}
}
merger, ok := mergers[MergerType(option.Merger)]
if !ok {
return nil, fmt.Errorf("merger of option not impelement Merger interface, merger is [%s]", option.Merger)
}
maxFileSize := defaultMaxFileSize
if option.MaxFileSize > 0 {
maxFileSize = option.MaxFileSize
}
f := &family{
familyPath: familyPath,
store: store,
name: name,
option: option,
merger: merger,
maxFileSize: maxFileSize,
newCompactJobFunc: newCompactJobFunc,
familyVersion: store.createFamilyVersion(name, version.FamilyID(option.ID)),
lastRollupTime: atomic.NewInt64(timeutil.Now()),
}
kvLogger.Info("create new family successfully", logger.String("family", f.familyInfo()))
return f, nil
}
// ID return family's id
func (f *family) ID() version.FamilyID {
return version.FamilyID(f.option.ID)
}
// Name return family's name
func (f *family) Name() string {
return f.name
}
func (f *family) getStore() Store {
return f.store
}
// NewFlusher creates flusher for saving data to family.
func (f *family) NewFlusher() Flusher {
f.condition.Add(1)
return newStoreFlusher(f, func() {
f.condition.Done()
})
}
// GetSnapshot returns current version's snapshot
func (f *family) GetSnapshot() version.Snapshot {
return f.familyVersion.GetSnapshot()
}
// familyInfo return family info
func (f *family) familyInfo() string {
return f.familyPath
}
// newTableBuilder creates table builder instance for storing kv data.
func (f *family) newTableBuilder() (table.Builder, error) {
fileNumber := f.store.nextFileNumber()
// NOTE: need add pending output before create write
f.addPendingOutput(fileNumber)
fileName := filepath.Join(f.familyPath, version.Table(fileNumber))
return table.NewStoreBuilder(fileNumber, fileName)
}
// commitEditLog persists edit logs into manifest file.
// returns true on committing successfully and false on failure
func (f *family) commitEditLog(editLog version.EditLog) bool {
if editLog == nil || editLog.IsEmpty() {
kvLogger.Warn("edit log is empty", logger.String("family", f.familyInfo()))
return true
}
if err := f.store.commitFamilyEditLog(f.name, editLog); err != nil {
kvLogger.Error("commit edit log error:", logger.String("family", f.familyInfo()), logger.Error(err))
return false
}
return true
}
// Compact compacts all files of level0.
func (f *family) Compact() {
// has compaction job doing
if f.compacting.Load() {
return
}
snapshot := f.GetSnapshot()
numberOfFiles := snapshot.GetCurrent().NumberOfFilesInLevel(0)
snapshot.Close()
if numberOfFiles > 1 {
f.compact()
}
}
// needCompact returns level0 files if it needs to do compact job
func (f *family) needCompact() bool {
// has compaction job doing
if f.compacting.Load() {
return false
}
snapshot := f.GetSnapshot()
defer snapshot.Close()
threshold := f.option.CompactThreshold
if threshold <= 0 {
threshold = defaultCompactThreshold
}
numberOfFiles := snapshot.GetCurrent().NumberOfFilesInLevel(0)
if numberOfFiles > 0 && numberOfFiles >= threshold {
kvLogger.Info("need to compact level0 files", logger.String("family", f.familyInfo()),
logger.Any("numOfFiles", numberOfFiles), logger.Any("threshold", f.option.CompactThreshold))
return true
}
return false
}
// compact does compact job if it hasn't compact job running.
func (f *family) compact() {
if f.compacting.CAS(false, true) {
f.condition.Add(1)
go func() {
defer func() {
f.condition.Done()
f.compacting.Store(false)
}()
if err := f.backgroundCompactionJob(); err != nil {
kvLogger.Error("do compact job error",
logger.String("family", f.familyInfo()), logger.Error(err), logger.Stack())
}
}()
}
}
// backgroundCompactionJob runs compact job in background goroutine
func (f *family) backgroundCompactionJob() error {
snapshot := f.GetSnapshot()
defer func() {
snapshot.Close()
// clean up unused files, maybe some file not used
f.deleteObsoleteFiles()
}()
compaction := snapshot.GetCurrent().PickL0Compaction(f.option.CompactThreshold)
if compaction == nil {
// no compaction job need to do
return nil
}
compactionState := newCompactionState(f.maxFileSize, snapshot, compaction)
compactJob := f.newCompactJobFunc(f, compactionState, nil)
if err := compactJob.Run(); err != nil {
return err
}
return nil
}
// addPendingOutput add a file which current writing file number
func (f *family) addPendingOutput(fileNumber table.FileNumber) {
f.pendingOutputs.Store(fileNumber, dummy)
}
// removePendingOutput removes pending output file after compact or flush
func (f *family) removePendingOutput(fileNumber table.FileNumber) {
f.pendingOutputs.Delete(fileNumber)
}
// deleteSST deletes the temp sst file, if flush or compact fail
func (f *family) deleteSST(fileNumber table.FileNumber) error {
if err := removeDirFunc(filepath.Join(f.familyPath, version.Table(fileNumber))); err != nil {
return err
}
return nil
}
// getFamilyVersion returns the family version
func (f *family) getFamilyVersion() version.FamilyVersion {
return f.familyVersion
}
// getNewMerger returns new merger function, merger need implement Merger interface
func (f *family) getNewMerger() NewMerger {
return f.merger
}
// deleteObsoleteFiles deletes obsolete files
func (f *family) deleteObsoleteFiles() {
sstFiles, err := listDirFunc(f.familyPath)
if err != nil {
kvLogger.Error("list sst file fail when delete obsolete files", logger.String("family", f.familyInfo()))
return
}
// make a map for all live files
liveFiles := make(map[table.FileNumber]string)
f.pendingOutputs.Range(func(key, _ interface{}) bool {
if k, ok := key.(table.FileNumber); ok {
liveFiles[k] = dummy
}
return true
})
// add live files
allLiveSSTFiles := f.familyVersion.GetAllActiveFiles()
for idx := range allLiveSSTFiles {
liveFiles[allLiveSSTFiles[idx].GetFileNumber()] = dummy
}
// add live rollup files, maybe some rollup files is not alive in current family version,
// but those files cannot delete, because need read those files when do rollup job
rollupFiles := f.familyVersion.GetLiveRollupFiles()
for file := range rollupFiles {
liveFiles[file] = dummy
}
for _, fileName := range sstFiles {
fileDesc := version.ParseFileName(fileName)
if fileDesc == nil {
continue
}
keep := true
fileNumber := fileDesc.FileNumber
if fileDesc.FileType == version.TypeTable {
_, keep = liveFiles[fileNumber]
}
if !keep {
f.store.evictFamilyFile(fileNumber)
if err := f.deleteSST(fileNumber); err != nil {
kvLogger.Error("delete sst file fail",
logger.String("family", f.familyInfo()), logger.Any("fileNumber", fileNumber))
} else {
kvLogger.Info("delete sst file successfully",
logger.String("family", f.familyInfo()), logger.Any("fileNumber", fileNumber))
}
}
}
}
// close family, need wait background job completed then releases resource.
func (f *family) close() {
// wait background job completed.
f.condition.Wait()
}