-
-
Notifications
You must be signed in to change notification settings - Fork 737
/
migrate_blobs.go
129 lines (103 loc) · 2.83 KB
/
migrate_blobs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package task
import (
"context"
"fmt"
"github.com/stashapp/stash/pkg/job"
"github.com/stashapp/stash/pkg/logger"
"github.com/stashapp/stash/pkg/txn"
)
type BlobStoreMigrator interface {
Count(ctx context.Context) (int, error)
FindBlobs(ctx context.Context, n uint, lastChecksum string) ([]string, error)
MigrateBlob(ctx context.Context, checksum string, deleteOld bool) error
}
type Vacuumer interface {
Vacuum(ctx context.Context) error
}
type MigrateBlobsJob struct {
TxnManager txn.Manager
BlobStore BlobStoreMigrator
Vacuumer Vacuumer
DeleteOld bool
}
func (j *MigrateBlobsJob) Execute(ctx context.Context, progress *job.Progress) {
var (
count int
err error
)
progress.ExecuteTask("Counting blobs", func() {
count, err = j.countBlobs(ctx)
progress.SetTotal(count)
})
if err != nil {
logger.Errorf("Error counting blobs: %s", err.Error())
return
}
if count == 0 {
logger.Infof("No blobs to migrate")
return
}
logger.Infof("Migrating %d blobs", count)
progress.ExecuteTask(fmt.Sprintf("Migrating %d blobs", count), func() {
err = j.migrateBlobs(ctx, progress)
})
if job.IsCancelled(ctx) {
logger.Info("Cancelled migrating blobs")
return
}
if err != nil {
logger.Errorf("Error migrating blobs: %v", err)
return
}
// run a vacuum to reclaim space
progress.ExecuteTask("Vacuuming database", func() {
err = j.Vacuumer.Vacuum(ctx)
if err != nil {
logger.Errorf("Error vacuuming database: %v", err)
}
})
logger.Infof("Finished migrating blobs")
}
func (j *MigrateBlobsJob) countBlobs(ctx context.Context) (int, error) {
var count int
if err := txn.WithReadTxn(ctx, j.TxnManager, func(ctx context.Context) error {
var err error
count, err = j.BlobStore.Count(ctx)
return err
}); err != nil {
return 0, err
}
return count, nil
}
func (j *MigrateBlobsJob) migrateBlobs(ctx context.Context, progress *job.Progress) error {
lastChecksum := ""
batch, err := j.getBatch(ctx, lastChecksum)
for len(batch) > 0 && err == nil && ctx.Err() == nil {
for _, checksum := range batch {
if ctx.Err() != nil {
return nil
}
lastChecksum = checksum
progress.ExecuteTask("Migrating blob "+checksum, func() {
defer progress.Increment()
if err := txn.WithTxn(ctx, j.TxnManager, func(ctx context.Context) error {
return j.BlobStore.MigrateBlob(ctx, checksum, j.DeleteOld)
}); err != nil {
logger.Errorf("Error migrating blob %s: %v", checksum, err)
}
})
}
batch, err = j.getBatch(ctx, lastChecksum)
}
return err
}
func (j *MigrateBlobsJob) getBatch(ctx context.Context, lastChecksum string) ([]string, error) {
const batchSize = 1000
var batch []string
err := txn.WithReadTxn(ctx, j.TxnManager, func(ctx context.Context) error {
var err error
batch, err = j.BlobStore.FindBlobs(ctx, batchSize, lastChecksum)
return err
})
return batch, err
}