-
Notifications
You must be signed in to change notification settings - Fork 111
/
s3.go
134 lines (117 loc) · 3.45 KB
/
s3.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package s3
import (
"context"
"fmt"
"net/url"
"os"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/mitchellh/mapstructure"
"github.com/rilldata/rill/runtime/connectors"
"github.com/rilldata/rill/runtime/pkg/fileutil"
)
func init() {
connectors.Register("s3", connector{})
}
var spec = connectors.Spec{
DisplayName: "Amazon S3",
Description: "Connect to AWS S3 Storage.",
Properties: []connectors.PropertySchema{
{
Key: "path",
DisplayName: "S3 URI",
Description: "Path to file on the disk.",
Placeholder: "s3://bucket-name/path/to/file.csv",
Type: connectors.StringPropertyType,
Required: true,
Hint: "Note that glob patterns aren't yet supported",
},
{
Key: "aws.region",
DisplayName: "AWS region",
Description: "AWS Region for the bucket.",
Placeholder: "us-east-1",
Type: connectors.StringPropertyType,
Required: false,
Hint: "Rill will use the default region in your local AWS config, unless set here.",
},
{
Key: "aws.credentials",
DisplayName: "AWS credentials",
Description: "AWS credentials inferred from your local environment.",
Type: connectors.InformationalPropertyType,
Hint: "Set your local credentials: <code>aws configure</code> Click to learn more.",
Href: "https://docs.rilldata.com/import-data#setting-amazon-s3-credentials",
},
},
}
type Config struct {
Path string `mapstructure:"path"`
AWSRegion string `mapstructure:"aws.region"`
}
func ParseConfig(props map[string]any) (*Config, error) {
conf := &Config{}
err := mapstructure.Decode(props, conf)
if err != nil {
return nil, err
}
return conf, nil
}
type connector struct{}
func (c connector) Spec() connectors.Spec {
return spec
}
func (c connector) ConsumeAsFile(ctx context.Context, env *connectors.Env, source *connectors.Source) (string, error) {
conf, err := ParseConfig(source.Properties)
if err != nil {
return "", fmt.Errorf("failed to parse config: %v", err)
}
// The session the S3 Downloader will use
sess, err := getAwsSessionConfig(conf)
if err != nil {
return "", fmt.Errorf("failed to start session: %v", err)
}
// Create a downloader with the session and default options
downloader := s3manager.NewDownloader(sess)
bucket, key, extension, err := getAwsUrlParts(conf.Path)
if err != nil {
return "", fmt.Errorf("failed to parse path %s, %v", conf.Path, err)
}
f, err := os.CreateTemp(
os.TempDir(),
fmt.Sprintf("%s*%s", source.Name, extension),
)
if err != nil {
return "", fmt.Errorf("os.Create: %v", err)
}
defer f.Close()
// Write the contents of S3 Object to the f
_, err = downloader.Download(f, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
os.Remove(f.Name())
return "", fmt.Errorf("failed to download f, %v", err)
}
return f.Name(), nil
}
func getAwsSessionConfig(conf *Config) (*session.Session, error) {
if conf.AWSRegion != "" {
return session.NewSession(&aws.Config{
Region: aws.String(conf.AWSRegion),
})
}
return session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
})
}
func getAwsUrlParts(path string) (string, string, string, error) {
u, err := url.Parse(path)
if err != nil {
return "", "", "", err
}
return u.Host, u.Path, fileutil.FullExt(u.Path), nil
}