-
Notifications
You must be signed in to change notification settings - Fork 16
/
cleanup.go
220 lines (178 loc) · 5.74 KB
/
cleanup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package store
import (
"errors"
"log/slog"
"os"
"strings"
"time"
"github.com/spf13/viper"
"github.com/turbot/flowpipe/internal/filepaths"
"github.com/turbot/pipe-fittings/constants"
"github.com/turbot/pipe-fittings/perr"
putils "github.com/turbot/pipe-fittings/utils"
)
func cleanupFlowpipeDB(currentTime time.Time, offset time.Duration) (int, error) {
slog.Debug("Cleaning up flowpipe db")
db, err := OpenFlowpipeDB()
if err != nil {
slog.Error("error opening flowpipe db", "error", err)
return -1, perr.InternalWithMessage("error opening flowpipe db")
}
defer db.Close()
timeLimit := currentTime.Add(offset)
// TODO: how do we cleanup orphaned pipeline runs? We can filter this to just 'finished', 'cancelled' and 'failed' states
// but then the orphan pipeline will never be cleaned. Should we have a hard limit?
cleanupQuery := `delete from pipeline_run where updated_at < ?;`
timeAsString := timeLimit.Format(putils.RFC3339WithMS)
result, err := db.Exec(cleanupQuery, timeAsString)
if err != nil {
slog.Error("error cleaning up flowpipe db", "error", err)
return -1, perr.InternalWithMessage("error cleaning up flowpipe db")
}
rowsAffected, err := result.RowsAffected()
if err != nil {
slog.Error("error cleaning up flowpipe db", "error", err)
return -1, perr.InternalWithMessage("error cleaning up flowpipe db")
}
slog.Debug("Cleaned up flowpipe db", "rowsAffected", rowsAffected)
sql := `select value from internal where name = 'last_cleanup'`
rows, err := db.Query(sql)
if err != nil {
slog.Error("error getting last cleanup time", "error", err)
return -1, perr.InternalWithMessage("error getting last cleanup time")
}
var lastCleanupTime string
for rows.Next() {
err = rows.Scan(&lastCleanupTime)
if err != nil {
slog.Error("error getting last cleanup time", "error", err)
return -1, perr.InternalWithMessage("error getting last cleanup time")
}
}
defer rows.Close()
currentTimeStringFormat := currentTime.Format(putils.RFC3339WithMS)
if lastCleanupTime != "" {
sql = `update internal set value = ?, updated_at = ? where name = 'last_cleanup'`
_, err = db.Exec(sql, currentTimeStringFormat, currentTimeStringFormat)
if err != nil {
slog.Error("error updating last cleanup time", "error", err)
return -1, perr.InternalWithMessage("error updating last cleanup time")
}
} else {
sql = `insert into internal (name, value, created_at) values ('last_cleanup', ?, ?)`
_, err = db.Exec(sql, currentTimeStringFormat, currentTimeStringFormat)
if err != nil {
slog.Error("error inserting last cleanup time", "error", err)
return -1, perr.InternalWithMessage("error inserting last cleanup time")
}
}
return int(rowsAffected), nil
}
func CleanupRunner() {
currentTime := time.Now().UTC()
retentionInSecond := viper.GetInt(constants.ArgProcessRetention)
if retentionInSecond == -1 {
slog.Debug("Skipping cleanup as retention is set to -1")
return
}
offset := time.Duration(-1*retentionInSecond) * time.Second
rowsAffected, err := cleanupFlowpipeDB(currentTime, offset)
if err != nil {
slog.Error("error cleaning up flowpipe db", "error", err)
return
}
slog.Info("Cleaned up flowpipe db", "rowsAffected", rowsAffected)
deleteOldJsonlFiles(filepaths.EventStoreDir(), offset)
}
// Force cleanup run if we haven't run it more than 1 day
func ForceCleanup() {
// can only clean up if flowpipe.db exist
dbPath := filepaths.FlowpipeDBFileName()
_, err := os.Stat(dbPath)
if os.IsNotExist(err) {
slog.Debug("Skipping force cleanup as flowpipe.db does not exist")
return
}
slog.Debug("Checking if cleanup must be run")
sql := `select value from internal where name = 'last_cleanup'`
db, err := OpenFlowpipeDB()
if err != nil {
slog.Error("error opening flowpipe db", "error", err)
return
}
defer db.Close()
rows, err := db.Query(sql)
if err != nil {
slog.Error("error getting last cleanup time", "error", err)
return
}
var lastCleanupTime string
for rows.Next() {
err = rows.Scan(&lastCleanupTime)
if err != nil {
slog.Error("error getting last cleanup time", "error", err)
return
}
}
defer rows.Close()
runCleanup := false
if lastCleanupTime == "" {
runCleanup = true
} else {
lastCleanupTime, err := time.Parse(putils.RFC3339WithMS, lastCleanupTime)
if err != nil {
slog.Error("error parsing last cleanup time", "error", err)
runCleanup = true
}
// force run cleanup if we haven't run cleanup for 1 day
if time.Now().UTC().Sub(lastCleanupTime) > 24*time.Hour {
runCleanup = true
}
}
if !runCleanup {
slog.Debug("Skipping force cleanup")
return
}
slog.Debug("Running force cleanup")
CleanupRunner()
}
// This function should be removed eventually. SQLite store is out in v0.3.
func deleteOldJsonlFiles(dir string, olderThan time.Duration) {
// Read files in directory
entries, err := os.ReadDir(dir)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return
}
slog.Error("error reading directory", "error", err, "dir", dir)
return
}
// Current time
now := time.Now()
for _, entry := range entries {
// Ignore directories and non-jsonl files
if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".jsonl") {
continue
}
// Get FileInfo for the file
info, err := entry.Info()
if err != nil {
slog.Error("error getting info for file", "error", err, "file", entry.Name())
continue
}
// Calculate the file's age
fileAge := now.Sub(info.ModTime())
// Check if the file is older than the specified duration
if fileAge > olderThan {
// Construct file path
filePath := dir + "/" + entry.Name()
// Delete the file
err := os.Remove(filePath)
if err != nil {
slog.Error("error deleting file", "error", err, "file", filePath)
} else {
slog.Debug("Deleted file", "file", filePath)
}
}
}
}