Skip to content

Commit

Permalink
Use aws-sdk-go-v2 S3 clients
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelSnowden committed Jun 15, 2023
1 parent 73189b7 commit 47301c5
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 5,303 deletions.
29 changes: 16 additions & 13 deletions common/archiver/s3store/historyArchiver.go
Expand Up @@ -34,12 +34,12 @@ import (
"strings"
"time"

awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"go.temporal.io/api/serviceerror"

archiverspb "go.temporal.io/server/api/archiver/v1"
Expand Down Expand Up @@ -71,7 +71,7 @@ var (
type (
historyArchiver struct {
container *archiver.HistoryBootstrapContainer
s3cli s3iface.S3API
s3cli s3Client
// only set in test code
historyIterator archiver.HistoryIterator
}
Expand Down Expand Up @@ -105,19 +105,21 @@ func newHistoryArchiver(
if len(config.Region) == 0 {
return nil, errEmptyAwsRegion
}
s3Config := &aws.Config{
Endpoint: config.Endpoint,
Region: aws.String(config.Region),
S3ForcePathStyle: aws.Bool(config.S3ForcePathStyle),
}
sess, err := session.NewSession(s3Config)

cfg, err := awsconfig.LoadDefaultConfig(context.TODO(), awsconfig.WithRegion(config.Region))
if err != nil {
return nil, err
}
s3cli := s3.NewFromConfig(cfg, func(options *s3.Options) {
if config.Endpoint != nil {
options.EndpointResolver = s3.EndpointResolverFromURL(*config.Endpoint)
}
options.UsePathStyle = config.S3ForcePathStyle
})

return &historyArchiver{
container: container,
s3cli: s3.New(sess),
s3cli: s3cli,
historyIterator: historyIterator,
}, nil
}
Expand Down Expand Up @@ -365,13 +367,14 @@ func (h *historyArchiver) getHighestVersion(ctx context.Context, URI archiver.UR
ctx, cancel := ensureContextTimeout(ctx)
defer cancel()
var prefix = constructHistoryKeyPrefix(URI.Path(), request.NamespaceID, request.WorkflowID, request.RunID) + "/"
results, err := h.s3cli.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
results, err := h.s3cli.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(URI.Hostname()),
Prefix: aws.String(prefix),
Delimiter: aws.String("/"),
})
if err != nil {
if aerr, ok := err.(awserr.Error); ok && aerr.Code() == s3.ErrCodeNoSuchBucket {
var noSuchBucket *types.NoSuchBucket
if errors.As(err, &noSuchBucket) {
return nil, serviceerror.NewInvalidArgument(errBucketNotExists.Error())
}
return nil, err
Expand Down
50 changes: 25 additions & 25 deletions common/archiver/s3store/historyArchiver_test.go
Expand Up @@ -36,10 +36,11 @@ import (
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand All @@ -50,7 +51,6 @@ import (
archiverspb "go.temporal.io/server/api/archiver/v1"
"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver"
"go.temporal.io/server/common/archiver/s3store/mocks"
"go.temporal.io/server/common/codec"
"go.temporal.io/server/common/convert"
"go.temporal.io/server/common/log"
Expand All @@ -75,7 +75,7 @@ var testBranchToken = []byte{1, 2, 3}
type historyArchiverSuite struct {
*require.Assertions
suite.Suite
s3cli *mocks.MockS3API
s3cli *Mocks3Client
container *archiver.HistoryBootstrapContainer
testArchivalURI archiver.URI
historyBatchesV1 []*archiverspb.HistoryBlob
Expand Down Expand Up @@ -104,15 +104,15 @@ func (s *historyArchiverSuite) SetupTest() {
}

s.controller = gomock.NewController(s.T())
s.s3cli = mocks.NewMockS3API(s.controller)
s.s3cli = NewMocks3Client(s.controller)
setupFsEmulation(s.s3cli)
s.setupHistoryDirectory()
}

func setupFsEmulation(s3cli *mocks.MockS3API) {
func setupFsEmulation(s3cli *Mocks3Client) {

Check failure on line 112 in common/archiver/s3store/historyArchiver_test.go

View workflow job for this annotation

GitHub Actions / lint

cyclomatic: function setupFsEmulation has cyclomatic complexity 20 (> max enabled 15) (revive)
fs := make(map[string][]byte)

putObjectFn := func(_ aws.Context, input *s3.PutObjectInput, _ ...request.Option) (*s3.PutObjectOutput, error) {
putObjectFn := func(_ context.Context, input *s3.PutObjectInput, _ ...request.Option) (*s3.PutObjectOutput, error) {
buf := new(bytes.Buffer)
if _, err := buf.ReadFrom(input.Body); err != nil {
return nil, err
Expand All @@ -121,27 +121,27 @@ func setupFsEmulation(s3cli *mocks.MockS3API) {
return &s3.PutObjectOutput{}, nil
}

s3cli.EXPECT().ListObjectsV2WithContext(gomock.Any(), gomock.Any()).DoAndReturn(
s3cli.EXPECT().ListObjectsV2(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, input *s3.ListObjectsV2Input, opts ...request.Option) (*s3.ListObjectsV2Output, error) {
objects := make([]*s3.Object, 0)
objects := make([]types.Object, 0)
commonPrefixMap := map[string]bool{}
for k := range fs {
if strings.HasPrefix(k, *input.Bucket+*input.Prefix) {
key := k[len(*input.Bucket):]
keyWithoutPrefix := key[len(*input.Prefix):]
index := strings.Index(keyWithoutPrefix, "/")
if index == -1 || input.Delimiter == nil {
objects = append(objects, &s3.Object{
objects = append(objects, types.Object{
Key: aws.String(key),
})
} else {
commonPrefixMap[key[:len(*input.Prefix)+index]] = true
}
}
}
commonPrefixes := make([]*s3.CommonPrefix, 0)
commonPrefixes := make([]types.CommonPrefix, 0)
for k := range commonPrefixMap {
commonPrefixes = append(commonPrefixes, &s3.CommonPrefix{
commonPrefixes = append(commonPrefixes, types.CommonPrefix{
Prefix: aws.String(k),
})
}
Expand All @@ -150,8 +150,8 @@ func setupFsEmulation(s3cli *mocks.MockS3API) {
return *objects[i].Key < *objects[j].Key
})
maxKeys := 1000
if input.MaxKeys != nil {
maxKeys = int(*input.MaxKeys)
if input.MaxKeys != 0 {
maxKeys = int(input.MaxKeys)
}
start := 0
if input.ContinuationToken != nil {
Expand Down Expand Up @@ -195,14 +195,14 @@ func setupFsEmulation(s3cli *mocks.MockS3API) {
return &s3.ListObjectsV2Output{
CommonPrefixes: commonPrefixes,
Contents: objects,
IsTruncated: &isTruncated,
IsTruncated: isTruncated,
NextContinuationToken: nextContinuationToken,
}, nil
}).AnyTimes()
s3cli.EXPECT().PutObjectWithContext(gomock.Any(), gomock.Any()).DoAndReturn(putObjectFn).AnyTimes()
s3cli.EXPECT().PutObject(gomock.Any(), gomock.Any()).DoAndReturn(putObjectFn).AnyTimes()

s3cli.EXPECT().HeadObjectWithContext(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx aws.Context, input *s3.HeadObjectInput, options ...request.Option) (*s3.HeadObjectOutput, error) {
s3cli.EXPECT().HeadObject(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, input *s3.HeadObjectInput, options ...request.Option) (*s3.HeadObjectOutput, error) {
_, ok := fs[*input.Bucket+*input.Key]
if !ok {
return nil, awserr.New("NotFound", "", nil)
Expand All @@ -211,11 +211,11 @@ func setupFsEmulation(s3cli *mocks.MockS3API) {
return &s3.HeadObjectOutput{}, nil
}).AnyTimes()

s3cli.EXPECT().GetObjectWithContext(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx aws.Context, input *s3.GetObjectInput, options ...request.Option) (*s3.GetObjectOutput, error) {
s3cli.EXPECT().GetObject(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, input *s3.GetObjectInput, options ...request.Option) (*s3.GetObjectOutput, error) {
_, ok := fs[*input.Bucket+*input.Key]
if !ok {
return nil, awserr.New(s3.ErrCodeNoSuchKey, "", nil)
return nil, &types.NoSuchKey{}
}

return &s3.GetObjectOutput{
Expand Down Expand Up @@ -247,8 +247,8 @@ func (s *historyArchiverSuite) TestValidateURI() {
},
}

s.s3cli.EXPECT().HeadBucketWithContext(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx aws.Context, input *s3.HeadBucketInput, options ...request.Option) (*s3.HeadBucketOutput, error) {
s.s3cli.EXPECT().HeadBucket(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, input *s3.HeadBucketInput, options ...request.Option) (*s3.HeadBucketOutput, error) {
if *input.Bucket != s.testArchivalURI.Hostname() {
return nil, awserr.New("NotFound", "", nil)
}
Expand Down Expand Up @@ -770,7 +770,7 @@ func (s *historyArchiverSuite) writeHistoryBatchesForGetTest(historyBatches []*a
data, err := encoder.Encode(batch)
s.Require().NoError(err)
key := constructHistoryKey("", testNamespaceID, testWorkflowID, testRunID, version, i)
_, err = s.s3cli.PutObjectWithContext(context.Background(), &s3.PutObjectInput{
_, err = s.s3cli.PutObject(context.Background(), &s3.PutObjectInput{
Bucket: aws.String(testBucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
Expand All @@ -780,7 +780,7 @@ func (s *historyArchiverSuite) writeHistoryBatchesForGetTest(historyBatches []*a
}

func (s *historyArchiverSuite) assertKeyExists(key string) {
_, err := s.s3cli.GetObjectWithContext(context.Background(), &s3.GetObjectInput{
_, err := s.s3cli.GetObject(context.Background(), &s3.GetObjectInput{
Bucket: aws.String(testBucket),
Key: aws.String(key),
})
Expand Down

0 comments on commit 47301c5

Please sign in to comment.