forked from grafana/loki
/
table.go
264 lines (218 loc) · 7.94 KB
/
table.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
package compactor
import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/prometheus/common/model"
chunk_util "github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/compactor/retention"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/storage"
util_log "github.com/grafana/loki/pkg/util/log"
)
const (
uploadIndexSetsConcurrency = 10
gzipExtension = ".gz"
)
var errRetentionFileCountNotOne = fmt.Errorf("can't apply retention when index file count is not one")
type tableExpirationChecker interface {
IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool
}
type IndexCompactor interface {
// NewTableCompactor returns a new TableCompactor for compacting a table.
// commonIndexSet refers to common index files or in other words multi-tenant index.
// existingUserIndexSet refers to existing user specific index files in the storage.
// makeEmptyUserIndexSetFunc can be used for creating an empty indexSet for a user
// who does not have an index for it in existingUserIndexSet.
// periodConfig holds the PeriodConfig for the table.
NewTableCompactor(
ctx context.Context,
commonIndexSet IndexSet,
existingUserIndexSet map[string]IndexSet,
makeEmptyUserIndexSetFunc MakeEmptyUserIndexSetFunc,
periodConfig config.PeriodConfig,
) TableCompactor
// OpenCompactedIndexFile opens a compressed index file at given path.
OpenCompactedIndexFile(
ctx context.Context,
path,
tableName,
userID,
workingDir string,
periodConfig config.PeriodConfig,
logger log.Logger,
) (
CompactedIndex,
error,
)
}
type TableCompactor interface {
// CompactTable compacts the table.
// After compaction is done successfully, it should set the new/updated CompactedIndex for relevant IndexSets.
CompactTable() (err error)
}
type MakeEmptyUserIndexSetFunc func(userID string) (IndexSet, error)
type table struct {
name string
workingDirectory string
indexStorageClient storage.Client
indexCompactor IndexCompactor
tableMarker retention.TableMarker
expirationChecker tableExpirationChecker
periodConfig config.PeriodConfig
baseUserIndexSet, baseCommonIndexSet storage.IndexSet
indexSets map[string]*indexSet
usersWithPerUserIndex []string
logger log.Logger
ctx context.Context
}
func newTable(ctx context.Context, workingDirectory string, indexStorageClient storage.Client,
indexCompactor IndexCompactor, periodConfig config.PeriodConfig,
tableMarker retention.TableMarker, expirationChecker tableExpirationChecker,
) (*table, error) {
err := chunk_util.EnsureDirectory(workingDirectory)
if err != nil {
return nil, err
}
table := table{
ctx: ctx,
name: filepath.Base(workingDirectory),
workingDirectory: workingDirectory,
indexStorageClient: indexStorageClient,
indexCompactor: indexCompactor,
tableMarker: tableMarker,
expirationChecker: expirationChecker,
periodConfig: periodConfig,
indexSets: map[string]*indexSet{},
baseUserIndexSet: storage.NewIndexSet(indexStorageClient, true),
baseCommonIndexSet: storage.NewIndexSet(indexStorageClient, false),
}
table.logger = log.With(util_log.Logger, "table-name", table.name)
return &table, nil
}
func (t *table) compact(applyRetention bool) error {
indexFiles, usersWithPerUserIndex, err := t.indexStorageClient.ListFiles(t.ctx, t.name, false)
if err != nil {
return err
}
if len(indexFiles) == 0 && len(usersWithPerUserIndex) == 0 {
level.Info(t.logger).Log("msg", "no common index files and user index found")
return nil
}
t.usersWithPerUserIndex = usersWithPerUserIndex
level.Info(t.logger).Log("msg", "listed files", "count", len(indexFiles))
defer func() {
for _, is := range t.indexSets {
is.cleanup()
}
if err := os.RemoveAll(t.workingDirectory); err != nil {
level.Error(t.logger).Log("msg", fmt.Sprintf("failed to remove working directory %s", t.workingDirectory), "err", err)
}
}()
t.indexSets[""], err = newCommonIndexSet(t.ctx, t.name, t.baseCommonIndexSet, t.workingDirectory, t.logger)
if err != nil {
return err
}
// userIndexSets is just for passing it to NewTableCompactor since go considers map[string]*indexSet different type than map[string]IndexSet
userIndexSets := make(map[string]IndexSet, len(t.usersWithPerUserIndex))
for _, userID := range t.usersWithPerUserIndex {
var err error
t.indexSets[userID], err = newUserIndexSet(t.ctx, t.name, userID, t.baseUserIndexSet, filepath.Join(t.workingDirectory, userID), t.logger)
if err != nil {
return err
}
userIndexSets[userID] = t.indexSets[userID]
}
// protect indexSets with mutex so that we are concurrency safe if the TableCompactor calls MakeEmptyUserIndexSetFunc concurrently
indexSetsMtx := sync.Mutex{}
tableCompactor := t.indexCompactor.NewTableCompactor(t.ctx, t.indexSets[""], userIndexSets, func(userID string) (IndexSet, error) {
indexSetsMtx.Lock()
defer indexSetsMtx.Unlock()
var err error
t.indexSets[userID], err = newUserIndexSet(t.ctx, t.name, userID, t.baseUserIndexSet, filepath.Join(t.workingDirectory, userID), t.logger)
return t.indexSets[userID], err
}, t.periodConfig)
err = tableCompactor.CompactTable()
if err != nil {
return err
}
if applyRetention {
err := t.applyRetention()
if err != nil {
return err
}
}
return t.done()
}
func (t *table) done() error {
userIDs := make([]string, 0, len(t.indexSets))
for userID := range t.indexSets {
// indexSet.done() uploads the compacted db and cleans up the source index files.
// For user index sets, the files from common index sets are also a source of index.
// if we cleanup common index sets first, and we fail to upload newly compacted dbs in user index sets, then we will lose data.
// To avoid any data loss, we should call done() on common index sets at the end.
if userID == "" {
continue
}
userIDs = append(userIDs, userID)
}
err := concurrency.ForEachJob(t.ctx, len(userIDs), uploadIndexSetsConcurrency, func(ctx context.Context, idx int) error {
return t.indexSets[userIDs[idx]].done()
})
if err != nil {
return err
}
if commonIndexSet, ok := t.indexSets[""]; ok {
if err := commonIndexSet.done(); err != nil {
return err
}
}
return nil
}
// applyRetention applies retention on the index sets
func (t *table) applyRetention() error {
tableInterval := retention.ExtractIntervalFromTableName(t.name)
// call runRetention on the index sets which may have expired chunks
for userID, is := range t.indexSets {
// make sure we do not apply retention on common index set which got compacted away to per-user index
if userID == "" && is.compactedIndex == nil && is.removeSourceObjects && !is.uploadCompactedDB {
continue
}
if !t.expirationChecker.IntervalMayHaveExpiredChunks(tableInterval, userID) {
continue
}
// compactedIndex is only set in indexSet when files have been compacted,
// so we need to open the compacted index file for applying retention if compactedIndex is nil
if is.compactedIndex == nil && len(is.ListSourceFiles()) == 1 {
if err := t.openCompactedIndexForRetention(is); err != nil {
return err
}
}
err := is.runRetention(t.tableMarker)
if err != nil {
return err
}
}
return nil
}
func (t *table) openCompactedIndexForRetention(idxSet *indexSet) error {
sourceFiles := idxSet.ListSourceFiles()
if len(sourceFiles) != 1 {
return errRetentionFileCountNotOne
}
downloadedAt, err := idxSet.GetSourceFile(sourceFiles[0])
if err != nil {
return err
}
compactedIndexFile, err := t.indexCompactor.OpenCompactedIndexFile(t.ctx, downloadedAt, t.name, idxSet.userID, filepath.Join(t.workingDirectory, idxSet.userID), t.periodConfig, idxSet.logger)
if err != nil {
return err
}
idxSet.setCompactedIndex(compactedIndexFile, false, false)
return nil
}