Skip to content

Commit

Permalink
clean up attachment cache disk storage (#24423)
Browse files Browse the repository at this point in the history
  • Loading branch information
mmaxim committed Jan 13, 2021
1 parent 90c1462 commit ce81c7b
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 21 deletions.
2 changes: 1 addition & 1 deletion go/chat/attachment_httpsrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ func NewCachingAttachmentFetcher(g *globals.Context, store attachments.Store, si
Contextified: globals.NewContextified(g),
DebugLabeler: utils.NewDebugLabeler(g.ExternalG(), "CachingAttachmentFetcher", false),
store: store,
diskLRU: disklru.NewDiskLRU("attachments", 1, size),
diskLRU: disklru.NewDiskLRU("attachments", 2, size),
}
}

Expand Down
82 changes: 62 additions & 20 deletions go/chat/attachments/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"sync"
"time"

disklru "github.com/keybase/client/go/lru"

Expand All @@ -27,6 +28,7 @@ import (
const (
uploadedPreviewsDir = "uploadedpreviews"
uploadedFullsDir = "uploadedfulls"
uploadedTempsDir = "uploadtemps"
)

type uploaderTask struct {
Expand Down Expand Up @@ -176,6 +178,7 @@ type Uploader struct {
s3signer s3.Signer
uploads map[string]*activeUpload
previewsLRU, fullsLRU *disklru.DiskLRU
versionUploaderTemps int

// testing
tempDir string
Expand All @@ -185,17 +188,25 @@ var _ types.AttachmentUploader = (*Uploader)(nil)

func NewUploader(g *globals.Context, store Store, s3signer s3.Signer,
ri func() chat1.RemoteInterface, size int) *Uploader {
return &Uploader{
Contextified: globals.NewContextified(g),
DebugLabeler: utils.NewDebugLabeler(g.ExternalG(), "Attachments.Uploader", false),
store: store,
ri: ri,
s3signer: s3signer,
uploads: make(map[string]*activeUpload),
taskStorage: newUploaderTaskStorage(g),
previewsLRU: disklru.NewDiskLRU(uploadedPreviewsDir, 1, size),
fullsLRU: disklru.NewDiskLRU(uploadedFullsDir, 1, size),
}
u := &Uploader{
Contextified: globals.NewContextified(g),
DebugLabeler: utils.NewDebugLabeler(g.ExternalG(), "Attachments.Uploader", false),
store: store,
ri: ri,
s3signer: s3signer,
uploads: make(map[string]*activeUpload),
taskStorage: newUploaderTaskStorage(g),
previewsLRU: disklru.NewDiskLRU(uploadedPreviewsDir, 2, size),
fullsLRU: disklru.NewDiskLRU(uploadedFullsDir, 2, size),
versionUploaderTemps: 1,
}

// make sure local state is clean
mctx := libkb.NewMetaContextTODO(g.ExternalG())
go u.clearOldUploaderTempDirs(context.Background(), 8*time.Second)
go disklru.CleanOutOfSyncWithDelay(mctx, u.previewsLRU, u.getPreviewsDir(), 10*time.Second)
go disklru.CleanOutOfSyncWithDelay(mctx, u.fullsLRU, u.getFullsDir(), 10*time.Second)
return u
}

func (u *Uploader) SetPreviewTempDir(dir string) {
Expand All @@ -221,8 +232,25 @@ func (u *Uploader) Complete(ctx context.Context, outboxID chat1.OutboxID) {
u.clearTempDirFromOutboxID(ctx, outboxID)
}

func (u *Uploader) clearOldUploaderTempDirs(ctx context.Context, delay time.Duration) {
u.Debug(ctx, "clearOldUploaderTempDirs: cleaning in %v", delay)
select {
case <-ctx.Done():
u.Debug(ctx, "clearOldUploaderTempDirs: context canceled, bailing")
return
case <-time.After(delay):
}

defer u.Trace(ctx, nil, "clearOldUploaderTempDirs")()
for i := 0; i < u.versionUploaderTemps; i++ {
dir := u.getUploadTempBaseDir(i)
u.Debug(ctx, "clearOldUploaderTempDirs: cleaning: %s", dir)
os.RemoveAll(dir)
}
}

func (u *Uploader) clearTempDirFromOutboxID(ctx context.Context, outboxID chat1.OutboxID) {
dir := u.getUploadTempDir(outboxID)
dir := u.getUploadTempDir(u.versionUploaderTemps, outboxID)
u.Debug(ctx, "clearTempDirFromOutboxID: clearing: %s", dir)
os.RemoveAll(dir)
}
Expand Down Expand Up @@ -638,12 +666,21 @@ func (u *Uploader) upload(ctx context.Context, uid gregor1.UID, convID chat1.Con
return upload.uploadResult, nil
}

func (u *Uploader) getUploadTempDir(outboxID chat1.OutboxID) string {
return filepath.Join(u.G().GetSharedCacheDir(), "uploadtemps", outboxID.String())
func (u *Uploader) getUploadTempBaseDir(version int) string {
base := filepath.Join(u.G().GetSharedCacheDir(), uploadedTempsDir)
// version 0 didn't have the naming scheme ready, so special case it
if version == 0 {
return base
}
return fmt.Sprintf("%s_v%d", base, version)
}

func (u *Uploader) getUploadTempDir(version int, outboxID chat1.OutboxID) string {
return filepath.Join(u.getUploadTempBaseDir(version), outboxID.String())
}

func (u *Uploader) GetUploadTempFile(ctx context.Context, outboxID chat1.OutboxID, filename string) (string, error) {
dir := u.getUploadTempDir(outboxID)
dir := u.getUploadTempDir(u.versionUploaderTemps, outboxID)
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return "", err
}
Expand Down Expand Up @@ -671,14 +708,19 @@ func (u *Uploader) CancelUploadTempFile(ctx context.Context, outboxID chat1.Outb
return nil
}

func (u *Uploader) getPreviewsDir() string {
return filepath.Join(u.getBaseDir(), uploadedPreviewsDir)
}

func (u *Uploader) getFullsDir() string {
return filepath.Join(u.getBaseDir(), uploadedFullsDir)
}

func (u *Uploader) OnDbNuke(mctx libkb.MetaContext) error {
baseDir := u.getBaseDir()
previewsDir := filepath.Join(baseDir, uploadedPreviewsDir)
if err := u.previewsLRU.CleanOutOfSync(mctx, previewsDir); err != nil {
if err := u.previewsLRU.CleanOutOfSync(mctx, u.getPreviewsDir()); err != nil {
u.Debug(mctx.Ctx(), "unable to run clean for uploadedPreviews: %v", err)
}
fullsDir := filepath.Join(baseDir, uploadedFullsDir)
if err := u.fullsLRU.CleanOutOfSync(mctx, fullsDir); err != nil {
if err := u.fullsLRU.CleanOutOfSync(mctx, u.getFullsDir()); err != nil {
u.Debug(mctx.Ctx(), "unable to run clean for uploadedFulls: %v", err)
}
return nil
Expand Down

0 comments on commit ce81c7b

Please sign in to comment.