-
Notifications
You must be signed in to change notification settings - Fork 1
/
fileCache.go
224 lines (186 loc) · 6.95 KB
/
fileCache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
package wsHelpers
import (
"fmt"
"os"
"path/filepath"
"sort"
"github.com/pixlise/core/v4/api/filepaths"
"github.com/pixlise/core/v4/api/services"
"github.com/pixlise/core/v4/core/errorwithstatus"
"github.com/pixlise/core/v4/core/fileaccess"
protos "github.com/pixlise/core/v4/generated-protos"
"google.golang.org/protobuf/proto"
)
// This uses a cache as it may be reading the same thing many times in bursts.
// Cache is updated upon user info change though
type fileCacheItem struct {
localPath string
fileSize uint64
timestampUnixSec int64
}
var fileCache = map[string]fileCacheItem{}
var MaxFileCacheAgeSec = int64(60 * 5)
var MaxFileCacheSizeBytes = uint64(200 * 1024 * 1024)
func ReadDatasetFile(scanId string, svcs *services.APIServices) (*protos.Experiment, error) {
cacheId := "scan-" + scanId
fileBytes := checkCache(cacheId, "scan", svcs)
// If we don't have data by now, download it and add to our cache
var err error
if fileBytes == nil {
s3Path := filepaths.GetScanFilePath(scanId, filepaths.DatasetFileName)
fmt.Printf("Downloading file: s3://%v/%v\n", svcs.Config.DatasetsBucket, s3Path)
fileBytes, err = svcs.FS.ReadObject(svcs.Config.DatasetsBucket, s3Path)
if err != nil {
// Doesn't seem to exist?
if svcs.FS.IsNotFoundError(err) {
return nil, errorwithstatus.MakeNotFoundError(scanId)
}
svcs.Log.Errorf("Failed to load scan data for %v, from: s3://%v/%v, error was: %v.", scanId, svcs.Config.DatasetsBucket, s3Path, err)
return nil, err
}
// Write locally
addToCache(cacheId, "-dataset.bin", fmt.Sprintf("s3://%v/%v", svcs.Config.DatasetsBucket, s3Path), fileBytes, svcs)
}
// Now decode the data & return it
datasetPB := &protos.Experiment{}
err = proto.Unmarshal(fileBytes, datasetPB)
if err != nil {
svcs.Log.Errorf("Failed to decode scan data for scan: %v. Error: %v", scanId, err)
return nil, err
}
return datasetPB, nil
}
func ReadQuantificationFile(quantId string, quantPath string, svcs *services.APIServices) (*protos.Quantification, error) {
cacheId := "quant-" + quantId
fileBytes := checkCache(cacheId, "quant", svcs)
// If we don't have data by now, download it and add to our cache
var err error
if fileBytes == nil {
fmt.Printf("Downloading file: s3://%v/%v\n", svcs.Config.UsersBucket, quantPath)
fileBytes, err = svcs.FS.ReadObject(svcs.Config.UsersBucket, quantPath)
if err != nil {
// Doesn't seem to exist?
if svcs.FS.IsNotFoundError(err) {
return nil, errorwithstatus.MakeNotFoundError(quantId)
}
svcs.Log.Errorf("Failed to load quant data for %v, from: s3://%v/%v, error was: %v.", quantId, svcs.Config.UsersBucket, quantPath, err)
return nil, err
}
// Write locally
addToCache(cacheId, "-quant.bin", fmt.Sprintf("s3://%v/%v", svcs.Config.UsersBucket, quantPath), fileBytes, svcs)
}
// Now decode the data & return it
quantPB := &protos.Quantification{}
err = proto.Unmarshal(fileBytes, quantPB)
if err != nil {
svcs.Log.Errorf("Failed to decode quant data for scan: %v. Error: %v", quantId, err)
return nil, err
}
return quantPB, nil
}
func ReadDiffractionFile(scanId string, svcs *services.APIServices) (*protos.Diffraction, error) {
cacheId := "diffraction-" + scanId
fileBytes := checkCache(cacheId, "diffraction", svcs)
// If we don't have data by now, download it and add to our cache
var err error
if fileBytes == nil {
s3Path := filepaths.GetScanFilePath(scanId, filepaths.DiffractionDBFileName)
fmt.Printf("Downloading file: s3://%v/%v\n", svcs.Config.DatasetsBucket, s3Path)
fileBytes, err = svcs.FS.ReadObject(svcs.Config.DatasetsBucket, s3Path)
if err != nil {
// Doesn't seem to exist?
if svcs.FS.IsNotFoundError(err) {
return nil, errorwithstatus.MakeNotFoundError(scanId)
}
svcs.Log.Errorf("Failed to load diffraction data for %v, from: s3://%v/%v, error was: %v.", scanId, svcs.Config.DatasetsBucket, s3Path, err)
return nil, err
}
// Write locally
addToCache(cacheId, "-diffraction.bin", fmt.Sprintf("s3://%v/%v", svcs.Config.DatasetsBucket, s3Path), fileBytes, svcs)
}
// Now decode the data & return it
diffPB := &protos.Diffraction{}
err = proto.Unmarshal(fileBytes, diffPB)
if err != nil {
svcs.Log.Errorf("Failed to decode diffraction data for scan: %v. Error: %v", scanId, err)
return nil, err
}
return diffPB, nil
}
func checkCache(id string, fileTypeName string, svcs *services.APIServices) []byte {
var fileBytes []byte
var err error
lfs := fileaccess.FSAccess{}
// Check if it's cached
if item, ok := fileCache[id]; ok {
// We have a cached file, use if not too old
now := svcs.TimeStamper.GetTimeNowSec()
if item.timestampUnixSec > now-MaxFileCacheAgeSec {
// Read the file from local cache
fmt.Printf("Reading local file: %v\n", item.localPath)
fileBytes, err = lfs.ReadObject("", item.localPath)
if err != nil {
// Failed to read locally, delete this cache item
svcs.Log.Errorf("Failed to read locally cached scan %v for %v, path: %v, error was: %v. Download will be attempted.", fileTypeName, id, item.localPath, err)
delete(fileCache, id)
fileBytes = nil
}
}
}
return fileBytes
}
func addToCache(id string, fileSuffix string, srcPath string, fileBytes []byte, svcs *services.APIServices) {
cacheRoot := os.TempDir()
cachePath := filepath.Join(cacheRoot, id+fileSuffix)
lfs := fileaccess.FSAccess{}
err := lfs.WriteObject("", cachePath, fileBytes)
if err != nil {
svcs.Log.Errorf("Failed to cache %v to local file system: %v", srcPath, err)
// But don't die here, we can still service the request with the file bytes we downloaded
} else {
// Write to cache
fileCache[id] = fileCacheItem{
localPath: cachePath,
fileSize: uint64(len(fileBytes)),
timestampUnixSec: svcs.TimeStamper.GetTimeNowSec(),
}
// Now we remove files that would make us over-extend our cache space
removeOldFileCacheItems(fileCache)
}
}
func orderCacheItems(cache map[string]fileCacheItem) ([]fileCacheItem, uint64) {
// Calc total cache size and sort by age, oldest last
totalSize := uint64(0)
itemsByAge := []fileCacheItem{}
for _, item := range cache {
totalSize += item.fileSize
itemsByAge = append(itemsByAge, item)
}
sort.Slice(itemsByAge, func(i, j int) bool {
return itemsByAge[i].timestampUnixSec > itemsByAge[j].timestampUnixSec
})
return itemsByAge, totalSize
}
func removeOldFileCacheItems(cache map[string]fileCacheItem) {
itemsByAge, totalSize := orderCacheItems(cache)
if len(itemsByAge) <= 0 {
return
}
// Loop through, oldest to newest, delete until we satisfy cache size limit
for c := len(itemsByAge) - 1; c >= 0; c-- {
if totalSize < MaxFileCacheSizeBytes {
// Cache is small enough now, stop here
break
}
// Try delete this file
item := itemsByAge[c]
// Delete it
err := os.Remove(item.localPath)
if err != nil {
// If that worked, remember our cache is smaller now
totalSize -= item.fileSize
// And remove it from cache too
itemsByAge = append(itemsByAge[0:c], itemsByAge[c+1:]...)
}
}
}