Skip to content

Commit

Permalink
Merge pull request #848 from vdaas/test/internal/add_s3_test-revision
Browse files Browse the repository at this point in the history
Revise internal/db/storage/s3 tests in #837
  • Loading branch information
hlts2 committed Nov 19, 2020
2 parents 250df9c + c3e28c1 commit e046d53
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 217 deletions.
30 changes: 0 additions & 30 deletions internal/db/storage/blob/s3/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3manager"

"github.com/vdaas/vald/internal/backoff"
"github.com/vdaas/vald/internal/db/storage/blob/s3/reader"
"github.com/vdaas/vald/internal/db/storage/blob/s3/writer"
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/unit"
)
Expand All @@ -33,34 +31,6 @@ type Option func(c *client) error
var (
defaultOpts = []Option{
WithErrGroup(errgroup.Get()),

func(c *client) error {
c.readerFunc = func(key string) (reader.Reader, error) {
return reader.New(
reader.WithErrGroup(c.eg),
reader.WithService(c.service),
reader.WithBucket(c.bucket),
reader.WithKey(key),
reader.WithMaxChunkSize(c.maxChunkSize),
reader.WithBackoff(c.readerBackoffEnabled),
reader.WithBackoffOpts(c.readerBackoffOpts...),
)
}
return nil
},

func(c *client) error {
c.writerFunc = func(key string) writer.Writer {
return writer.New(
writer.WithErrGroup(c.eg),
writer.WithService(c.service),
writer.WithBucket(c.bucket),
writer.WithKey(key),
writer.WithMaxPartSize(c.maxPartSize),
)
}
return nil
},
}
)

Expand Down
41 changes: 22 additions & 19 deletions internal/db/storage/blob/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ import (
"github.com/vdaas/vald/internal/errors"
)

var (
reader_New = reader.New
writer_New = writer.New
)

type client struct {
eg errgroup.Group
session *session.Session
Expand All @@ -42,9 +47,6 @@ type client struct {

readerBackoffEnabled bool
readerBackoffOpts []backoff.Option

readerFunc func(key string) (reader.Reader, error)
writerFunc func(key string) writer.Writer
}

// New returns blob.Bucket implementation if no error occurs.
Expand All @@ -60,14 +62,6 @@ func New(opts ...Option) (blob.Bucket, error) {
return nil, errors.NewErrInvalidOption("session", c.session)
}

if c.readerFunc == nil {
return nil, errors.NewErrInvalidOption("readerFunc", c.readerFunc)
}

if c.writerFunc == nil {
return nil, errors.NewErrInvalidOption("writerFunc", c.writerFunc)
}

c.service = s3.New(c.session)

return c, nil
Expand All @@ -86,10 +80,15 @@ func (c *client) Close() error {
// Reader creates reader.Reader implementation and returns it.
// An error will be returned if the reader initializes fails and if an error occurs in reader.Open.
func (c *client) Reader(ctx context.Context, key string) (io.ReadCloser, error) {
if c.readerFunc == nil {
return nil, errors.ErrNilObject
}
r, err := c.readerFunc(key)
r, err := reader_New(
reader.WithErrGroup(c.eg),
reader.WithService(c.service),
reader.WithBucket(c.bucket),
reader.WithKey(key),
reader.WithMaxChunkSize(c.maxChunkSize),
reader.WithBackoff(c.readerBackoffEnabled),
reader.WithBackoffOpts(c.readerBackoffOpts...),
)
if err != nil {
return nil, err
}
Expand All @@ -100,9 +99,13 @@ func (c *client) Reader(ctx context.Context, key string) (io.ReadCloser, error)
// Writer creates writer.Writer implementation and returns it.
// An error will be returned if the writer initializes fails and if an error occurs in writer.Open.
func (c *client) Writer(ctx context.Context, key string) (io.WriteCloser, error) {
if c.writerFunc == nil {
return nil, errors.ErrNilObject
}
w := c.writerFunc(key)
w := writer_New(
writer.WithErrGroup(c.eg),
writer.WithService(c.service),
writer.WithBucket(c.bucket),
writer.WithKey(key),
writer.WithMaxPartSize(c.maxPartSize),
)

return w, w.Open(ctx)
}
Loading

0 comments on commit e046d53

Please sign in to comment.