forked from wal-g/wal-g
/
uploader.go
131 lines (116 loc) · 3.48 KB
/
uploader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package internal
import (
"io"
"path"
"path/filepath"
"sync"
"sync/atomic"
"github.com/wal-g/storages/storage"
"github.com/tinsane/tracelog"
"github.com/wal-g/wal-g/internal/compression"
"github.com/wal-g/wal-g/utility"
)
// Uploader contains fields associated with uploading tarballs.
// Multiple tarballs can share one uploader.
type Uploader struct {
UploadingFolder storage.Folder
Compressor compression.Compressor
waitGroup *sync.WaitGroup
deltaFileManager *DeltaFileManager
ArchiveStatusManager ArchiveStatusManager
Failed atomic.Value
tarSize *int64
}
// UploadObject
type UploadObject struct {
Path string
Content io.Reader
}
func (uploader *Uploader) getUseWalDelta() (useWalDelta bool) {
return uploader.deltaFileManager != nil
}
func NewUploader(
compressor compression.Compressor,
uploadingLocation storage.Folder,
deltaFileManager *DeltaFileManager,
) *Uploader {
size := int64(0)
uploader := &Uploader{
UploadingFolder: uploadingLocation,
Compressor: compressor,
waitGroup: &sync.WaitGroup{},
deltaFileManager: deltaFileManager,
tarSize: &size,
}
uploader.Failed.Store(false)
return uploader
}
// finish waits for all waiting parts to be uploaded. If an error occurs,
// prints alert to stderr.
func (uploader *Uploader) finish() {
uploader.waitGroup.Wait()
if uploader.Failed.Load().(bool) {
tracelog.ErrorLogger.Printf("WAL-G could not complete upload.\n")
}
}
// Clone creates similar Uploader with new WaitGroup
func (uploader *Uploader) Clone() *Uploader {
return &Uploader{
uploader.UploadingFolder,
uploader.Compressor,
&sync.WaitGroup{},
uploader.deltaFileManager,
uploader.ArchiveStatusManager,
uploader.Failed,
uploader.tarSize,
}
}
// TODO : unit tests
func (uploader *Uploader) UploadWalFile(file NamedReader) error {
var walFileReader io.Reader
filename := path.Base(file.Name())
if uploader.getUseWalDelta() && isWalFilename(filename) {
recordingReader, err := NewWalDeltaRecordingReader(file, filename, uploader.deltaFileManager)
if err != nil {
walFileReader = file
} else {
walFileReader = recordingReader
defer utility.LoggedClose(recordingReader, "")
}
} else {
walFileReader = file
}
return uploader.UploadFile(NewNamedReaderImpl(walFileReader, file.Name()))
}
// TODO : unit tests
// UploadFile compresses a file and uploads it.
func (uploader *Uploader) UploadFile(file NamedReader) error {
compressedFile := CompressAndEncrypt(file, uploader.Compressor, ConfigureCrypter())
dstPath := utility.SanitizePath(filepath.Base(file.Name()) + "." + uploader.Compressor.FileExtension())
err := uploader.Upload(dstPath, compressedFile)
tracelog.InfoLogger.Println("FILE PATH:", dstPath)
return err
}
// TODO : unit tests
func (uploader *Uploader) Upload(path string, content io.Reader) error {
err := uploader.UploadingFolder.PutObject(path, &WithSizeReader{content, uploader.tarSize})
if err == nil {
return nil
}
uploader.Failed.Store(true)
tracelog.ErrorLogger.Printf(tracelog.GetErrorFormatter()+"\n", err)
return err
}
// UploadMultiple uploads multiple objects from the start of the slice,
// returning the first error if any. Note that this operation is not atomic
// TODO : unit tests
func (uploader *Uploader) UploadMultiple(objects []UploadObject) error {
for _, object := range objects {
err := uploader.Upload(object.Path, object.Content)
if err != nil {
// possibly do a retry here
return err
}
}
return nil
}