Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for s3/writer and Refactor. #672

Merged
merged 17 commits into from Sep 24, 2020
8 changes: 8 additions & 0 deletions internal/db/storage/blob/s3/sdk/s3/s3.go
@@ -0,0 +1,8 @@
package s3

import (
"github.com/aws/aws-sdk-go/service/s3"
)

// S3 is type alias for s3.S3.
type S3 = s3.S3
6 changes: 6 additions & 0 deletions internal/db/storage/blob/s3/sdk/s3/s3iface/s3iface.go
@@ -0,0 +1,6 @@
package s3iface
hlts2 marked this conversation as resolved.
Show resolved Hide resolved

import "github.com/aws/aws-sdk-go/service/s3/s3iface"

// S3API is type alias for s3iface.S3API.
type S3API = s3iface.S3API
38 changes: 38 additions & 0 deletions internal/db/storage/blob/s3/sdk/s3/s3manager/s3manager.go
@@ -0,0 +1,38 @@
package s3manager
hlts2 marked this conversation as resolved.
Show resolved Hide resolved

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3iface"
)

type (
// Uploader is type alias of s3manager.Uploader.
Uploader = s3manager.Uploader
// UploadInput is type alias of s3manager.UploadInput.
UploadInput = s3manager.UploadInput
// UploadOutput is type alias of s3manager.UploadOutput.
UploadOutput = s3manager.UploadOutput
)

// UploadClient represents an interface to upload to s3.
type UploadClient interface {
UploadWithContext(ctx aws.Context, input *UploadInput, opts ...func(*Uploader)) (*UploadOutput, error)
}

// S3Manager represents an interface to create object of s3manager package.
type S3Manager interface {
NewUploaderWithClient(svc s3iface.S3API, options ...func(*Uploader)) UploadClient
}

type s3mngr struct{}

// New returns S3Manager implementation.
func New() S3Manager {
return new(s3mngr)
}

// NewUploaderWithClient returns UploadClient implementation.
func (*s3mngr) NewUploaderWithClient(svc s3iface.S3API, options ...func(*Uploader)) UploadClient {
return s3manager.NewUploaderWithClient(svc, options...)
}
39 changes: 39 additions & 0 deletions internal/db/storage/blob/s3/writer/mock_test.go
@@ -0,0 +1,39 @@
package writer

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3iface"
"github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3manager"
)

// S3Manager represents mock of s3manager.S3Manager.
type MockS3Manager struct {
NewUploaderWithClientFunc func(s3iface.S3API, ...func(*s3manager.Uploader)) s3manager.UploadClient
}

// NewUploaderWithClient calls NewUNewUploaderWithClientFunc.
func (m *MockS3Manager) NewUploaderWithClient(svc s3iface.S3API, opts ...func(*s3manager.Uploader)) s3manager.UploadClient {
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
return m.NewUploaderWithClientFunc(svc, opts...)
}

type MockUploadClient struct {
UploadWithContextFunc func(aws.Context, *s3manager.UploadInput, ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error)
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
}

func (m *MockUploadClient) UploadWithContext(ctx aws.Context, input *s3manager.UploadInput, opts ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) {
hlts2 marked this conversation as resolved.
Show resolved Hide resolved
return m.UploadWithContextFunc(ctx, input, opts...)
}

// MockWriteCloser represents mock of io.WriteCloser.
type MockWriteCloser struct {
WriteFunc func(p []byte) (n int, err error)
CloseFunc func() error
}

func (m *MockWriteCloser) Write(p []byte) (n int, err error) {
return m.WriteFunc(p)
}

func (m *MockWriteCloser) Close() error {
return m.CloseFunc()
}
27 changes: 18 additions & 9 deletions internal/db/storage/blob/s3/writer/writer.go
Expand Up @@ -23,19 +23,20 @@ import (
"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3iface"
"github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3manager"
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/safety"
)

type writer struct {
eg errgroup.Group
service *s3.S3
bucket string
key string
eg errgroup.Group
s3manager s3manager.S3Manager
service s3iface.S3API
bucket string
key string

contentType string
maxPartSize int64
Expand All @@ -44,13 +45,17 @@ type writer struct {
wg *sync.WaitGroup
}

// Writer represents an interface to write to s3.
type Writer interface {
Open(ctx context.Context) error
io.WriteCloser
}

// New returns Writer implementation.
func New(opts ...Option) Writer {
w := new(writer)
w := &writer{
s3manager: s3manager.New(),
}
for _, opt := range append(defaultOpts, opts...) {
if err := opt(w); err != nil {
log.Warn(errors.ErrOptionFailed(err, reflect.ValueOf(opt)))
Expand All @@ -60,6 +65,8 @@ func New(opts ...Option) Writer {
return w
}

// Open creates io.Pipe. When the write method is called, the written data will be uploaded to s3.
// Open method returns an error to align the interface, but it doesn't actually return an error.
func (w *writer) Open(ctx context.Context) (err error) {
w.wg = new(sync.WaitGroup)

Expand All @@ -79,6 +86,7 @@ func (w *writer) Open(ctx context.Context) (err error) {
return err
}

// Close closes the writer.
func (w *writer) Close() error {
if w.pw != nil {
return w.pw.Close()
Expand All @@ -91,6 +99,7 @@ func (w *writer) Close() error {
return nil
}

// Write writes len(p) bytes from p to the underlying data stream. The written data will be uploaded to s3.
func (w *writer) Write(p []byte) (n int, err error) {
if w.pw == nil {
return 0, errors.ErrStorageWriterNotOpened
Expand All @@ -100,7 +109,7 @@ func (w *writer) Write(p []byte) (n int, err error) {
}

func (w *writer) upload(ctx context.Context, body io.Reader) (err error) {
uploader := s3manager.NewUploaderWithClient(
client := w.s3manager.NewUploaderWithClient(
w.service,
func(u *s3manager.Uploader) {
u.PartSize = w.maxPartSize
Expand All @@ -113,7 +122,7 @@ func (w *writer) upload(ctx context.Context, body io.Reader) (err error) {
ContentType: aws.String(w.contentType),
}

res, err := uploader.UploadWithContext(ctx, input)
res, err := client.UploadWithContext(ctx, input)
if err != nil {
log.Error("upload failed with error: ", err)
return err
Expand Down