Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom S3 CSV billing export endpoint #1991

Merged
merged 4 commits into from
Aug 28, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 62 additions & 21 deletions pkg/filemanager/filemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -34,20 +35,23 @@ type FileManager interface {
// - s3://bucket-name/path/to/file.csv
// - gs://bucket-name/path/to/file.csv
// - https://azblobaccount.blob.core.windows.net/containerName/path/to/file.csv
// - alts3://fqdn:port/bucket-name/path/to/file.csv
// - local/file/path.csv

func NewFileManager(path string) (FileManager, error) {
func NewFileManager(filePath string) (FileManager, error) {
switch {
case strings.HasPrefix(path, "s3://"):
return NewS3File(path)
case strings.HasPrefix(path, "gs://"):
return NewGCSStorageFile(path)
case strings.Contains(path, "blob.core.windows.net"):
return NewAzureBlobFile(path)
case path == "":
case strings.HasPrefix(filePath, "s3://"):
return NewS3File(filePath)
case strings.HasPrefix(filePath, "gs://"):
return NewGCSStorageFile(filePath)
case strings.Contains(filePath, "blob.core.windows.net"):
return NewAzureBlobFile(filePath)
case strings.HasPrefix(filePath, "alts3://"):
return NewAltS3File(filePath)
case filePath == "":
return nil, errors.New("empty path")
default:
return NewSystemFile(path), nil
return NewSystemFile(filePath), nil
}
}

Expand Down Expand Up @@ -85,8 +89,8 @@ type S3File struct {
key string
}

func NewS3File(path string) (*S3File, error) {
u, err := url.Parse(path)
func NewS3File(filePath string) (*S3File, error) {
u, err := url.Parse(filePath)
if err != nil {
return nil, err
}
Expand All @@ -95,7 +99,7 @@ func NewS3File(path string) (*S3File, error) {
key := strings.TrimPrefix(u.Path, "/")

if bucket == "" || key == "" {
return nil, fmt.Errorf("invalid s3 path: %s", path)
return nil, fmt.Errorf("invalid s3 path: %s", filePath)
}

cfg, err := config.LoadDefaultConfig(context.Background())
Expand All @@ -110,6 +114,43 @@ func NewS3File(path string) (*S3File, error) {
}, nil
}

func NewAltS3File(filePath string) (*S3File, error) {
u, err := url.Parse(filePath)
if err != nil {
return nil, err
}

clPath := path.Clean(u.Path)

if len(strings.Split(clPath, "/")) < 3 {
return nil, fmt.Errorf("invalid s3 path: %s", filePath)
}

// Extract bucket and path from url
bucket, key, _ := strings.Cut(strings.TrimLeft(clPath, "/"), "/")

if bucket == "" || key == "" {
return nil, fmt.Errorf("invalid s3 path: %s", filePath)
}

cfg, err := config.LoadDefaultConfig(context.Background())
if err != nil {
return nil, err
}

return &S3File{
s3Client: s3.NewFromConfig(cfg, func(o *s3.Options) {
// Always use https for the endpoint when using an alternative s3 url.
// NOTE: From service/s3 v1.38.0 and onwards use EndpointResolverV2 as described in the AWS SDK docs.
o.EndpointResolver = s3.EndpointResolverFromURL(fmt.Sprintf("https://%v", u.Host), func(e *aws.Endpoint) {
e.HostnameImmutable = true
})
}),
bucket: bucket, // bucket
key: key, // path/to/file.csv
}, nil
}

func (c *S3File) Download(ctx context.Context, f *os.File) error {
_, err := manager.NewDownloader(c.s3Client).Download(ctx, f, &s3.GetObjectInput{
Bucket: aws.String(c.bucket),
Expand Down Expand Up @@ -140,9 +181,9 @@ type GCSStorageFile struct {
client *storage.Client
}

func NewGCSStorageFile(path string) (*GCSStorageFile, error) {
path = strings.TrimPrefix(path, "gs://")
parts := strings.SplitN(path, "/", 2)
func NewGCSStorageFile(filePath string) (*GCSStorageFile, error) {
filePath = strings.TrimPrefix(filePath, "gs://")
parts := strings.SplitN(filePath, "/", 2)
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
return nil, errors.New("invalid GCS path")
}
Expand Down Expand Up @@ -184,16 +225,16 @@ func (g *GCSStorageFile) Upload(ctx context.Context, f *os.File) error {
return w.Close()
}

func NewSystemFile(path string) *SystemFile {
return &SystemFile{path: path}
func NewSystemFile(filePath string) *SystemFile {
return &SystemFile{filePath: filePath}
}

type SystemFile struct {
path string
filePath string
}

func (s *SystemFile) Download(ctx context.Context, f *os.File) error {
sFile, err := os.Open(s.path)
sFile, err := os.Open(s.filePath)
if err != nil {
if os.IsNotExist(err) {
return ErrNotFound
Expand All @@ -215,7 +256,7 @@ func (s *SystemFile) Upload(ctx context.Context, f *os.File) error {
if err != nil {
return err
}
tmpFilePath := filepath.Join(filepath.Dir(s.path), fmt.Sprintf(".tmp-%d", time.Now().UnixNano()))
tmpFilePath := filepath.Join(filepath.Dir(s.filePath), fmt.Sprintf(".tmp-%d", time.Now().UnixNano()))
tmpF, err := os.Create(tmpFilePath)
if err != nil {
return err
Expand All @@ -226,7 +267,7 @@ func (s *SystemFile) Upload(ctx context.Context, f *os.File) error {
if err != nil {
return err
}
err = os.Rename(tmpF.Name(), s.path)
err = os.Rename(tmpF.Name(), s.filePath)
if err != nil {
return err
}
Expand Down
Loading