-
Notifications
You must be signed in to change notification settings - Fork 1
/
filemanager.go
128 lines (112 loc) · 4.43 KB
/
filemanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
//go:generate mockgen -destination=mock_filemanager/mock_filemanager.go -package mock_filemanager github.com/rudderlabs/rudder-go-kit/filemanager FileManager
package filemanager
import (
"context"
"errors"
"fmt"
"os"
"time"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
)
const defaultTimeout = 120 * time.Second
var (
ErrKeyNotFound = errors.New("NoSuchKey")
ErrInvalidServiceProvider = errors.New("service provider not supported")
ErrPreConditionFailed = errors.New("precondition failed")
)
// Factory is a function that returns a new file manager
type Factory func(settings *Settings) (FileManager, error)
// UploadedFile contains information about the uploaded file
type UploadedFile struct {
Location string
ObjectName string
}
// FileInfo contains information about a file
type FileInfo struct {
Key string
LastModified time.Time
}
// FileManager is able to manage files in a storage provider
type FileManager interface {
// ListFilesWithPrefix starts a list session for files with given prefix
ListFilesWithPrefix(ctx context.Context, startAfter, prefix string, maxItems int64) ListSession
// Download downloads the file with given key to the passed in file
Download(context.Context, *os.File, string) error
// Upload uploads the passed in file to the file manager
Upload(context.Context, *os.File, ...string) (UploadedFile, error)
// Delete deletes the file(s) with given key(s)
Delete(ctx context.Context, keys []string) error
// Prefix returns the prefix for the file manager
Prefix() string
// SetTimeout overrides the default timeout for the file manager
SetTimeout(timeout time.Duration)
// GetObjectNameFromLocation gets the object name/key name from the object location url
GetObjectNameFromLocation(string) (string, error)
// GetDownloadKeyFromFileLocation gets the download key from the object location url
GetDownloadKeyFromFileLocation(string) string
}
// ListSession is a session for listing files
type ListSession interface {
// Next returns the next batch of files, until there are no more files for this session
Next() (fileObjects []*FileInfo, err error)
}
// Settings for file manager
type Settings struct {
Provider string
Config map[string]interface{}
Logger logger.Logger
Conf *config.Config
// when GCSUploadIfNotExist is set to true, the client uploads to GCS storage
// only if a file with the same name doesn't exist already
GCSUploadIfNotExist bool
}
// New returns file manager backed by configured provider
func New(settings *Settings) (FileManager, error) {
log := settings.Logger
if log == nil {
log = logger.NewLogger().Child("filemanager")
}
conf := settings.Conf
if conf == nil {
conf = config.Default
}
switch settings.Provider {
case "S3_DATALAKE":
return NewS3Manager(settings.Config, log, getDefaultTimeout(conf, settings.Provider))
case "S3":
return NewS3Manager(settings.Config, log, getDefaultTimeout(conf, settings.Provider))
case "GCS":
return NewGCSManager(settings.Config, log, getDefaultTimeout(conf, settings.Provider),
WithGCSUploadIfObjectNotExist(settings.GCSUploadIfNotExist),
)
case "AZURE_BLOB":
return NewAzureBlobManager(settings.Config, log, getDefaultTimeout(conf, settings.Provider))
case "MINIO":
return NewMinioManager(settings.Config, log, getDefaultTimeout(conf, settings.Provider))
case "DIGITAL_OCEAN_SPACES":
return NewDigitalOceanManager(settings.Config, log, getDefaultTimeout(conf, settings.Provider))
}
return nil, fmt.Errorf("%w: %s", ErrInvalidServiceProvider, settings.Provider)
}
func getDefaultTimeout(config *config.Config, destType string) func() time.Duration {
return func() time.Duration {
key := "timeout"
defaultValueInTimescaleUnits := int64(120)
timeScale := time.Second
if config.IsSet("FileManager." + destType + "." + key) {
return config.GetDuration("FileManager."+destType+"."+key, defaultValueInTimescaleUnits, timeScale)
}
if config.IsSet("FileManager." + key) {
return config.GetDuration("FileManager."+key, defaultValueInTimescaleUnits, timeScale)
}
return func() time.Duration { // legacy keys used in rudder-server
destOverrideFound := config.IsSet("BatchRouter." + destType + "." + key)
if destOverrideFound {
return config.GetDuration("BatchRouter."+destType+"."+key, defaultValueInTimescaleUnits, timeScale)
} else {
return config.GetDuration("BatchRouter."+key, defaultValueInTimescaleUnits, timeScale)
}
}()
}
}