Skip to content

Commit

Permalink
:white_check_makr: ♻️ Add test for s3/writer and Refactor. (#672)
Browse files Browse the repository at this point in the history
* feat: wrapper for s3 manager

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: refactor

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: s3 type

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix typo

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: apply suggestion

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* add test for writer

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* delete unncessary file

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* delete unncessary code

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: apply feedback

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: add comment

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: fatal error

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: fatal error

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: fatal error

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: fatal error

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: apply suggestion

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* fix: apply suggestion

Signed-off-by: hlts2 <hiroto.funakoshi.hiroto@gmail.com>

* 🤖 Update license headers / Format go codes and yaml files

Signed-off-by: vdaas-ci <ci@vdaas.org>

Co-authored-by: vdaas-ci <ci@vdaas.org>
  • Loading branch information
hlts2 and vdaas-ci committed Sep 24, 2020
1 parent 7cf4b77 commit 34853a0
Show file tree
Hide file tree
Showing 13 changed files with 437 additions and 226 deletions.
23 changes: 23 additions & 0 deletions internal/db/storage/blob/s3/sdk/s3/s3.go
@@ -0,0 +1,23 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package s3

import (
"github.com/aws/aws-sdk-go/service/s3"
)

// S3 is type alias for s3.S3.
type S3 = s3.S3
21 changes: 21 additions & 0 deletions internal/db/storage/blob/s3/sdk/s3/s3iface/s3iface.go
@@ -0,0 +1,21 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package s3iface

import "github.com/aws/aws-sdk-go/service/s3/s3iface"

// S3API is type alias for s3iface.S3API.
type S3API = s3iface.S3API
53 changes: 53 additions & 0 deletions internal/db/storage/blob/s3/sdk/s3/s3manager/s3manager.go
@@ -0,0 +1,53 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package s3manager

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3iface"
)

type (
// Uploader is type alias of s3manager.Uploader.
Uploader = s3manager.Uploader
// UploadInput is type alias of s3manager.UploadInput.
UploadInput = s3manager.UploadInput
// UploadOutput is type alias of s3manager.UploadOutput.
UploadOutput = s3manager.UploadOutput
)

// UploadClient represents an interface to upload to s3.
type UploadClient interface {
UploadWithContext(ctx aws.Context, input *UploadInput, opts ...func(*Uploader)) (*UploadOutput, error)
}

// S3Manager represents an interface to create object of s3manager package.
type S3Manager interface {
NewUploaderWithClient(svc s3iface.S3API, options ...func(*Uploader)) UploadClient
}

type s3mngr struct{}

// New returns S3Manager implementation.
func New() S3Manager {
return new(s3mngr)
}

// NewUploaderWithClient returns UploadClient implementation.
func (*s3mngr) NewUploaderWithClient(svc s3iface.S3API, options ...func(*Uploader)) UploadClient {
return s3manager.NewUploaderWithClient(svc, options...)
}
54 changes: 54 additions & 0 deletions internal/db/storage/blob/s3/writer/mock_test.go
@@ -0,0 +1,54 @@
//
// Copyright (C) 2019-2020 Vdaas.org Vald team ( kpango, rinx, kmrmt )
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package writer

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3iface"
"github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3manager"
)

// S3Manager represents mock of s3manager.S3Manager.
type MockS3Manager struct {
NewUploaderWithClientFunc func(s3iface.S3API, ...func(*s3manager.Uploader)) s3manager.UploadClient
}

// NewUploaderWithClient calls NewUNewUploaderWithClientFunc.
func (m *MockS3Manager) NewUploaderWithClient(svc s3iface.S3API, opts ...func(*s3manager.Uploader)) s3manager.UploadClient {
return m.NewUploaderWithClientFunc(svc, opts...)
}

type MockUploadClient struct {
UploadWithContextFunc func(aws.Context, *s3manager.UploadInput, ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error)
}

func (m *MockUploadClient) UploadWithContext(ctx aws.Context, input *s3manager.UploadInput, opts ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) {
return m.UploadWithContextFunc(ctx, input, opts...)
}

// MockWriteCloser represents mock of io.WriteCloser.
type MockWriteCloser struct {
WriteFunc func(p []byte) (n int, err error)
CloseFunc func() error
}

func (m *MockWriteCloser) Write(p []byte) (n int, err error) {
return m.WriteFunc(p)
}

func (m *MockWriteCloser) Close() error {
return m.CloseFunc()
}
27 changes: 18 additions & 9 deletions internal/db/storage/blob/s3/writer/writer.go
Expand Up @@ -23,19 +23,20 @@ import (
"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3iface"
"github.com/vdaas/vald/internal/db/storage/blob/s3/sdk/s3/s3manager"
"github.com/vdaas/vald/internal/errgroup"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/safety"
)

type writer struct {
eg errgroup.Group
service *s3.S3
bucket string
key string
eg errgroup.Group
s3manager s3manager.S3Manager
service s3iface.S3API
bucket string
key string

contentType string
maxPartSize int64
Expand All @@ -44,13 +45,17 @@ type writer struct {
wg *sync.WaitGroup
}

// Writer represents an interface to write to s3.
type Writer interface {
Open(ctx context.Context) error
io.WriteCloser
}

// New returns Writer implementation.
func New(opts ...Option) Writer {
w := new(writer)
w := &writer{
s3manager: s3manager.New(),
}
for _, opt := range append(defaultOpts, opts...) {
if err := opt(w); err != nil {
log.Warn(errors.ErrOptionFailed(err, reflect.ValueOf(opt)))
Expand All @@ -60,6 +65,8 @@ func New(opts ...Option) Writer {
return w
}

// Open creates io.Pipe. When the write method is called, the written data will be uploaded to s3.
// Open method returns an error to align the interface, but it doesn't actually return an error.
func (w *writer) Open(ctx context.Context) (err error) {
w.wg = new(sync.WaitGroup)

Expand All @@ -79,6 +86,7 @@ func (w *writer) Open(ctx context.Context) (err error) {
return err
}

// Close closes the writer.
func (w *writer) Close() error {
if w.pw != nil {
return w.pw.Close()
Expand All @@ -91,6 +99,7 @@ func (w *writer) Close() error {
return nil
}

// Write writes len(p) bytes from p to the underlying data stream. The written data will be uploaded to s3.
func (w *writer) Write(p []byte) (n int, err error) {
if w.pw == nil {
return 0, errors.ErrStorageWriterNotOpened
Expand All @@ -100,7 +109,7 @@ func (w *writer) Write(p []byte) (n int, err error) {
}

func (w *writer) upload(ctx context.Context, body io.Reader) (err error) {
uploader := s3manager.NewUploaderWithClient(
client := w.s3manager.NewUploaderWithClient(
w.service,
func(u *s3manager.Uploader) {
u.PartSize = w.maxPartSize
Expand All @@ -113,7 +122,7 @@ func (w *writer) upload(ctx context.Context, body io.Reader) (err error) {
ContentType: aws.String(w.contentType),
}

res, err := uploader.UploadWithContext(ctx, input)
res, err := client.UploadWithContext(ctx, input)
if err != nil {
log.Error("upload failed with error: ", err)
return err
Expand Down

0 comments on commit 34853a0

Please sign in to comment.